[rocprofiler-sdk] Fix double-buffering emplace and flush synchronization (#2334)

* Fix buffer tracing synchronization lock

- PR #529 (in rocprofiler-sdk-internal) introduced waiting on the syncer flag when emplacing in a buffer to prevent the overwriting buffer records currently being processed in a buffer flush callback
- The above fix introduced a block on the both buffers when a buffer flush callback was being executed instead of a block on the buffer being flushed.

* Add rocpd tests for duplicate records

* Address code review comments
Dieser Commit ist enthalten in:
Jonathan R. Madsen
2026-01-06 06:06:18 -06:00
committet von GitHub
Ursprung 9e4d1c31c7
Commit 7fcea905f3
4 geänderte Dateien mit 99 neuen und 43 gelöschten Zeilen
@@ -161,19 +161,26 @@ flush(rocprofiler_buffer_id_t buffer_id, bool wait)
if(wait) task_group->wait();
auto idx = buff->buffer_idx++;
idx %= buff->buffers.size();
ROCP_INFO << fmt::format("executing buffer flush [id={}, index={}]...", buffer_id.handle, idx);
// buffer is currently being flushed or destroyed
if(buff->syncer.test_and_set())
if(buff->syncer.at(idx).test_and_set())
{
ROCP_INFO << fmt::format(
"waiting for buffer flush to complete [id={}, index={}]...", buffer_id.handle, idx);
if(!wait) return ROCPROFILER_STATUS_ERROR_BUFFER_BUSY;
while(buff->syncer.test_and_set())
while(buff->syncer.at(idx).test_and_set())
{
ROCP_TRACE << fmt::format(
"waiting for buffer flush to complete [id={}, index={}]...", buffer_id.handle, idx);
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::microseconds{10});
std::this_thread::sleep_for(buff->sync_wait_usec);
}
}
auto idx = buff->buffer_idx++;
auto _task = [buffer_id, idx, offset]() {
ROCP_ERROR_IF(registration::get_fini_status() > 0)
<< "executing buffer (" << buffer_id.handle << ") flush task finalization!";
@@ -248,7 +255,9 @@ flush(rocprofiler_buffer_id_t buffer_id, bool wait)
ROCP_INFO << "buffer at " << buffer_id.handle << " is empty...";
}
buff_v->syncer.clear();
ROCP_INFO << fmt::format(
"completed buffer flush [id={}, index={}]...", buffer_id.handle, idx);
buff_v->syncer.at(idx).clear();
};
task_group->exec(std::move(_task));
@@ -331,12 +340,17 @@ rocprofiler_destroy_buffer(rocprofiler_buffer_id_t buffer_id)
if(!buff) return ROCPROFILER_STATUS_ERROR_BUFFER_NOT_FOUND;
// buffer is currently being flushed or destroyed
if(buff->syncer.test_and_set()) return ROCPROFILER_STATUS_ERROR_BUFFER_BUSY;
for(auto& itr : buff->syncer)
{
if(itr.test_and_set()) return ROCPROFILER_STATUS_ERROR_BUFFER_BUSY;
}
for(auto& itr : buff->buffers)
itr.reset();
buff->syncer.clear();
for(auto& itr : buff->syncer)
itr.clear();
buff.reset();
return ROCPROFILER_STATUS_SUCCESS;
@@ -29,6 +29,9 @@
#include "lib/common/container/stable_vector.hpp"
#include "lib/common/demangle.hpp"
#include <fmt/format.h>
#include <sys/types.h>
#include <array>
#include <atomic>
#include <cstdint>
@@ -40,19 +43,21 @@ namespace buffer
{
struct instance
{
using buffer_t = common::container::record_header_buffer;
using buffer_t = common::container::record_header_buffer;
static constexpr auto size = 2; // double buffering
static constexpr auto sync_wait_usec = std::chrono::microseconds{10};
mutable std::array<buffer_t, 2> buffers = {};
mutable std::atomic_flag syncer = ATOMIC_FLAG_INIT; // writer and reader lock.
mutable std::atomic<uint32_t> buffer_idx = {}; // array index
mutable std::atomic<uint64_t> drop_count = {};
uint64_t watermark = 0;
uint64_t context_id = 0; // rocprofiler_context_id_t value
uint64_t buffer_id = 0; // rocprofiler_buffer_id_t value
uint64_t task_group_id = 0; // thread-pool assignment
rocprofiler_buffer_tracing_cb_t callback = nullptr;
void* callback_data = nullptr;
rocprofiler_buffer_policy_t policy = ROCPROFILER_BUFFER_POLICY_NONE;
mutable std::array<buffer_t, size> buffers = {};
mutable std::array<std::atomic_flag, size> syncer = {false, false}; // r/w lock
mutable std::atomic<uint32_t> buffer_idx = {}; // array index
mutable std::atomic<uint64_t> drop_count = {};
uint64_t watermark = 0;
uint64_t context_id = 0; // rocprofiler_context_id_t value
uint64_t buffer_id = 0; // rocprofiler_buffer_id_t value
uint64_t task_group_id = 0; // thread-pool assignment
rocprofiler_buffer_tracing_cb_t callback = nullptr;
void* callback_data = nullptr;
rocprofiler_buffer_policy_t policy = ROCPROFILER_BUFFER_POLICY_NONE;
template <typename Tp>
bool emplace(uint32_t, uint32_t, Tp&);
@@ -115,26 +120,51 @@ template <typename Tp>
inline bool
rocprofiler::buffer::instance::emplace(uint32_t category, uint32_t kind, Tp& value)
{
struct local_sync
{
local_sync(uint64_t buffer_id, uint64_t idx, std::atomic_flag& flag)
: m_flag{flag}
{
if(m_flag.test_and_set())
{
ROCP_INFO << fmt::format(
"waiting for buffer flush to complete [id={}, index={}]...", buffer_id, idx);
while(m_flag.test_and_set())
{
ROCP_TRACE << fmt::format(
"waiting for buffer flush to complete [id={}, index={}]...",
buffer_id,
idx);
std::this_thread::yield();
std::this_thread::sleep_for(sync_wait_usec);
}
}
}
~local_sync() { m_flag.clear(); }
std::atomic_flag& m_flag;
};
// get the index of the current buffer
auto get_idx = [this]() { return buffer_idx.load(std::memory_order_acquire) % buffers.size(); };
while(syncer.test_and_set())
{
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::microseconds{10});
}
auto idx = get_idx();
auto success = buffers.at(idx).emplace(category, kind, value);
syncer.clear();
auto success = false;
{
auto _syncer = local_sync{buffer_id, idx, syncer.at(idx)}; // ensure buffer not flushing
success = buffers.at(idx).emplace(category, kind, value);
}
if(!success)
{
if(buffers.at(idx).capacity() < sizeof(value))
{
ROCP_CI_LOG(ERROR) << "buffer " << buffer_id
<< " too small (size=" << buffers.at(idx).capacity()
<< ") to hold an object of type "
<< common::cxx_demangle(typeid(value).name()) << " with size "
<< sizeof(value);
ROCP_CI_LOG(ERROR) << fmt::format(
"buffer {} too small (size={}) to hold an object of type {} with size {}",
buffer_id,
buffers.at(idx).capacity(),
common::cxx_demangle(typeid(value).name()),
sizeof(value));
return false;
}
@@ -144,14 +174,9 @@ rocprofiler::buffer::instance::emplace(uint32_t category, uint32_t kind, Tp& val
do
{
buffer::flush(buffer_id, true);
while(syncer.test_and_set())
{
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::microseconds{10});
}
idx = get_idx();
success = buffers.at(idx).emplace(category, kind, value);
syncer.clear();
idx = get_idx();
auto _syncer = local_sync{buffer_id, idx, syncer.at(idx)}; // wait for flush
success = buffers.at(idx).emplace(category, kind, value);
} while(!success);
}
else
@@ -232,6 +232,23 @@ def test_rocpd_data(
_js_data
), f"query: {_rpd_query}\n{rpd_category} ({len(_rpd_data)}):\n\t{_rpd_data}\n{js_category} ({len(_js_data)}):\n\t{_js_data}"
# if duplicate entries exist from double buffering synchronization issues, there will be duplicate start and end times
for itr in ["regions", "kernels", "memory_copies", "memory_allocations"]:
_num_rpd_tot = rocpd_data.execute(f"SELECT COUNT(*) FROM {itr}").fetchone()[0]
_num_rpd_start = rocpd_data.execute(
f"SELECT COUNT(DISTINCT(start)) FROM {itr}"
).fetchone()[0]
_num_rpd_end = rocpd_data.execute(
f"SELECT COUNT(DISTINCT(end)) FROM {itr}"
).fetchone()[0]
assert _num_rpd_tot == _num_rpd_start == _num_rpd_end, (
f"Duplicate records check failed for {itr}: total {itr}={_num_rpd_tot}, "
f"unique starts={_num_rpd_start}, unique ends={_num_rpd_end}. In rocprofv3, "
"this likely means the double buffering scheme updated a buffer with new "
"records while it was being processed in a buffer flush"
)
def _perform_time_sanity_checks(data):
"""Helper function to perform time sanity checks on data."""
@@ -57,7 +57,7 @@ rocprofiler_add_integration_validate_test(
--rocpd-input
${CMAKE_CURRENT_BINARY_DIR}/simple-transpose-trace/cmdl-input/out_results.db
TIMEOUT 45
LABELS "integration-tests"
LABELS "integration-tests;rocpd"
FIXTURES_REQUIRED rocprofv3-test-trace)
rocprofiler_add_integration_execute_test(
@@ -69,7 +69,7 @@ rocprofiler_add_integration_execute_test(
$<TARGET_FILE:simple-transpose>
DEPENDS simple-transpose
TIMEOUT 45
LABELS "integration-tests"
LABELS "integration-tests;rocpd"
PRELOAD "${PRELOAD_ENV}"
FIXTURES_SETUP rocprofv3-test-trace-input-json
FAIL_REGULAR_EXPRESSION