diff --git a/projects/roctracer/src/roctracer/memory_pool.h b/projects/roctracer/src/roctracer/memory_pool.h index d23b195841..4f3bfd96e3 100644 --- a/projects/roctracer/src/roctracer/memory_pool.h +++ b/projects/roctracer/src/roctracer/memory_pool.h @@ -24,9 +24,11 @@ #include #include #include +#include #include #include #include +#include namespace roctracer { @@ -44,7 +46,8 @@ class MemoryPool { pool_end_ = pool_begin_ + allocation_size; buffer_begin_ = pool_begin_; buffer_end_ = buffer_begin_ + properties_.buffer_size; - write_ptr_ = buffer_begin_; + record_ptr_ = buffer_begin_; + data_ptr_ = buffer_end_; // Create a consumer thread and wait for it to be ready to accept work. std::promise ready; @@ -67,38 +70,65 @@ class MemoryPool { MemoryPool(const MemoryPool&) = delete; MemoryPool& operator=(const MemoryPool&) = delete; - template void Write(Record&& record) { + template > + void Write(Record&& record, const void* data, size_t data_size, Functor&& store_data = {}) { + assert(data != nullptr || data_size == 0); // If data is null, then data_size must be 0 + std::lock_guard producer_lock(producer_mutex_); - char* next = write_ptr_ + sizeof(record); - if (next > buffer_end_) { - NotifyConsumerThread(buffer_begin_, write_ptr_); - // Switch buffers - buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_; - buffer_end_ = buffer_begin_ + properties_.buffer_size; - write_ptr_ = buffer_begin_; + // The amount of memory reserved in the buffer to store data. If the data cannot fit because it + // is larger than the buffer size minus one record, then the data won't be copied into the + // buffer. + size_t reserve_data_size = + data_size <= (properties_.buffer_size - sizeof(Record)) ? data_size : 0; - next = write_ptr_ + sizeof(record); - assert(next <= buffer_end_ && "buffer size is less then the record size"); + std::byte* next_record = record_ptr_ + sizeof(Record); + if (next_record > (data_ptr_ - reserve_data_size)) { + NotifyConsumerThread(buffer_begin_, record_ptr_); + SwitchBuffers(); + next_record = record_ptr_ + sizeof(Record); + assert(next_record <= buffer_end_ && "buffer size is less then the record size"); + } + + // Store data in the record. Copy the data first if it fits in the buffer + // (reserve_data_size != 0). + if (reserve_data_size) { + data_ptr_ -= data_size; + ::memcpy(data_ptr_, data, data_size); + store_data(record, data_ptr_); + } else if (data != nullptr) { + store_data(record, data); } // Store the record into the buffer, and increment the write pointer. - ::memcpy(write_ptr_, &record, sizeof(record)); - write_ptr_ = next; + ::memcpy(record_ptr_, &record, sizeof(Record)); + record_ptr_ = next_record; + + // If the data does not fit in the buffer, flush the buffer with the record as is. We don't copy + // the data so we make sure that the record and its data are processed by waiting until the + // flush is complete. + if (data != nullptr && reserve_data_size == 0) { + NotifyConsumerThread(buffer_begin_, record_ptr_); + SwitchBuffers(); + { + std::unique_lock consumer_lock(consumer_mutex_); + consumer_cond_.wait(consumer_lock, [this]() { return !consumer_arg_.valid; }); + } + } + } + template void Write(Record&& record) { + using DataPtr = void*; + Write(std::forward(record), DataPtr(nullptr), 0, {}); } // Flush the records and block until they are all made visible to the client. void Flush() { { std::lock_guard producer_lock(producer_mutex_); - if (write_ptr_ == buffer_begin_) return; + if (record_ptr_ == buffer_begin_) return; - NotifyConsumerThread(buffer_begin_, write_ptr_); - - // Switch buffers - buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_; - buffer_end_ = buffer_begin_ + properties_.buffer_size; - write_ptr_ = buffer_begin_; + NotifyConsumerThread(buffer_begin_, record_ptr_); + SwitchBuffers(); } { // Wait for the current operation to complete. @@ -108,6 +138,13 @@ class MemoryPool { } private: + void SwitchBuffers() { + buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_; + buffer_end_ = buffer_begin_ + properties_.buffer_size; + record_ptr_ = buffer_begin_; + data_ptr_ = buffer_end_; + } + void ConsumerThreadLoop(std::promise ready) { std::unique_lock consumer_lock(consumer_mutex_); @@ -120,7 +157,8 @@ class MemoryPool { // begin == end == nullptr means the thread needs to exit. if (consumer_arg_.begin == nullptr && consumer_arg_.end == nullptr) break; - properties_.buffer_callback_fun(consumer_arg_.begin, consumer_arg_.end, + properties_.buffer_callback_fun(reinterpret_cast(consumer_arg_.begin), + reinterpret_cast(consumer_arg_.end), properties_.buffer_callback_arg); // Mark this operation as complete (valid=false) and notify all producers that may be @@ -131,7 +169,7 @@ class MemoryPool { } } - void NotifyConsumerThread(const char* data_begin, const char* data_end) { + void NotifyConsumerThread(const std::byte* data_begin, const std::byte* data_end) { std::unique_lock consumer_lock(consumer_mutex_); // If consumer_arg_ is still in use (valid=true), then wait for the consumer thread to finish @@ -148,18 +186,18 @@ class MemoryPool { consumer_cond_.notify_all(); } - void AllocateMemory(char** ptr, size_t size) const { + void AllocateMemory(std::byte** ptr, size_t size) const { if (properties_.alloc_fun != nullptr) { // Use the custom allocator provided in the properties. - properties_.alloc_fun(ptr, size, properties_.alloc_arg); + properties_.alloc_fun(reinterpret_cast(ptr), size, properties_.alloc_arg); return; } // No custom allocator was provided so use the default malloc/realloc/free allocator. if (*ptr == nullptr) { - *ptr = reinterpret_cast(malloc(size)); + *ptr = static_cast(malloc(size)); } else if (size != 0) { - *ptr = reinterpret_cast(realloc(*ptr, size)); + *ptr = static_cast(realloc(*ptr, size)); } else { free(*ptr); *ptr = nullptr; @@ -170,18 +208,19 @@ class MemoryPool { const roctracer_properties_t properties_; // Pool definition - char* pool_begin_; // FIXME: shouldn't these be void*? - char* pool_end_; - char* buffer_begin_; - char* buffer_end_; - char* write_ptr_; + std::byte* pool_begin_; + std::byte* pool_end_; + std::byte* buffer_begin_; + std::byte* buffer_end_; + std::byte* record_ptr_; + std::byte* data_ptr_; std::mutex producer_mutex_; // Consumer thread std::thread consumer_thread_; struct { - const char* begin; - const char* end; + const std::byte* begin; + const std::byte* end; bool valid = false; } consumer_arg_; diff --git a/projects/roctracer/test/CMakeLists.txt b/projects/roctracer/test/CMakeLists.txt index 66961ca1b4..8074ddd0f0 100644 --- a/projects/roctracer/test/CMakeLists.txt +++ b/projects/roctracer/test/CMakeLists.txt @@ -80,12 +80,6 @@ target_include_directories(codeobj_test PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${PR target_link_libraries(codeobj_test roctracer) add_dependencies(mytest codeobj_test) -## Build the trace_buffer test -add_executable(trace_buffer EXCLUDE_FROM_ALL directed/trace_buffer.cpp) -target_include_directories(trace_buffer PRIVATE ${PROJECT_SOURCE_DIR}/src/tracer_tool) -target_link_libraries(trace_buffer Threads::Threads atomic) -add_dependencies(mytest trace_buffer) - ## Build the hsa (standalone) copy test function(generate_hsaco TARGET_ID INPUT_FILE OUTPUT_FILE) separate_arguments(CLANG_ARG_LIST UNIX_COMMAND @@ -126,6 +120,18 @@ add_executable(load_unload_reload_test EXCLUDE_FROM_ALL hsa/load_unload_reload.c target_link_libraries(load_unload_reload_test hsa-runtime64::hsa-runtime64) add_dependencies(mytest load_unload_reload_test) +## Build the trace_buffer test +add_executable(trace_buffer EXCLUDE_FROM_ALL directed/trace_buffer.cpp) +target_include_directories(trace_buffer PRIVATE ${PROJECT_SOURCE_DIR}/src/tracer_tool) +target_link_libraries(trace_buffer Threads::Threads atomic) +add_dependencies(mytest trace_buffer) + +## Build the memory_pool test +add_executable(memory_pool EXCLUDE_FROM_ALL directed/memory_pool.cpp) +target_include_directories(memory_pool PRIVATE ${PROJECT_SOURCE_DIR}/src/roctracer ${PROJECT_SOURCE_DIR}/inc) +target_link_libraries(memory_pool Threads::Threads atomic) +add_dependencies(mytest memory_pool) + ## Copy the golden traces and test scripts configure_file(run.sh ${PROJECT_BINARY_DIR} COPYONLY) execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink run.sh ${PROJECT_BINARY_DIR}/run_ci.sh) diff --git a/projects/roctracer/test/directed/memory_pool.cpp b/projects/roctracer/test/directed/memory_pool.cpp new file mode 100644 index 0000000000..08dd5b59ed --- /dev/null +++ b/projects/roctracer/test/directed/memory_pool.cpp @@ -0,0 +1,125 @@ +/* Copyright (c) 2022 Advanced Micro Devices, Inc. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. */ + +#include "roctracer.h" +#include "memory_pool.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace roctracer; + +namespace { + +std::ifstream cpuinfo("/proc/cpuinfo"); +const std::size_t num_cpu_cores = + std::count(std::istream_iterator(cpuinfo), std::istream_iterator(), + std::string("processor")); + +constexpr std::size_t num_iterations = 1000; +constexpr std::size_t min_num_threads = 10; +constexpr std::size_t max_num_threads = 50; + +void fatal_error(const char* message) { + std::cerr << message << std::endl; + abort(); +} + +} // namespace + +int main() { + constexpr size_t buffer_size = 10 * sizeof(roctracer_record_t); + constexpr size_t max_data_size = buffer_size - sizeof(roctracer_record_t); + + size_t flush_count = 0, record_count = 0; + auto flush_callback = [&flush_count, &record_count](const char* begin, const char* end) { + ++flush_count; + std::this_thread::sleep_for(std::chrono::microseconds(10)); + record_count += (end - begin) / sizeof(roctracer_record_t); + }; + + roctracer_properties_t properties{}; + properties.buffer_callback_fun = [](const char* begin, const char* end, void* arg) { + (*static_cast(arg))(begin, end); + }; + properties.buffer_callback_arg = &flush_callback; + properties.buffer_size = buffer_size; + MemoryPool pool(properties); + + const void* original_data; + std::atomic relocation_count{0}; + auto relocate_data = [&relocation_count, &original_data](roctracer_record_t&, const void* data) { + if (data != original_data) ++relocation_count; + }; + + // test1: the record and data fit in the buffer: no flush, data should get relocated. + constexpr char data_fits[max_data_size] = {0}; + original_data = data_fits; + pool.Write(roctracer_record_t{}, data_fits, sizeof(data_fits), relocate_data); // F=0, R=1 + pool.Flush(); // F=1, R=1 + if (flush_count != 1 || relocation_count != 1) fatal_error("failed test1"); + + flush_count = record_count = relocation_count = 0; + + // test2: the records and data do not fit in the buffer: 1 flush, data should get relocated. + pool.Write(roctracer_record_t{}); // F=0, R=0 + pool.Write(roctracer_record_t{}, data_fits, sizeof(data_fits), relocate_data); // F=1, R=1 + pool.Flush(); // F=2, R=1 + if (flush_count != 2 || relocation_count != 1) fatal_error("failed test2"); + + flush_count = record_count = relocation_count = 0; + + // test3: data does not fit in the buffer: 1 Flush, data is not relocated, all records should be + // processed. + constexpr char does_not_fit[max_data_size + 1] = {0}; + original_data = does_not_fit; + pool.Write(roctracer_record_t{}, does_not_fit, sizeof(does_not_fit), relocate_data); // F=1, R=0 + if (flush_count != 1 || relocation_count != 0 || record_count != 1) fatal_error("failed test3"); + + flush_count = record_count = relocation_count = 0; + + // test4: stress test writing and flushing. + const std::size_t num_threads = std::clamp(num_cpu_cores, min_num_threads, max_num_threads); + std::vector threads(num_threads); + + // Start the worker threads. Each thread will write 'num_iterations' records in the memory + // pool, then exit. + for (auto&& thread : threads) { + thread = std::thread([&pool]() { + for (std::size_t j = 0; j < num_iterations; ++j) pool.Write(roctracer_record_t{}); + }); + } + + // Wait for all the threads to complete, then flush the trace buffer. + for (auto&& thread : threads) thread.join(); + pool.Flush(); + + if (record_count != num_iterations * threads.size() || + flush_count != (record_count / (buffer_size / sizeof(roctracer_record_t)))) + fatal_error("failed test4"); + + return 0; +} \ No newline at end of file diff --git a/projects/roctracer/test/golden_traces/tests_trace_cmp_levels.txt b/projects/roctracer/test/golden_traces/tests_trace_cmp_levels.txt index 0355a16c72..8ab4e93b45 100644 --- a/projects/roctracer/test/golden_traces/tests_trace_cmp_levels.txt +++ b/projects/roctracer/test/golden_traces/tests_trace_cmp_levels.txt @@ -16,6 +16,7 @@ copy_hsa_input_trace --check-events .* load_unload_reload_trace --check-order .* hsa_co_trace --check-none code_obj_trace --check-none -trace_buffer_trace --check-none +trace_buffer --check-none +memory_pool --check-none roctx_test_trace --check-count .* backward_compat_test_trace --check-none \ No newline at end of file diff --git a/projects/roctracer/test/run.sh b/projects/roctracer/test/run.sh index baebac067f..5878650ebe 100755 --- a/projects/roctracer/test/run.sh +++ b/projects/roctracer/test/run.sh @@ -186,7 +186,8 @@ eval_test "tool tracer codeobj" ./test/MatrixTranspose code_obj_trace #valgrind --tool=massif $tbin #ms_print massif.out. -eval_test "directed TraceBuffer test" ./test/trace_buffer trace_buffer_trace +eval_test "directed TraceBuffer test" ./test/trace_buffer trace_buffer +eval_test "directed MemoryPool test" ./test/memory_pool memory_pool eval_test "backward compatibilty tests" ./test/backward_compat_test backward_compat_test_trace