From e7d45624d0a1f7aaa88db88c2f291a96ae8eb04d Mon Sep 17 00:00:00 2001 From: "Kuricheti, Mythreya" Date: Thu, 5 Dec 2024 17:58:38 -0800 Subject: [PATCH] Fix page-migration background thread on fork (#31) * Fix page-migration background thread on fork After falling off main in the forked child, all the children try to join on on the parent's monitoring thread. This results in a deadlock. Parent is waiting for the child to exit, but the child is trying to join the parent's thread which is signaled from the parent's static destructors. Even with just one parent and child, due to copy-on-write semantics, a child signalling the background thread to join will still block (thread's updated state is not visible in the child). This fix creates background treads on fork per-child with a pthread_atfork handler, ensuring that each child has its own monitoring thread. * Formatting fixes * Detach page-migration background thread and update test timeout * Attach files with ctest * Update corr-id assert * Tweak on-fork, simplify background thread * Revert thread detach --- .../page_migration/page_migration.cpp | 144 +++++++++--------- tests/page-migration/CMakeLists.txt | 10 +- tests/page-migration/validate.py | 19 ++- 3 files changed, 95 insertions(+), 78 deletions(-) diff --git a/source/lib/rocprofiler-sdk/page_migration/page_migration.cpp b/source/lib/rocprofiler-sdk/page_migration/page_migration.cpp index 0afb670d13..6bd5c29381 100644 --- a/source/lib/rocprofiler-sdk/page_migration/page_migration.cpp +++ b/source/lib/rocprofiler-sdk/page_migration/page_migration.cpp @@ -539,11 +539,14 @@ handle_reporting(std::string_view event_data) } // namespace +namespace +{ +void poll_events(small_vector); +} + // KFD utils namespace kfd { -void poll_events(small_vector); - using fd_flags_t = decltype(EFD_NONBLOCK); using fd_t = decltype(pollfd::fd); constexpr auto KFD_DEVICE_PATH{"/dev/kfd"}; @@ -619,17 +622,18 @@ struct poll_kfd_t const rocprofiler_agent_t* agent = nullptr; }; - kfd_device_fd kfd_fd = {}; - small_vector file_handles = {}; - pollfd thread_notify = {}; - std::thread bg_thread = {}; - bool active = {false}; + kfd_device_fd kfd_fd = {}; + pollfd thread_notify = {}; + std::thread bg_thread = {}; + bool active = {false}; poll_kfd_t() = default; poll_kfd_t(const small_vector& rprof_ev) : kfd_fd{kfd_device_fd{}} { + small_vector file_handles = {}; + const auto kfd_flags = kfd_bitmask(rprof_ev, std::make_index_sequence{}); @@ -703,7 +707,24 @@ struct poll_kfd_t poll_kfd_t(poll_kfd_t&&) = default; poll_kfd_t& operator=(poll_kfd_t&&) = default; - ~poll_kfd_t(); + ~poll_kfd_t() + { + ROCP_TRACE << fmt::format("Terminating poll_kfd\n"); + if(!active) return; + + // wake thread up + auto bytes_written{-1}; + do + { + bytes_written = write(thread_notify.fd, "E", 1); + } while(bytes_written == -1 && (errno == EINTR || errno == EAGAIN)); + + bg_thread.join(); + + close(thread_notify.fd); + + ROCP_TRACE << fmt::format("Background thread signalled\n"); + } node_fd_t get_node_fd(int gpu_node_id) const { @@ -716,48 +737,36 @@ struct poll_kfd_t return args.anon_fd; } }; +} // namespace +} // namespace kfd // for all contexts -struct page_migration_config +struct config { - bool should_exit() const { return m_should_exit.load(); } - void set_exit(bool val) { m_should_exit.store(val); } - - uint64_t enabled_events = 0; - kfd::poll_kfd_t* kfd_handle = nullptr; - private: - std::atomic m_should_exit = false; + kfd::poll_kfd_t kfd_handle{}; + + static inline config* _config{nullptr}; + + config(const small_vector& _event_ids) + : kfd_handle{_event_ids} + {} + +public: + static void init(const small_vector& event_ids) { _config = new config{event_ids}; } + + static void reset() + { + config* ptr = nullptr; + std::swap(ptr, _config); + delete ptr; + } + + static void reset_on_fork() { _config = nullptr; } }; -page_migration_config& -get_config() +namespace { - static auto& state = *common::static_object::construct(); - return state; -} - -kfd::poll_kfd_t::~poll_kfd_t() -{ - ROCP_TRACE << fmt::format("Terminating poll_kfd\n"); - if(!active) return; - - // wake thread up - kfd::get_config().set_exit(true); - auto bytes_written{-1}; - do - { - bytes_written = write(thread_notify.fd, "E", 1); - } while(bytes_written == -1 && (errno == EINTR || errno == EAGAIN)); - - if(bg_thread.joinable()) bg_thread.join(); - ROCP_TRACE << fmt::format("Background thread terminated\n"); - - for(const auto& f : file_handles) - close(f.fd); -} -} // namespace - void poll_events(small_vector file_handles) { @@ -778,7 +787,7 @@ poll_events(small_vector file_handles) "Handle = {}, events = {}, revents = {}\n", fd.fd, fd.events, fd.revents); } - while(!kfd::get_config().should_exit()) + while(true) { auto poll_ret = poll(file_handles.data(), file_handles.size(), -1); @@ -787,6 +796,10 @@ poll_events(small_vector file_handles) if((exitfd.revents & POLLIN) != 0) { + for(const auto& f : file_handles) + { + close(f.fd); + } ROCP_INFO << "Terminating background thread\n"; return; } @@ -809,7 +822,6 @@ poll_events(small_vector file_handles) } } } -} // namespace kfd template const char* @@ -851,19 +863,17 @@ to_bitmask(small_vector& _id_list, std::index_sequence) (_emplace(_id_list, page_migration_info::operation), ...); } -namespace -{ -rocprofiler_status_t -init(const small_vector& event_ids) +template +rocprofiler_status_t init(std::index_sequence) { + static const small_vector event_ids{Inxs...}; // Check if version is more than 1.11 auto ver = kfd::get_version(); if(ver.major_version * 1000 + ver.minor_version > 1011) { if(!context::get_registered_contexts(context_filter).empty()) { - if(!kfd::get_config().kfd_handle) - kfd::get_config().kfd_handle = new kfd::poll_kfd_t{event_ids}; + config::init(event_ids); } return ROCPROFILER_STATUS_SUCCESS; } @@ -879,31 +889,28 @@ init(const small_vector& event_ids) } } // namespace +} // namespace page_migration +} // namespace rocprofiler + +namespace rocprofiler::page_migration +{ rocprofiler_status_t init() { - // Testing page migration - return init({ - ROCPROFILER_PAGE_MIGRATION_PAGE_MIGRATE_START, - ROCPROFILER_PAGE_MIGRATION_PAGE_MIGRATE_END, - ROCPROFILER_PAGE_MIGRATION_PAGE_FAULT_START, - ROCPROFILER_PAGE_MIGRATION_PAGE_FAULT_END, - ROCPROFILER_PAGE_MIGRATION_QUEUE_EVICTION, - ROCPROFILER_PAGE_MIGRATION_QUEUE_RESTORE, - ROCPROFILER_PAGE_MIGRATION_UNMAP_FROM_GPU, - ROCPROFILER_PAGE_MIGRATION_DROPPED_EVENT, + pthread_atfork(nullptr, nullptr, []() { + // null out child's copy on fork and reinitialize + // otherwise all children wait on the same thread to join + config::reset_on_fork(); + init(std::make_index_sequence{}); }); + + return init(std::make_index_sequence{}); } void finalize() { - if(kfd::get_config().kfd_handle) - { - kfd::poll_kfd_t* _handle = nullptr; - std::swap(kfd::get_config().kfd_handle, _handle); - delete _handle; - } + config::reset(); } const char* @@ -920,5 +927,4 @@ get_ids() get_ids(_data, std::make_index_sequence{}); return _data; } -} // namespace page_migration -} // namespace rocprofiler +} // namespace rocprofiler::page_migration diff --git a/tests/page-migration/CMakeLists.txt b/tests/page-migration/CMakeLists.txt index 8f0fd6d9f7..8516d0272b 100644 --- a/tests/page-migration/CMakeLists.txt +++ b/tests/page-migration/CMakeLists.txt @@ -28,7 +28,7 @@ set(page-migration-env set_tests_properties( test-page-migration-execute PROPERTIES TIMEOUT - 45 + 60 LABELS "integration-tests" ENVIRONMENT @@ -38,7 +38,9 @@ set_tests_properties( SKIP_REGULAR_EXPRESSION "KFD does not support SVM event reporting" WORKING_DIRECTORY - ${CMAKE_CURRENT_BINARY_DIR}) + ${CMAKE_CURRENT_BINARY_DIR} + ATTACHED_FILES + ${CMAKE_CURRENT_BINARY_DIR}/page-migration-test.json) # copy to binary directory rocprofiler_configure_pytest_files(COPY validate.py conftest.py CONFIG pytest.ini) @@ -60,4 +62,6 @@ set_tests_properties( SKIP_REGULAR_EXPRESSION "KFD does not support SVM event reporting" WORKING_DIRECTORY - ${CMAKE_CURRENT_BINARY_DIR}) + ${CMAKE_CURRENT_BINARY_DIR} + ATTACHED_FILES + ${CMAKE_CURRENT_BINARY_DIR}/page-migration-test.json) diff --git a/tests/page-migration/validate.py b/tests/page-migration/validate.py index 4b8ca93742..4d859f693e 100644 --- a/tests/page-migration/validate.py +++ b/tests/page-migration/validate.py @@ -243,16 +243,23 @@ def test_retired_correlation_ids(input_data): api_corr_ids = _sort_dict(api_corr_ids) async_corr_ids = _sort_dict(async_corr_ids) retired_corr_ids = _sort_dict(retired_corr_ids) + missing_corr_ids = {} for cid, itr in async_corr_ids.items(): - assert cid in retired_corr_ids.keys() - ts = retired_corr_ids[cid]["timestamp"] - assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}" + if cid not in retired_corr_ids.keys(): + missing_corr_ids[cid] = itr + else: + ts = retired_corr_ids[cid]["timestamp"] + assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}" for cid, itr in api_corr_ids.items(): - assert cid in retired_corr_ids.keys() - ts = retired_corr_ids[cid]["timestamp"] - assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}" + if cid not in retired_corr_ids.keys(): + missing_corr_ids[cid] = itr + else: + ts = retired_corr_ids[cid]["timestamp"] + assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}" + + assert len(missing_corr_ids) == 0, f"{missing_corr_ids}" assert len(api_corr_ids.keys()) == (len(retired_corr_ids.keys()))