From 36c9a8b4de3b53e0cebd3a30eb390feb92fa4ca2 Mon Sep 17 00:00:00 2001 From: Giovanni LB Date: Tue, 13 Aug 2024 16:12:14 -0300 Subject: [PATCH] SWDEV-477909: Adding fixes for signal management in queue.cpp Change-Id: I24f2ab24c0a4824cf9feff6c1cd91a3eee2ef4be --- src/core/context.h | 4 +- src/core/counters/metrics/eval_metrics.cpp | 2 +- src/core/hsa/packets/packets_generator.cpp | 2 - src/core/hsa/queues/queue.cpp | 184 ++++++++++----------- src/core/hsa/queues/queue.h | 4 +- src/core/session/att/att.cpp | 4 +- src/core/session/att/att.h | 10 +- src/core/session/att/continuous.cpp | 4 +- 8 files changed, 101 insertions(+), 113 deletions(-) diff --git a/src/core/context.h b/src/core/context.h index 9f11eff8f0..fa1cb3ff71 100644 --- a/src/core/context.h +++ b/src/core/context.h @@ -288,8 +288,8 @@ class Context { rinfo->data.kind = ROCPROFILER_DATA_KIND_UNINIT; rinfo->data.result_int64 = 0; } - uint32_t xcc_count = agent_info_->xcc_num; - uint32_t single_xcc_buff_size = tuple.profile->output_buffer.size / (sizeof(uint64_t) * xcc_count); + size_t xcc_count = agent_info_->xcc_num; + size_t single_xcc_buff_size = tuple.profile->output_buffer.size / (sizeof(uint64_t) * xcc_count); callback_data_t callback_data{tuple.profile, tuple.info_vector, tuple.info_vector->size(), NULL, single_xcc_buff_size, 0}; const hsa_status_t status = diff --git a/src/core/counters/metrics/eval_metrics.cpp b/src/core/counters/metrics/eval_metrics.cpp index ee2e84b01a..c4a766cd1d 100644 --- a/src/core/counters/metrics/eval_metrics.cpp +++ b/src/core/counters/metrics/eval_metrics.cpp @@ -104,7 +104,7 @@ bool metrics::ExtractMetricEvents( */ try { HSASupport_Singleton& hsasupport_singleton = HSASupport_Singleton::GetInstance(); - uint32_t xcc_count = hsasupport_singleton.GetHSAAgentInfo(gpu_agent.handle).GetDeviceInfo().getXccCount(); + size_t xcc_count = hsasupport_singleton.GetHSAAgentInfo(gpu_agent.handle).GetDeviceInfo().getXccCount(); for (size_t i = 0; i < metric_names.size(); i++) { diff --git a/src/core/hsa/packets/packets_generator.cpp b/src/core/hsa/packets/packets_generator.cpp index d991232a65..cb7e2c7263 100644 --- a/src/core/hsa/packets/packets_generator.cpp +++ b/src/core/hsa/packets/packets_generator.cpp @@ -740,8 +740,6 @@ void AQLPacketProfile::MoveToCache(hsa_agent_t gpu_agent, std::unique_ptrprofile->output_buffer; - std::lock_guard lk(cache_mutex); auto agent_it = _cache.find(gpu_agent.handle); diff --git a/src/core/hsa/queues/queue.cpp b/src/core/hsa/queues/queue.cpp index 8df3d8fb7c..95bc89e0ad 100644 --- a/src/core/hsa/queues/queue.cpp +++ b/src/core/hsa/queues/queue.cpp @@ -54,15 +54,24 @@ } \ } while (0) -#define __NR_gettid 186 +static thread_local uint32_t THREAD_ID = (uint32_t)syscall(186); #define SIGNAL_DELAY_THRESHOLD 3 std::atomic ACTIVE_INTERRUPT_SIGNAL_COUNT{0}; - namespace rocprofiler { typedef std::vector pmc_callback_data_t; +constexpr uint16_t BARRIER_BIT = 1 << HSA_PACKET_HEADER_BARRIER; + +bool GetNoSerialization() { + const static bool no_serialization = []() { + const char* str = getenv("ROCPROFILER_NO_SERIALIZATION"); + if (str != NULL) return (atol(str) > 0); + return false; + }(); + return no_serialization; +} static inline bool IsEventMatch(const hsa_ven_amd_aqlprofile_event_t& event1, const hsa_ven_amd_aqlprofile_event_t& event2) { @@ -346,7 +355,8 @@ void enable_dispatch(Queue* dispatch_queue) { need this to be invoked more than once in which case we would return false. */ -bool AsyncSignalReadyHandler(hsa_signal_value_t signal_value, void* data) { +bool AsyncSignalReadyHandler(hsa_signal_value_t signal_value, void* data) +{ HSASupport_Singleton& hsasupport_singleton = HSASupport_Singleton::GetInstance(); profiler_serializer_t& serializer = rocprofiler::ROCProfiler_Singleton::GetInstance().GetSerializer(); @@ -357,20 +367,21 @@ bool AsyncSignalReadyHandler(hsa_signal_value_t signal_value, void* data) { ready signal is destroyed and the destructor is notified and the handler is unregistered by returning false */ - if (queue->state == is_destroy::to_destroy) { - { - queue->state = done_destroy; - hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(queue->GetReadySignal()); - } + if (queue->state == is_destroy::to_destroy) + { + queue->state = done_destroy; + hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(queue->GetReadySignal()); queue->cv_ready_signal.notify_all(); return false; } queue->cv_ready_signal.notify_all(); + hsasupport_singleton.GetCoreApiTable().hsa_signal_store_screlease_fn(queue->GetReadySignal(), 1); if (serializer.dispatch_queue == nullptr) enable_dispatch(queue); else serializer.dispatch_ready.push_back(queue); + return true; } /* @@ -387,15 +398,6 @@ void SignalAsyncReadyHandler(const hsa_signal_t& signal, void* data) { if (status != HSA_STATUS_SUCCESS) fatal("hsa_amd_signal_async_handler failed"); } -bool GetNoSerialization() { - const static bool no_serialization = []() { - const char* str = getenv("ROCPROFILER_NO_SERIALIZATION"); - if (str != NULL) return (atol(str) > 0); - return false; - }(); - return no_serialization; -} - bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) { auto queue_info_session = static_cast(data); @@ -417,7 +419,7 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) for (auto& pending : pending_signals) { - if (hsasupport_singleton.GetCoreApiTable().hsa_signal_load_relaxed_fn(pending->new_signal)) + if (hsasupport_singleton.GetCoreApiTable().hsa_signal_load_scacquire_fn(pending->new_signal)) return true; hsa_amd_profiling_dispatch_time_t time; hsasupport_singleton.GetAmdExtTable().hsa_amd_profiling_get_dispatch_time_fn( @@ -429,7 +431,7 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) } //hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(pending->new_signal); uint32_t record_count = 1; - uint32_t xcc_count = queue_info_session->xcc_count; + size_t xcc_count = queue_info_session->xcc_count; static thread_local bool is_individual_xcc_mode = [xcc_count]() { if (xcc_count < 2) return false; const char* str = getenv("ROCPROFILER_INDIVIDUAL_XCC_MODE"); @@ -481,25 +483,24 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) } } } - auto* profile = pending->profile ? pending->profile->profile.get() : nullptr; - if (pending->counters_count > 0 && profile && profile->events) + auto profile = (pending->profile && pending->profile->profile && pending->counters_count > 0) ? std::move(pending->profile) : nullptr; + + if (profile && !GetNoSerialization()) { - Packet::AQLPacketProfile::MoveToCache(queue_info_session->agent, std::move(pending->profile)); - if (!GetNoSerialization()) { - profiler_serializer_t& serializer = - rocprofiler::ROCProfiler_Singleton::GetInstance().GetSerializer(); - std::lock_guard serializer_lock(serializer.serializer_mutex); - assert(serializer.dispatch_queue != nullptr); - hsasupport_singleton.GetCoreApiTable().hsa_signal_store_screlease_fn( - queue_info_session->block_signal, 1); - serializer.dispatch_queue = nullptr; - if (!serializer.dispatch_ready.empty()) - { - Queue* queue = serializer.dispatch_ready.front(); - serializer.dispatch_ready.erase(serializer.dispatch_ready.begin()); - enable_dispatch(queue); - } - } + profiler_serializer_t& serializer = + rocprofiler::ROCProfiler_Singleton::GetInstance().GetSerializer(); + std::lock_guard serializer_lock(serializer.serializer_mutex); + assert(serializer.dispatch_queue != nullptr); + hsasupport_singleton.GetCoreApiTable().hsa_signal_store_screlease_fn( + queue_info_session->block_signal, 1); + serializer.dispatch_queue = nullptr; + if (!serializer.dispatch_ready.empty()) + { + Queue* queue = serializer.dispatch_ready.front(); + serializer.dispatch_ready.erase(serializer.dispatch_ready.begin()); + enable_dispatch(queue); + } + ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_sub(1); } if (pending->new_signal.handle) @@ -508,8 +509,8 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn( queue_info_session->interrupt_signal); - if (pending->counters_count > 0 && profile && profile->events) - ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_sub(1); + if (profile) + Packet::AQLPacketProfile::MoveToCache(queue_info_session->agent, std::move(profile)); } delete queue_info_session; return false; @@ -555,7 +556,6 @@ void Queue::ResetSessionID(rocprofiler_session_id_t id) bool Queue::CheckNeededProfileConfigs() { - std::unique_lock session_id_lock(session_id_mutex); is_counter_collection_mode = false; is_timestamp_collection_mode = false; is_att_collection_mode = false; @@ -610,7 +610,7 @@ bool Queue::CheckNeededProfileConfigs() return true; } -std::atomic WRITER_ID{0}; +std::atomic WRITER_ID{0}; /** * @brief This function is a queue write interceptor. It intercepts the @@ -631,7 +631,10 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u session_id.handle != rocprofiler::ROCProfiler_Singleton::GetInstance().GetCurrentSessionId().handle) { session_id_lock.unlock(); - CheckNeededProfileConfigs(); + { + std::unique_lock _lk(session_id_mutex); + CheckNeededProfileConfigs(); + } session_id_lock.lock(); } @@ -658,19 +661,20 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u std::unique_ptr profile_packet{nullptr}; // If counters found in the session - if (session_data_count > 0 && is_counter_collection_mode) { - // Get the PM4 Packets using packets_generator - profile_packet = Packet::AQLPacketProfile::MoveFromCache(queue_info.GetGPUAgent()); - if (!profile_packet) - profile_packet = Packet::InitializeAqlPackets( - queue_info.GetCPUAgent(), queue_info.GetGPUAgent(), session_data, session_id); - - if (ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_add(1) >= SIGNAL_DELAY_THRESHOLD) + if (session_data_count > 0 && is_counter_collection_mode) + { + if (ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_add(1) >= SIGNAL_DELAY_THRESHOLD && !GetNoSerialization()) { - queue_info.cv_ready_signal.wait_for(lk, std::chrono::microseconds(1), [] { + // Optimization: Schedule threads out if there is a lot of contention for HSA queues + queue_info.cv_ready_signal.wait_for(lk, std::chrono::microseconds(5), [] { return ACTIVE_INTERRUPT_SIGNAL_COUNT.load() <= SIGNAL_DELAY_THRESHOLD; }); } + + profile_packet = Packet::AQLPacketProfile::MoveFromCache(queue_info.GetGPUAgent()); + if (!profile_packet) // Get the PM4 Packets using packets_generator + profile_packet = Packet::InitializeAqlPackets( + queue_info.GetCPUAgent(), queue_info.GetGPUAgent(), session_data, session_id); } if (profile_packet.get() && !GetNoSerialization()) @@ -691,97 +695,83 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u Packet::CreateBarrierPacket(&transformed_packets, &block_signal, &block_signal); } - uint32_t writer_id = WRITER_ID.fetch_add(1, std::memory_order_release); + size_t writer_id = WRITER_ID.fetch_add(1); - if (session_data_count > 0 && is_counter_collection_mode && profile_packet.get()) + if (session_data_count > 0 && is_counter_collection_mode && profile_packet) { auto* start_packet = profile_packet->context->start_packet; - // Adding start packet and its barrier with a dummy signal - hsa_signal_t dummy_signal{}; - dummy_signal.handle = 0; - start_packet->header = HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE; - Packet::AddVendorSpecificPacket(start_packet, &transformed_packets, dummy_signal); - - Packet::CreateBarrierPacket( - &transformed_packets, - &start_packet->completion_signal, - nullptr - ); + start_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT; + Packet::AddVendorSpecificPacket(start_packet, &transformed_packets, hsa_signal_t{.handle = 0}); } auto& packet = transformed_packets.emplace_back(packets_arr[i]); - auto& dispatch_packet = reinterpret_cast(packet); + auto& __attribute__((__may_alias__)) dispatch_packet = reinterpret_cast(packet); uint64_t correlation_id = dispatch_packet.reserved2; + if (profile_packet) + dispatch_packet.header |= BARRIER_BIT; - CreateSignal(HSA_AMD_SIGNAL_AMD_GPU_ONLY, &packet.completion_signal); + CreateSignal(0, &packet.completion_signal); // Adding the dispatch packet newly created signal to the pending signals // list to be processed by the signal interrupt rocprofiler_kernel_properties_t kernel_properties = set_kernel_properties(dispatch_packet, queue_info.GetGPUAgent()); - auto* context_backup = profile_packet.get() ? profile_packet->context.get() : nullptr; + auto* read_packet = profile_packet ? profile_packet->context->read_packet : nullptr; + auto* stop_packet = profile_packet ? profile_packet->context->stop_packet : nullptr; + if (session) { uint64_t record_id = rocprofiler::ROCProfiler_Singleton::GetInstance().GetUniqueRecordId(); AddKernelNameWithDispatchID(GetKernelNameFromKsymbols(dispatch_packet.kernel_object), record_id); + if (session_data_count > 0 && profile_packet.get()) { session->GetProfiler()->AddPendingSignals( writer_id, record_id, original_packet.completion_signal, packet.completion_signal, session_id, buffer_id, session_data_count, std::move(profile_packet), - kernel_properties, (uint32_t)syscall(__NR_gettid), user_pkt_index, correlation_id); + kernel_properties, THREAD_ID, user_pkt_index, correlation_id); } else { session->GetProfiler()->AddPendingSignals( writer_id, record_id, original_packet.completion_signal, packet.completion_signal, session_id, buffer_id, session_data_count, nullptr, kernel_properties, - (uint32_t)syscall(__NR_gettid), user_pkt_index, correlation_id); + THREAD_ID, user_pkt_index, correlation_id); } } // Make a copy of the original packet, adding its signal to a barrier // packet and create a new signal for it to get timestamps - if (original_packet.completion_signal.handle) { - hsa_barrier_and_packet_t barrier{}; - barrier.header = HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE; - Packet::packet_t* __attribute__((__may_alias__)) pkt = - (reinterpret_cast(&barrier)); - transformed_packets.emplace_back(*pkt).completion_signal = - original_packet.completion_signal; - - { - std::lock_guard lock( - HSASupport_Singleton::GetInstance().signals_timestamps_map_lock); - HSASupport_Singleton::GetInstance() - .signals_timestamps[original_packet.completion_signal.handle] = - new_signal_timestamp_t{packet.completion_signal, std::nullopt}; - } + if (original_packet.completion_signal.handle) + { + std::lock_guard lock(HSASupport_Singleton::GetInstance().signals_timestamps_map_lock); + HSASupport_Singleton::GetInstance() + .signals_timestamps[original_packet.completion_signal.handle] = + new_signal_timestamp_t{packet.completion_signal, std::nullopt}; } hsa_signal_t interrupt_signal{}; - // Adding a barrier packet with the original packet's completion signal. CreateSignal(0, &interrupt_signal); // Adding Stop and Read PM4 Packets - if (session_data_count > 0 && is_counter_collection_mode && context_backup) + if (session_data_count > 0 && is_counter_collection_mode && stop_packet && read_packet) { - hsa_signal_t dummy_signal{}; - context_backup->read_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | (1 << HSA_PACKET_HEADER_BARRIER); - Packet::AddVendorSpecificPacket(context_backup->read_packet, &transformed_packets, dummy_signal); - context_backup->stop_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | (1 << HSA_PACKET_HEADER_BARRIER); - Packet::AddVendorSpecificPacket(context_backup->stop_packet, &transformed_packets, interrupt_signal); - - // Added Interrupt Signal with barrier and provided handler for it - Packet::CreateBarrierPacket( &transformed_packets, &interrupt_signal, nullptr); + read_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT; + Packet::AddVendorSpecificPacket(read_packet, &transformed_packets, original_packet.completion_signal); + stop_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT; + Packet::AddVendorSpecificPacket(stop_packet, &transformed_packets, interrupt_signal); } else + { + Packet::packet_t pkt{}; + pkt.header = (HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT; + transformed_packets.emplace_back(pkt).completion_signal = original_packet.completion_signal; Packet::CreateBarrierPacket(&transformed_packets, nullptr, &interrupt_signal); + } rocprofiler::HSAAgentInfo& agentInfo = - rocprofiler::HSASupport_Singleton::GetInstance().GetHSAAgentInfo( - queue_info.GetGPUAgent().handle); + rocprofiler::HSASupport_Singleton::GetInstance().GetHSAAgentInfo(queue_info.GetGPUAgent().handle); // Creating Async Handler to be called every time the interrupt signal is // marked complete SignalAsyncHandler( @@ -792,7 +782,7 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u agentInfo.GetDeviceInfo().getXccCount(), queue_info.GetBlockSignal()}); } /* Write the transformed packets to the hardware queue. */ - writer(&transformed_packets[0], transformed_packets.size()); + writer(transformed_packets.data(), transformed_packets.size()); } else if ( !is_att_collection_mode|| !session || diff --git a/src/core/hsa/queues/queue.h b/src/core/hsa/queues/queue.h index 38aefc06c0..9909994c5d 100644 --- a/src/core/hsa/queues/queue.h +++ b/src/core/hsa/queues/queue.h @@ -109,10 +109,10 @@ struct queue_info_session_t { hsa_agent_t agent; rocprofiler_session_id_t session_id; uint64_t queue_id; - uint32_t writer_id; + size_t writer_id; hsa_signal_t interrupt_signal; uint64_t gpu_index; - uint32_t xcc_count; + size_t xcc_count; hsa_signal_t block_signal; }; diff --git a/src/core/session/att/att.cpp b/src/core/session/att/att.cpp index 895083e309..a19e99e449 100644 --- a/src/core/session/att/att.cpp +++ b/src/core/session/att/att.cpp @@ -35,7 +35,7 @@ AttTracer::AttTracer(rocprofiler_buffer_id_t buffer_id, rocprofiler_filter_id_t : buffer_id_(buffer_id), filter_id_(filter_id), session_id_(session_id) {} void AttTracer::AddPendingSignals( - uint32_t writer_id, + size_t writer_id, uint64_t kernel_object, const hsa_signal_t& original_completion_signal, const hsa_signal_t& new_completion_signal, @@ -66,7 +66,7 @@ void AttTracer::AddPendingSignals( }); } -std::vector AttTracer::MovePendingSignals(uint32_t writer_id) +std::vector AttTracer::MovePendingSignals(size_t writer_id) { std::lock_guard lock(sessions_pending_signals_lock_); auto it = sessions_pending_signals_.find(writer_id); diff --git a/src/core/session/att/att.h b/src/core/session/att/att.h index 75dc44c5bf..b4ec6822b0 100644 --- a/src/core/session/att/att.h +++ b/src/core/session/att/att.h @@ -71,7 +71,7 @@ public: rocprofiler_session_id_t session_id ); - void AddPendingSignals(uint32_t writer_id, uint64_t kernel_object, + void AddPendingSignals(size_t writer_id, uint64_t kernel_object, const hsa_signal_t& original_completion_signal, const hsa_signal_t& new_completion_signal, rocprofiler_session_id_t session_id, rocprofiler_buffer_id_t buffer_id, @@ -95,7 +95,7 @@ public: uint64_t agent_handle ); - std::vector MovePendingSignals(uint32_t writer_id); + std::vector MovePendingSignals(size_t writer_id); bool ATTWriteInterceptor( const void* packets, @@ -201,7 +201,7 @@ private: rocprofiler_buffer_id_t buffer_id_; rocprofiler_filter_id_t filter_id_; rocprofiler_session_id_t session_id_; - std::atomic WRITER_ID{0}; + std::atomic WRITER_ID{0}; std::vector kernel_profile_names; std::vector> kernel_profile_dispatch_ids; @@ -209,12 +209,12 @@ private: std::vector att_parameters_data; std::mutex sessions_pending_signals_lock_; - std::map> sessions_pending_signals_; + std::map> sessions_pending_signals_; std::condition_variable has_session_pending_cv; std::atomic bIsSessionDestroying{false}; rocprofiler_record_id_t capture_id; - std::unordered_set active_capture_event_ids; + std::unordered_set active_capture_event_ids; }; } // namespace att diff --git a/src/core/session/att/continuous.cpp b/src/core/session/att/continuous.cpp index 5f02bd673b..10763a916a 100644 --- a/src/core/session/att/continuous.cpp +++ b/src/core/session/att/continuous.cpp @@ -261,12 +261,12 @@ bool AttTracer::ATTContiguousWriteInterceptor( codeobj_event_cnt = new_load_cnt; auto symbols = codeobj_record::get_capture(this->capture_id); - std::unordered_set current_ids; + std::unordered_set current_ids; for (size_t s=0; s