Dynamic expansion of thread data (#294)

* Tests for exceeding OMNITRACE_MAX_THREADS

- tests which exceeds OMNITRACE_MAX_THREADS value for thread creation

* CMake Formatting.cmake update

- include source files in /tests/source directory

* Add unknown-hash= to OMNITRACE_ABORT_FAIL_REGEX

- fail if a timemory hash is not resolved to a name

* Tests for exceeding OMNITRACE_MAX_THREADS

- update

* omnitrace-sample update

- remove env disabling of critical-trace and process-sampling

* core library update

- make_unique in concepts.hpp
- add OMNITRACE_USE_ROCM_SMI to "process_sampling" category
- remove forced disabling of critical-trace in sampling mode
- parentheses for OMNITRACE_PREFER
- use tim::get_hash_id instead of tim::get_combined_hash_id

* core library update (containers)

- added aligned_static_vector.hpp
  - similar to static_vector.hpp but attempts to align to cache line size
- alignment template parameter for stable_vector
- added missing aliases in static_vector
  - consistent with aligned_static_vector aliases

* thread_info update

- track the peak number of threads created
- thread_info::get_peak_num_threads() returns the peak number of threads

* thread_data update

- generic thread_data inherits from base_thread_data
- thread_data reworked to support dynamic expansion
- base_thread_data updated to invoke private_instance() function
- thread_data<optional<T>> uses stable_vector aligned to cache line width
- thread_data<identity<T>> uses stable_vector aligned to cache line width
- thread_data for optional and identity provide private private_instance function + friend to base_thread_data
- component_bundle_cache<T> is now thread_data<component_bundle_cache_impl<T>>

* causal update

- thread_data<T>::instances -> thread_data<T>::instance(construct_on_thread{ ... })
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()
- tim::get_combined_hash_id -> tim::get_hash_id
- update progress_bundle usage to new thread_data API

* backtrace/backtrace_metrics component update

- backtrace_metrics update
  - update to new thead_data API
  - add thread CPU time row in perfetto
  - fix potential bug when rusage categories are disabled
  - fix bug in operator-= not subtracting cpu time of rhs
- backtrace update
  - skip all child call-stack below 'tim::openmp::' if sampling_keep_internal = false

* pthread_gotcha component update

- pthread_gotcha::shutdown() invokes pthread_create_gotcha::shutdown()

* pthread_create_gotcha component update

- minor tweak to {start,stop}_bundle functions: pass in thread id
- update to new thread_data API
- track native handles of internal threads
- implement system with pthread_kill to stop dangling bundles

* rocprofiler/roctracer component update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* critical trace (library) update

- update to new thread_data API
- tim::get_combined_hash_id -> tim::get_hash_id

* coverage update

- update to new thread_data API

* tasking update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* roctracer update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* rocm_smi update

- update to new thread_data API

* runtime.cpp update

- update to new thread_data API

* sampling.cpp update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* ompt.cpp update

- invoke pthread_gotcha::shutdown before invoking OMPT finalize function
  - this prevents signals from being delivered to OpenMP threads

* tracing.hpp and tracing.cpp update

- replace get_timemory_hash_{ids,aliases} functions with copy_timemory_hash_ids function
- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()
- tim::get_combined_hash_id -> tim::get_hash_id
- improvements to + error checking in thread_init function

* library.cpp update

- move copying timemory hash id/aliases to tracing.cpp
- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* Update BuildSettings.cmake

- add -Wno-interference-size to suppress warning about use of std::hardware_destructive_interference

* Update fork example

- improve scheme for waiting on child processes via waitpid instead of wait
- support running main routine multiple times
- push/pop regions in child process

* Update lib/common/defines.h.in

- allow use to specify misc values via -D <name>=<value>
  - OMNITRACE_CACHELINE_SIZE
  - OMNITRACE_CACHELINE_SIZE_MIN
  - OMNITRACE_ROCM_MAX_COUNTERS
- remove unused defines
  - OMNITRACE_ROCM_LOOK_AHEAD
  - OMNITRACE_MAX_ROCM_QUEUES

* Update rocprofiler.hpp

- OMNITRACE_MAX_ROCM_COUNTERS -> OMNITRACE_ROCM_MAX_COUNTERS

* Update aligned_static_vector

- set cacheline_align_v from max of OMNITRACE_CACHELINE_SIZE and OMNITRACE_CACHELINE_SIZE_MIN

* Update tracing.cpp

- acquire locks for updating main hash ids/aliases
- only propagate ids/aliases when finalizing

* Update pthread_create_gotcha.cpp

- make sure hash for "start_thread" exists on main thread

* Update causal end to end tests

- if OMNITRACE_BUILD_NUMBER is 1, set OMNITRACE_VERBOSE=0

[ROCm/rocprofiler-systems commit: 518c83e0f9]
This commit is contained in:
Jonathan R. Madsen
2023-10-16 18:04:47 -05:00
committed by GitHub
szülő 63e8cec645
commit a1b11b94f0
43 fájl változott, egészen pontosan 1231 új sor hozzáadva és 433 régi sor törölve
@@ -86,7 +86,7 @@ endif()
#
add_flag_if_avail(
"-W" "-Wall" "-Wno-unknown-pragmas" "-Wno-unused-function" "-Wno-ignored-attributes"
"-Wno-attributes" "-Wno-missing-field-initializers")
"-Wno-attributes" "-Wno-missing-field-initializers" "-Wno-interference-size")
if(OMNITRACE_BUILD_DEBUG)
add_flag_if_avail("-g3" "-fno-omit-frame-pointer")
@@ -64,6 +64,8 @@ if(OMNITRACE_CLANG_FORMAT_EXE
file(GLOB_RECURSE examples ${PROJECT_SOURCE_DIR}/examples/*.cpp
${PROJECT_SOURCE_DIR}/examples/*.c ${PROJECT_SOURCE_DIR}/examples/*.hpp
${PROJECT_SOURCE_DIR}/examples/*.h)
file(GLOB_RECURSE tests_source ${PROJECT_SOURCE_DIR}/tests/source/*.cpp
${PROJECT_SOURCE_DIR}/tests/source/*.hpp)
file(GLOB_RECURSE external ${PROJECT_SOURCE_DIR}/examples/lulesh/external/kokkos/*)
file(
GLOB_RECURSE
@@ -86,6 +88,7 @@ if(OMNITRACE_CLANG_FORMAT_EXE
add_custom_target(
format-omnitrace-source
${OMNITRACE_CLANG_FORMAT_EXE} -i ${sources} ${headers} ${examples}
${tests_source}
COMMENT "[omnitrace] Running C++ formatter ${OMNITRACE_CLANG_FORMAT_EXE}...")
endif()
@@ -5,6 +5,8 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <pthread.h>
#include <set>
#include <string>
#include <sys/wait.h>
#include <thread>
@@ -24,71 +26,111 @@ print_info(const char* _name)
int
run(const char* _name, int nchildren)
{
auto _threads = std::vector<std::thread>{};
auto _barrier = pthread_barrier_t{};
auto _threads = std::vector<std::thread>{};
auto _children = std::vector<pid_t>{};
_children.resize(nchildren, 0);
pthread_barrier_init(&_barrier, nullptr, nchildren + 1);
for(int i = 0; i < nchildren; ++i)
{
omnitrace_user_push_region("launch_child");
auto _run = [i, _name]() {
pid_t _pid = fork();
if(_pid == 0)
auto _run = [&_barrier, &_children, i, _name](uint64_t _nsec) {
pthread_barrier_wait(&_barrier);
_children.at(i) = fork();
if(_children.at(i) == 0)
{
// child code
print_info(_name);
printf("[%s][%i] child job starting...\n", _name, getpid());
auto _sleep = [=]() {
std::this_thread::sleep_for(std::chrono::seconds{ i + 1 });
omnitrace_user_push_region("child_process_child_thread");
std::this_thread::sleep_for(std::chrono::seconds{ _nsec });
omnitrace_user_pop_region("child_process_child_thread");
};
omnitrace_user_push_region("child_process");
std::thread{ _sleep }.join();
omnitrace_user_push_region("child_process");
printf("[%s][%i] child job complete\n", _name, getpid());
exit(EXIT_SUCCESS);
}
else
{
pthread_barrier_wait(&_barrier);
}
};
_threads.emplace_back(_run);
_threads.emplace_back(_run, i + 1);
omnitrace_user_pop_region("launch_child");
}
// all child threads should start executing their fork once this returns
pthread_barrier_wait(&_barrier);
// wait for the threads to successfully fork
pthread_barrier_wait(&_barrier);
omnitrace_user_push_region("wait_for_children");
int _status = 0;
pid_t _wait_pid = 0;
// parent waits for all the child processes
while((_wait_pid = wait(&_status)) > 0)
for(auto& itr : _children)
{
printf("[%s][%i] returned from wait with pid = %i :: ", _name, getpid(),
_wait_pid);
if(WIFEXITED(_status))
while(itr == 0)
{}
printf("[%s][%i] performing waitpid(%i, ...)\n", _name, getpid(), itr);
while((_wait_pid = waitpid(itr, &_status, WUNTRACED | WNOHANG)) <= 0)
{
printf("exited, status=%d\n", WEXITSTATUS(_status));
}
else if(WIFSIGNALED(_status))
{
printf("killed by signal %d\n", WTERMSIG(_status));
}
else if(WIFSTOPPED(_status))
{
printf("stopped by signal %d\n", WSTOPSIG(_status));
}
else if(WIFCONTINUED(_status))
{
printf("continued\n");
}
else
{
printf("unknown\n");
if(_wait_pid == 0) continue;
printf("[%s][%i] returned from waitpid(%i) with pid = %i (status = %i) :: ",
_name, getpid(), itr, _wait_pid, _status);
if(WIFEXITED(_status))
{
printf("exited, status=%d\n", WEXITSTATUS(_status));
}
else if(WIFSIGNALED(_status))
{
printf("killed by signal %d\n", WTERMSIG(_status));
}
else if(WIFSTOPPED(_status))
{
printf("stopped by signal %d\n", WSTOPSIG(_status));
}
else if(WIFCONTINUED(_status))
{
printf("continued\n");
}
else
{
printf("unknown\n");
}
}
}
printf("[%s][%i] joining threads ...\n", _name, getpid());
for(auto& itr : _threads)
itr.join();
omnitrace_user_pop_region("wait_for_children");
printf("[%s][%i] returning (error code: %i) ...\n", _name, getpid(), _status);
return _status;
}
int
main(int argc, char** argv)
{
int _n = 4;
if(argc > 1) _n = std::stoi(argv[1]);
int _nfork = 4;
int _nrep = 1;
if(argc > 1) _nfork = std::stoi(argv[1]);
if(argc > 2) _nrep = std::stoi(argv[2]);
print_info(argv[0]);
return run(argv[0], _n);
for(int i = 0; i < _nrep; ++i)
{
auto _ec = run(argv[0], _nfork);
if(_ec != 0) return _ec;
}
printf("[%s][%i] job complete\n", argv[0], getpid());
return EXIT_SUCCESS;
}
@@ -140,12 +140,6 @@ get_initial_environment()
auto _mode = get_env<std::string>("OMNITRACE_MODE", "sampling", false);
update_env(_env, "OMNITRACE_USE_SAMPLING", (_mode != "causal"));
update_env(_env, "OMNITRACE_CRITICAL_TRACE", false);
update_env(_env, "OMNITRACE_USE_PROCESS_SAMPLING", false);
// update_env(_env, "OMNITRACE_USE_PID", false);
// update_env(_env, "OMNITRACE_TIME_OUTPUT", false);
// update_env(_env, "OMNITRACE_OUTPUT_PATH", "omnitrace-output/%tag%/%launch_time%");
#if defined(OMNITRACE_USE_ROCTRACER) || defined(OMNITRACE_USE_ROCPROFILER)
update_env(_env, "HSA_TOOLS_LIB", _dl_libpath);
@@ -1,6 +1,7 @@
set(OMNITRACE_ABORT_FAIL_REGEX
"### ERROR ###|address of faulting memory reference|exiting with non-zero exit code|terminate called after throwing an instance|calling abort.. in |Exit code: [1-9]"
CACHE INTERNAL "Regex to catch abnormal exits when a PASS_REGULAR_EXPRESSION is set")
"### ERROR ###|unknown-hash=|address of faulting memory reference|exiting with non-zero exit code|terminate called after throwing an instance|calling abort.. in |Exit code: [1-9]"
CACHE INTERNAL "Regex to catch abnormal exits when a PASS_REGULAR_EXPRESSION is set"
FORCE)
# adds a ctest for executable
function(OMNITRACE_ADD_BIN_TEST)
@@ -46,6 +46,17 @@
#define OMNITRACE_HIP_VERSION_MAJOR @OMNITRACE_HIP_VERSION_MAJOR@
#define OMNITRACE_HIP_VERSION_MINOR @OMNITRACE_HIP_VERSION_MINOR@
#define OMNITRACE_HIP_VERSION_PATCH @OMNITRACE_HIP_VERSION_PATCH@
// these can be set via defining the variable in CMake, e.g.:
// cmake -D OMNITRACE_CACHELINE_SIZE=N /path/to/source
// if not defined when configuring cmake, these values fall back to
// default values set in core/containers/aligned_static_vector.hpp.
// the OMNITRACE_CACHELINE_SIZE_MIN is used to ensure portability
#cmakedefine OMNITRACE_CACHELINE_SIZE @OMNITRACE_CACHELINE_SIZE@
#cmakedefine OMNITRACE_CACHELINE_SIZE_MIN @OMNITRACE_CACHELINE_SIZE_MIN@
// misc definitions which can be configured by cmake to override the defaults
#cmakedefine OMNITRACE_ROCM_MAX_COUNTERS @OMNITRACE_ROCM_MAX_COUNTERS@
// clang-format on
#define OMNITRACE_VERSION \
@@ -87,16 +98,22 @@
#endif
// clang-format on
#if !defined(OMNITRACE_MAX_COUNTERS)
# define OMNITRACE_MAX_COUNTERS 25
// in general, we want to make sure the cache line size is not less than
// 64 bytes (most common cacheline size for x86-64 CPUs) so unless
// OMNITRACE_CACHELINE_SIZE was explicitly set, we set the min to 64
// and use the max value of OMNITRACE_CACHELINE_SIZE and
// OMNITRACE_CACHELINE_SIZE_MIN to assure that false-sharing is well
// guarded against
#if !defined(OMNITRACE_CACHELINE_SIZE_MIN)
# if defined(OMNITRACE_CACHELINE_SIZE)
# define OMNITRACE_CACHELINE_SIZE_MIN OMNITRACE_CACHELINE_SIZE
# else
# define OMNITRACE_CACHELINE_SIZE_MIN 64
# endif
#endif
#if !defined(OMNITRACE_ROCM_LOOK_AHEAD)
# define OMNITRACE_ROCM_LOOK_AHEAD 128
#endif
#if !defined(OMNITRACE_MAX_ROCM_QUEUES)
# define OMNITRACE_MAX_ROCM_QUEUES OMNITRACE_MAX_THREADS
#if !defined(OMNITRACE_ROCM_MAX_COUNTERS)
# define OMNITRACE_ROCM_MAX_COUNTERS 25
#endif
#define OMNITRACE_ATTRIBUTE(...) __attribute__((__VA_ARGS__))
@@ -183,7 +183,7 @@ address_range::operator+=(address_range _v)
hash_value_t
address_range::hash() const
{
return (is_range()) ? tim::get_combined_hash_id(hash_value_t{ low }, high)
return (is_range()) ? tim::get_hash_id(hash_value_t{ low }, high)
: hash_value_t{ low };
}
} // namespace binary
@@ -52,6 +52,13 @@ using tim::identity_t; // NOLINT
template <typename Tp>
struct use_placement_new_when_generating_unique_ptr : std::false_type
{};
template <typename Tp, typename... Args>
auto
make_unique(Args&&... args)
{
return unique_ptr_t<Tp>{ new Tp{ std::forward<Args>(args)... } };
}
} // namespace omnitrace
namespace tim
@@ -293,7 +293,7 @@ configure_settings(bool _init)
OMNITRACE_CONFIG_SETTING(
bool, "OMNITRACE_USE_ROCM_SMI",
"Enable sampling GPU power, temp, utilization, and memory usage", true, "backend",
"rocm_smi", "rocm");
"rocm_smi", "rocm", "process_sampling");
OMNITRACE_CONFIG_SETTING(
bool, "OMNITRACE_USE_ROCTX",
@@ -1154,7 +1154,6 @@ configure_mode_settings(const std::shared_ptr<settings>& _config)
{
set_default_setting_value("OMNITRACE_USE_SAMPLING", true);
set_default_setting_value("OMNITRACE_USE_PROCESS_SAMPLING", true);
_set("OMNITRACE_CRITICAL_TRACE", false);
}
if(gpu::device_count() == 0)
@@ -1,7 +1,11 @@
#
set(containers_sources)
set(containers_headers ${CMAKE_CURRENT_LIST_DIR}/stable_vector.hpp
${CMAKE_CURRENT_LIST_DIR}/static_vector.hpp)
set(containers_headers
${CMAKE_CURRENT_LIST_DIR}/aligned_static_vector.hpp
${CMAKE_CURRENT_LIST_DIR}/c_array.hpp
${CMAKE_CURRENT_LIST_DIR}/operators.hpp
${CMAKE_CURRENT_LIST_DIR}/stable_vector.hpp
${CMAKE_CURRENT_LIST_DIR}/static_vector.hpp)
target_sources(omnitrace-core-library PRIVATE ${containers_sources} ${containers_headers})
@@ -0,0 +1,326 @@
// MIT License
//
// Copyright (c) 2022 Advanced Micro Devices, Inc. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#pragma once
#include "core/common.hpp"
#include "core/containers/operators.hpp"
#include "core/debug.hpp"
#include "core/exception.hpp"
#include <array>
#include <atomic>
#include <cstdlib>
#include <new>
namespace omnitrace
{
namespace container
{
#if !defined(OMNITRACE_CACHELINE_SIZE)
# ifdef __cpp_lib_hardware_interference_size
# define OMNITRACE_CACHELINE_SIZE std::hardware_destructive_interference_size
# else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
# define OMNITRACE_CACHELINE_SIZE 64
# endif
#endif
constexpr std::size_t cacheline_align_v =
std::max<size_t>(OMNITRACE_CACHELINE_SIZE, OMNITRACE_CACHELINE_SIZE_MIN);
template <typename Tp, size_t N, size_t AlignN = cacheline_align_v,
bool AtomicSizeV = false>
struct aligned_static_vector
{
struct aligned_value_type
{
alignas(AlignN) Tp value = {};
};
using count_type = std::conditional_t<AtomicSizeV, std::atomic<size_t>, size_t>;
using this_type = aligned_static_vector<Tp, N>;
using const_this_type = const aligned_static_vector<Tp, N>;
using value_type = Tp;
using array_type = std::array<aligned_value_type, N>;
using reference = value_type&;
using const_reference = const value_type&;
using pointer = value_type*;
using const_pointer = const value_type*;
using size_type = size_t;
using difference_type = std::ptrdiff_t;
aligned_static_vector() = default;
aligned_static_vector(const aligned_static_vector&) = default;
aligned_static_vector(aligned_static_vector&&) noexcept = default;
aligned_static_vector& operator=(const aligned_static_vector&) = default;
aligned_static_vector& operator=(aligned_static_vector&&) noexcept = default;
aligned_static_vector(size_t _n, Tp _v = {});
aligned_static_vector& operator=(std::initializer_list<Tp>&& _v);
aligned_static_vector& operator=(std::pair<std::array<Tp, N>, size_t>&&);
template <typename... Args>
value_type& emplace_back(Args&&... _v);
template <typename Up>
decltype(auto) push_back(Up&& _v)
{
return emplace_back(Tp{ std::forward<Up>(_v) });
}
void pop_back() { --m_size; }
void clear();
void reserve(size_t) noexcept {}
void shrink_to_fit() noexcept {}
auto capacity() noexcept { return N; }
size_t size() const { return m_size; }
bool empty() const { return (size() == 0); }
reference operator[](size_t _idx) { return m_data[_idx].value; }
const_reference operator[](size_t _idx) const { return m_data[_idx].value; }
reference at(size_t _idx) { return m_data.at(_idx).value; }
const_reference at(size_t _idx) const { return m_data.at(_idx).value; }
reference front() { return m_data.front().value; }
const_reference front() const { return m_data.front().value; }
reference back() { return *(m_data.begin() + size() - 1).value; }
const_reference back() const { return *(m_data.begin() + size() - 1).value; }
void swap(this_type& _v);
friend void swap(this_type& _lhs, this_type& _rhs) { _lhs.swap(_rhs); }
template <typename ContainerT>
struct iterator_base
{
iterator_base(ContainerT* c = nullptr, size_type i = 0)
: m_container(c)
, m_index(i)
{}
iterator_base& operator+=(size_type i)
{
m_index += i;
return *this;
}
iterator_base& operator-=(size_type i)
{
m_index -= i;
return *this;
}
iterator_base& operator++()
{
++m_index;
return *this;
}
iterator_base& operator--()
{
--m_index;
return *this;
}
difference_type operator-(const iterator_base& itr)
{
assert(m_container == itr.m_container);
return m_index - itr.m_index;
}
bool operator<(const iterator_base& itr) const
{
assert(m_container == itr.m_container);
return m_index < itr.m_index;
}
bool operator==(const iterator_base& itr) const
{
return m_container == itr.m_container && m_index == itr.m_index;
}
protected:
ContainerT* m_container;
size_type m_index;
};
public:
struct const_iterator;
struct iterator
: public iterator_base<this_type>
, public random_access_iterator_helper<iterator, value_type>
{
using iterator_base<this_type>::iterator_base;
friend struct const_iterator;
reference operator*() { return (*this->m_container)[this->m_index]; }
};
struct const_iterator
: public iterator_base<const_this_type>
, public random_access_iterator_helper<const_iterator, const value_type>
{
using iterator_base<const_this_type>::iterator_base;
const_iterator(const iterator& itr)
: iterator_base<const_this_type>(itr.m_container, itr.m_index)
{}
const_reference operator*() const { return (*this->m_container)[this->m_index]; }
bool operator==(const const_iterator& itr) const
{
return iterator_base<const_this_type>::operator==(itr);
}
friend bool operator==(const iterator& l, const const_iterator& r)
{
return r == l;
}
};
iterator begin() noexcept { return { this, 0 }; }
const_iterator begin() const noexcept { return { this, 0 }; }
const_iterator cbegin() const noexcept { return begin(); }
iterator end() noexcept { return { this, size() }; }
const_iterator end() const noexcept { return { this, size() }; }
const_iterator cend() const noexcept { return end(); }
private:
count_type m_size = count_type{ 0 };
array_type m_data = {};
};
template <typename Tp, size_t N, size_t AlignN, bool AtomicSizeV>
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>::aligned_static_vector(size_t _n, Tp _v)
{
m_data.fill(_v);
if constexpr(AtomicSizeV)
m_size.store(_n);
else
m_size = _n;
}
template <typename Tp, size_t N, size_t AlignN, bool AtomicSizeV>
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>&
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>::operator=(
std::initializer_list<Tp>&& _v)
{
if(OMNITRACE_UNLIKELY(_v.size() > N))
{
throw exception<std::out_of_range>(
std::string{ "aligned_static_vector::operator=(initializer_list) size > " } +
std::to_string(N));
}
clear();
for(auto&& itr : _v)
m_data[m_size++] = itr;
return *this;
}
template <typename Tp, size_t N, size_t AlignN, bool AtomicSizeV>
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>&
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>::operator=(
std::pair<std::array<Tp, N>, size_t>&& _v)
{
if constexpr(AtomicSizeV) m_size.store(0);
for(size_t i = 0; i < N; ++i)
m_data[i].value = std::move(_v.first[i]);
if constexpr(AtomicSizeV)
m_size.store(_v.second);
else
m_size = _v.second;
return *this;
}
template <typename Tp, size_t N, size_t AlignN, bool AtomicSizeV>
void
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>::clear()
{
if constexpr(AtomicSizeV)
m_size.store(0);
else
m_size = 0;
}
template <typename Tp, size_t N, size_t AlignN, bool AtomicSizeV>
void
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>::swap(this_type& _v)
{
if constexpr(AtomicSizeV)
{
auto _t_size = m_size;
auto _v_size = _v.m_size;
std::swap(m_data, _v.m_data);
m_size.store(_v_size);
_v.m_size.store(_t_size);
}
else
{
std::swap(m_size, _v.m_size);
std::swap(m_data, _v.m_data);
}
}
template <typename Tp, size_t N, size_t AlignN, bool AtomicSizeV>
template <typename... Args>
Tp&
aligned_static_vector<Tp, N, AlignN, AtomicSizeV>::emplace_back(Args&&... _v)
{
auto _idx = m_size++;
if(_idx >= N)
{
throw exception<std::out_of_range>(
std::string{ "aligned_static_vector::emplace_back - reached capacity " } +
std::to_string(N));
}
if constexpr(sizeof...(Args) > 0)
{
if constexpr(std::is_assignable<Tp, decltype(std::forward<Args>(_v))...>::value)
m_data[_idx].value = { std::forward<Args>(_v)... };
else if constexpr(std::is_constructible<Tp, decltype(std::forward<Args>(
_v))...>::value)
m_data[_idx].value = Tp{ std::forward<Args>(_v)... };
else
static_assert(
sizeof...(Args) == 0,
"Error! Tp is not assignable or constructible with provided args");
}
else
{
// _v... expands to nothing but is used to suppress unused variable warnings
m_data[_idx].value = { _v... };
}
return m_data[_idx].value;
}
} // namespace container
} // namespace omnitrace
@@ -22,8 +22,8 @@
#pragma once
#include "core/containers/aligned_static_vector.hpp"
#include "core/containers/operators.hpp"
#include "core/containers/static_vector.hpp"
#include "core/defines.hpp"
#include <algorithm>
@@ -38,7 +38,8 @@ namespace omnitrace
{
namespace container
{
template <typename Tp, size_t ChunkSizeV = OMNITRACE_MAX_THREADS>
template <typename Tp, size_t ChunkSizeV = OMNITRACE_MAX_THREADS,
size_t AlignN = alignof(Tp)>
class stable_vector
{
public:
@@ -62,8 +63,8 @@ private:
static_assert(ChunkSizeV > 0, "ChunkSize needs to be greater than zero");
static_assert(is_pow2<ChunkSizeV>::value, "ChunkSize needs to be a power of 2");
using this_type = stable_vector<Tp, ChunkSizeV>;
using const_this_type = const stable_vector<Tp, ChunkSizeV>;
using this_type = stable_vector<Tp, ChunkSizeV, AlignN>;
using const_this_type = const stable_vector<Tp, ChunkSizeV, AlignN>;
template <typename ContainerT>
struct iterator_base
@@ -120,7 +121,6 @@ public:
struct iterator
: public iterator_base<this_type>
//, std::iterator<std::random_access_iterator_tag, value_type>
, public random_access_iterator_helper<iterator, value_type>
{
using iterator_base<this_type>::iterator_base;
@@ -131,7 +131,6 @@ public:
struct const_iterator
: public iterator_base<const_this_type>
//, std::iterator<std::random_access_iterator_tag, const value_type>
, public random_access_iterator_helper<const_iterator, const value_type>
{
using iterator_base<const_this_type>::iterator_base;
@@ -221,7 +220,7 @@ public:
const_reference at(size_type i) const;
private:
using chunk_type = container::static_vector<Tp, ChunkSizeV, true>;
using chunk_type = container::aligned_static_vector<Tp, ChunkSizeV, AlignN, true>;
using storage_type = std::vector<std::unique_ptr<chunk_type>>;
void add_chunk();
@@ -230,17 +229,17 @@ private:
storage_type m_chunks;
};
template <typename Tp, size_t ChunkSizeV>
stable_vector<Tp, ChunkSizeV>::stable_vector(size_type count, const Tp& value)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
stable_vector<Tp, ChunkSizeV, AlignN>::stable_vector(size_type count, const Tp& value)
{
for(size_type i = 0; i < count; ++i)
{
push_back(value);
emplace_back(value);
}
}
template <typename Tp, size_t ChunkSizeV>
stable_vector<Tp, ChunkSizeV>::stable_vector(size_type count)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
stable_vector<Tp, ChunkSizeV, AlignN>::stable_vector(size_type count)
{
for(size_type i = 0; i < count; ++i)
{
@@ -248,18 +247,18 @@ stable_vector<Tp, ChunkSizeV>::stable_vector(size_type count)
}
}
template <typename Tp, size_t ChunkSizeV>
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
template <typename InputItrT, typename>
stable_vector<Tp, ChunkSizeV>::stable_vector(InputItrT first, InputItrT last)
stable_vector<Tp, ChunkSizeV, AlignN>::stable_vector(InputItrT first, InputItrT last)
{
for(; first != last; ++first)
{
push_back(*first);
emplace_back(*first);
}
}
template <typename Tp, size_t ChunkSizeV>
stable_vector<Tp, ChunkSizeV>::stable_vector(const stable_vector& other)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
stable_vector<Tp, ChunkSizeV, AlignN>::stable_vector(const stable_vector& other)
{
for(const auto& chunk : other.m_chunks)
{
@@ -267,38 +266,38 @@ stable_vector<Tp, ChunkSizeV>::stable_vector(const stable_vector& other)
}
}
template <typename Tp, size_t ChunkSizeV>
stable_vector<Tp, ChunkSizeV>::stable_vector(stable_vector&& other) noexcept
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
stable_vector<Tp, ChunkSizeV, AlignN>::stable_vector(stable_vector&& other) noexcept
: m_chunks(std::move(other.m_chunks))
{}
template <typename Tp, size_t ChunkSizeV>
stable_vector<Tp, ChunkSizeV>::stable_vector(std::initializer_list<Tp> ilist)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
stable_vector<Tp, ChunkSizeV, AlignN>::stable_vector(std::initializer_list<Tp> ilist)
{
for(const auto& t : ilist)
{
push_back(t);
emplace_back(t);
}
}
template <typename Tp, size_t ChunkSizeV>
stable_vector<Tp, ChunkSizeV>&
stable_vector<Tp, ChunkSizeV>::operator=(stable_vector v)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
stable_vector<Tp, ChunkSizeV, AlignN>&
stable_vector<Tp, ChunkSizeV, AlignN>::operator=(stable_vector v)
{
swap(v);
return *this;
}
template <typename Tp, size_t ChunkSizeV>
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
void
stable_vector<Tp, ChunkSizeV>::add_chunk()
stable_vector<Tp, ChunkSizeV, AlignN>::add_chunk()
{
m_chunks.emplace_back(std::make_unique<chunk_type>());
}
template <typename Tp, size_t ChunkSizeV>
typename stable_vector<Tp, ChunkSizeV>::chunk_type&
stable_vector<Tp, ChunkSizeV>::last_chunk()
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
typename stable_vector<Tp, ChunkSizeV, AlignN>::chunk_type&
stable_vector<Tp, ChunkSizeV, AlignN>::last_chunk()
{
if(OMNITRACE_UNLIKELY(m_chunks.empty() || m_chunks.back()->size() == ChunkSizeV))
{
@@ -308,9 +307,9 @@ stable_vector<Tp, ChunkSizeV>::last_chunk()
return *m_chunks.back();
}
template <typename Tp, size_t ChunkSizeV>
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
void
stable_vector<Tp, ChunkSizeV>::reserve(size_type new_capacity)
stable_vector<Tp, ChunkSizeV, AlignN>::reserve(size_type new_capacity)
{
const size_t initial_capacity = capacity();
for(difference_type i = new_capacity - initial_capacity; i > 0; i -= ChunkSizeV)
@@ -319,45 +318,45 @@ stable_vector<Tp, ChunkSizeV>::reserve(size_type new_capacity)
}
}
template <typename Tp, size_t ChunkSizeV>
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
void
stable_vector<Tp, ChunkSizeV>::push_back(const Tp& t)
stable_vector<Tp, ChunkSizeV, AlignN>::push_back(const Tp& t)
{
last_chunk().push_back(t);
}
template <typename Tp, size_t ChunkSizeV>
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
void
stable_vector<Tp, ChunkSizeV>::push_back(Tp&& t)
stable_vector<Tp, ChunkSizeV, AlignN>::push_back(Tp&& t)
{
last_chunk().push_back(std::move(t));
}
template <typename Tp, size_t ChunkSizeV>
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
template <typename... Args>
void
stable_vector<Tp, ChunkSizeV>::emplace_back(Args&&... args)
stable_vector<Tp, ChunkSizeV, AlignN>::emplace_back(Args&&... args)
{
last_chunk().emplace_back(std::forward<Args>(args)...);
}
template <typename Tp, size_t ChunkSizeV>
typename stable_vector<Tp, ChunkSizeV>::reference
stable_vector<Tp, ChunkSizeV>::operator[](size_type i)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
typename stable_vector<Tp, ChunkSizeV, AlignN>::reference
stable_vector<Tp, ChunkSizeV, AlignN>::operator[](size_type i)
{
return (*m_chunks[i / ChunkSizeV])[i % ChunkSizeV];
}
template <typename Tp, size_t ChunkSizeV>
typename stable_vector<Tp, ChunkSizeV>::const_reference
stable_vector<Tp, ChunkSizeV>::operator[](size_type i) const
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
typename stable_vector<Tp, ChunkSizeV, AlignN>::const_reference
stable_vector<Tp, ChunkSizeV, AlignN>::operator[](size_type i) const
{
return const_cast<this_type&>(*this)[i];
}
template <typename Tp, size_t ChunkSizeV>
typename stable_vector<Tp, ChunkSizeV>::reference
stable_vector<Tp, ChunkSizeV>::at(size_type i)
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
typename stable_vector<Tp, ChunkSizeV, AlignN>::reference
stable_vector<Tp, ChunkSizeV, AlignN>::at(size_type i)
{
if(OMNITRACE_UNLIKELY(i >= size()))
{
@@ -369,16 +368,16 @@ stable_vector<Tp, ChunkSizeV>::at(size_type i)
return operator[](i);
}
template <typename Tp, size_t ChunkSizeV>
typename stable_vector<Tp, ChunkSizeV>::const_reference
stable_vector<Tp, ChunkSizeV>::at(size_type i) const
template <typename Tp, size_t ChunkSizeV, size_t AlignN>
typename stable_vector<Tp, ChunkSizeV, AlignN>::const_reference
stable_vector<Tp, ChunkSizeV, AlignN>::at(size_type i) const
{
return const_cast<this_type&>(*this).at(i);
}
template <typename Tp, size_t ChunkSizeV, typename... Args>
template <typename Tp, size_t ChunkSizeV, size_t AlignN, typename... Args>
auto
resize(stable_vector<Tp, ChunkSizeV>& _v, size_t _n, Args&&... args)
resize(stable_vector<Tp, ChunkSizeV, AlignN>& _v, size_t _n, Args&&... args)
{
if(_n > _v.capacity()) _v.reserve(_n);
@@ -40,9 +40,16 @@ namespace container
template <typename Tp, size_t N, bool AtomicSizeV = false>
struct static_vector
{
using count_type = std::conditional_t<AtomicSizeV, std::atomic<size_t>, size_t>;
using this_type = static_vector<Tp, N>;
using value_type = Tp;
using count_type = std::conditional_t<AtomicSizeV, std::atomic<size_t>, size_t>;
using this_type = static_vector<Tp, N>;
using value_type = Tp;
using array_type = std::array<Tp, N>;
using reference = value_type&;
using const_reference = const value_type&;
using pointer = value_type*;
using const_pointer = const value_type*;
using size_type = size_t;
using difference_type = std::ptrdiff_t;
static_vector() = default;
static_vector(const static_vector&) = default;
@@ -635,9 +635,10 @@ as_hex<void*>(void*, size_t);
#define OMNITRACE_REQUIRE(...) TIMEMORY_REQUIRE(__VA_ARGS__)
#define OMNITRACE_PREFER(COND) \
(OMNITRACE_LIKELY(COND)) ? ::tim::log::base() \
: (::omnitrace::get_is_continuous_integration()) ? TIMEMORY_FATAL \
: TIMEMORY_WARNING
((OMNITRACE_LIKELY(COND)) \
? ::tim::log::base() \
: ((::omnitrace::get_is_continuous_integration()) ? TIMEMORY_FATAL \
: TIMEMORY_WARNING))
//--------------------------------------------------------------------------------------//
//
@@ -112,10 +112,10 @@ ensure_initialization(bool _offset, int64_t _glob_n, int64_t _offset_n)
auto _exit_info = component::exit_gotcha::get_exit_info();
if(_exit_info.is_known && _exit_info.exit_code != EXIT_SUCCESS) return _offset;
auto _tid = utility::get_thread_index();
auto _max_threads = grow_data(_tid + 1);
auto _tid = utility::get_thread_index();
auto _peak_num_threads = grow_data(_tid + 1);
if(_tid > 0 && _tid < _max_threads)
if(_tid > 0 && _tid < _peak_num_threads)
{
const auto& _info = thread_info::get();
OMNITRACE_BASIC_VERBOSE_F(3,
@@ -123,7 +123,7 @@ ensure_initialization(bool _offset, int64_t _glob_n, int64_t _offset_n)
"offset counter: %li, max threads: %li\n",
std::to_string(static_cast<bool>(_info)).c_str(),
std::to_string(_offset).c_str(), _glob_n, _offset_n,
_max_threads);
_peak_num_threads);
}
return _offset;
@@ -762,29 +762,7 @@ omnitrace_finalize_hidden(void)
[](int) {});
OMNITRACE_DEBUG_F("Copying over all timemory hash information to main thread...\n");
// copy these over so that all hashes are known
auto& _hmain = tim::hash::get_main_hash_ids();
auto& _amain = tim::hash::get_main_hash_aliases();
auto& _hzero = tracing::get_timemory_hash_ids(0);
auto& _azero = tracing::get_timemory_hash_aliases(0);
for(size_t i = 0; i < max_supported_threads; ++i)
{
auto& _hitr = tracing::get_timemory_hash_ids(i);
auto& _aitr = tracing::get_timemory_hash_aliases(i);
if(_hmain && _hitr)
{
for(const auto& itr : *_hitr)
_hmain->emplace(itr.first, itr.second);
}
if(_amain && _aitr)
{
for(auto itr : *_aitr)
_amain->emplace(itr.first, itr.second);
}
}
if(_hzero && _hmain) *_hzero = *_hmain;
if(_azero && _amain) *_azero = *_amain;
tracing::copy_timemory_hash_ids();
// stop the main bundle which has stats for run
if(get_main_bundle())
@@ -809,11 +787,12 @@ omnitrace_finalize_hidden(void)
}
OMNITRACE_DEBUG_F("Stopping and destroying instrumentation bundles...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
auto& itr = instrumentation_bundles::instances().at(i);
if(!instrumentation_bundles::get()) continue;
const auto& _info = thread_info::get(i, SequentTID);
while(!itr.bundles.empty())
auto& itr = instrumentation_bundles::get()->at(i);
while(itr != nullptr && !itr->empty())
{
int _lvl = 1;
if(_info->is_offset)
@@ -824,14 +803,11 @@ omnitrace_finalize_hidden(void)
OMNITRACE_VERBOSE_F(_lvl,
"Warning! instrumentation bundle on thread %zu (TID=%li) "
"with label '%s' was not stopped.\n",
i, itr.bundles.back()->tid(),
itr.bundles.back()->key().c_str());
i, itr->back()->tid(), itr->back()->key().c_str());
itr.bundles.back()->stop();
itr.bundles.back()->pop();
itr.allocator.destroy(itr.bundles.back());
itr.allocator.deallocate(itr.bundles.back(), 1);
itr.bundles.pop_back();
itr->back()->stop();
itr->back()->pop();
itr->pop_back();
}
}
@@ -907,15 +883,18 @@ omnitrace_finalize_hidden(void)
// thread-specific data will be wrong if try to stop them from
// the main thread.
auto _thr_verbose = (config::get_use_causal()) ? 1 : 0;
for(auto& itr : thread_data<thread_bundle_t>::instances())
if(thread_data<thread_bundle_t>::get())
{
if(itr && itr->get<comp::wall_clock>() &&
!itr->get<comp::wall_clock>()->get_is_running())
for(auto& itr : *thread_data<thread_bundle_t>::get())
{
std::string _msg = JOIN("", *itr);
auto _pos = _msg.find(">>> ");
if(_pos != std::string::npos) _msg = _msg.substr(_pos + 5);
OMNITRACE_VERBOSE_F(_thr_verbose, "%s\n", _msg.c_str());
if(itr && itr->get<comp::wall_clock>() &&
!itr->get<comp::wall_clock>()->get_is_running())
{
std::string _msg = JOIN("", *itr);
auto _pos = _msg.find(">>> ");
if(_pos != std::string::npos) _msg = _msg.substr(_pos + 5);
OMNITRACE_VERBOSE_F(_thr_verbose, "%s\n", _msg.c_str());
}
}
}
@@ -937,23 +916,25 @@ omnitrace_finalize_hidden(void)
if(get_use_critical_trace() || (get_use_rocm_smi() && get_use_roctracer()))
{
OMNITRACE_VERBOSE_F(1, "Generating the critical trace...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
using critical_trace_hash_data =
thread_data<critical_trace::hash_ids, critical_trace::id>;
if(critical_trace_hash_data::instances().at(i))
if(i < critical_trace_hash_data::get()->size() &&
critical_trace_hash_data::get()->at(i))
{
OMNITRACE_DEBUG_F("Copying the hash id data for thread %zu...\n", i);
critical_trace::add_hash_id(*critical_trace_hash_data::instances().at(i));
critical_trace::add_hash_id(*critical_trace_hash_data::get()->at(i));
}
}
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
using critical_trace_chain_data = thread_data<critical_trace::call_chain>;
if(critical_trace_chain_data::instances().at(i))
if(i < critical_trace_chain_data::get()->size() &&
critical_trace_chain_data::get()->at(i))
{
OMNITRACE_DEBUG_F(
"Updating the critical trace call-chains for thread %zu...\n", i);
@@ -991,6 +972,8 @@ omnitrace_finalize_hidden(void)
coverage::post_process();
}
tracing::copy_timemory_hash_ids();
bool _perfetto_output_error = false;
if(get_use_perfetto())
{
@@ -60,8 +60,7 @@ get_progress_map(int64_t _tid)
auto&
get_progress_allocator(int64_t _tid)
{
static auto& _v = thread_data<progress_allocator_t>::instances(construct_on_init{});
return _v.at(_tid);
return thread_data<progress_allocator_t>::instance(construct_on_thread{ _tid });
}
} // namespace
@@ -95,6 +95,12 @@ auto speedup_dist = []() {
auto perform_experiment_impl_completed = std::unique_ptr<std::promise<void>>{};
auto num_progress_points = std::atomic<size_t>{ 0 };
auto&
get_progress_bundles(int64_t _tid = utility::get_thread_index())
{
return progress_bundles_t::instance(construct_on_thread{ _tid });
}
template <typename ContextT>
auto&
get_engine()
@@ -107,7 +113,7 @@ get_engine()
}();
static thread_local auto _v =
random_engine_t{ tim::get_combined_hash_id(_seed, utility::get_thread_index()) };
random_engine_t{ tim::get_hash_id(_seed, utility::get_thread_index()) };
return _v;
}
@@ -965,11 +971,14 @@ push_progress_point(std::string_view _name)
++num_progress_points;
auto _hash = tim::add_hash_id(_name);
auto& _data = progress_bundles_t::instance(utility::get_thread_index());
auto* _bundle = _data.construct(_hash);
_bundle->push();
_bundle->start();
auto _hash = tim::add_hash_id(_name);
auto& _data = get_progress_bundles();
if(OMNITRACE_LIKELY(_data != nullptr))
{
auto* _bundle = _data->construct(_hash);
_bundle->push();
_bundle->start();
}
}
void
@@ -977,26 +986,26 @@ pop_progress_point(std::string_view _name)
{
if(config::get_causal_end_to_end()) return;
auto& _data = progress_bundles_t::instance(utility::get_thread_index());
if(_data.empty()) return;
auto& _data = get_progress_bundles();
if(OMNITRACE_UNLIKELY(!_data || _data->empty())) return;
if(_name.empty())
{
auto* itr = _data.back();
auto* itr = _data->back();
itr->stop();
itr->pop();
_data.pop_back();
_data->pop_back();
return;
}
else
{
auto _hash = tim::add_hash_id(_name);
for(auto itr = _data.rbegin(); itr != _data.rend(); ++itr)
for(auto itr = _data->rbegin(); itr != _data->rend(); ++itr)
{
if((*itr)->get_hash() == _hash)
{
(*itr)->stop();
(*itr)->pop();
_data.destroy(itr);
_data->destroy(itr);
return;
}
}
@@ -1010,13 +1019,16 @@ mark_progress_point(std::string_view _name, bool _force)
++num_progress_points;
auto _hash = tim::add_hash_id(_name);
auto& _data = progress_bundles_t::instance(utility::get_thread_index());
auto* _bundle = _data.construct(_hash);
_bundle->push();
_bundle->mark();
_bundle->pop();
_data.pop_back();
auto _hash = tim::add_hash_id(_name);
auto& _data = get_progress_bundles();
if(OMNITRACE_LIKELY(_data != nullptr))
{
auto* _bundle = _data->construct(_hash);
_bundle->push();
_bundle->mark();
_bundle->pop();
_data->pop_back();
}
}
uint16_t
@@ -573,7 +573,7 @@ post_process()
block_samples();
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
auto& _causal = get_causal_sampler(i);
if(_causal) _causal->stop();
@@ -586,7 +586,7 @@ post_process()
auto _allocator = get_causal_sampler_allocator(false);
if(_allocator) _allocator->flush();
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
auto& _causal = get_causal_sampler(i);
auto _causal_data =
@@ -595,7 +595,7 @@ post_process()
if(!_causal_data.empty()) post_process_causal(i, _causal_data);
}
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
get_causal_sampler(i).reset();
@@ -121,6 +121,7 @@ backtrace::filter_and_patch(const std::vector<entry_type>& _data)
if(_keep_internal) return 1;
if(_lbl.find("omnitrace_main") != _npos) return 0;
if(_lbl.find("omnitrace::") != _npos) return 0;
if(_lbl.find("tim::openmp::") != _npos) return -1;
if(_lbl.find("tim::") != _npos) return 0;
if(_lbl.find("DYNINST_") != _npos) return 0;
if(_lbl.find("omnitrace_") != _npos) return -1;
@@ -96,30 +96,25 @@ struct perfetto_rusage
unique_ptr_t<std::vector<std::string>>&
get_papi_labels(int64_t _tid)
{
static auto& _v = papi_label_instances::instances(construct_on_init{});
return _v.at(_tid);
return papi_label_instances::instance(construct_on_thread{ _tid });
}
unique_ptr_t<hw_counters>&
get_papi_vector(int64_t _tid)
{
static auto& _v = papi_vector_instances::instances();
if(_tid == threading::get_id()) papi_vector_instances::construct();
return _v.at(_tid);
return papi_vector_instances::instance(construct_on_thread{ _tid });
}
unique_ptr_t<backtrace_metrics>&
get_backtrace_metrics_init(int64_t _tid)
{
static auto& _v = backtrace_metrics_init_instances::instances();
return _v.at(_tid);
return backtrace_metrics_init_instances::instance(construct_on_thread{ _tid });
}
unique_ptr_t<bool>&
get_sampler_running(int64_t _tid)
{
static auto& _v = sampler_running_instances::instances(construct_on_init{}, false);
return _v.at(_tid);
return sampler_running_instances::instance(construct_on_thread{ _tid }, false);
}
} // namespace
@@ -248,6 +243,9 @@ backtrace_metrics::init_perfetto(int64_t _tid, valid_array_t _valid)
if(!perfetto_counter_track<perfetto_rusage>::exists(_tid))
{
if(get_valid(category::thread_cpu_time{}, _valid))
perfetto_counter_track<perfetto_rusage>::emplace(
_tid, JOIN(' ', "Thread CPU time", _tid_name, "(S)"), "sec");
if(get_valid(category::thread_peak_memory{}, _valid))
perfetto_counter_track<perfetto_rusage>::emplace(
_tid, JOIN(' ', "Thread Peak Memory Usage", _tid_name, "(S)"), "MB");
@@ -283,24 +281,35 @@ backtrace_metrics::fini_perfetto(int64_t _tid, valid_array_t _valid)
OMNITRACE_CI_THROW(!_thread_info, "Error! missing thread info for tid=%li\n", _tid);
if(!_thread_info) return;
uint64_t _ts = _thread_info->get_stop();
uint64_t _ts = _thread_info->get_stop();
uint64_t _rusage_idx = 0;
if(get_valid(category::thread_cpu_time{}, _valid))
{
TRACE_COUNTER(trait::name<category::thread_cpu_time>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, 0);
}
if(get_valid(category::thread_peak_memory{}, _valid))
{
TRACE_COUNTER(trait::name<category::thread_peak_memory>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, 0), _ts, 0);
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, 0);
}
if(get_valid(category::thread_context_switch{}, _valid))
{
TRACE_COUNTER(trait::name<category::thread_context_switch>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, 1), _ts, 0);
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, 0);
}
if(get_valid(category::thread_page_fault{}, _valid))
{
TRACE_COUNTER(trait::name<category::thread_page_fault>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, 2), _ts, 0);
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, 0);
}
if(get_valid(type_list<hw_counters>{}, _valid) &&
@@ -321,6 +330,12 @@ backtrace_metrics&
backtrace_metrics::operator-=(const backtrace_metrics& _rhs)
{
auto& _lhs = *this;
if(_lhs(category::thread_cpu_time{}))
{
_lhs.m_cpu -= _rhs.m_cpu;
}
if(_lhs(category::thread_peak_memory{}))
{
_lhs.m_mem_peak -= _rhs.m_mem_peak;
@@ -348,25 +363,34 @@ backtrace_metrics::operator-=(const backtrace_metrics& _rhs)
void
backtrace_metrics::post_process_perfetto(int64_t _tid, uint64_t _ts) const
{
uint64_t _rusage_idx = 0;
if((*this)(category::thread_cpu_time{}))
{
TRACE_COUNTER(trait::name<category::thread_cpu_time>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, m_cpu / units::sec);
}
if((*this)(category::thread_peak_memory{}))
{
TRACE_COUNTER(trait::name<category::thread_peak_memory>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, 0), _ts,
m_mem_peak / units::megabyte);
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, m_mem_peak / units::megabyte);
}
if((*this)(category::thread_context_switch{}))
{
TRACE_COUNTER(trait::name<category::thread_context_switch>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, 1), _ts,
m_ctx_swch);
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, m_ctx_swch);
}
if((*this)(category::thread_page_fault{}))
{
TRACE_COUNTER(trait::name<category::thread_page_fault>::value,
perfetto_counter_track<perfetto_rusage>::at(_tid, 2), _ts,
m_page_flt);
perfetto_counter_track<perfetto_rusage>::at(_tid, _rusage_idx++),
_ts, m_page_flt);
}
if((*this)(type_list<hw_counters>{}) && (*this)(category::thread_hardware_counter{}))
@@ -23,6 +23,7 @@
#include "library/components/pthread_create_gotcha.hpp"
#include "core/config.hpp"
#include "core/debug.hpp"
#include "core/locking.hpp"
#include "core/state.hpp"
#include "core/utility.hpp"
#include "library/causal/delay.hpp"
@@ -32,14 +33,18 @@
#include "library/sampling.hpp"
#include "library/thread_data.hpp"
#include "library/thread_info.hpp"
#include "library/tracing.hpp"
#include <timemory/backends/threading.hpp>
#include <timemory/components/macros.hpp>
#include <timemory/components/timing/wall_clock.hpp>
#include <timemory/hash/types.hpp>
#include <timemory/mpl/types.hpp>
#include <timemory/sampling/allocator.hpp>
#include <timemory/units.hpp>
#include <timemory/utility/types.hpp>
#include <csignal>
#include <ostream>
#include <pthread.h>
#include <utility>
@@ -74,11 +79,12 @@ auto bundles_dtor = scope::destructor{ []() {
template <typename... Args>
inline void
start_bundle(bundle_t& _bundle, Args&&... _args)
start_bundle(bundle_t& _bundle, int64_t _tid, Args&&... _args)
{
if(!get_use_timemory() && !get_use_perfetto()) return;
trait::runtime_enabled<comp::roctracer_data>::set(get_use_roctracer());
OMNITRACE_BASIC_VERBOSE_F(3, "starting bundle '%s'...\n", _bundle.key().c_str());
OMNITRACE_BASIC_VERBOSE_F(3, "starting bundle '%s' in thread %li...\n",
_bundle.key().c_str(), _tid);
if constexpr(sizeof...(Args) > 0)
{
const char* _name = nullptr;
@@ -94,7 +100,7 @@ start_bundle(bundle_t& _bundle, Args&&... _args)
}
if(get_use_timemory())
{
_bundle.push();
_bundle.push(_tid);
_bundle.start();
}
}
@@ -139,7 +145,11 @@ stop_bundle(bundle_t& _bundle, int64_t _tid, Args&&... _args)
}
}
std::set<pthread_create_gotcha::native_handle_t> native_handles = {};
using native_handle_set_t = std::set<pthread_create_gotcha::native_handle_t>;
auto native_handles = native_handle_set_t{};
auto internal_native_handles = native_handle_set_t{};
auto native_handles_mutex = locking::atomic_mutex{};
} // namespace
//--------------------------------------------------------------------------------------//
@@ -210,6 +220,13 @@ pthread_create_gotcha::wrapper::operator()() const
auto _active = (get_state() == ::omnitrace::State::Active && bundles != nullptr &&
bundles_mutex != nullptr);
if(m_config.offset)
{
auto _lk = locking::atomic_lock{ native_handles_mutex };
internal_native_handles.emplace(pthread_self());
}
if(_active && !_coverage && !m_config.offset)
{
_tid = _info->index_data->sequent_value;
@@ -220,13 +237,13 @@ pthread_create_gotcha::wrapper::operator()() const
threading::set_thread_name(TIMEMORY_JOIN(" ", "Thread", _tid).c_str());
auto _manager = tim::manager::instance();
if(_manager) _manager->initialize();
if(!thread_bundle_data_t::instances().at(_tid))
if(!thread_bundle_data_t::get()->at(_tid))
{
thread_data<thread_bundle_t>::construct(
TIMEMORY_JOIN('/', "omnitrace/process", process::get_id(), "thread",
_tid),
quirk::config<quirk::auto_start>{});
thread_bundle_data_t::instances().at(_tid)->start();
thread_bundle_data_t::get()->at(_tid)->start();
}
if(bundles && bundles_mutex)
{
@@ -234,7 +251,7 @@ pthread_create_gotcha::wrapper::operator()() const
_bundle = bundles->emplace(_tid, std::make_shared<bundle_t>("start_thread"))
.first->second;
}
if(_bundle) start_bundle(*_bundle);
if(_bundle) start_bundle(*_bundle, _tid);
get_cpu_cid_stack(_tid, m_config.parent_tid);
if(m_config.enable_causal)
{
@@ -299,13 +316,28 @@ pthread_create_gotcha::wrapper::wrap(void* _arg)
wrapper* _wrapper = static_cast<wrapper*>(_arg);
// store the handle
native_handles.emplace(_self);
{
auto _lk = locking::atomic_lock{ native_handles_mutex };
native_handles.emplace(_self);
}
static thread_local auto _remover = scope::destructor{ []() {
if(get_state() >= omnitrace::State::Finalized) return;
// remove the handle even if original function aborts
auto _lk = locking::atomic_lock{ native_handles_mutex };
native_handles.erase(pthread_self());
} };
(void) _remover;
// execute the original function
void* _ret = (*_wrapper)();
// remove the handle
if(::pthread_equal(_self, pthread_self()) == 0) native_handles.erase(_self);
if(::pthread_equal(_self, pthread_self()) == 0)
{
auto _lk = locking::atomic_lock{ native_handles_mutex };
native_handles.erase(_self);
}
// eliminate memory leak
if(_ret != _arg) delete _wrapper;
@@ -313,6 +345,20 @@ pthread_create_gotcha::wrapper::wrap(void* _arg)
return _ret;
}
namespace
{
const auto shutdown_signal_v = SIGRTMAX - 1;
size_t shutdown_signals_delivered = 0;
void
pthread_create_gotcha_shutdown_handler(int)
{
pthread_create_gotcha::shutdown(threading::get_id());
++shutdown_signals_delivered;
}
} // namespace
void
pthread_create_gotcha::configure()
{
@@ -322,6 +368,8 @@ pthread_create_gotcha::configure()
0, int, pthread_t*, const pthread_attr_t*, void* (*) (void*), void*>(
"pthread_create");
};
tim::hash::add_hash_id("start_thread");
}
void
@@ -335,8 +383,67 @@ pthread_create_gotcha::shutdown()
if(!bundles_mutex || !bundles) return;
unsigned long _ndangling = 0;
for(const auto& itr : *bundles)
{
if(itr.second) ++_ndangling;
}
tracing::copy_timemory_hash_ids();
// enable the signal handler for when the timeout is reached
struct sigaction _action = {};
struct sigaction _former = {};
memset(&_action, 0, sizeof(_action));
memset(&_former, 0, sizeof(_former));
sigemptyset(&_action.sa_mask);
sigemptyset(&_former.sa_mask);
_action.sa_flags = SA_RESTART;
_action.sa_handler = pthread_create_gotcha_shutdown_handler;
// activate signal handler
sigaction(shutdown_signal_v, &_action, &_former);
size_t _expected_shutdown_signals_delivered = 0;
{
auto _lk = locking::atomic_lock{ native_handles_mutex };
for(auto itr : native_handles)
{
// skip sending signals to internal threads
if(internal_native_handles.count(itr) != 0) continue;
if(pthread_equal(pthread_self(), itr) == 0 && pthread_equal(itr, itr) != 0)
{
::pthread_kill(itr, shutdown_signal_v);
++_expected_shutdown_signals_delivered;
}
}
auto _nattempt = 0U;
constexpr auto nmax_attempt = 20U;
while(shutdown_signals_delivered < _expected_shutdown_signals_delivered &&
_nattempt++ < nmax_attempt)
{
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::milliseconds{ 50 });
}
OMNITRACE_CI_BASIC_FAIL(
shutdown_signals_delivered != _expected_shutdown_signals_delivered,
"Number of signals delivered (%zu) != expected number of signals delievered "
"(%zu)",
shutdown_signals_delivered, _expected_shutdown_signals_delivered);
}
// restore existing signal handler
sigaction(shutdown_signal_v, &_former, nullptr);
// subtract the bundles that had signals delivered
_ndangling -= shutdown_signals_delivered;
// stop any remaining dangling bundles on this thread
std::unique_lock<std::mutex> _lk{ *bundles_mutex };
unsigned long _ndangling = 0;
for(auto itr : *bundles)
{
if(itr.second)
@@ -480,7 +587,8 @@ pthread_create_gotcha::operator()(pthread_t* thread, const pthread_attr_t* attr,
if(_use_bundle)
{
_bundle = bundle_t{ "pthread_create" };
start_bundle(*_bundle, audit::incoming{}, thread, attr, func, arg);
start_bundle(*_bundle, _info->index_data->sequent_value, audit::incoming{},
thread, attr, func, arg);
}
// threads must process their delays before creating a new thread
@@ -97,7 +97,7 @@ pthread_gotcha::shutdown()
if(is_configured)
{
::omnitrace::component::pthread_mutex_gotcha::shutdown();
// ::omnitrace::component::pthread_create_gotcha::shutdown();
::omnitrace::component::pthread_create_gotcha::shutdown();
is_configured = false;
}
}
@@ -61,8 +61,7 @@ unique_ptr_t<rocm_data_t>&
rocm_data(int64_t _tid)
{
using thread_data_t = thread_data<rocm_data_t, rocm_event>;
static auto& _v = thread_data_t::instances(construct_on_init{});
return _v.at(_tid);
return thread_data_t::instance(construct_on_thread{ _tid });
}
rocm_event::rocm_event(uint32_t _dev, uint32_t _thr, uint32_t _queue,
@@ -57,7 +57,7 @@ using rocm_feature_value = std::variant<uint32_t, float, uint64_t, double>;
struct rocm_counter
{
std::array<rocm_metric_type, OMNITRACE_MAX_COUNTERS> counters;
std::array<rocm_metric_type, OMNITRACE_ROCM_MAX_COUNTERS> counters;
};
struct rocm_event
@@ -30,6 +30,7 @@
#include "library/roctracer.hpp"
#include "library/runtime.hpp"
#include "library/thread_data.hpp"
#include "library/thread_info.hpp"
#include <roctracer.h>
@@ -265,7 +266,7 @@ roctracer::setup(void* table, bool on_load_trace)
itr.second();
// make sure all async callbacks are allocated
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
hip_exec_activity_callbacks(i);
OMNITRACE_VERBOSE_F(1, "roctracer is setup\n");
@@ -286,9 +287,9 @@ roctracer::shutdown()
OMNITRACE_VERBOSE_F(1, "shutting down roctracer...\n");
OMNITRACE_VERBOSE_F(2, "executing hip_exec_activity_callbacks(0..%zu)\n",
max_supported_threads);
thread_info::get_peak_num_threads());
// make sure all async operations are executed
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
hip_exec_activity_callbacks(i);
// callback for hsa
@@ -87,8 +87,7 @@ get_coverage_data()
auto&
get_coverage_count(int64_t _tid = tim::threading::get_id())
{
static auto& _v = coverage_thread_data::instances(construct_on_init{});
return _v.at(_tid);
return coverage_thread_data::instance(construct_on_thread{ _tid });
}
} // namespace
@@ -92,16 +92,8 @@ template <typename Arg0, typename Arg1, typename... Args>
size_t
get_combined_hash(Arg0&& _zero, Arg1&& _one, Args&&... _args)
{
size_t _hash = tim::hash::get_combined_hash_id(std::forward<Arg0>(_zero),
std::forward<Arg1>(_one));
if constexpr(sizeof...(_args) == 0)
{
return _hash;
}
else
{
return get_combined_hash(_hash, std::forward<Args>(_args)...);
}
return tim::hash::get_hash_id(std::forward<Arg0>(_zero), std::forward<Arg1>(_one),
std::forward<Args>(_args)...);
}
} // namespace
@@ -386,15 +378,15 @@ get_update_frequency()
unique_ptr_t<call_chain>&
get(int64_t _tid)
{
static auto& _v = thread_data<call_chain>::instances();
static auto* _v = thread_data<call_chain>::get();
static thread_local auto _once = [_tid]() {
if(!_v.at(0)) _v.at(0) = unique_ptr_t<call_chain>{ new call_chain{} };
if(!_v.at(_tid)) _v.at(_tid) = unique_ptr_t<call_chain>{ new call_chain{} };
if(_tid > 0) *_v.at(_tid) = *_v.at(0);
if(!_v->at(0)) _v->at(0) = unique_ptr_t<call_chain>{ new call_chain{} };
if(!_v->at(_tid)) _v->at(_tid) = unique_ptr_t<call_chain>{ new call_chain{} };
if(_tid > 0) *_v->at(_tid) = *_v->at(0);
return true;
}();
(void) _once;
return _v.at(_tid);
return _v->at(_tid);
}
void
@@ -92,6 +92,7 @@ shutdown()
trait::runtime_enabled<ompt_toolset_t>::set(false);
trait::runtime_enabled<ompt_context_t>::set(false);
comp::user_ompt_bundle::reset();
pthread_gotcha::shutdown();
// call the OMPT finalize callback
if(f_finalize) (*f_finalize)();
}
@@ -156,7 +156,7 @@ join()
if(roctracer::get_thread_pool_state() == State::Active)
{
OMNITRACE_DEBUG_F("waiting for all roctracer tasks to complete...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
roctracer::get_task_group(i).join();
}
else
@@ -167,7 +167,7 @@ join()
if(critical_trace::get_thread_pool_state() == State::Active)
{
OMNITRACE_DEBUG_F("waiting for all critical trace tasks to complete...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
critical_trace::get_task_group(i).join();
}
else
@@ -178,7 +178,7 @@ join()
if(general::get_thread_pool_state() == State::Active)
{
OMNITRACE_DEBUG_F("waiting for all general tasks to complete...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
general::get_task_group(i).join();
}
}
@@ -189,7 +189,7 @@ shutdown()
if(roctracer::get_thread_pool_state() == State::Active)
{
OMNITRACE_DEBUG_F("Waiting on completion of roctracer tasks...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
roctracer::get_task_group(i).join();
roctracer::get_task_group(i).clear();
@@ -205,7 +205,7 @@ shutdown()
if(critical_trace::get_thread_pool_state() == State::Active)
{
OMNITRACE_DEBUG_F("Waiting on completion of critical trace tasks...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
critical_trace::get_task_group(i).join();
critical_trace::get_task_group(i).clear();
@@ -221,7 +221,7 @@ shutdown()
if(general::get_thread_pool_state() == State::Active)
{
OMNITRACE_DEBUG_F("Waiting on completion of general tasks...\n");
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
general::get_task_group(i).join();
general::get_task_group(i).clear();
@@ -254,9 +254,9 @@ general::get_task_group(int64_t _tid)
struct local
{};
using thread_data_t = thread_data<PTL::TaskGroup<void>, local>;
static auto& _v =
thread_data_t::instances(construct_on_init{}, &tasking::get_thread_pool());
return *_v.at(_tid);
static thread_local auto& _v =
thread_data_t::instance(construct_on_thread{ _tid }, &tasking::get_thread_pool());
return *_v;
}
PTL::TaskGroup<void>&
@@ -264,11 +264,11 @@ roctracer::get_task_group(int64_t _tid)
{
struct local
{};
using thread_data_t = thread_data<PTL::TaskGroup<void>, local>;
static auto& _v =
(roctracer::get_thread_pool_state() = State::Active,
thread_data_t::instances(construct_on_init{}, &tasking::get_thread_pool()));
return *_v.at(_tid);
using thread_data_t = thread_data<PTL::TaskGroup<void>, local>;
static thread_local auto& _v = (roctracer::get_thread_pool_state() = State::Active,
thread_data_t::instance(construct_on_thread{ _tid },
&tasking::get_thread_pool()));
return *_v;
}
PTL::TaskGroup<void>&
@@ -277,10 +277,11 @@ critical_trace::get_task_group(int64_t _tid)
struct local
{};
using thread_data_t = thread_data<PTL::TaskGroup<void>, local>;
static auto& _v =
static thread_local auto& _v =
(critical_trace::get_thread_pool_state() = State::Active,
thread_data_t::instances(construct_on_init{}, &tasking::get_thread_pool()));
return *_v.at(_tid);
thread_data_t::instance(construct_on_thread{ _tid },
&tasking::get_thread_pool()));
return *_v;
}
} // namespace tasking
} // namespace omnitrace
@@ -164,7 +164,7 @@ config()
{
if(data::device_list.count(i) > 0)
{
_bundle_data.at(i) = &sampler_instances::instances().at(i);
_bundle_data.at(i) = &sampler_instances::get()->at(i);
if(!*_bundle_data.at(i))
*_bundle_data.at(i) = unique_ptr_t<bundle_t>{ new bundle_t{} };
}
@@ -239,7 +239,7 @@ data::post_process(uint32_t _dev_id)
if(device_count < _dev_id) return;
auto& _rocm_smi_v = sampler_instances::instances().at(_dev_id);
auto& _rocm_smi_v = sampler_instances::get()->at(_dev_id);
auto _rocm_smi = (_rocm_smi_v) ? *_rocm_smi_v : std::deque<rocm_smi::data>{};
const auto& _thread_info = thread_info::get(0, InternalTID);
@@ -113,8 +113,7 @@ get_roctracer_hip_data(int64_t _tid = threading::get_id())
{
using data_t = std::unordered_map<uint64_t, roctracer_hip_bundle_t>;
using thread_data_t = thread_data<data_t, category::roctracer>;
static auto& _v = thread_data_t::instances(construct_on_init{});
return _v.at(_tid);
return thread_data_t::instance(construct_on_thread{ _tid });
}
std::unordered_map<uint64_t, const char*>&
@@ -154,8 +153,7 @@ get_roctracer_cid_data(int64_t _tid = threading::get_id())
{
using thread_data_t =
thread_data<std::unordered_map<uint64_t, cid_data>, category::roctracer>;
static auto& _v = thread_data_t::instances(construct_on_init{});
return *_v.at(_tid);
return thread_data_t::instance(construct_on_thread{ _tid });
}
auto&
@@ -163,8 +161,7 @@ get_hip_activity_callbacks(int64_t _tid = threading::get_id())
{
using thread_data_t =
thread_data<std::vector<std::function<void()>>, category::roctracer>;
static auto& _v = thread_data_t::instances(construct_on_init{});
return _v.at(_tid);
return thread_data_t::instance(construct_on_thread{ _tid });
}
using hip_activity_mutex_t = std::decay_t<decltype(get_hip_activity_callbacks())>;
@@ -804,7 +801,7 @@ hip_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, void*
critical_trace::add_hash_id(op_name), _depth);
}
get_roctracer_cid_data(_tid).emplace(
get_roctracer_cid_data(_tid)->emplace(
_roct_cid, cid_data{ _crit_cid, _parent_crit_cid, _depth, _queue });
hip_exec_activity_callbacks(_tid);
@@ -814,7 +811,7 @@ hip_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, void*
hip_exec_activity_callbacks(_tid);
std::tie(_crit_cid, _parent_crit_cid, _depth, std::ignore) =
get_roctracer_cid_data(_tid).at(_roct_cid);
get_roctracer_cid_data(_tid)->at(_roct_cid);
if(get_use_perfetto())
{
@@ -841,7 +838,7 @@ hip_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, void*
};
if(!_stop(_tid))
{
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
if(_stop(i)) break;
}
@@ -941,8 +938,8 @@ hip_activity_callback(const char* begin, const char* end, void* arg)
if(_critical_trace)
{
auto& _crit_cids = get_roctracer_cid_data(_tid);
if(_crit_cids.find(_roct_cid) != _crit_cids.end())
std::tie(_crit_cid, _pcid, _depth, _queue) = _crit_cids.at(_roct_cid);
if(_crit_cids->find(_roct_cid) != _crit_cids->end())
std::tie(_crit_cid, _pcid, _depth, _queue) = _crit_cids->at(_roct_cid);
else
{
OMNITRACE_VERBOSE_F(3,
@@ -107,19 +107,19 @@ get_cpu_cid_stack(int64_t _tid, int64_t _parent)
using init_data_t = thread_data<bool, omnitrace_cpu_cid_stack>;
using thread_data_t = thread_data<std::vector<uint64_t>, omnitrace_cpu_cid_stack>;
static auto& _v = thread_data_t::instances(construct_on_init{});
static auto& _b = init_data_t::instances(construct_on_init{}, false);
auto& _v_tid = thread_data_t::instance(construct_on_thread{ _tid });
auto& _b_tid = init_data_t::instance(construct_on_thread{ _tid }, false);
auto& _v_tid = _v.at(_tid);
if(_b.at(_tid) && !(*_b.at(_tid)))
if(_b_tid && !(*_b_tid))
{
*_b.at(_tid) = true;
auto _parent_tid = _parent;
*_b_tid = true;
auto _parent_tid = _parent;
auto& _p_tid = thread_data_t::instance(construct_on_thread{ _parent_tid });
// if tid != parent and there is not a valid pointer for the provided parent
// thread id set it to zero since that will always be valid
if(_tid != _parent_tid && !_v.at(_parent_tid)) _parent_tid = 0;
if(_tid != _parent_tid && !_p_tid) _parent_tid = 0;
// copy over the thread ids from the parent if tid != parent
if(_tid != _parent_tid) *_v_tid = *_v.at(_parent_tid);
if(_tid != _parent_tid) *_v_tid = *_p_tid;
}
return _v_tid;
}
@@ -130,9 +130,7 @@ get_cpu_cid_parents(int64_t _tid)
struct omnitrace_cpu_cid_stack
{};
using thread_data_t = thread_data<cpu_cid_parent_map_t, omnitrace_cpu_cid_stack>;
static auto& _v =
thread_data_t::instances(construct_on_init{}, cpu_cid_parent_map_t{});
return _v.at(_tid);
return thread_data_t::instance(construct_on_thread{ _tid }, cpu_cid_parent_map_t{});
}
std::tuple<uint64_t, uint64_t, uint32_t>
@@ -251,23 +251,20 @@ get_signal_names(Tp&& _v)
unique_ptr_t<sampler_t>&
get_sampler(int64_t _tid = threading::get_id())
{
static auto& _v = sampler_instances::instances();
return _v.at(_tid);
static auto* _v = sampler_instances::get();
return _v->at(_tid);
}
unique_ptr_t<bundle_t>&
get_sampler_init(int64_t _tid = threading::get_id())
{
static auto& _v = sampler_init_instances::instances();
if(!_v.at(_tid)) _v.at(_tid) = unique_ptr_t<bundle_t>{ new bundle_t{} };
return _v.at(_tid);
return sampler_init_instances::instance(construct_on_thread{ _tid });
}
unique_ptr_t<bool>&
get_sampler_running(int64_t _tid)
{
static auto& _v = sampler_running_instances::instances(construct_on_init{}, false);
return _v.at(_tid);
return sampler_running_instances::instance(construct_on_thread{ _tid }, false);
}
auto&
@@ -816,10 +813,8 @@ auto static_strings = std::set<std::string>{};
unique_ptr_t<std::set<int>>&
get_signal_types(int64_t _tid)
{
static auto& _v = signal_type_instances::instances();
signal_type_instances::construct(construct_on_thread{ _tid },
omnitrace::get_sampling_signals(_tid));
return _v.at(_tid);
return signal_type_instances::instance(construct_on_thread{ _tid },
omnitrace::get_sampling_signals(_tid));
}
std::set<int>
@@ -834,7 +829,7 @@ shutdown()
{
if(is_child_process())
{
for(auto& itr : sampler_instances::instances())
for(auto& itr : *sampler_instances::get())
itr.release();
return std::set<int>{};
}
@@ -862,7 +857,7 @@ block_signals(std::set<int> _signals)
if(_signals.empty()) _signals = *get_signal_types(threading::get_id());
if(_signals.empty())
{
OMNITRACE_PRINT("No signals to block...\n");
OMNITRACE_VERBOSE(2, "No signals to block...\n");
return;
}
@@ -879,7 +874,7 @@ unblock_signals(std::set<int> _signals)
if(_signals.empty()) _signals = *get_signal_types(threading::get_id());
if(_signals.empty())
{
OMNITRACE_PRINT("No signals to unblock...\n");
OMNITRACE_VERBOSE(2, "No signals to unblock...\n");
return;
}
@@ -908,7 +903,7 @@ post_process()
for(auto& itr : get_sampler_allocators())
if(itr) itr->flush();
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
auto& _sampler = get_sampler(i);
@@ -1004,7 +999,7 @@ post_process()
get_offload_file().reset(); // remove the temporary file
for(size_t i = 0; i < max_supported_threads; ++i)
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
get_sampler(i).reset();
for(auto& itr : get_sampler_allocators())
@@ -26,6 +26,7 @@
#include "core/concepts.hpp"
#include "core/config.hpp"
#include "core/containers/stable_vector.hpp"
#include "core/debug.hpp"
#include "core/defines.hpp"
#include "core/state.hpp"
#include "core/timemory.hpp"
@@ -68,7 +69,7 @@ struct base_thread_data
base_thread_data()
{
auto _func = [](int64_t _sz) -> int64_t {
auto& _v = Tp::instance();
decltype(auto) _v = Tp::private_instance();
if(_v && _v->capacity() < static_cast<size_t>(_sz + 1))
{
_v->reserve(_v->capacity() + 1);
@@ -81,19 +82,37 @@ struct base_thread_data
};
template <typename Tp, typename Tag = void, size_t MaxThreads = max_supported_threads>
struct thread_data
struct thread_data;
template <typename Tp, typename Tag, size_t MaxThreads>
struct use_placement_new_when_generating_unique_ptr<thread_data<Tp, Tag, MaxThreads>>
: std::true_type
{};
template <typename Tp, typename Tag, size_t MaxThreads>
struct use_placement_new_when_generating_unique_ptr<
thread_data<std::optional<Tp>, Tag, MaxThreads>> : std::true_type
{};
template <typename Tp, typename Tag, size_t MaxThreads>
struct use_placement_new_when_generating_unique_ptr<
thread_data<identity<Tp>, Tag, MaxThreads>> : std::true_type
{};
template <typename Tp, typename Tag, size_t MaxThreads>
struct thread_data : base_thread_data<thread_data<Tp, Tag, MaxThreads>>
{
using value_type = unique_ptr_t<Tp>;
using instance_array_t = std::array<value_type, MaxThreads>;
using this_type = thread_data<Tp, Tag, MaxThreads>;
using value_type = unique_ptr_t<Tp>;
using array_type =
container::stable_vector<value_type, MaxThreads, container::cacheline_align_v>;
using functor_type = std::function<value_type()>;
template <typename... Args>
static void construct(construct_on_thread&&, Args&&...);
static value_type& instance();
static instance_array_t& instances();
static void construct(construct_on_thread&&, Args&&...);
static value_type& instance();
template <typename... Args>
static value_type& instance(construct_on_thread&&, Args&&...);
template <typename... Args>
static instance_array_t& instances(construct_on_init, Args&&...);
template <typename... Args>
static void construct(Args&&... args)
@@ -107,15 +126,62 @@ struct thread_data
return instance(construct_on_thread{}, std::forward<Args>(args)...);
}
static constexpr size_t size() { return MaxThreads; }
static size_t size() { return private_instance()->m_data.size(); }
decltype(auto) begin() { return instances().begin(); }
decltype(auto) end() { return instances().end(); }
decltype(auto) data() { return m_data; }
decltype(auto) data() const { return m_data; }
decltype(auto) begin() const { return instances().begin(); }
decltype(auto) end() const { return instances().end(); }
decltype(auto) begin() { return m_data.begin(); }
decltype(auto) end() { return m_data.end(); }
decltype(auto) begin() const { return m_data.begin(); }
decltype(auto) end() const { return m_data.end(); }
decltype(auto) at(size_t _idx) { return m_data.at(_idx); }
decltype(auto) at(size_t _idx) const { return m_data.at(_idx); }
decltype(auto) operator[](size_t _idx) { return m_data[_idx]; }
decltype(auto) operator[](size_t _idx) const { return m_data[_idx]; }
decltype(auto) reserve(size_t _n) { return m_data.reserve(_n); }
decltype(auto) capacity() const { return m_data.capacity(); }
decltype(auto) empty() const { return m_data.empty(); }
void resize(size_t _n) { container::resize(m_data, _n, m_init()); }
template <typename Up>
void resize(size_t _n, Up&& _v)
{
static_assert(std::is_assignable<value_type, Up>::value,
"value is not assignable to optional<Tp>");
container::resize(m_data, _n, std::forward<Up>(_v));
}
static array_type* get()
{
return (private_instance()) ? &private_instance()->m_data : nullptr;
}
private:
friend struct base_thread_data<this_type>;
static unique_ptr_t<this_type>& private_instance();
static array_type& instances();
template <typename... Args>
static array_type& instances(construct_on_init, Args&&...);
array_type m_data = array_type(MaxThreads);
functor_type m_init = []() { return value_type{}; };
};
template <typename Tp, typename Tag, size_t MaxThreads>
unique_ptr_t<thread_data<Tp, Tag, MaxThreads>>&
thread_data<Tp, Tag, MaxThreads>::private_instance()
{
static auto _v = unique_ptr_t<this_type>{ new this_type{} };
return _v;
}
template <typename Tp, typename Tag, size_t MaxThreads>
template <typename... Args>
void
@@ -136,11 +202,10 @@ thread_data<Tp, Tag, MaxThreads>::instance()
}
template <typename Tp, typename Tag, size_t MaxThreads>
typename thread_data<Tp, Tag, MaxThreads>::instance_array_t&
typename thread_data<Tp, Tag, MaxThreads>::array_type&
thread_data<Tp, Tag, MaxThreads>::instances()
{
static auto _v = instance_array_t{};
return _v;
return private_instance()->m_data;
}
template <typename Tp, typename Tag, size_t MaxThreads>
@@ -154,29 +219,22 @@ thread_data<Tp, Tag, MaxThreads>::instance(construct_on_thread&& _t, Args&&... _
template <typename Tp, typename Tag, size_t MaxThreads>
template <typename... Args>
typename thread_data<Tp, Tag, MaxThreads>::instance_array_t&
typename thread_data<Tp, Tag, MaxThreads>::array_type&
thread_data<Tp, Tag, MaxThreads>::instances(construct_on_init, Args&&... _args)
{
static auto& _v = [&]() -> instance_array_t& {
static auto& _v = [&]() -> array_type& {
auto& _internal = instances();
for(size_t i = 0; i < MaxThreads; ++i)
_internal.at(i) =
utility::generate<value_type>{}(std::forward<Args>(_args)...);
private_instance()->m_init = [_args...]() {
return utility::generate<value_type>{}(_args...);
};
return _internal;
}();
return _v;
}
template <typename Tp, typename Tag, size_t MaxThreads>
struct use_placement_new_when_generating_unique_ptr<
thread_data<std::optional<Tp>, Tag, MaxThreads>> : std::true_type
{};
template <typename Tp, typename Tag, size_t MaxThreads>
struct use_placement_new_when_generating_unique_ptr<
thread_data<identity<Tp>, Tag, MaxThreads>> : std::true_type
{};
//--------------------------------------------------------------------------------------//
//
// thread_data with std::optional
@@ -189,8 +247,9 @@ struct thread_data<std::optional<Tp>, Tag, MaxThreads>
{
using this_type = thread_data<std::optional<Tp>, Tag, MaxThreads>;
using value_type = std::optional<Tp>;
using array_type = container::stable_vector<value_type, MaxThreads>;
using functor_type = std::function<value_type()>;
using array_type =
container::stable_vector<value_type, MaxThreads, container::cacheline_align_v>;
thread_data() = default;
~thread_data() = default;
@@ -251,6 +310,9 @@ struct thread_data<std::optional<Tp>, Tag, MaxThreads>
}
private:
friend struct base_thread_data<this_type>;
static decltype(auto) private_instance() { return instance(); }
array_type m_data = {};
functor_type m_init = []() { return value_type{}; };
};
@@ -307,9 +369,10 @@ thread_data<std::optional<Tp>, Tag, MaxThreads>::construct(construct_on_thread&&
Args&&... _args)
{
// construct outside of lambda to prevent data-race
static auto& _instance = instance(construct_on_init{});
static auto _constructed = container::stable_vector<bool, MaxThreads>{};
static auto _grow = []() {
static auto& _instance = instance(construct_on_init{});
static auto _constructed =
container::stable_vector<bool, MaxThreads, container::cacheline_align_v>{};
static auto _grow = []() {
container::resize(_constructed, MaxThreads, false);
grow_functors().emplace_back([](int64_t _n) -> int64_t {
if(static_cast<size_t>(_n) >= _constructed.size())
@@ -353,9 +416,10 @@ template <typename Tp, typename Tag, size_t MaxThreads>
struct thread_data<identity<Tp>, Tag, MaxThreads>
: base_thread_data<thread_data<identity<Tp>, Tag, MaxThreads>>
{
using this_type = thread_data<identity<Tp>, Tag, MaxThreads>;
using value_type = Tp;
using array_type = container::stable_vector<value_type, MaxThreads>;
using this_type = thread_data<identity<Tp>, Tag, MaxThreads>;
using value_type = Tp;
using array_type =
container::stable_vector<value_type, MaxThreads, container::cacheline_align_v>;
using functor_type = std::function<value_type()>;
thread_data() = default;
@@ -416,6 +480,9 @@ struct thread_data<identity<Tp>, Tag, MaxThreads>
}
private:
friend struct base_thread_data<this_type>;
static decltype(auto) private_instance() { return instance(); }
array_type m_data = {};
functor_type m_init = []() { return value_type{}; };
};
@@ -470,9 +537,10 @@ thread_data<identity<Tp>, Tag, MaxThreads>::construct(construct_on_thread&& _t,
Args&&... _args)
{
// construct outside of lambda to prevent data-race
static auto& _instance = instance(construct_on_init{});
static auto _constructed = container::stable_vector<bool, MaxThreads>{};
static auto _grow = []() {
static auto& _instance = instance(construct_on_init{});
static auto _constructed =
container::stable_vector<bool, MaxThreads, container::cacheline_align_v>{};
static auto _grow = []() {
container::resize(_constructed, MaxThreads, false);
grow_functors().emplace_back([](int64_t _n) -> int64_t {
if(static_cast<size_t>(_n) >= _constructed.size())
@@ -512,72 +580,70 @@ thread_data<identity<Tp>, Tag, MaxThreads>::instance(construct_on_thread&& _t,
// timemory's ring_buffer_allocator to create contiguous memory-page aligned instances of
// the bundle
template <typename... Tp>
struct component_bundle_cache
struct component_bundle_cache_impl
{
using this_type = component_bundle_cache_impl<Tp...>;
using bundle_type = tim::component_bundle<project::omnitrace, Tp...>;
using this_type = component_bundle_cache<Tp...>;
using allocator_type = tim::data::ring_buffer_allocator<bundle_type>;
using instance_type =
std::array<component_bundle_cache<Tp...>, max_supported_threads>;
using array_type = std::vector<bundle_type*>;
using iterator = typename std::vector<bundle_type*>::iterator;
using const_iterator = typename std::vector<bundle_type*>::const_iterator;
using reverse_iterator = typename std::vector<bundle_type*>::reverse_iterator;
using iterator = typename array_type::iterator;
using const_iterator = typename array_type::const_iterator;
using reverse_iterator = typename array_type::reverse_iterator;
allocator_type allocator = {};
std::vector<bundle_type*> bundles = {};
component_bundle_cache_impl() = default;
~component_bundle_cache_impl() = default;
bool empty() const { return bundles.empty(); }
component_bundle_cache_impl(const component_bundle_cache_impl&) = delete;
component_bundle_cache_impl(component_bundle_cache_impl&&) noexcept = delete;
auto& front() { return bundles.front(); }
auto& front() const { return bundles.front(); }
component_bundle_cache_impl& operator=(const component_bundle_cache_impl&) = delete;
component_bundle_cache_impl& operator=(component_bundle_cache_impl&&) noexcept =
delete;
auto& back() { return bundles.back(); }
auto& back() const { return bundles.back(); }
bool empty() const { return m_bundles.empty(); }
auto begin() { return bundles.begin(); }
auto end() { return bundles.end(); }
auto& front() { return m_bundles.front(); }
auto& front() const { return m_bundles.front(); }
auto rbegin() { return bundles.rbegin(); }
auto rend() { return bundles.rend(); }
auto& back() { return m_bundles.back(); }
auto& back() const { return m_bundles.back(); }
auto begin() const { return bundles.begin(); }
auto end() const { return bundles.end(); }
auto begin() { return m_bundles.begin(); }
auto end() { return m_bundles.end(); }
auto size() const { return bundles.size(); }
auto rbegin() { return m_bundles.rbegin(); }
auto rend() { return m_bundles.rend(); }
auto& at(size_t _idx) { return bundles.at(_idx); }
const auto& at(size_t _idx) const { return bundles.at(_idx); }
auto begin() const { return m_bundles.begin(); }
auto end() const { return m_bundles.end(); }
static auto& instances()
{
static auto _v = instance_type{};
return _v;
}
auto size() const { return m_bundles.size(); }
static auto& instance(int64_t _tid) { return instances().at(_tid); }
auto& at(size_t _idx) { return m_bundles.at(_idx); }
const auto& at(size_t _idx) const { return m_bundles.at(_idx); }
template <typename... Args>
bundle_type* construct(Args&&... args)
{
bundle_type* _v = allocator.allocate(1);
allocator.construct(_v, std::forward<Args>(args)...);
return bundles.emplace_back(_v);
bundle_type* _v = m_allocator.allocate(1);
m_allocator.construct(_v, std::forward<Args>(args)...);
return m_bundles.emplace_back(_v);
}
void destroy(bundle_type* _v, size_t _idx)
{
allocator.destroy(_v);
allocator.deallocate(_v, 1);
bundles.erase(bundles.begin() + _idx);
m_allocator.destroy(_v);
m_allocator.deallocate(_v, 1);
m_bundles.erase(m_bundles.begin() + _idx);
}
void pop_back()
{
bundle_type* _v = bundles.back();
allocator.destroy(_v);
allocator.deallocate(_v, 1);
bundles.pop_back();
bundle_type* _v = m_bundles.back();
m_allocator.destroy(_v);
m_allocator.deallocate(_v, 1);
m_bundles.pop_back();
}
template <typename IterT>
@@ -594,23 +660,28 @@ struct component_bundle_cache
if(_v == end()) return;
itr = _v;
}
allocator.destroy(*itr);
allocator.deallocate(*itr, 1);
bundles.erase(itr);
m_allocator.destroy(*itr);
m_allocator.deallocate(*itr, 1);
m_bundles.erase(itr);
}
private:
allocator_type m_allocator = {};
array_type m_bundles = {};
};
template <typename... Tp>
struct component_bundle_cache<tim::component_bundle<project::omnitrace, Tp...>>
: component_bundle_cache<Tp...>
struct component_bundle_cache_impl<tim::component_bundle<project::omnitrace, Tp...>>
: component_bundle_cache_impl<Tp...>
{
using base_type = component_bundle_cache<Tp...>;
using base_type::allocator;
using base_type::bundles;
using base_type::instances;
using base_type = component_bundle_cache_impl<Tp...>;
};
//--------------------------------------------------------------------------------------//
template <typename... Tp>
using component_bundle_cache = thread_data<component_bundle_cache_impl<Tp...>>;
using instrumentation_bundles = component_bundle_cache<instrumentation_bundle_t>;
extern template struct component_bundle_cache<instrumentation_bundle_t>;
extern template struct component_bundle_cache_impl<instrumentation_bundle_t>;
} // namespace omnitrace
@@ -31,7 +31,7 @@
namespace omnitrace
{
template struct component_bundle_cache<instrumentation_bundle_t>;
template struct component_bundle_cache_impl<instrumentation_bundle_t>;
void
thread_deleter<void>::operator()() const
@@ -105,7 +105,8 @@ init_index_data(int64_t _tid, bool _offset = false)
return itr;
}
const auto unknown_thread = std::optional<thread_info>{};
const auto unknown_thread = std::optional<thread_info>{};
int64_t peak_num_threads = max_supported_threads;
} // namespace
std::string
@@ -123,21 +124,19 @@ grow_data(int64_t _tid)
struct data_growth
{};
static int64_t _max_threads = max_supported_threads;
if(_tid >= _max_threads)
if(_tid >= peak_num_threads)
{
OMNITRACE_SCOPED_THREAD_STATE(ThreadState::Internal);
auto_lock_t _lk{ type_mutex<data_growth>() };
// check again after locking
if(_tid >= _max_threads)
if(_tid >= peak_num_threads)
{
TIMEMORY_PRINTF_WARNING(
stderr, "[%li] Growing thread data from %li to %li...\n", _tid,
_max_threads, _max_threads + max_supported_threads);
peak_num_threads, peak_num_threads + max_supported_threads);
fflush(stderr);
// auto _expected = _max_threads + max_supported_threads;
for(auto itr : grow_functors())
{
if(itr)
@@ -145,14 +144,14 @@ grow_data(int64_t _tid)
int64_t _new_capacity = (*itr)(_tid + 1);
TIMEMORY_PRINTF_WARNING(stderr,
"[%li] Grew thread data from %li to %li...\n",
_tid, _max_threads, _new_capacity);
_tid, peak_num_threads, _new_capacity);
}
}
_max_threads += max_supported_threads;
peak_num_threads += max_supported_threads;
}
}
return _max_threads;
return peak_num_threads;
}
bool
@@ -161,6 +160,12 @@ thread_info::exists()
return (get_info_data() != nullptr);
}
size_t
thread_info::get_peak_num_threads()
{
return peak_num_threads;
}
const std::optional<thread_info>&
thread_info::init(bool _offset)
{
@@ -109,6 +109,7 @@ struct thread_info
std::string as_string() const;
static bool exists();
static size_t get_peak_num_threads();
static const std::optional<thread_info>& init(bool _offset = false);
static const std::optional<thread_info>& get();
static const std::optional<thread_info>& get(native_handle_t&);
@@ -21,14 +21,42 @@
// SOFTWARE.
#include "library/tracing.hpp"
#include "core/concepts.hpp"
#include "core/config.hpp"
#include "core/state.hpp"
#include "library/thread_data.hpp"
#include "library/thread_info.hpp"
#include <timemory/hash/types.hpp>
#include <timemory/process/threading.hpp>
namespace omnitrace
{
namespace tracing
{
namespace
{
tim::hash_map_ptr_t&
get_timemory_hash_ids(int64_t _tid = threading::get_id());
tim::hash_alias_ptr_t&
get_timemory_hash_aliases(int64_t _tid = threading::get_id());
tim::hash_map_ptr_t&
get_timemory_hash_ids(int64_t _tid)
{
return thread_data<identity<tim::hash_map_ptr_t>>::instance(
construct_on_thread{ _tid });
}
tim::hash_alias_ptr_t&
get_timemory_hash_aliases(int64_t _tid)
{
return thread_data<identity<tim::hash_alias_ptr_t>>::instance(
construct_on_thread{ _tid });
}
} // namespace
bool debug_push = tim::get_env("OMNITRACE_DEBUG_PUSH", false) || get_debug_env();
bool debug_pop = tim::get_env("OMNITRACE_DEBUG_POP", false) || get_debug_env();
bool debug_mark = tim::get_env("OMNITRACE_DEBUG_MARK", false) || get_debug_env();
@@ -41,19 +69,52 @@ get_perfetto_track_uuids()
return _v;
}
tim::hash_map_ptr_t&
get_timemory_hash_ids(int64_t _tid)
void
copy_timemory_hash_ids()
{
static auto _v = std::array<tim::hash_map_ptr_t, omnitrace::max_supported_threads>{};
return _v.at(_tid);
}
auto_lock_t _ilk{ type_mutex<tim::hash_map_t>(), std::defer_lock };
auto_lock_t _alk{ type_mutex<tim::hash_alias_map_t>(), std::defer_lock };
tim::hash_alias_ptr_t&
get_timemory_hash_aliases(int64_t _tid)
{
static auto _v =
std::array<tim::hash_alias_ptr_t, omnitrace::max_supported_threads>{};
return _v.at(_tid);
if(!_ilk.owns_lock()) _ilk.lock();
if(!_alk.owns_lock()) _alk.lock();
// copy these over so that all hashes are known
auto& _hmain = tim::hash::get_main_hash_ids();
auto& _amain = tim::hash::get_main_hash_aliases();
OMNITRACE_REQUIRE(_hmain != nullptr) << "no main timemory hash ids";
OMNITRACE_REQUIRE(_amain != nullptr) << "no main timemory hash aliases";
// combine all the hash and alias info into one container
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
auto& _hitr = get_timemory_hash_ids(i);
auto& _aitr = get_timemory_hash_aliases(i);
if(_hitr)
{
for(const auto& itr : *_hitr)
_hmain->emplace(itr.first, itr.second);
}
if(_aitr)
{
for(auto itr : *_aitr)
_amain->emplace(itr.first, itr.second);
}
}
// distribute the contents of that combined container to each thread-specific
// container before finalizing
if(get_state() == State::Finalized)
{
for(size_t i = 0; i < thread_info::get_peak_num_threads(); ++i)
{
auto& _hitr = get_timemory_hash_ids(i);
auto& _aitr = get_timemory_hash_aliases(i);
if(_hitr) *_hitr = *_hmain;
if(_aitr) *_aitr = *_amain;
}
}
}
std::vector<std::function<void()>>&
@@ -94,15 +155,29 @@ thread_init()
if(get_thread_state() == ThreadState::Disabled) return;
static thread_local auto _thread_setup = []() {
if(threading::get_id() > 0)
threading::set_thread_name(JOIN(" ", "Thread", threading::get_id()).c_str());
thread_data<thread_bundle_t>::construct(JOIN('/', "omnitrace/process",
process::get_id(), "thread",
threading::get_id()),
quirk::config<quirk::auto_start>{});
const auto& _tinfo = thread_info::init();
auto _tidx = (_tinfo && _tinfo->index_data) ? _tinfo->index_data->sequent_value
: threading::get_id();
OMNITRACE_REQUIRE(_tidx >= 0)
<< "thread setup failed. thread info not initialized: " << [&_tinfo]() {
if(_tinfo) return JOIN("", *_tinfo);
return std::string{ "no thread_info" };
}();
if(_tidx > 0) threading::set_thread_name(JOIN(" ", "Thread", _tidx).c_str());
thread_data<thread_bundle_t>::construct(
JOIN('/', "omnitrace/process", process::get_id(), "thread", _tidx),
quirk::config<quirk::auto_start>{});
// save the hash maps
get_timemory_hash_ids() = tim::get_hash_ids();
get_timemory_hash_aliases() = tim::get_hash_aliases();
get_timemory_hash_ids(_tidx) = tim::get_hash_ids();
get_timemory_hash_aliases(_tidx) = tim::get_hash_aliases();
OMNITRACE_REQUIRE(get_timemory_hash_ids(_tidx) != nullptr)
<< "no timemory hash ids pointer for thread " << _tidx;
OMNITRACE_REQUIRE(get_timemory_hash_aliases(_tidx) != nullptr)
<< "no timemory hash aliases pointer for thread " << _tidx;
record_thread_start_time();
return true;
}();
@@ -85,11 +85,8 @@ extern OMNITRACE_HIDDEN_API bool debug_mark;
std::unordered_map<hash_value_t, std::string>&
get_perfetto_track_uuids();
tim::hash_map_ptr_t&
get_timemory_hash_ids(int64_t _tid = threading::get_id());
tim::hash_alias_ptr_t&
get_timemory_hash_aliases(int64_t _tid = threading::get_id());
void
copy_timemory_hash_ids();
std::vector<std::function<void()>>&
get_finalization_functions();
@@ -148,7 +145,7 @@ template <typename CategoryT, typename... Args>
auto
get_perfetto_category_uuid(Args&&... _args)
{
return tim::hash::get_combined_hash_id(
return tim::hash::get_hash_id(
tim::hash::get_hash_id(JOIN('_', "omnitrace", trait::name<CategoryT>::value)),
std::forward<Args>(_args)...);
}
@@ -199,7 +196,7 @@ now()
inline auto&
get_instrumentation_bundles(int64_t _tid = threading::get_id())
{
return instrumentation_bundles::instance(_tid);
return instrumentation_bundles::instance(construct_on_thread{ _tid });
}
inline auto&
@@ -290,11 +287,14 @@ push_timemory(CategoryT, std::string_view name, Args&&... args)
if(category_push_disabled<CategoryT>()) return;
auto& _data = tracing::get_instrumentation_bundles();
// this generates a hash for the raw string array
auto _hash = tim::add_hash_id(name);
_data.construct(_hash)->start(std::forward<Args>(args)...);
// increment the profile stack
++get_profile_stack<CategoryT>();
if(OMNITRACE_LIKELY(_data != nullptr))
{
// this generates a hash for the raw string array
auto _hash = tim::add_hash_id(name);
_data->construct(_hash)->start(std::forward<Args>(args)...);
// increment the profile stack
++get_profile_stack<CategoryT>();
}
}
template <typename CategoryT>
@@ -307,23 +307,23 @@ get_timemory(CategoryT, std::string_view name)
auto _hash = tim::hash::get_hash_id(name);
auto& _data = tracing::get_instrumentation_bundles();
if(OMNITRACE_UNLIKELY(_data.bundles.empty()))
if(OMNITRACE_UNLIKELY(_data == nullptr || _data->empty()))
{
OMNITRACE_DEBUG("[%s] skipped %s :: empty bundle stack\n", "omnitrace_pop_trace",
name.data());
return return_type{ nullptr, -1 };
}
auto*& _v_back = _data.bundles.back();
auto*& _v_back = _data->back();
if(OMNITRACE_LIKELY(_v_back->get_hash() == _hash))
{
return std::make_pair(_v_back, _data.bundles.size() - 1);
return std::make_pair(_v_back, _data->size() - 1);
}
else if(_data.bundles.size() > 1)
else if(_data->size() > 1)
{
for(size_t i = _data.bundles.size() - 1; i > 0; --i)
for(size_t i = _data->size() - 1; i > 0; --i)
{
auto*& _v = _data.bundles.at(i - 1);
auto*& _v = _data->at(i - 1);
if(_v->get_hash() == _hash)
{
return std::make_pair(_v, i - 1);
@@ -357,9 +357,8 @@ destroy_timemory(std::pair<instrumentation_bundle_t*, size_t> _data)
if(_data.first)
{
auto& _bundles = tracing::get_instrumentation_bundles();
_bundles.allocator.destroy(_data.first);
_bundles.allocator.deallocate(_data.first, 1);
_bundles.bundles.erase(_bundles.bundles.begin() + _data.second);
if(OMNITRACE_LIKELY(_bundles != nullptr))
_bundles->destroy(_data.first, _data.second);
}
}
@@ -24,3 +24,5 @@ include(${CMAKE_CURRENT_LIST_DIR}/omnitrace-overflow-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/omnitrace-annotate-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/omnitrace-causal-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/omnitrace-python-tests.cmake)
add_subdirectory(source)
@@ -127,6 +127,12 @@ causal_e2e_args_and_validation(_causal_fast_func fast-func "-F" "cpu_fast_func"
causal_e2e_args_and_validation(_causal_line_100 line-100 "-S" "causal.cpp:100" 10 20 20 5)
causal_e2e_args_and_validation(_causal_line_110 line-110 "-S" "causal.cpp:110" 0 0 0 5)
if(OMNITRACE_BUILD_NUMBER GREATER 1)
set(_causal_e2e_environment)
else()
set(_causal_e2e_environment "OMNITRACE_VERBOSE=0")
endif()
omnitrace_add_causal_test(
SKIP_BASELINE
NAME cpu-omni-slow-func-e2e
@@ -138,6 +144,7 @@ omnitrace_add_causal_test(
CAUSAL_VALIDATE_ARGS ${_causal_slow_func_valid}
CAUSAL_PASS_REGEX
"Starting causal experiment #1(.*)causal/experiments.json(.*)causal/experiments.coz"
ENVIRONMENT "${_causal_e2e_environment}"
PROPERTIES PROCESSORS 2 PROCESSOR_AFFINITY OFF)
omnitrace_add_causal_test(
@@ -151,6 +158,7 @@ omnitrace_add_causal_test(
CAUSAL_VALIDATE_ARGS ${_causal_fast_func_valid}
CAUSAL_PASS_REGEX
"Starting causal experiment #1(.*)causal/experiments.json(.*)causal/experiments.coz"
ENVIRONMENT "${_causal_e2e_environment}"
PROPERTIES PROCESSORS 2 PROCESSOR_AFFINITY OFF)
omnitrace_add_causal_test(
@@ -164,6 +172,7 @@ omnitrace_add_causal_test(
CAUSAL_VALIDATE_ARGS ${_causal_line_100_valid}
CAUSAL_PASS_REGEX
"Starting causal experiment #1(.*)causal/experiments.json(.*)causal/experiments.coz"
ENVIRONMENT "${_causal_e2e_environment}"
PROPERTIES PROCESSORS 2 PROCESSOR_AFFINITY OFF)
omnitrace_add_causal_test(
@@ -177,4 +186,5 @@ omnitrace_add_causal_test(
CAUSAL_VALIDATE_ARGS ${_causal_line_110_valid}
CAUSAL_PASS_REGEX
"Starting causal experiment #1(.*)causal/experiments.json(.*)causal/experiments.coz"
ENVIRONMENT "${_causal_e2e_environment}"
PROPERTIES PROCESSORS 2 PROCESSOR_AFFINITY OFF)
@@ -0,0 +1,39 @@
set(CMAKE_BUILD_TYPE "Release")
find_package(Threads REQUIRED)
add_library(tests-compile-options INTERFACE)
target_compile_options(tests-compile-options INTERFACE -g)
add_executable(thread-limit thread-limit.cpp)
target_compile_definitions(thread-limit PRIVATE MAX_THREADS=${OMNITRACE_MAX_THREADS})
target_link_libraries(thread-limit PRIVATE Threads::Threads tests-compile-options)
set(_thread_limit_environment
"${_base_environment}" "OMNITRACE_USE_PERFETTO=ON" "OMNITRACE_USE_TIMEMORY=ON"
"OMNITRACE_COUT_OUTPUT=ON" "OMNITRACE_USE_SAMPLING=ON" "OMNITRACE_SAMPLING_FREQ=250"
"OMNITRACE_VERBOSE=2" "OMNITRACE_TIMEMORY_COMPONENTS=wall_clock,peak_rss,page_rss")
math(EXPR THREAD_LIMIT_TEST_VALUE "${OMNITRACE_MAX_THREADS} + 24")
math(EXPR THREAD_LIMIT_TEST_VALUE_PLUS_ONE "${THREAD_LIMIT_TEST_VALUE} + 1")
set(_thread_limit_pass_regex "\\|${THREAD_LIMIT_TEST_VALUE}>>>")
set(_thread_limit_fail_regex
"\\|${THREAD_LIMIT_TEST_VALUE_PLUS_ONE}>>>|OMNITRACE_ABORT_FAIL_REGEX")
omnitrace_add_test(
SKIP_BASELINE
NAME thread-limit
TARGET thread-limit
LABELS "max-threads"
REWRITE_ARGS -e -v 2 -i 1024 --label return args
RUNTIME_ARGS -e -v 1 -i 1024 --label return args
RUN_ARGS 35 2 ${THREAD_LIMIT_TEST_VALUE}
REWRITE_TIMEOUT 180
RUNTIME_TIMEOUT 360
RUNTIME_PASS_REGEX "${_thread_limit_pass_regex}"
SAMPLING_PASS_REGEX "${_thread_limit_pass_regex}"
REWRITE_RUN_PASS_REGEX "${_thread_limit_pass_regex}"
RUNTIME_FAIL_REGEX "${_thread_limit_fail_regex}"
SAMPLING_FAIL_REGEX "${_thread_limit_fail_regex}"
REWRITE_RUN_FAIL_REGEX "${_thread_limit_fail_regex}"
ENVIRONMENT "${_thread_limit_environment}")
@@ -0,0 +1,86 @@
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <pthread.h>
#include <random>
#include <ratio>
#include <string>
#include <thread>
#include <vector>
long
fib(long n)
{
return (n < 2) ? n : fib(n - 1) + fib(n - 2);
}
#if !defined(MAX_THREADS)
# define MAX_THREADS 4000
#endif
auto total_duration = std::chrono::duration<long, std::nano>{};
int
main(int argc, char** argv)
{
std::string _name = argv[0];
auto _pos = _name.find_last_of('/');
if(_pos != std::string::npos) _name = _name.substr(_pos + 1);
size_t nthread = 2 * MAX_THREADS;
size_t concurrency = std::thread::hardware_concurrency();
long nfib = 35;
if(argc > 1) nfib = atol(argv[1]);
if(argc > 2) concurrency = atol(argv[2]);
if(argc > 3) nthread = atol(argv[3]);
printf("\n[%s] Threads: %zu\n[%s] concurrency: %zu\n[%s] fibonacci(%li)\n",
_name.c_str(), nthread, _name.c_str(), concurrency, _name.c_str(), nfib);
auto threads = std::vector<std::thread>{};
auto _sync = [_name, &threads]() {
std::this_thread::yield();
for(auto& itr : threads)
itr.join();
threads.clear();
};
threads.reserve(concurrency);
for(size_t i = 0; i < nthread; ++i)
{
if(i > MAX_THREADS - 8)
{
printf("[%s] launching thread %zu (max: %d)...\n", _name.c_str(), i,
MAX_THREADS);
fflush(stdout);
}
threads.emplace_back(
[](auto n) {
auto t0 = std::chrono::steady_clock::now();
n = fib(n);
(void) n;
auto diff = (std::chrono::steady_clock::now() - t0);
static auto _mutex = std::mutex{};
_mutex.lock();
total_duration += diff;
_mutex.unlock();
},
nfib);
if(i % concurrency == (concurrency - 1)) _sync();
}
_sync();
printf("[%s] ... completed with an average of %.3f msec per thread\n", _name.c_str(),
std::chrono::duration_cast<std::chrono::milliseconds>(total_duration).count() *
(1.0 / nthread));
return 0;
}