diff --git a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.cpp b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.cpp index d79fb28cb6..31575c87a9 100644 --- a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.cpp +++ b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.cpp @@ -161,19 +161,26 @@ flush(rocprofiler_buffer_id_t buffer_id, bool wait) if(wait) task_group->wait(); + auto idx = buff->buffer_idx++; + idx %= buff->buffers.size(); + + ROCP_INFO << fmt::format("executing buffer flush [id={}, index={}]...", buffer_id.handle, idx); + // buffer is currently being flushed or destroyed - if(buff->syncer.test_and_set()) + if(buff->syncer.at(idx).test_and_set()) { + ROCP_INFO << fmt::format( + "waiting for buffer flush to complete [id={}, index={}]...", buffer_id.handle, idx); if(!wait) return ROCPROFILER_STATUS_ERROR_BUFFER_BUSY; - while(buff->syncer.test_and_set()) + while(buff->syncer.at(idx).test_and_set()) { + ROCP_TRACE << fmt::format( + "waiting for buffer flush to complete [id={}, index={}]...", buffer_id.handle, idx); std::this_thread::yield(); - std::this_thread::sleep_for(std::chrono::microseconds{10}); + std::this_thread::sleep_for(buff->sync_wait_usec); } } - auto idx = buff->buffer_idx++; - auto _task = [buffer_id, idx, offset]() { ROCP_ERROR_IF(registration::get_fini_status() > 0) << "executing buffer (" << buffer_id.handle << ") flush task finalization!"; @@ -248,7 +255,9 @@ flush(rocprofiler_buffer_id_t buffer_id, bool wait) ROCP_INFO << "buffer at " << buffer_id.handle << " is empty..."; } - buff_v->syncer.clear(); + ROCP_INFO << fmt::format( + "completed buffer flush [id={}, index={}]...", buffer_id.handle, idx); + buff_v->syncer.at(idx).clear(); }; task_group->exec(std::move(_task)); @@ -331,12 +340,17 @@ rocprofiler_destroy_buffer(rocprofiler_buffer_id_t buffer_id) if(!buff) return ROCPROFILER_STATUS_ERROR_BUFFER_NOT_FOUND; // buffer is currently being flushed or destroyed - if(buff->syncer.test_and_set()) return ROCPROFILER_STATUS_ERROR_BUFFER_BUSY; + for(auto& itr : buff->syncer) + { + if(itr.test_and_set()) return ROCPROFILER_STATUS_ERROR_BUFFER_BUSY; + } for(auto& itr : buff->buffers) itr.reset(); - buff->syncer.clear(); + for(auto& itr : buff->syncer) + itr.clear(); + buff.reset(); return ROCPROFILER_STATUS_SUCCESS; diff --git a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.hpp b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.hpp index 95e10f7668..549e3ead2a 100644 --- a/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.hpp +++ b/projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/buffer.hpp @@ -29,6 +29,9 @@ #include "lib/common/container/stable_vector.hpp" #include "lib/common/demangle.hpp" +#include + +#include #include #include #include @@ -40,19 +43,21 @@ namespace buffer { struct instance { - using buffer_t = common::container::record_header_buffer; + using buffer_t = common::container::record_header_buffer; + static constexpr auto size = 2; // double buffering + static constexpr auto sync_wait_usec = std::chrono::microseconds{10}; - mutable std::array buffers = {}; - mutable std::atomic_flag syncer = ATOMIC_FLAG_INIT; // writer and reader lock. - mutable std::atomic buffer_idx = {}; // array index - mutable std::atomic drop_count = {}; - uint64_t watermark = 0; - uint64_t context_id = 0; // rocprofiler_context_id_t value - uint64_t buffer_id = 0; // rocprofiler_buffer_id_t value - uint64_t task_group_id = 0; // thread-pool assignment - rocprofiler_buffer_tracing_cb_t callback = nullptr; - void* callback_data = nullptr; - rocprofiler_buffer_policy_t policy = ROCPROFILER_BUFFER_POLICY_NONE; + mutable std::array buffers = {}; + mutable std::array syncer = {false, false}; // r/w lock + mutable std::atomic buffer_idx = {}; // array index + mutable std::atomic drop_count = {}; + uint64_t watermark = 0; + uint64_t context_id = 0; // rocprofiler_context_id_t value + uint64_t buffer_id = 0; // rocprofiler_buffer_id_t value + uint64_t task_group_id = 0; // thread-pool assignment + rocprofiler_buffer_tracing_cb_t callback = nullptr; + void* callback_data = nullptr; + rocprofiler_buffer_policy_t policy = ROCPROFILER_BUFFER_POLICY_NONE; template bool emplace(uint32_t, uint32_t, Tp&); @@ -115,26 +120,51 @@ template inline bool rocprofiler::buffer::instance::emplace(uint32_t category, uint32_t kind, Tp& value) { + struct local_sync + { + local_sync(uint64_t buffer_id, uint64_t idx, std::atomic_flag& flag) + : m_flag{flag} + { + if(m_flag.test_and_set()) + { + ROCP_INFO << fmt::format( + "waiting for buffer flush to complete [id={}, index={}]...", buffer_id, idx); + while(m_flag.test_and_set()) + { + ROCP_TRACE << fmt::format( + "waiting for buffer flush to complete [id={}, index={}]...", + buffer_id, + idx); + std::this_thread::yield(); + std::this_thread::sleep_for(sync_wait_usec); + } + } + } + + ~local_sync() { m_flag.clear(); } + + std::atomic_flag& m_flag; + }; + // get the index of the current buffer auto get_idx = [this]() { return buffer_idx.load(std::memory_order_acquire) % buffers.size(); }; - - while(syncer.test_and_set()) - { - std::this_thread::yield(); - std::this_thread::sleep_for(std::chrono::microseconds{10}); - } auto idx = get_idx(); - auto success = buffers.at(idx).emplace(category, kind, value); - syncer.clear(); + auto success = false; + { + auto _syncer = local_sync{buffer_id, idx, syncer.at(idx)}; // ensure buffer not flushing + success = buffers.at(idx).emplace(category, kind, value); + } + if(!success) { if(buffers.at(idx).capacity() < sizeof(value)) { - ROCP_CI_LOG(ERROR) << "buffer " << buffer_id - << " too small (size=" << buffers.at(idx).capacity() - << ") to hold an object of type " - << common::cxx_demangle(typeid(value).name()) << " with size " - << sizeof(value); + ROCP_CI_LOG(ERROR) << fmt::format( + "buffer {} too small (size={}) to hold an object of type {} with size {}", + buffer_id, + buffers.at(idx).capacity(), + common::cxx_demangle(typeid(value).name()), + sizeof(value)); return false; } @@ -144,14 +174,9 @@ rocprofiler::buffer::instance::emplace(uint32_t category, uint32_t kind, Tp& val do { buffer::flush(buffer_id, true); - while(syncer.test_and_set()) - { - std::this_thread::yield(); - std::this_thread::sleep_for(std::chrono::microseconds{10}); - } - idx = get_idx(); - success = buffers.at(idx).emplace(category, kind, value); - syncer.clear(); + idx = get_idx(); + auto _syncer = local_sync{buffer_id, idx, syncer.at(idx)}; // wait for flush + success = buffers.at(idx).emplace(category, kind, value); } while(!success); } else diff --git a/projects/rocprofiler-sdk/tests/pytest-packages/tests/rocprofv3.py b/projects/rocprofiler-sdk/tests/pytest-packages/tests/rocprofv3.py index 9cc1acfef3..2a6bacfecb 100644 --- a/projects/rocprofiler-sdk/tests/pytest-packages/tests/rocprofv3.py +++ b/projects/rocprofiler-sdk/tests/pytest-packages/tests/rocprofv3.py @@ -232,6 +232,23 @@ def test_rocpd_data( _js_data ), f"query: {_rpd_query}\n{rpd_category} ({len(_rpd_data)}):\n\t{_rpd_data}\n{js_category} ({len(_js_data)}):\n\t{_js_data}" + # if duplicate entries exist from double buffering synchronization issues, there will be duplicate start and end times + for itr in ["regions", "kernels", "memory_copies", "memory_allocations"]: + _num_rpd_tot = rocpd_data.execute(f"SELECT COUNT(*) FROM {itr}").fetchone()[0] + _num_rpd_start = rocpd_data.execute( + f"SELECT COUNT(DISTINCT(start)) FROM {itr}" + ).fetchone()[0] + _num_rpd_end = rocpd_data.execute( + f"SELECT COUNT(DISTINCT(end)) FROM {itr}" + ).fetchone()[0] + + assert _num_rpd_tot == _num_rpd_start == _num_rpd_end, ( + f"Duplicate records check failed for {itr}: total {itr}={_num_rpd_tot}, " + f"unique starts={_num_rpd_start}, unique ends={_num_rpd_end}. In rocprofv3, " + "this likely means the double buffering scheme updated a buffer with new " + "records while it was being processed in a buffer flush" + ) + def _perform_time_sanity_checks(data): """Helper function to perform time sanity checks on data.""" diff --git a/projects/rocprofiler-sdk/tests/rocprofv3/tracing/CMakeLists.txt b/projects/rocprofiler-sdk/tests/rocprofv3/tracing/CMakeLists.txt index 1a52de6fc8..ea13e20af4 100644 --- a/projects/rocprofiler-sdk/tests/rocprofv3/tracing/CMakeLists.txt +++ b/projects/rocprofiler-sdk/tests/rocprofv3/tracing/CMakeLists.txt @@ -57,7 +57,7 @@ rocprofiler_add_integration_validate_test( --rocpd-input ${CMAKE_CURRENT_BINARY_DIR}/simple-transpose-trace/cmdl-input/out_results.db TIMEOUT 45 - LABELS "integration-tests" + LABELS "integration-tests;rocpd" FIXTURES_REQUIRED rocprofv3-test-trace) rocprofiler_add_integration_execute_test( @@ -69,7 +69,7 @@ rocprofiler_add_integration_execute_test( $ DEPENDS simple-transpose TIMEOUT 45 - LABELS "integration-tests" + LABELS "integration-tests;rocpd" PRELOAD "${PRELOAD_ENV}" FIXTURES_SETUP rocprofv3-test-trace-input-json FAIL_REGULAR_EXPRESSION