From f2c193a7e60fdea2d15c99e224adfcd6b4d13e1a Mon Sep 17 00:00:00 2001 From: "Jonathan R. Madsen" Date: Mon, 1 Apr 2024 20:31:54 -0500 Subject: [PATCH] Update internal threading (#720) - update lib/rocprofiler-sdk/internal_threading.* - use PTL::TaskManager instead of PTL::TaskGroup - easier to handle for our needs - eliminate data race in rocprofiler_flush_buffer - combine memory management of TaskManager and ThreadPool [ROCm/rocprofiler-sdk commit: 092c428b785215030c1a3fe264e8ad6b8d06a865] --- .../rocprofiler-sdk/internal_threading.cpp | 114 ++++++++++++------ .../rocprofiler-sdk/internal_threading.hpp | 34 +++--- 2 files changed, 100 insertions(+), 48 deletions(-) diff --git a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.cpp b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.cpp index effef6e926..b1492ec846 100644 --- a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.cpp +++ b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.cpp @@ -34,39 +34,85 @@ #include "lib/rocprofiler-sdk/registration.hpp" #include +#include #include #include #include #include +#include #include namespace rocprofiler { namespace internal_threading { -using thread_pool_vec_t = std::vector>; -// Note: task_group maintains a shared_ptr copy to thread_pool to ensure it is not destroyed -// before the task can be sync'd. -using task_group_vec_t = std::vector>; +namespace +{ +using task_group_vec_t = std::vector; +using thread_pool_config_t = PTL::ThreadPool::Config; -TaskGroup::TaskGroup(std::shared_ptr pool) -: parent_type{static_cast(pool.get())} -, m_pool{std::move(pool)} +auto affinity_functor(intmax_t) +{ + static auto assigned = std::atomic{0}; + intmax_t _assign = assigned++; + return _assign % std::thread::hardware_concurrency(); +} + +auto +get_thread_pool_config() +{ + return thread_pool_config_t{.init = true, + .use_tbb = false, + .use_affinity = false, + .verbose = 0, + .priority = 0, + .pool_size = 1, + .task_queue = nullptr, + .set_affinity = affinity_functor, + .initializer = []() {}, + .finalizer = []() {}}; +} +} // namespace + +TaskGroup::TaskGroup() +: parent_type{new thread_pool_t{get_thread_pool_config()}, false} +, m_pool{parent_type::thread_pool()} {} -ThreadPool::ThreadPool(const parent_type::Config& cfg) -: parent_type{cfg} -{} +TaskGroup::~TaskGroup() +{ + m_pool->destroy_threadpool(); + delete m_pool; +} -ThreadPool::~ThreadPool() { parent_type::destroy_threadpool(); } +void +TaskGroup::exec(std::function&& _func) +{ + auto lk = std::unique_lock{m_mutex}; + m_tasks.emplace_back(parent_type::async(std::move(_func))); +} + +void +TaskGroup::wait() +{ + auto lk = std::unique_lock{m_mutex}; + for(auto& itr : m_tasks) + itr->wait(); + m_tasks.clear(); +} + +void +TaskGroup::join() +{ + wait(); +} namespace { template using library_sequence_t = std::integer_sequence; using creation_notifier_cb_t = void (*)(rocprofiler_runtime_library_t, void*); -using thread_pool_config_t = PTL::ThreadPool::Config; // this is used to loop over the different libraries constexpr auto creation_notifier_library_seq = library_sequence_t()), ...); } -// using thread_pool_vec_t = std::vector>; -// using task_group_vec_t = std::vector>; - -auto*& -get_thread_pools() -{ - static auto* _v = common::static_object::construct(); - return _v; -} - auto*& get_task_groups() { static auto* _v = new task_group_vec_t{}; return _v; } + +void +create_forked_callback_threads() +{ + if(get_task_groups()) + { + for(auto& itr : *get_task_groups()) + { + notify_pre_internal_thread_create(ROCPROFILER_LIBRARY); + itr = new task_group_t{}; + notify_post_internal_thread_create(ROCPROFILER_LIBRARY); + } + } +} } // namespace // initialize the default thread pool @@ -190,7 +240,9 @@ initialize() // registration or else the static objects it is pointing to // will be destroyed before finalize is invoked. create_callback_thread(); - atexit(®istration::finalize); + ::atexit(®istration::finalize); + // ensure the callback threads are created on the forked process + ::pthread_atfork(nullptr, nullptr, create_forked_callback_threads); }); } @@ -203,6 +255,8 @@ finalize() { for(auto& itr : *get_task_groups()) itr->join(); + for(auto& itr : *get_task_groups()) + delete itr; get_task_groups()->clear(); delete get_task_groups(); get_task_groups() = nullptr; @@ -228,18 +282,10 @@ create_callback_thread() notify_pre_internal_thread_create(ROCPROFILER_LIBRARY); // this will be index after emplace_back - auto idx = CHECK_NOTNULL(get_thread_pools())->size(); - - thread_pool_config_t pool_config = {}; - pool_config.pool_size = 1; - - auto& thr_pool = CHECK_NOTNULL(get_thread_pools()) - ->emplace_back(std::make_shared(pool_config)); - - if(!get_task_groups()) get_task_groups() = new task_group_vec_t{}; + auto idx = CHECK_NOTNULL(get_task_groups())->size(); // construct the task group to use the newly created thread pool - get_task_groups()->emplace_back(allocator::make_unique_static(thr_pool)); + get_task_groups()->emplace_back(new task_group_t{}); // notify that rocprofiler library finished creating an internal thread notify_post_internal_thread_create(ROCPROFILER_LIBRARY); @@ -252,7 +298,7 @@ task_group_t* get_task_group(rocprofiler_callback_thread_t cb_tid) { if(!get_task_groups() || get_task_groups()->empty()) return nullptr; - return get_task_groups()->at(cb_tid.handle).get(); + return get_task_groups()->at(cb_tid.handle); } } // namespace internal_threading } // namespace rocprofiler diff --git a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.hpp b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.hpp index ab0fad3f1e..bfc98776cc 100644 --- a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.hpp +++ b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/internal_threading.hpp @@ -29,10 +29,12 @@ #include "lib/common/utility.hpp" #include "lib/rocprofiler-sdk/allocator.hpp" -#include +#include #include #include +#include +#include #include #include @@ -40,28 +42,32 @@ namespace rocprofiler { namespace internal_threading { -class ThreadPool : public PTL::ThreadPool +class TaskGroup : private PTL::TaskManager { public: - using parent_type = PTL::ThreadPool; + using thread_pool_t = PTL::ThreadPool; + using parent_type = PTL::TaskManager; + using task_type = PTL::PackagedTask; - ThreadPool(const parent_type::Config&); - ~ThreadPool(); -}; + TaskGroup(); + ~TaskGroup() override; -class TaskGroup : public PTL::TaskGroup -{ -public: - using parent_type = PTL::TaskGroup; + TaskGroup(const TaskGroup&) = delete; + TaskGroup(TaskGroup&&) noexcept = delete; + TaskGroup& operator=(const TaskGroup&) = delete; + TaskGroup& operator=(TaskGroup&&) noexcept = delete; - TaskGroup(std::shared_ptr); + void exec(std::function&&); + void wait(); + void join(); private: - std::shared_ptr m_pool = {}; + std::mutex m_mutex = {}; + thread_pool_t* m_pool = nullptr; + std::deque> m_tasks = {}; }; -using thread_pool_t = ThreadPool; -using task_group_t = TaskGroup; +using task_group_t = TaskGroup; void notify_pre_internal_thread_create(rocprofiler_runtime_library_t); void notify_post_internal_thread_create(rocprofiler_runtime_library_t);