// MIT License // // Copyright (c) 2023 ROCm Developer Tools // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. #include "buffering.hpp" #include "lib/common/container/record_header_buffer.hpp" #include "lib/common/mpl.hpp" #include "lib/common/units.hpp" #include #include #include #include #include #include #include namespace { namespace test = ::rocprofiler::test; namespace units = ::rocprofiler::common::units; namespace mpl = ::rocprofiler::common::mpl; using record_header_buffer_t = rocprofiler::common::container::record_header_buffer; // this function returns a random array of values specific to template instantiation template auto& get_generated_array() { static auto _value = []() { auto _v = test::raw_array{}; test::generate(_v, Tp{0}, std::numeric_limits::max()); return _v; }(); return _value; } // these are the array size variants. we use the units to scale up // but technically the data size of the raw_array will be multiplied // by sizeof(Tp) constexpr auto test_data_sizes = std::index_sequence<1 * units::byte, 2 * units::byte, 3 * units::byte, 4 * units::byte, 8 * units::byte, 16 * units::kilobyte, 20 * units::kilobyte, 24 * units::kilobyte, 32 * units::kilobyte, 56 * units::kilobyte, 64 * units::kilobyte, 91 * units::kilobyte, 128 * units::kilobyte, 387 * units::kilobyte, 693 * units::kilobyte, 2 * units::megabyte>{}; // this is the list of array data types we will generate. Effectively, there // will be one raw array for each combination of these types and the test data sizes // (i.e. there will be unique 160 arrays of different types and sizes) using test_data_types = mpl::type_list; // this function creates a thread for each data size for a given type. // all threads are detached and will wait at the first barrier until all // threads have reached it, race to emplace their data in the shared // buffer and then wait at the second barrier until all the threads have // emplacing the data and the main thread has also reached the second // barrier. template void launch_threads(record_header_buffer_t& _buf, pthread_barrier_t& _race_barrier, pthread_barrier_t& _done_barrier, std::index_sequence) { auto _launch = [](record_header_buffer_t* _buf_v, auto* _race_barrier_v, auto* _done_barrier_v, auto* _v) { pthread_barrier_wait(_race_barrier_v); EXPECT_TRUE(_buf_v->emplace(*_v)); pthread_barrier_wait(_done_barrier_v); }; (std::thread{_launch, &_buf, &_race_barrier, &_done_barrier, &get_generated_array()} .detach(), ...); } // expansion for each type template void launch_threads(record_header_buffer_t& _buf, pthread_barrier_t& _race_barrier, pthread_barrier_t& _done_barrier, mpl::type_list, std::index_sequence _seq) { (launch_threads(_buf, _race_barrier, _done_barrier, _seq), ...); } // computes the size of every raw_array size for a given type template constexpr size_t get_data_size(std::index_sequence) { size_t _v = 0; ((_v += sizeof(get_generated_array())), ...); return _v; } // expansion for each type template constexpr size_t get_data_size(mpl::type_list, std::index_sequence _seq) { size_t _v = 0; ((_v += get_data_size(_seq)), ...); return _v; } // validates that the raw array extracted out of the buffer is equal // to the raw array that was placed in the buffer template void validate(const std::vector& _headers) { using data_type = test::raw_array; auto& _ref_data = get_generated_array(); for(auto* itr : _headers) { if(itr->hash == typeid(data_type).hash_code()) { auto* _data = static_cast(itr->payload); EXPECT_EQ(_ref_data, *_data); } } } // expansion for every raw array size for a given data type template void validate(const std::vector& _headers, std::index_sequence) { (validate(_headers), ...); } // expansion for each raw array type template void validate(const std::vector& _headers, mpl::type_list, std::index_sequence _seq) { (validate(_headers, _seq), ...); } } // namespace TEST(buffering, parallel) { // this test launches 160 threads, each with a randomly generated array of data // and has them contend for emplacing their data in the same buffer. The purpose // of this test is to validate that multiple threads can write to the same // (lock-free) buffer without any data corruption or loss. constexpr auto num_variants = test_data_types::size() * test_data_sizes.size(); constexpr auto data_size = get_data_size(test_data_types{}, test_data_sizes); EXPECT_EQ(num_variants, 160); // make a buffer large enough to hold all the data we generate auto _buffer = record_header_buffer_t{data_size}; // create a barrier that all child threads will wait and then race to enqueue their data in the // buffer i.e., we want to maximize contention on inserting into buffer auto _data_race_barrier = pthread_barrier_t{}; pthread_barrier_init(&_data_race_barrier, nullptr, num_variants); // a barrier to signal that all threads have completed placing their data in the buffer auto _emplaced_barrier = pthread_barrier_t{}; pthread_barrier_init(&_emplaced_barrier, nullptr, num_variants + 1); // launch 160 threads launch_threads( _buffer, _data_race_barrier, _emplaced_barrier, test_data_types{}, test_data_sizes); // wait for all the threads to complete pthread_barrier_wait(&_emplaced_barrier); // verify the data pulled out the buffer matches the data put in by the threads validate(_buffer.get_record_headers(), test_data_types{}, test_data_sizes); }