[SDK] Fix double buffer data race (#394)

* Fix double buffer data race

- fixes relatively rare data race in double buffering scheme

In `rocprofiler::buffer::instance::emplace`, the `container::record_header_buffer::get_record_headers()` function returned a `std::vector<rocprofiler_record_header_t*>` and then invoked callback to tool. It was possible for that callback to still be executing while the buffer was being updated. This potentially introduced a scenario where the rocprofiler_record_header_t* was modified (or corrupted) before the tool processed the record. In rocprofv3, this would result in a "future" buffer record showing up among "past" buffer records. E.g., correlation id sequence of 1-15 where the buffer flushes after five values, could result in this during processing:
|     |     |     |     |      |
|:---:|:---:|:---:|:---:|:---:|
|  1 |  2 |  3 |  4 | 15 |
|  6 |  7 |  8 |  9 | 10 |
| 11 | 12 | 13 | 14 | 15 |

Because buffer A (of double buffering scheme) originally containing corr ids 1-5 stalled after process corr id 4 (e.g. write to disk), buffer B filled up with 6-10 and started flushing, causing a switch back to buffer A, and buffer A was filled with 11-15 by the time callback accessed what was originally corr id 5 but was now updated to corr id 15.

* Update CHANGELOG

* misc minor cleanup

---------

Co-authored-by: Jonathan R. Madsen <jonathanrmadsen@gmail.com>
Este commit está contenido en:
Madsen, Jonathan
2025-05-14 13:19:22 -05:00
cometido por GitHub
padre 6ec9526475
commit 8a1ee46e47
Se han modificado 8 ficheros con 144 adiciones y 66 borrados
+1
Ver fichero
@@ -185,6 +185,7 @@ Full documentation for ROCprofiler-SDK is available at [rocm.docs.amd.com/projec
### Resolved issues
- Fixed missing callbacks around internal thread creation within counter collection service
- Fixed potential data race in rocprofiler-sdk double buffering scheme
### Removed
@@ -82,20 +82,18 @@ record_header_buffer::allocate(size_t num_bytes)
return true;
}
record_header_buffer::record_ptr_vec_t
record_header_buffer::get_record_headers(size_t _n)
size_t
record_header_buffer::get_num_record_headers()
{
auto _lk = rhb_raii_lock{*this};
auto _sz = m_index.load(std::memory_order_acquire);
_n = std::min(_n, _sz);
auto _ret = record_ptr_vec_t{};
_ret.reserve(_n);
for(size_t i = 0; i < _n; ++i)
auto _size = m_index.load(std::memory_order_acquire);
size_t _ret = 0;
for(size_t i = 0; i < _size; ++i)
{
if(auto& itr = m_headers.at(i); itr.hash > 0 && itr.payload != nullptr)
_ret.emplace_back(&itr);
if(auto& itr = m_headers.at(i); itr.hash > 0 && itr.payload != nullptr) ++_ret;
}
return _ret;
}
@@ -23,13 +23,16 @@
#pragma once
#include "lib/common/container/ring_buffer.hpp"
#include "lib/common/scope_destructor.hpp"
#include <rocprofiler-sdk/fwd.h>
#include <atomic>
#include <functional>
#include <limits>
#include <mutex>
#include <shared_mutex>
#include <type_traits>
#include <vector>
namespace rocprofiler
@@ -75,9 +78,13 @@ struct record_header_buffer
template <typename Tp>
bool emplace(uint32_t, uint32_t, Tp&);
/// this function will return a vector of pointers to the record headers
/// at the time of invocation.
record_ptr_vec_t get_record_headers(size_t _n = std::numeric_limits<size_t>::max());
/// this function will return the number of record headers
size_t get_num_record_headers();
/// this function will invoke functor with a vector of pointers to the record headers.
/// if ClearRecordsV is true, the container will be cleared after invoking the functor
template <typename ClearRecordsT, typename FuncT, typename... Args>
size_t process_record_headers(ClearRecordsT, FuncT&& functor, Args&&... args);
/// record_header_buffer is a multiple writer, single reader data structure so
/// this function prevents writing via emplace
@@ -322,6 +329,34 @@ record_header_buffer::emplace(Tp& _v)
// if enumerations are not used, use the typeid hash code
return emplace(typeid(Tp).hash_code(), _v);
}
template <typename ClearRecordsT, typename FuncT, typename... Args>
size_t
record_header_buffer::process_record_headers(ClearRecordsT, FuncT&& _functor, Args&&... _args)
{
// RAII for lock/unlock
auto _lk = scope_destructor{[&]() { unlock(); }, [&]() { lock(); }};
auto _n = m_index.load(std::memory_order_acquire);
auto _records = record_ptr_vec_t{};
_records.reserve(_n);
for(size_t i = 0; i < _n; ++i)
{
if(auto& itr = m_headers.at(i); itr.hash > 0 && itr.payload != nullptr)
_records.emplace_back(&itr);
}
// get number of records before vector is moved
auto _num_records = _records.size();
// invoke the callback
std::forward<FuncT>(_functor)(std::move(_records), std::forward<Args>(_args)...);
// clear the container
if constexpr(ClearRecordsT::value) clear();
return _num_records;
}
} // namespace container
} // namespace common
} // namespace rocprofiler
+45 -20
Ver fichero
@@ -183,27 +183,52 @@ flush(rocprofiler_buffer_id_t buffer_id, bool wait)
if(!buff_internal_v.is_empty())
{
// get the array of record headers
auto buff_data = buff_internal_v.get_record_headers();
// designates that buffer should be cleared after functor is invoked
constexpr auto clear_buffer_v = std::true_type{};
// invoke buffer callback
try
{
if(buff_v->callback)
{
buff_v->callback(rocprofiler_context_id_t{buff_v->context_id},
rocprofiler_buffer_id_t{buff_v->buffer_id},
buff_data.data(),
buff_data.size(),
buff_v->callback_data,
buff_v->drop_count);
}
} catch(std::exception& e)
{
ROCP_ERROR << "buffer callback threw an exception: " << e.what();
}
// clear the buffer
buff_internal_v.clear();
// invoke the callback within the scoped lock of process_record_headers.
auto num_processed = buff_internal_v.process_record_headers(
clear_buffer_v, [&buffer_id, &idx, &offset, &buff_v](auto&& _headers) {
// invoke buffer callback
try
{
if(buff_v->callback)
{
ROCP_INFO << fmt::format("invoking buffer callback for {} records "
"[buffer_id={}, idx={}, offset={}]",
_headers.size(),
buffer_id.handle,
idx,
offset);
buff_v->callback(rocprofiler_context_id_t{buff_v->context_id},
rocprofiler_buffer_id_t{buff_v->buffer_id},
_headers.data(),
_headers.size(),
buff_v->callback_data,
buff_v->drop_count);
}
else
{
ROCP_TRACE << fmt::format("no buffer callback for {} records "
"[buffer_id={}, idx={}, offset={}]",
_headers.size(),
buffer_id.handle,
idx,
offset);
}
} catch(std::exception& e)
{
ROCP_CI_LOG(ERROR) << "buffer callback threw an exception: " << e.what();
}
});
ROCP_INFO << fmt::format(
"completed buffer callback for {} records [buffer_id={}, idx={}, offset={}]",
num_processed,
buffer_id.handle,
idx,
offset);
}
else
{
+2 -6
Ver fichero
@@ -55,10 +55,7 @@ TEST(rocprofiler_lib, buffer)
EXPECT_EQ(buffer_v->buffer_id, buffer_id->handle);
buffer_v->watermark = common::units::get_page_size();
{
auto records = buffer_v->get_internal_buffer().get_record_headers();
EXPECT_EQ(records.size(), 0);
}
EXPECT_EQ(buffer_v->get_internal_buffer().get_num_record_headers(), 0);
EXPECT_TRUE(buffer_v->get_internal_buffer().allocate(sizeof(rocprofiler_buffer_id_t)));
@@ -67,8 +64,7 @@ TEST(rocprofiler_lib, buffer)
auto data = *buffer_id;
buffer_v->emplace(1, 1, data);
auto records = buffer_v->get_internal_buffer().get_record_headers();
EXPECT_EQ(records.size(), 1);
EXPECT_EQ(buffer_v->get_internal_buffer().get_num_record_headers(), 1);
auto flush_status = buffer::flush(*buffer_id, true);
EXPECT_EQ(flush_status, ROCPROFILER_STATUS_SUCCESS);
@@ -212,6 +212,11 @@ TEST(buffering, parallel)
// wait for all the threads to complete
pthread_barrier_wait(&_emplaced_barrier);
// designates that buffer should be cleared after invoking functor
using clear_buffer_t = std::true_type;
// verify the data pulled out the buffer matches the data put in by the threads
validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes);
_buffer.process_record_headers(clear_buffer_t{}, [](auto&& _records) {
validate(_records, test_data_types{}, test_data_sizes);
});
}
+17 -10
Ver fichero
@@ -180,8 +180,10 @@ TEST(buffering, save_load)
// and move it to another object and ensure that the data after the save + load + move matches
// the original data placed into the buffer without any data corruption or loss
constexpr auto num_variants = test_data_types::size() * test_data_sizes.size();
constexpr auto data_size = get_data_size(test_data_types{}, test_data_sizes);
// designates that buffer should not be cleared after invoking functor
constexpr auto clear_buffer_v = std::false_type{};
constexpr auto num_variants = test_data_types::size() * test_data_sizes.size();
constexpr auto data_size = get_data_size(test_data_types{}, test_data_sizes);
EXPECT_EQ(num_variants, 120);
@@ -224,7 +226,9 @@ TEST(buffering, save_load)
EXPECT_FALSE(_buffer.is_empty());
// verify the data pulled out the buffer matches the data put in
validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes);
_buffer.process_record_headers(clear_buffer_v, [](auto&& _records) {
validate(_records, test_data_types{}, test_data_sizes);
});
// save the data to a binary file and clear the buffer so it can "receive" new data (in theory)
{
@@ -235,7 +239,7 @@ TEST(buffering, save_load)
}
// verify that the buffer is empty
EXPECT_EQ(_buffer.get_record_headers().size(), 0) << "buffer was not cleared properly";
EXPECT_EQ(_buffer.get_num_record_headers(), 0) << "buffer was not cleared properly";
// load the data back from the binary file
{
@@ -245,23 +249,26 @@ TEST(buffering, save_load)
}
// verify that, at a high level, all the data was preserved
ASSERT_EQ(_buffer.get_record_headers().size(), num_variants)
ASSERT_EQ(_buffer.get_num_record_headers(), num_variants)
<< "buffer was not saved/loaded properly";
// verify the data is entirely correct
validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes);
_buffer.process_record_headers(clear_buffer_v, [](auto&& _records) {
validate(_records, test_data_types{}, test_data_sizes);
});
// move the data into another instance of record_header_buffer_t
auto _buffer_v = record_header_buffer_t{std::move(_buffer)};
// make sure the move emptied out the old object and populated the new object
ASSERT_EQ(_buffer.get_record_headers().size(), 0) << "buffer was not moved properly";
ASSERT_EQ(_buffer_v.get_record_headers().size(), num_variants)
<< "buffer was not moved properly";
ASSERT_EQ(_buffer.get_num_record_headers(), 0) << "buffer was not moved properly";
ASSERT_EQ(_buffer_v.get_num_record_headers(), num_variants) << "buffer was not moved properly";
// validate the data in the new object
// verify the data pulled out the buffer matches the data put in by the threads
validate(_buffer_v.get_record_headers(), test_data_types{}, test_data_sizes);
_buffer.process_record_headers(clear_buffer_v, [](auto&& _records) {
validate(_records, test_data_types{}, test_data_sizes);
});
// make sure reset works when empty and when full
EXPECT_EQ(_buffer.reset(), 0) << "buffer should be empty after move";
+28 -17
Ver fichero
@@ -123,26 +123,37 @@ TEST(buffering, serial)
EXPECT_EQ(_f, _fp_history.back()) << "float not equal after emplace_back";
}
// designates that buffer should be cleared after invoking functor
constexpr auto clear_buffer_v = std::true_type{};
// get the records out of the buffer
auto _headers = _buffer.get_record_headers();
for(auto* itr : _headers)
{
ASSERT_TRUE(itr->payload) << "nullptr to payload not expected";
auto _num_headers = _buffer.process_record_headers(
clear_buffer_v,
[](auto&& _headers, auto& _ui_result_v, auto& _fp_result_v) {
for(auto* itr : _headers)
{
ASSERT_TRUE(itr->payload) << "nullptr to payload not expected";
if(itr->hash == typeid(uint_raw_array_t).hash_code())
{
extract_header(_ui_result, itr);
}
else if(itr->hash == typeid(flt_raw_array_t).hash_code())
{
extract_header(_fp_result, itr);
}
else
{
GTEST_FAIL() << "unknown type id hash code: " << std::to_string(itr->hash);
}
}
if(itr->hash == typeid(uint_raw_array_t).hash_code())
{
extract_header(_ui_result_v, itr);
}
else if(itr->hash == typeid(flt_raw_array_t).hash_code())
{
extract_header(_fp_result_v, itr);
}
else
{
GTEST_FAIL() << "unknown type id hash code: " << std::to_string(itr->hash);
}
}
},
_ui_result,
_fp_result);
ASSERT_EQ(_ui_history.size() + _fp_history.size(), _num_headers)
<< "UINT: " << _ui_history.size() << " + FLOAT: " << _fp_history.size()
<< " != HEADERS: " << _num_headers;
// validate that we got the same number of records out that we put in
ASSERT_EQ(_ui_history.size(), _ui_result.size())
<< "UINT: " << _ui_history.size() << " vs. " << _ui_result.size();