diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f1f938b86b..003a81b838 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -83,7 +83,7 @@ if ( DEFINED ROCTRACER_TARGET ) set ( TEST_LIB_SRC ${TEST_DIR}/tool/tracer_tool.cpp ${UTIL_SRC} ) add_library ( ${TEST_LIB} SHARED ${TEST_LIB_SRC} ) target_include_directories ( ${TEST_LIB} PRIVATE ${HSA_TEST_DIR} ${ROOT_DIR} ${ROOT_DIR}/inc ${HSA_RUNTIME_INC_PATH} ${ROCM_INC_PATH} ${HIP_INC_DIR} ${HSA_KMT_INC_PATH} ${GEN_INC_DIR} ) - target_link_libraries ( ${TEST_LIB} ${ROCTRACER_TARGET} ${HSA_RUNTIME_LIB} c stdc++ dl pthread rt numa ) + target_link_libraries ( ${TEST_LIB} ${ROCTRACER_TARGET} ${HSA_RUNTIME_LIB} c stdc++ atomic dl pthread rt numa ) install ( TARGETS ${TEST_LIB} LIBRARY DESTINATION lib/${DEST_NAME}) endif () diff --git a/test/tool/trace_buffer.h b/test/tool/trace_buffer.h index 3000e188c3..a7bd2c6d69 100644 --- a/test/tool/trace_buffer.h +++ b/test/tool/trace_buffer.h @@ -18,289 +18,248 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#ifndef SRC_CORE_TRACE_BUFFER_H_ -#define SRC_CORE_TRACE_BUFFER_H_ +#ifndef TOOL_TRACE_BUFFER_H_ +#define TOOL_TRACE_BUFFER_H_ #include +#include +#include +#include +#include #include #include #include +#include #include - -#include -#include -#include - -#define FATAL(stream) \ - do { \ - std::ostringstream oss; \ - oss << __FUNCTION__ << "(), " << stream; \ - std::cout << oss.str() << std::endl; \ - abort(); \ - } while (0) - -#define PTHREAD_CALL(call) \ - do { \ - int err = call; \ - if (err != 0) { \ - errno = err; \ - perror(#call); \ - abort(); \ - } \ - } while (0) +#include +#include namespace roctracer { -enum { TRACE_ENTRY_INV = 0, TRACE_ENTRY_INIT = 1, TRACE_ENTRY_COMPL = 2 }; - -enum entry_type_t { - DFLT_ENTRY_TYPE = 0, - API_ENTRY_TYPE = 1, - COPY_ENTRY_TYPE = 2, - KERNEL_ENTRY_TYPE = 3, - NUM_ENTRY_TYPE = 4 -}; - -template struct push_element_fun { - T* const elem_; - T** prev_; - bool fun(T* node) { - if (node->priority_ > elem_->priority_) { - *prev_ = elem_; - elem_->next_elem_ = node; - } else if (node->next_elem_ == NULL) { - node->next_elem_ = elem_; - } else { - prev_ = &(node->next_elem_); - return false; - } - return true; - } - push_element_fun(T* elem, T** prev) : elem_(elem), prev_(prev) {} -}; - -template struct call_element_fun { - void (T::*fptr_)(); - bool fun(T* node) const { - (node->*fptr_)(); - return false; - } - call_element_fun(void (T::*f)()) : fptr_(f) {} -}; struct TraceBufferBase { - typedef std::mutex mutex_t; - - virtual void StartWorkerThread() = 0; - virtual void Flush() = 0; - - static void StartWorkerThreadAll() { - foreach (call_element_fun(&TraceBufferBase::StartWorkerThread)) - ; - } static void FlushAll() { - foreach (call_element_fun(&TraceBufferBase::Flush)) - ; + std::lock_guard lock(mutex_); + + for (auto* trace_buffer = head_; trace_buffer != nullptr; trace_buffer = trace_buffer->next_) + trace_buffer->Flush(); } static void Push(TraceBufferBase* elem) { - if (head_elem_ == NULL) - head_elem_ = elem; - else - foreach (push_element_fun(elem, &head_elem_)) - ; + std::lock_guard lock(mutex_); + + auto** prev_ptr = &head_; + while (*prev_ptr != nullptr && elem->priority_ > (*prev_ptr)->priority_) + prev_ptr = &(*prev_ptr)->next_; + + elem->next_ = *prev_ptr; + *prev_ptr = elem; } - TraceBufferBase(const uint32_t& prior) : priority_(prior), next_elem_(NULL) {} + TraceBufferBase(std::string name, int priority) + : name_(std::move(name)), priority_(priority), next_(nullptr) {} - template static void foreach (const F& f_in) { - std::lock_guard lck(mutex_); - F f = f_in; - TraceBufferBase* p = head_elem_; - while (p != NULL) { - TraceBufferBase* next = p->next_elem_; - if (f.fun(p) == true) break; - p = next; - } - } + virtual void Flush() = 0; - const uint32_t priority_; - TraceBufferBase* next_elem_; - static TraceBufferBase* head_elem_; - static mutex_t mutex_; + const std::string& name() const { return name_; } + + private: + const std::string name_; + const int priority_; + TraceBufferBase* next_; + + static TraceBufferBase* head_; + static std::mutex mutex_; }; -template class TraceBuffer : protected TraceBufferBase { +enum TraceEntryState { TRACE_ENTRY_INVALID = 0, TRACE_ENTRY_INIT = 1, TRACE_ENTRY_COMPLETE = 2 }; + +template > +class TraceBuffer : protected TraceBufferBase { public: - typedef void (*callback_t)(Entry*); - typedef TraceBuffer Obj; - typedef uint64_t pointer_t; - typedef std::recursive_mutex mutex_t; - typedef typename std::list buf_list_t; - typedef typename buf_list_t::iterator buf_list_it_t; + using callback_t = std::function; - struct flush_prm_t { - entry_type_t type; - callback_t fun; - }; + TraceBuffer(std::string name, uint64_t size, callback_t flush_callback, int priority = 0) + : TraceBufferBase(std::move(name), priority), + flush_callback_(std::move(flush_callback)), + size_(size) { + assert(size_ != 0 && "cannot create an empty trace buffer"); - TraceBuffer(const char* name, uint32_t size, const flush_prm_t* flush_prm_arr, - uint32_t flush_prm_count, uint32_t prior = 0) - : TraceBufferBase(prior), size_(size), work_thread_started_(false) { - name_ = strdup(name); - data_ = allocate_fun(); - next_ = allocate_fun(); - read_pointer_ = 0; - write_pointer_ = 0; - end_pointer_ = size; - buf_list_.push_back(data_); + Entry* write_buffer = allocator_.allocate(size_); + assert(write_buffer != nullptr); + buffer_list_.push_back(write_buffer); - memset(f_array_, 0, sizeof(f_array_)); - for (const flush_prm_t* prm = flush_prm_arr; prm < flush_prm_arr + flush_prm_count; prm++) { - const entry_type_t type = prm->type; - if (type >= NUM_ENTRY_TYPE) FATAL("out of f_array bounds (" << type << ")"); - if (f_array_[type] != NULL) FATAL("handler function ptr redefinition (" << type << ")"); - f_array_[type] = prm->fun; - } + read_index_ = 0; + write_index_ = {0, write_buffer}; + AllocateFreeBuffer(); + + // Add this instance to the link list of all trace buffers in the process. TraceBufferBase::Push(this); } ~TraceBuffer() { - StopWorkerThread(); + // Flush the remaining records. After flushing, there should not be any records left in the + // trace buffer. Flush(); - } + assert(read_index_ == write_index_.load().index); - void StartWorkerThread() { - std::lock_guard lck(mutex_); - if (work_thread_started_ == false) { - PTHREAD_CALL(pthread_mutex_init(&work_mutex_, NULL)); - PTHREAD_CALL(pthread_cond_init(&work_cond_, NULL)); - PTHREAD_CALL(pthread_create(&work_thread_, NULL, allocate_worker, this)); - work_thread_started_ = true; - } - } + // Acquire both the writer and worker lock as we are accessing shared variables they protect. + std::unique_lock writer_lock(write_mutex_, std::defer_lock); + std::unique_lock worker_lock(worker_mutex_, std::defer_lock); + std::lock(writer_lock, worker_lock); - void StopWorkerThread() { - std::lock_guard lck(mutex_); - if (work_thread_started_ == true) { - PTHREAD_CALL(pthread_cancel(work_thread_)); - void* res; - PTHREAD_CALL(pthread_join(work_thread_, &res)); - if (res != PTHREAD_CANCELED) FATAL("consumer thread wasn't stopped correctly"); - work_thread_started_ = false; + // Deallocate the buffers. + allocator_.deallocate(write_index_.load().buffer, size_); + allocator_.deallocate(free_buffer_, size_); + + // Stop the worker thread. The worker thread loop checks the 'worker_thread_' std::optional + // after waking up, and exits if it does not have a value. + if (worker_thread_) { + std::thread worker_thread = std::move(worker_thread_.value()); + { + // Tell the worker thread loop to exit. + worker_thread_.reset(); + free_buffer_ = nullptr; + worker_cond_.notify_one(); + } + // Release the worker lock to allow the worker thread to exit. + worker_lock.unlock(); + worker_thread.join(); } } Entry* GetEntry() { - const pointer_t pointer = write_pointer_.fetch_add(1); - if (pointer >= end_pointer_) wrap_buffer(pointer); - if (pointer >= end_pointer_) FATAL("pointer >= end_pointer_ after buffer wrap"); - Entry* entry = data_ + (size_ + pointer - end_pointer_); - entry->valid = TRACE_ENTRY_INV; - entry->type = DFLT_ENTRY_TYPE; - return entry; + auto current = write_index_.load(std::memory_order_relaxed); + + while (true) { + // If the pointer is at the end of the current buffer, switch to the available free buffer and + // notify the worker thread to allocate a new buffer. + if (current.index != 0 && current.index % size_ == 0) { + std::lock_guard lock(write_mutex_); + + // If the worker thread wasn't already started, start it now. This avoids starting a new + // thread when the trace buffer is created. + if (!worker_thread_) { + std::promise ready; + auto future = ready.get_future(); + { + std::lock_guard worker_lock(worker_mutex_); + worker_thread_.emplace(&TraceBuffer::WorkerThreadLoop, this, std::move(ready)); + } + future.wait(); + } + + // Re-check the pointer overflow under the writer lock, another thread could have beaten us + // to it and already bumped the write_index_. + current = write_index_.load(std::memory_order_relaxed); + if (current.index % size_ == 0) { + std::unique_lock worker_lock(worker_mutex_); + + // Wait for the free buffer to become available. + worker_cond_.wait(worker_lock, [this]() { return free_buffer_ != nullptr; }); + + current.buffer = free_buffer_; + buffer_list_.push_back(current.buffer); + write_index_.store({current.index + 1, current.buffer}, std::memory_order_relaxed); + + // Tell the worker thread to allocate a new free buffer. + free_buffer_ = nullptr; + worker_cond_.notify_one(); + + // We successfully allocated a new buffer, return the first element. + return ¤t.buffer[0]; + } + } + + if (write_index_.compare_exchange_weak(current, {current.index + 1, current.buffer}, + std::memory_order_relaxed)) + return ¤t.buffer[current.index % size_]; + } } - void Flush() { flush_buf(); } + // Flush all entries between read_pointer and write_pointer. read_pointer and write_pointer are + // monotonically increasing indices, with read_pointer % size always indexing inside the first + // buffer in the list. Stop flushing if an incomplete entry is found, it will be flushed with + // the next invocation after changing its state to 'complete'. + void Flush() override { + std::lock_guard lock(write_mutex_); + auto write_index = write_index_.load(std::memory_order_relaxed); + + for (auto it = buffer_list_.begin(); it != buffer_list_.end();) { + auto end_of_buffer = read_index_ - read_index_ % size_ + size_; + + while (read_index_ < std::min(write_index.index, end_of_buffer)) { + Entry* entry = &(*it)[read_index_ % size_]; + + // The entry is not yet complete, stop flushing here. + if (entry->valid.load(std::memory_order_acquire) != TRACE_ENTRY_COMPLETE) return; + + flush_callback_(entry); + ++read_index_; + } + + // The buffer is still in use or the read pointer did not reach the end of the buffer. + if (*it == write_index.buffer || read_index_ != end_of_buffer) return; + + // All entries in the current buffer are now processed. Destroy the buffer and move onto the + // next buffer in the list. + allocator_.deallocate(*it, size_); + it = buffer_list_.erase(it); + } + } private: - void flush_buf() { - std::lock_guard lck(mutex_); + void AllocateFreeBuffer() { + assert(free_buffer_ == nullptr); - pointer_t pointer = read_pointer_; - pointer_t curr_pointer = write_pointer_.load(std::memory_order_relaxed); - buf_list_it_t it = buf_list_.begin(); - buf_list_it_t end_it = buf_list_.end(); - while (it != end_it) { - Entry* buf = *it; - Entry* ptr = buf + (pointer % size_); - Entry* end_ptr = buf + size_; - while ((ptr < end_ptr) && (pointer < curr_pointer)) { - if (ptr->valid != TRACE_ENTRY_COMPL) break; + free_buffer_ = allocator_.allocate(size_); + assert(free_buffer_ != nullptr); - entry_type_t type = ptr->type; - if (type >= NUM_ENTRY_TYPE) FATAL("out of f_array bounds (" << type << ")"); - callback_t f_ptr = f_array_[type]; - if (f_ptr == NULL) FATAL("f_ptr == NULL"); - (*f_ptr)(ptr); + for (size_t i = 0; i < size_; ++i) + free_buffer_[i].valid.store(TRACE_ENTRY_INVALID, std::memory_order_relaxed); + } - ptr++; - pointer++; - } + void WorkerThreadLoop(std::promise ready) { + std::unique_lock lock(worker_mutex_); - buf_list_it_t prev = it; - it++; - if (ptr == end_ptr) { - free_fun(*prev); - buf_list_.erase(prev); - } - if (pointer == curr_pointer) break; + // This worker thread is now ready to accept work. + ready.set_value(); + + while (true) { + worker_cond_.wait(lock, [this]() { return free_buffer_ == nullptr; }); + if (!worker_thread_) break; + AllocateFreeBuffer(); + worker_cond_.notify_one(); } - - read_pointer_ = pointer; } - inline Entry* allocate_fun() { - Entry* ptr = (Entry*)malloc(size_ * sizeof(Entry)); - if (ptr == NULL) FATAL("malloc failed"); - // memset(ptr, 0, size_ * sizeof(Entry)); - return ptr; - } + // The WriteIndex is used to store both the index and the buffer associated with that index (the + // buffer contains the trace buffer records at [index - index % size, index - index % size_t + + // size_ - 1]) in a single atomic variable. + struct WriteIndex { + uint64_t index; + Entry* buffer; + }; - inline void free_fun(void* ptr) { free(ptr); } + const callback_t flush_callback_; + const uint64_t size_; - static void* allocate_worker(void* arg) { - Obj* obj = (Obj*)arg; + uint64_t read_index_; // The index of the next record to flush. + std::atomic write_index_; // The index of the next record that could be written. + Entry* free_buffer_{nullptr}; // The next available free buffer. - while (1) { - PTHREAD_CALL(pthread_mutex_lock(&(obj->work_mutex_))); - while (obj->next_ != NULL) { - PTHREAD_CALL(pthread_cond_wait(&(obj->work_cond_), &(obj->work_mutex_))); - } - obj->next_ = obj->allocate_fun(); - PTHREAD_CALL(pthread_mutex_unlock(&(obj->work_mutex_))); - } + std::optional worker_thread_; + std::mutex worker_mutex_; + std::condition_variable worker_cond_; - return NULL; - } - - void wrap_buffer(const pointer_t pointer) { - std::lock_guard lck(mutex_); - if (work_thread_started_ == false) StartWorkerThread(); - - PTHREAD_CALL(pthread_mutex_lock(&work_mutex_)); - if (pointer >= end_pointer_) { - data_ = next_; - next_ = NULL; - PTHREAD_CALL(pthread_cond_signal(&work_cond_)); - end_pointer_ += size_; - if (end_pointer_ == 0) FATAL("pointer overflow"); - buf_list_.push_back(data_); - } - PTHREAD_CALL(pthread_mutex_unlock(&work_mutex_)); - } - - const char* name_; - const uint32_t size_; - Entry* data_; - Entry* next_; - pointer_t read_pointer_; - volatile std::atomic write_pointer_; - volatile std::atomic end_pointer_; - buf_list_t buf_list_; - callback_t f_array_[NUM_ENTRY_TYPE]; - - pthread_t work_thread_; - pthread_mutex_t work_mutex_; - pthread_cond_t work_cond_; - bool work_thread_started_; - - mutex_t mutex_; + std::mutex write_mutex_; + std::list buffer_list_; + Allocator allocator_; }; } // namespace roctracer #define TRACE_BUFFER_INSTANTIATE() \ - roctracer::TraceBufferBase* roctracer::TraceBufferBase::head_elem_ = NULL; \ - roctracer::TraceBufferBase::mutex_t roctracer::TraceBufferBase::mutex_; + roctracer::TraceBufferBase* roctracer::TraceBufferBase::head_ = nullptr; \ + std::mutex roctracer::TraceBufferBase::mutex_; -#endif // SRC_CORE_TRACE_BUFFER_H_ +#endif // TOOL_TRACE_BUFFER_H_ diff --git a/test/tool/tracer_tool.cpp b/test/tool/tracer_tool.cpp index 9e23becbe9..edb07184b5 100644 --- a/test/tool/tracer_tool.cpp +++ b/test/tool/tracer_tool.cpp @@ -18,6 +18,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +#include #include #include @@ -235,8 +236,7 @@ void* flush_thr_fun(void*) { // rocTX annotation tracing struct roctx_trace_entry_t { - std::atomic valid; - roctracer::entry_type_t type; + std::atomic valid; uint32_t cid; timestamp_t time; uint32_t pid; @@ -245,9 +245,6 @@ struct roctx_trace_entry_t { const char* message; }; -void roctx_flush_cb(roctx_trace_entry_t* entry); -constexpr roctracer::TraceBuffer::flush_prm_t roctx_flush_prm = { - roctracer::DFLT_ENTRY_TYPE, roctx_flush_cb}; roctracer::TraceBuffer* roctx_trace_buffer = NULL; // rocTX callback function @@ -265,7 +262,7 @@ static inline void roctx_callback_fun(uint32_t domain, uint32_t cid, uint32_t ti entry->tid = tid; entry->rid = rid; entry->message = (message != NULL) ? strdup(message) : NULL; - entry->valid.store(roctracer::TRACE_ENTRY_COMPL, std::memory_order_release); + entry->valid.store(roctracer::TRACE_ENTRY_COMPLETE, std::memory_order_release); } void roctx_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, void* arg) { @@ -314,7 +311,6 @@ void roctx_flush_cb(roctx_trace_entry_t* entry) { struct hsa_api_trace_entry_t { std::atomic valid; - roctracer::entry_type_t type; uint32_t cid; timestamp_t begin; timestamp_t end; @@ -323,9 +319,6 @@ struct hsa_api_trace_entry_t { hsa_api_data_t data; }; -void hsa_api_flush_cb(hsa_api_trace_entry_t* entry); -constexpr roctracer::TraceBuffer::flush_prm_t hsa_flush_prm = { - roctracer::DFLT_ENTRY_TYPE, hsa_api_flush_cb}; roctracer::TraceBuffer* hsa_api_trace_buffer = NULL; // HSA API callback function @@ -345,7 +338,7 @@ void hsa_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, entry->pid = GetPid(); entry->tid = GetTid(); entry->data = *data; - entry->valid.store(roctracer::TRACE_ENTRY_COMPL, std::memory_order_release); + entry->valid.store(roctracer::TRACE_ENTRY_COMPLETE, std::memory_order_release); } } @@ -362,7 +355,6 @@ void hsa_api_flush_cb(hsa_api_trace_entry_t* entry) { struct hip_api_trace_entry_t { std::atomic valid; - roctracer::entry_type_t type; uint32_t domain; uint32_t cid; timestamp_t begin; @@ -374,9 +366,6 @@ struct hip_api_trace_entry_t { void* ptr; }; -void hip_api_flush_cb(hip_api_trace_entry_t* entry); -constexpr roctracer::TraceBuffer::flush_prm_t hip_api_flush_prm = { - roctracer::DFLT_ENTRY_TYPE, hip_api_flush_cb}; roctracer::TraceBuffer* hip_api_trace_buffer = NULL; static inline bool is_hip_kernel_launch_api(const uint32_t& cid) { @@ -455,7 +444,7 @@ void hip_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, } } - entry->valid.store(roctracer::TRACE_ENTRY_COMPL, std::memory_order_release); + entry->valid.store(roctracer::TRACE_ENTRY_COMPLETE, std::memory_order_release); } DEBUG_TRACE( @@ -480,7 +469,7 @@ void mark_api_callback(uint32_t domain, uint32_t cid, const void* callback_data, entry->data = {}; entry->name = strdup(name); entry->ptr = NULL; - entry->valid.store(roctracer::TRACE_ENTRY_COMPL, std::memory_order_release); + entry->valid.store(roctracer::TRACE_ENTRY_COMPLETE, std::memory_order_release); } typedef std::map hip_kernel_map_t; @@ -572,15 +561,11 @@ void hip_api_flush_cb(hip_api_trace_entry_t* entry) { struct hip_act_trace_entry_t { std::atomic valid; - roctracer::entry_type_t type; uint32_t kind; timestamp_t dur; uint64_t correlation_id; }; -void hip_act_flush_cb(hip_act_trace_entry_t* entry); -constexpr roctracer::TraceBuffer::flush_prm_t hip_act_flush_prm = { - roctracer::DFLT_ENTRY_TYPE, hip_act_flush_cb}; roctracer::TraceBuffer* hip_act_trace_buffer = NULL; // HIP ACT trace buffer flush callback @@ -631,7 +616,7 @@ void pool_activity_callback(const char* begin, const char* end, void* arg) { entry->kind = record->kind; entry->dur = record->end_ns - record->begin_ns; entry->correlation_id = record->correlation_id; - entry->valid.store(roctracer::TRACE_ENTRY_COMPL, std::memory_order_release); + entry->valid.store(roctracer::TRACE_ENTRY_COMPLETE, std::memory_order_release); } else { fprintf(hcc_activity_file_handle, "%lu:%lu %d:%lu %s:%lu:%u\n", record->begin_ns, record->end_ns, record->device_id, record->queue_id, name, record->correlation_id, @@ -778,7 +763,7 @@ void tool_unload() { PTHREAD_CALL(pthread_cancel(flush_thread)); void* res; PTHREAD_CALL(pthread_join(flush_thread, &res)); - if (res != PTHREAD_CANCELED) FATAL("flush thread wasn't stopped correctly"); + if (res != PTHREAD_CANCELED) fatal("flush thread wasn't stopped correctly"); } if (trace_roctx) { @@ -1125,13 +1110,13 @@ extern "C" CONSTRUCTOR_API void constructor() { ONLOAD_TRACE_BEG(); roctracer::hip_support::HIP_depth_max = 0; roctx_trace_buffer = - new roctracer::TraceBuffer("rocTX API", 0x200000, &roctx_flush_prm, 1); + new roctracer::TraceBuffer("rocTX API", 0x200000, roctx_flush_cb); hip_api_trace_buffer = - new roctracer::TraceBuffer("HIP API", 0x200000, &hip_api_flush_prm, 1); - hip_act_trace_buffer = new roctracer::TraceBuffer( - "HIP ACT", 0x200000, &hip_act_flush_prm, 1, 1); + new roctracer::TraceBuffer("HIP API", 0x200000, hip_api_flush_cb); + hip_act_trace_buffer = + new roctracer::TraceBuffer("HIP ACT", 0x200000, hip_act_flush_cb, 1); hsa_api_trace_buffer = - new roctracer::TraceBuffer("HSA API", 0x200000, &hsa_flush_prm, 1); + new roctracer::TraceBuffer("HSA API", 0x200000, hsa_api_flush_cb); roctracer_load(); tool_load(); ONLOAD_TRACE_END();