diff --git a/CHANGELOG.md b/CHANGELOG.md index 45da7316e3..f15868fd4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -185,6 +185,7 @@ Full documentation for ROCprofiler-SDK is available at [rocm.docs.amd.com/projec ### Resolved issues - Fixed missing callbacks around internal thread creation within counter collection service +- Fixed potential data race in rocprofiler-sdk double buffering scheme ### Removed diff --git a/source/lib/common/container/record_header_buffer.cpp b/source/lib/common/container/record_header_buffer.cpp index 314e3aa11f..0e2cd2b883 100644 --- a/source/lib/common/container/record_header_buffer.cpp +++ b/source/lib/common/container/record_header_buffer.cpp @@ -82,20 +82,18 @@ record_header_buffer::allocate(size_t num_bytes) return true; } -record_header_buffer::record_ptr_vec_t -record_header_buffer::get_record_headers(size_t _n) +size_t +record_header_buffer::get_num_record_headers() { auto _lk = rhb_raii_lock{*this}; - auto _sz = m_index.load(std::memory_order_acquire); - _n = std::min(_n, _sz); - auto _ret = record_ptr_vec_t{}; - _ret.reserve(_n); - for(size_t i = 0; i < _n; ++i) + auto _size = m_index.load(std::memory_order_acquire); + size_t _ret = 0; + for(size_t i = 0; i < _size; ++i) { - if(auto& itr = m_headers.at(i); itr.hash > 0 && itr.payload != nullptr) - _ret.emplace_back(&itr); + if(auto& itr = m_headers.at(i); itr.hash > 0 && itr.payload != nullptr) ++_ret; } + return _ret; } diff --git a/source/lib/common/container/record_header_buffer.hpp b/source/lib/common/container/record_header_buffer.hpp index b84be76aaf..e44873e228 100644 --- a/source/lib/common/container/record_header_buffer.hpp +++ b/source/lib/common/container/record_header_buffer.hpp @@ -23,13 +23,16 @@ #pragma once #include "lib/common/container/ring_buffer.hpp" +#include "lib/common/scope_destructor.hpp" #include #include +#include #include #include #include +#include #include namespace rocprofiler @@ -75,9 +78,13 @@ struct record_header_buffer template bool emplace(uint32_t, uint32_t, Tp&); - /// this function will return a vector of pointers to the record headers - /// at the time of invocation. - record_ptr_vec_t get_record_headers(size_t _n = std::numeric_limits::max()); + /// this function will return the number of record headers + size_t get_num_record_headers(); + + /// this function will invoke functor with a vector of pointers to the record headers. + /// if ClearRecordsV is true, the container will be cleared after invoking the functor + template + size_t process_record_headers(ClearRecordsT, FuncT&& functor, Args&&... args); /// record_header_buffer is a multiple writer, single reader data structure so /// this function prevents writing via emplace @@ -322,6 +329,34 @@ record_header_buffer::emplace(Tp& _v) // if enumerations are not used, use the typeid hash code return emplace(typeid(Tp).hash_code(), _v); } + +template +size_t +record_header_buffer::process_record_headers(ClearRecordsT, FuncT&& _functor, Args&&... _args) +{ + // RAII for lock/unlock + auto _lk = scope_destructor{[&]() { unlock(); }, [&]() { lock(); }}; + + auto _n = m_index.load(std::memory_order_acquire); + auto _records = record_ptr_vec_t{}; + _records.reserve(_n); + for(size_t i = 0; i < _n; ++i) + { + if(auto& itr = m_headers.at(i); itr.hash > 0 && itr.payload != nullptr) + _records.emplace_back(&itr); + } + + // get number of records before vector is moved + auto _num_records = _records.size(); + + // invoke the callback + std::forward(_functor)(std::move(_records), std::forward(_args)...); + + // clear the container + if constexpr(ClearRecordsT::value) clear(); + + return _num_records; +} } // namespace container } // namespace common } // namespace rocprofiler diff --git a/source/lib/rocprofiler-sdk/buffer.cpp b/source/lib/rocprofiler-sdk/buffer.cpp index ba57a53422..b2c034eebf 100644 --- a/source/lib/rocprofiler-sdk/buffer.cpp +++ b/source/lib/rocprofiler-sdk/buffer.cpp @@ -183,27 +183,52 @@ flush(rocprofiler_buffer_id_t buffer_id, bool wait) if(!buff_internal_v.is_empty()) { - // get the array of record headers - auto buff_data = buff_internal_v.get_record_headers(); + // designates that buffer should be cleared after functor is invoked + constexpr auto clear_buffer_v = std::true_type{}; - // invoke buffer callback - try - { - if(buff_v->callback) - { - buff_v->callback(rocprofiler_context_id_t{buff_v->context_id}, - rocprofiler_buffer_id_t{buff_v->buffer_id}, - buff_data.data(), - buff_data.size(), - buff_v->callback_data, - buff_v->drop_count); - } - } catch(std::exception& e) - { - ROCP_ERROR << "buffer callback threw an exception: " << e.what(); - } - // clear the buffer - buff_internal_v.clear(); + // invoke the callback within the scoped lock of process_record_headers. + auto num_processed = buff_internal_v.process_record_headers( + clear_buffer_v, [&buffer_id, &idx, &offset, &buff_v](auto&& _headers) { + // invoke buffer callback + try + { + if(buff_v->callback) + { + ROCP_INFO << fmt::format("invoking buffer callback for {} records " + "[buffer_id={}, idx={}, offset={}]", + _headers.size(), + buffer_id.handle, + idx, + offset); + buff_v->callback(rocprofiler_context_id_t{buff_v->context_id}, + rocprofiler_buffer_id_t{buff_v->buffer_id}, + _headers.data(), + _headers.size(), + buff_v->callback_data, + buff_v->drop_count); + } + else + { + ROCP_TRACE << fmt::format("no buffer callback for {} records " + "[buffer_id={}, idx={}, offset={}]", + _headers.size(), + buffer_id.handle, + idx, + offset); + } + + } catch(std::exception& e) + { + ROCP_CI_LOG(ERROR) << "buffer callback threw an exception: " << e.what(); + } + }); + + ROCP_INFO << fmt::format( + "completed buffer callback for {} records [buffer_id={}, idx={}, offset={}]", + num_processed, + buffer_id.handle, + idx, + offset); } else { diff --git a/source/lib/rocprofiler-sdk/tests/buffer.cpp b/source/lib/rocprofiler-sdk/tests/buffer.cpp index 84471a782f..aa10f325ee 100644 --- a/source/lib/rocprofiler-sdk/tests/buffer.cpp +++ b/source/lib/rocprofiler-sdk/tests/buffer.cpp @@ -55,10 +55,7 @@ TEST(rocprofiler_lib, buffer) EXPECT_EQ(buffer_v->buffer_id, buffer_id->handle); buffer_v->watermark = common::units::get_page_size(); - { - auto records = buffer_v->get_internal_buffer().get_record_headers(); - EXPECT_EQ(records.size(), 0); - } + EXPECT_EQ(buffer_v->get_internal_buffer().get_num_record_headers(), 0); EXPECT_TRUE(buffer_v->get_internal_buffer().allocate(sizeof(rocprofiler_buffer_id_t))); @@ -67,8 +64,7 @@ TEST(rocprofiler_lib, buffer) auto data = *buffer_id; buffer_v->emplace(1, 1, data); - auto records = buffer_v->get_internal_buffer().get_record_headers(); - EXPECT_EQ(records.size(), 1); + EXPECT_EQ(buffer_v->get_internal_buffer().get_num_record_headers(), 1); auto flush_status = buffer::flush(*buffer_id, true); EXPECT_EQ(flush_status, ROCPROFILER_STATUS_SUCCESS); diff --git a/source/lib/tests/buffering/buffering-parallel.cpp b/source/lib/tests/buffering/buffering-parallel.cpp index 4ceaf5aa9d..fbaa3835c1 100644 --- a/source/lib/tests/buffering/buffering-parallel.cpp +++ b/source/lib/tests/buffering/buffering-parallel.cpp @@ -212,6 +212,11 @@ TEST(buffering, parallel) // wait for all the threads to complete pthread_barrier_wait(&_emplaced_barrier); + // designates that buffer should be cleared after invoking functor + using clear_buffer_t = std::true_type; + // verify the data pulled out the buffer matches the data put in by the threads - validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes); + _buffer.process_record_headers(clear_buffer_t{}, [](auto&& _records) { + validate(_records, test_data_types{}, test_data_sizes); + }); } diff --git a/source/lib/tests/buffering/buffering-save-load.cpp b/source/lib/tests/buffering/buffering-save-load.cpp index 19401b57ea..928510f69a 100644 --- a/source/lib/tests/buffering/buffering-save-load.cpp +++ b/source/lib/tests/buffering/buffering-save-load.cpp @@ -180,8 +180,10 @@ TEST(buffering, save_load) // and move it to another object and ensure that the data after the save + load + move matches // the original data placed into the buffer without any data corruption or loss - constexpr auto num_variants = test_data_types::size() * test_data_sizes.size(); - constexpr auto data_size = get_data_size(test_data_types{}, test_data_sizes); + // designates that buffer should not be cleared after invoking functor + constexpr auto clear_buffer_v = std::false_type{}; + constexpr auto num_variants = test_data_types::size() * test_data_sizes.size(); + constexpr auto data_size = get_data_size(test_data_types{}, test_data_sizes); EXPECT_EQ(num_variants, 120); @@ -224,7 +226,9 @@ TEST(buffering, save_load) EXPECT_FALSE(_buffer.is_empty()); // verify the data pulled out the buffer matches the data put in - validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes); + _buffer.process_record_headers(clear_buffer_v, [](auto&& _records) { + validate(_records, test_data_types{}, test_data_sizes); + }); // save the data to a binary file and clear the buffer so it can "receive" new data (in theory) { @@ -235,7 +239,7 @@ TEST(buffering, save_load) } // verify that the buffer is empty - EXPECT_EQ(_buffer.get_record_headers().size(), 0) << "buffer was not cleared properly"; + EXPECT_EQ(_buffer.get_num_record_headers(), 0) << "buffer was not cleared properly"; // load the data back from the binary file { @@ -245,23 +249,26 @@ TEST(buffering, save_load) } // verify that, at a high level, all the data was preserved - ASSERT_EQ(_buffer.get_record_headers().size(), num_variants) + ASSERT_EQ(_buffer.get_num_record_headers(), num_variants) << "buffer was not saved/loaded properly"; // verify the data is entirely correct - validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes); + _buffer.process_record_headers(clear_buffer_v, [](auto&& _records) { + validate(_records, test_data_types{}, test_data_sizes); + }); // move the data into another instance of record_header_buffer_t auto _buffer_v = record_header_buffer_t{std::move(_buffer)}; // make sure the move emptied out the old object and populated the new object - ASSERT_EQ(_buffer.get_record_headers().size(), 0) << "buffer was not moved properly"; - ASSERT_EQ(_buffer_v.get_record_headers().size(), num_variants) - << "buffer was not moved properly"; + ASSERT_EQ(_buffer.get_num_record_headers(), 0) << "buffer was not moved properly"; + ASSERT_EQ(_buffer_v.get_num_record_headers(), num_variants) << "buffer was not moved properly"; // validate the data in the new object // verify the data pulled out the buffer matches the data put in by the threads - validate(_buffer_v.get_record_headers(), test_data_types{}, test_data_sizes); + _buffer.process_record_headers(clear_buffer_v, [](auto&& _records) { + validate(_records, test_data_types{}, test_data_sizes); + }); // make sure reset works when empty and when full EXPECT_EQ(_buffer.reset(), 0) << "buffer should be empty after move"; diff --git a/source/lib/tests/buffering/buffering-serial.cpp b/source/lib/tests/buffering/buffering-serial.cpp index ae55c89fb6..509a9db627 100644 --- a/source/lib/tests/buffering/buffering-serial.cpp +++ b/source/lib/tests/buffering/buffering-serial.cpp @@ -123,26 +123,37 @@ TEST(buffering, serial) EXPECT_EQ(_f, _fp_history.back()) << "float not equal after emplace_back"; } + // designates that buffer should be cleared after invoking functor + constexpr auto clear_buffer_v = std::true_type{}; + // get the records out of the buffer - auto _headers = _buffer.get_record_headers(); - for(auto* itr : _headers) - { - ASSERT_TRUE(itr->payload) << "nullptr to payload not expected"; + auto _num_headers = _buffer.process_record_headers( + clear_buffer_v, + [](auto&& _headers, auto& _ui_result_v, auto& _fp_result_v) { + for(auto* itr : _headers) + { + ASSERT_TRUE(itr->payload) << "nullptr to payload not expected"; - if(itr->hash == typeid(uint_raw_array_t).hash_code()) - { - extract_header(_ui_result, itr); - } - else if(itr->hash == typeid(flt_raw_array_t).hash_code()) - { - extract_header(_fp_result, itr); - } - else - { - GTEST_FAIL() << "unknown type id hash code: " << std::to_string(itr->hash); - } - } + if(itr->hash == typeid(uint_raw_array_t).hash_code()) + { + extract_header(_ui_result_v, itr); + } + else if(itr->hash == typeid(flt_raw_array_t).hash_code()) + { + extract_header(_fp_result_v, itr); + } + else + { + GTEST_FAIL() << "unknown type id hash code: " << std::to_string(itr->hash); + } + } + }, + _ui_result, + _fp_result); + ASSERT_EQ(_ui_history.size() + _fp_history.size(), _num_headers) + << "UINT: " << _ui_history.size() << " + FLOAT: " << _fp_history.size() + << " != HEADERS: " << _num_headers; // validate that we got the same number of records out that we put in ASSERT_EQ(_ui_history.size(), _ui_result.size()) << "UINT: " << _ui_history.size() << " vs. " << _ui_result.size();