Files
rocm-systems/examples/fork/fork.cpp
T
Jonathan R. Madsen 518c83e0f9 Dynamic expansion of thread data (#294)
* Tests for exceeding OMNITRACE_MAX_THREADS

- tests which exceeds OMNITRACE_MAX_THREADS value for thread creation

* CMake Formatting.cmake update

- include source files in /tests/source directory

* Add unknown-hash= to OMNITRACE_ABORT_FAIL_REGEX

- fail if a timemory hash is not resolved to a name

* Tests for exceeding OMNITRACE_MAX_THREADS

- update

* omnitrace-sample update

- remove env disabling of critical-trace and process-sampling

* core library update

- make_unique in concepts.hpp
- add OMNITRACE_USE_ROCM_SMI to "process_sampling" category
- remove forced disabling of critical-trace in sampling mode
- parentheses for OMNITRACE_PREFER
- use tim::get_hash_id instead of tim::get_combined_hash_id

* core library update (containers)

- added aligned_static_vector.hpp
  - similar to static_vector.hpp but attempts to align to cache line size
- alignment template parameter for stable_vector
- added missing aliases in static_vector
  - consistent with aligned_static_vector aliases

* thread_info update

- track the peak number of threads created
- thread_info::get_peak_num_threads() returns the peak number of threads

* thread_data update

- generic thread_data inherits from base_thread_data
- thread_data reworked to support dynamic expansion
- base_thread_data updated to invoke private_instance() function
- thread_data<optional<T>> uses stable_vector aligned to cache line width
- thread_data<identity<T>> uses stable_vector aligned to cache line width
- thread_data for optional and identity provide private private_instance function + friend to base_thread_data
- component_bundle_cache<T> is now thread_data<component_bundle_cache_impl<T>>

* causal update

- thread_data<T>::instances -> thread_data<T>::instance(construct_on_thread{ ... })
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()
- tim::get_combined_hash_id -> tim::get_hash_id
- update progress_bundle usage to new thread_data API

* backtrace/backtrace_metrics component update

- backtrace_metrics update
  - update to new thead_data API
  - add thread CPU time row in perfetto
  - fix potential bug when rusage categories are disabled
  - fix bug in operator-= not subtracting cpu time of rhs
- backtrace update
  - skip all child call-stack below 'tim::openmp::' if sampling_keep_internal = false

* pthread_gotcha component update

- pthread_gotcha::shutdown() invokes pthread_create_gotcha::shutdown()

* pthread_create_gotcha component update

- minor tweak to {start,stop}_bundle functions: pass in thread id
- update to new thread_data API
- track native handles of internal threads
- implement system with pthread_kill to stop dangling bundles

* rocprofiler/roctracer component update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* critical trace (library) update

- update to new thread_data API
- tim::get_combined_hash_id -> tim::get_hash_id

* coverage update

- update to new thread_data API

* tasking update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* roctracer update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* rocm_smi update

- update to new thread_data API

* runtime.cpp update

- update to new thread_data API

* sampling.cpp update

- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* ompt.cpp update

- invoke pthread_gotcha::shutdown before invoking OMPT finalize function
  - this prevents signals from being delivered to OpenMP threads

* tracing.hpp and tracing.cpp update

- replace get_timemory_hash_{ids,aliases} functions with copy_timemory_hash_ids function
- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()
- tim::get_combined_hash_id -> tim::get_hash_id
- improvements to + error checking in thread_init function

* library.cpp update

- move copying timemory hash id/aliases to tracing.cpp
- update to new thread_data API
- loop over max_supported_threads (constexpr) -> loop over thread_info::get_peak_num_threads()

* Update BuildSettings.cmake

- add -Wno-interference-size to suppress warning about use of std::hardware_destructive_interference

* Update fork example

- improve scheme for waiting on child processes via waitpid instead of wait
- support running main routine multiple times
- push/pop regions in child process

* Update lib/common/defines.h.in

- allow use to specify misc values via -D <name>=<value>
  - OMNITRACE_CACHELINE_SIZE
  - OMNITRACE_CACHELINE_SIZE_MIN
  - OMNITRACE_ROCM_MAX_COUNTERS
- remove unused defines
  - OMNITRACE_ROCM_LOOK_AHEAD
  - OMNITRACE_MAX_ROCM_QUEUES

* Update rocprofiler.hpp

- OMNITRACE_MAX_ROCM_COUNTERS -> OMNITRACE_ROCM_MAX_COUNTERS

* Update aligned_static_vector

- set cacheline_align_v from max of OMNITRACE_CACHELINE_SIZE and OMNITRACE_CACHELINE_SIZE_MIN

* Update tracing.cpp

- acquire locks for updating main hash ids/aliases
- only propagate ids/aliases when finalizing

* Update pthread_create_gotcha.cpp

- make sure hash for "start_thread" exists on main thread

* Update causal end to end tests

- if OMNITRACE_BUILD_NUMBER is 1, set OMNITRACE_VERBOSE=0
2023-10-16 18:04:47 -05:00

137 lines
3.9 KiB
C++

#include <omnitrace/user.h>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <pthread.h>
#include <set>
#include <string>
#include <sys/wait.h>
#include <thread>
#include <unistd.h>
#include <vector>
void
print_info(const char* _name)
{
fflush(stdout);
fflush(stderr);
printf("[%s] pid = %i, ppid = %i\n", _name, getpid(), getppid());
fflush(stdout);
fflush(stderr);
}
int
run(const char* _name, int nchildren)
{
auto _barrier = pthread_barrier_t{};
auto _threads = std::vector<std::thread>{};
auto _children = std::vector<pid_t>{};
_children.resize(nchildren, 0);
pthread_barrier_init(&_barrier, nullptr, nchildren + 1);
for(int i = 0; i < nchildren; ++i)
{
omnitrace_user_push_region("launch_child");
auto _run = [&_barrier, &_children, i, _name](uint64_t _nsec) {
pthread_barrier_wait(&_barrier);
_children.at(i) = fork();
if(_children.at(i) == 0)
{
// child code
print_info(_name);
printf("[%s][%i] child job starting...\n", _name, getpid());
auto _sleep = [=]() {
omnitrace_user_push_region("child_process_child_thread");
std::this_thread::sleep_for(std::chrono::seconds{ _nsec });
omnitrace_user_pop_region("child_process_child_thread");
};
omnitrace_user_push_region("child_process");
std::thread{ _sleep }.join();
omnitrace_user_push_region("child_process");
printf("[%s][%i] child job complete\n", _name, getpid());
exit(EXIT_SUCCESS);
}
else
{
pthread_barrier_wait(&_barrier);
}
};
_threads.emplace_back(_run, i + 1);
omnitrace_user_pop_region("launch_child");
}
// all child threads should start executing their fork once this returns
pthread_barrier_wait(&_barrier);
// wait for the threads to successfully fork
pthread_barrier_wait(&_barrier);
omnitrace_user_push_region("wait_for_children");
int _status = 0;
pid_t _wait_pid = 0;
// parent waits for all the child processes
for(auto& itr : _children)
{
while(itr == 0)
{}
printf("[%s][%i] performing waitpid(%i, ...)\n", _name, getpid(), itr);
while((_wait_pid = waitpid(itr, &_status, WUNTRACED | WNOHANG)) <= 0)
{
if(_wait_pid == 0) continue;
printf("[%s][%i] returned from waitpid(%i) with pid = %i (status = %i) :: ",
_name, getpid(), itr, _wait_pid, _status);
if(WIFEXITED(_status))
{
printf("exited, status=%d\n", WEXITSTATUS(_status));
}
else if(WIFSIGNALED(_status))
{
printf("killed by signal %d\n", WTERMSIG(_status));
}
else if(WIFSTOPPED(_status))
{
printf("stopped by signal %d\n", WSTOPSIG(_status));
}
else if(WIFCONTINUED(_status))
{
printf("continued\n");
}
else
{
printf("unknown\n");
}
}
}
printf("[%s][%i] joining threads ...\n", _name, getpid());
for(auto& itr : _threads)
itr.join();
omnitrace_user_pop_region("wait_for_children");
printf("[%s][%i] returning (error code: %i) ...\n", _name, getpid(), _status);
return _status;
}
int
main(int argc, char** argv)
{
int _nfork = 4;
int _nrep = 1;
if(argc > 1) _nfork = std::stoi(argv[1]);
if(argc > 2) _nrep = std::stoi(argv[2]);
print_info(argv[0]);
for(int i = 0; i < _nrep; ++i)
{
auto _ec = run(argv[0], _nfork);
if(_ec != 0) return _ec;
}
printf("[%s][%i] job complete\n", argv[0], getpid());
return EXIT_SUCCESS;
}