Update internal threading (#720)

- update lib/rocprofiler-sdk/internal_threading.*
- use PTL::TaskManager instead of PTL::TaskGroup
  - easier to handle for our needs
  - eliminate data race in rocprofiler_flush_buffer
  - combine memory management of TaskManager and ThreadPool

[ROCm/rocprofiler-sdk commit: 092c428b78]
Este commit está contenido en:
Jonathan R. Madsen
2024-04-01 20:31:54 -05:00
cometido por GitHub
padre e2dfd3bbc9
commit f2c193a7e6
Se han modificado 2 ficheros con 100 adiciones y 48 borrados
@@ -34,39 +34,85 @@
#include "lib/rocprofiler-sdk/registration.hpp"
#include <glog/logging.h>
#include <pthread.h>
#include <cstdint>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
namespace rocprofiler
{
namespace internal_threading
{
using thread_pool_vec_t = std::vector<std::shared_ptr<thread_pool_t>>;
// Note: task_group maintains a shared_ptr copy to thread_pool to ensure it is not destroyed
// before the task can be sync'd.
using task_group_vec_t = std::vector<allocator::unique_static_ptr_t<task_group_t>>;
namespace
{
using task_group_vec_t = std::vector<task_group_t*>;
using thread_pool_config_t = PTL::ThreadPool::Config;
TaskGroup::TaskGroup(std::shared_ptr<thread_pool_t> pool)
: parent_type{static_cast<PTL::ThreadPool*>(pool.get())}
, m_pool{std::move(pool)}
auto affinity_functor(intmax_t)
{
static auto assigned = std::atomic<intmax_t>{0};
intmax_t _assign = assigned++;
return _assign % std::thread::hardware_concurrency();
}
auto
get_thread_pool_config()
{
return thread_pool_config_t{.init = true,
.use_tbb = false,
.use_affinity = false,
.verbose = 0,
.priority = 0,
.pool_size = 1,
.task_queue = nullptr,
.set_affinity = affinity_functor,
.initializer = []() {},
.finalizer = []() {}};
}
} // namespace
TaskGroup::TaskGroup()
: parent_type{new thread_pool_t{get_thread_pool_config()}, false}
, m_pool{parent_type::thread_pool()}
{}
ThreadPool::ThreadPool(const parent_type::Config& cfg)
: parent_type{cfg}
{}
TaskGroup::~TaskGroup()
{
m_pool->destroy_threadpool();
delete m_pool;
}
ThreadPool::~ThreadPool() { parent_type::destroy_threadpool(); }
void
TaskGroup::exec(std::function<void()>&& _func)
{
auto lk = std::unique_lock<std::mutex>{m_mutex};
m_tasks.emplace_back(parent_type::async(std::move(_func)));
}
void
TaskGroup::wait()
{
auto lk = std::unique_lock<std::mutex>{m_mutex};
for(auto& itr : m_tasks)
itr->wait();
m_tasks.clear();
}
void
TaskGroup::join()
{
wait();
}
namespace
{
template <rocprofiler_runtime_library_t... Idx>
using library_sequence_t = std::integer_sequence<rocprofiler_runtime_library_t, Idx...>;
using creation_notifier_cb_t = void (*)(rocprofiler_runtime_library_t, void*);
using thread_pool_config_t = PTL::ThreadPool::Config;
// this is used to loop over the different libraries
constexpr auto creation_notifier_library_seq = library_sequence_t<ROCPROFILER_LIBRARY,
@@ -162,22 +208,26 @@ execute_creation_notifiers(rocprofiler_runtime_library_t libs,
(execute(get_creation_notifier<Idx>()), ...);
}
// using thread_pool_vec_t = std::vector<std::unique_ptr<thread_pool_t>>;
// using task_group_vec_t = std::vector<std::unique_ptr<task_group_t>>;
auto*&
get_thread_pools()
{
static auto* _v = common::static_object<thread_pool_vec_t>::construct();
return _v;
}
auto*&
get_task_groups()
{
static auto* _v = new task_group_vec_t{};
return _v;
}
void
create_forked_callback_threads()
{
if(get_task_groups())
{
for(auto& itr : *get_task_groups())
{
notify_pre_internal_thread_create(ROCPROFILER_LIBRARY);
itr = new task_group_t{};
notify_post_internal_thread_create(ROCPROFILER_LIBRARY);
}
}
}
} // namespace
// initialize the default thread pool
@@ -190,7 +240,9 @@ initialize()
// registration or else the static objects it is pointing to
// will be destroyed before finalize is invoked.
create_callback_thread();
atexit(&registration::finalize);
::atexit(&registration::finalize);
// ensure the callback threads are created on the forked process
::pthread_atfork(nullptr, nullptr, create_forked_callback_threads);
});
}
@@ -203,6 +255,8 @@ finalize()
{
for(auto& itr : *get_task_groups())
itr->join();
for(auto& itr : *get_task_groups())
delete itr;
get_task_groups()->clear();
delete get_task_groups();
get_task_groups() = nullptr;
@@ -228,18 +282,10 @@ create_callback_thread()
notify_pre_internal_thread_create(ROCPROFILER_LIBRARY);
// this will be index after emplace_back
auto idx = CHECK_NOTNULL(get_thread_pools())->size();
thread_pool_config_t pool_config = {};
pool_config.pool_size = 1;
auto& thr_pool = CHECK_NOTNULL(get_thread_pools())
->emplace_back(std::make_shared<thread_pool_t>(pool_config));
if(!get_task_groups()) get_task_groups() = new task_group_vec_t{};
auto idx = CHECK_NOTNULL(get_task_groups())->size();
// construct the task group to use the newly created thread pool
get_task_groups()->emplace_back(allocator::make_unique_static<task_group_t>(thr_pool));
get_task_groups()->emplace_back(new task_group_t{});
// notify that rocprofiler library finished creating an internal thread
notify_post_internal_thread_create(ROCPROFILER_LIBRARY);
@@ -252,7 +298,7 @@ task_group_t*
get_task_group(rocprofiler_callback_thread_t cb_tid)
{
if(!get_task_groups() || get_task_groups()->empty()) return nullptr;
return get_task_groups()->at(cb_tid.handle).get();
return get_task_groups()->at(cb_tid.handle);
}
} // namespace internal_threading
} // namespace rocprofiler
@@ -29,10 +29,12 @@
#include "lib/common/utility.hpp"
#include "lib/rocprofiler-sdk/allocator.hpp"
#include <PTL/TaskGroup.hh>
#include <PTL/TaskManager.hh>
#include <PTL/ThreadPool.hh>
#include <cstdint>
#include <functional>
#include <mutex>
#include <string>
#include <vector>
@@ -40,28 +42,32 @@ namespace rocprofiler
{
namespace internal_threading
{
class ThreadPool : public PTL::ThreadPool
class TaskGroup : private PTL::TaskManager
{
public:
using parent_type = PTL::ThreadPool;
using thread_pool_t = PTL::ThreadPool;
using parent_type = PTL::TaskManager;
using task_type = PTL::PackagedTask<void>;
ThreadPool(const parent_type::Config&);
~ThreadPool();
};
TaskGroup();
~TaskGroup() override;
class TaskGroup : public PTL::TaskGroup<void>
{
public:
using parent_type = PTL::TaskGroup<void>;
TaskGroup(const TaskGroup&) = delete;
TaskGroup(TaskGroup&&) noexcept = delete;
TaskGroup& operator=(const TaskGroup&) = delete;
TaskGroup& operator=(TaskGroup&&) noexcept = delete;
TaskGroup(std::shared_ptr<ThreadPool>);
void exec(std::function<void()>&&);
void wait();
void join();
private:
std::shared_ptr<ThreadPool> m_pool = {};
std::mutex m_mutex = {};
thread_pool_t* m_pool = nullptr;
std::deque<std::shared_ptr<task_type>> m_tasks = {};
};
using thread_pool_t = ThreadPool;
using task_group_t = TaskGroup;
using task_group_t = TaskGroup;
void notify_pre_internal_thread_create(rocprofiler_runtime_library_t);
void notify_post_internal_thread_create(rocprofiler_runtime_library_t);