SWDEV-477909: Adding fixes for signal management in queue.cpp

Change-Id: I24f2ab24c0a4824cf9feff6c1cd91a3eee2ef4be
Этот коммит содержится в:
Giovanni LB
2024-08-13 16:12:14 -03:00
родитель 865e64753a
Коммит 36c9a8b4de
8 изменённых файлов: 101 добавлений и 113 удалений
+2 -2
Просмотреть файл
@@ -288,8 +288,8 @@ class Context {
rinfo->data.kind = ROCPROFILER_DATA_KIND_UNINIT;
rinfo->data.result_int64 = 0;
}
uint32_t xcc_count = agent_info_->xcc_num;
uint32_t single_xcc_buff_size = tuple.profile->output_buffer.size / (sizeof(uint64_t) * xcc_count);
size_t xcc_count = agent_info_->xcc_num;
size_t single_xcc_buff_size = tuple.profile->output_buffer.size / (sizeof(uint64_t) * xcc_count);
callback_data_t callback_data{tuple.profile, tuple.info_vector, tuple.info_vector->size(),
NULL, single_xcc_buff_size, 0};
const hsa_status_t status =
+1 -1
Просмотреть файл
@@ -104,7 +104,7 @@ bool metrics::ExtractMetricEvents(
*/
try {
HSASupport_Singleton& hsasupport_singleton = HSASupport_Singleton::GetInstance();
uint32_t xcc_count = hsasupport_singleton.GetHSAAgentInfo(gpu_agent.handle).GetDeviceInfo().getXccCount();
size_t xcc_count = hsasupport_singleton.GetHSAAgentInfo(gpu_agent.handle).GetDeviceInfo().getXccCount();
for (size_t i = 0; i < metric_names.size(); i++) {
-2
Просмотреть файл
@@ -740,8 +740,6 @@ void AQLPacketProfile::MoveToCache(hsa_agent_t gpu_agent, std::unique_ptr<AQLPac
{
if (!packet.get()) return;
auto& output_buffer = packet->profile->output_buffer;
std::lock_guard<std::mutex> lk(cache_mutex);
auto agent_it = _cache.find(gpu_agent.handle);
+87 -97
Просмотреть файл
@@ -54,15 +54,24 @@
} \
} while (0)
#define __NR_gettid 186
static thread_local uint32_t THREAD_ID = (uint32_t)syscall(186);
#define SIGNAL_DELAY_THRESHOLD 3
std::atomic<uint32_t> ACTIVE_INTERRUPT_SIGNAL_COUNT{0};
namespace rocprofiler {
typedef std::vector<hsa_ven_amd_aqlprofile_info_data_t> pmc_callback_data_t;
constexpr uint16_t BARRIER_BIT = 1 << HSA_PACKET_HEADER_BARRIER;
bool GetNoSerialization() {
const static bool no_serialization = []() {
const char* str = getenv("ROCPROFILER_NO_SERIALIZATION");
if (str != NULL) return (atol(str) > 0);
return false;
}();
return no_serialization;
}
static inline bool IsEventMatch(const hsa_ven_amd_aqlprofile_event_t& event1,
const hsa_ven_amd_aqlprofile_event_t& event2) {
@@ -346,7 +355,8 @@ void enable_dispatch(Queue* dispatch_queue) {
need this to be invoked more than once in which case we would return false.
*/
bool AsyncSignalReadyHandler(hsa_signal_value_t signal_value, void* data) {
bool AsyncSignalReadyHandler(hsa_signal_value_t signal_value, void* data)
{
HSASupport_Singleton& hsasupport_singleton = HSASupport_Singleton::GetInstance();
profiler_serializer_t& serializer =
rocprofiler::ROCProfiler_Singleton::GetInstance().GetSerializer();
@@ -357,20 +367,21 @@ bool AsyncSignalReadyHandler(hsa_signal_value_t signal_value, void* data) {
ready signal is destroyed and
the destructor is notified and the handler is unregistered by returning false
*/
if (queue->state == is_destroy::to_destroy) {
{
queue->state = done_destroy;
hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(queue->GetReadySignal());
}
if (queue->state == is_destroy::to_destroy)
{
queue->state = done_destroy;
hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(queue->GetReadySignal());
queue->cv_ready_signal.notify_all();
return false;
}
queue->cv_ready_signal.notify_all();
hsasupport_singleton.GetCoreApiTable().hsa_signal_store_screlease_fn(queue->GetReadySignal(), 1);
if (serializer.dispatch_queue == nullptr)
enable_dispatch(queue);
else
serializer.dispatch_ready.push_back(queue);
return true;
}
/*
@@ -387,15 +398,6 @@ void SignalAsyncReadyHandler(const hsa_signal_t& signal, void* data) {
if (status != HSA_STATUS_SUCCESS) fatal("hsa_amd_signal_async_handler failed");
}
bool GetNoSerialization() {
const static bool no_serialization = []() {
const char* str = getenv("ROCPROFILER_NO_SERIALIZATION");
if (str != NULL) return (atol(str) > 0);
return false;
}();
return no_serialization;
}
bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data)
{
auto queue_info_session = static_cast<queue_info_session_t*>(data);
@@ -417,7 +419,7 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data)
for (auto& pending : pending_signals)
{
if (hsasupport_singleton.GetCoreApiTable().hsa_signal_load_relaxed_fn(pending->new_signal))
if (hsasupport_singleton.GetCoreApiTable().hsa_signal_load_scacquire_fn(pending->new_signal))
return true;
hsa_amd_profiling_dispatch_time_t time;
hsasupport_singleton.GetAmdExtTable().hsa_amd_profiling_get_dispatch_time_fn(
@@ -429,7 +431,7 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data)
}
//hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(pending->new_signal);
uint32_t record_count = 1;
uint32_t xcc_count = queue_info_session->xcc_count;
size_t xcc_count = queue_info_session->xcc_count;
static thread_local bool is_individual_xcc_mode = [xcc_count]() {
if (xcc_count < 2) return false;
const char* str = getenv("ROCPROFILER_INDIVIDUAL_XCC_MODE");
@@ -481,25 +483,24 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data)
}
}
}
auto* profile = pending->profile ? pending->profile->profile.get() : nullptr;
if (pending->counters_count > 0 && profile && profile->events)
auto profile = (pending->profile && pending->profile->profile && pending->counters_count > 0) ? std::move(pending->profile) : nullptr;
if (profile && !GetNoSerialization())
{
Packet::AQLPacketProfile::MoveToCache(queue_info_session->agent, std::move(pending->profile));
if (!GetNoSerialization()) {
profiler_serializer_t& serializer =
rocprofiler::ROCProfiler_Singleton::GetInstance().GetSerializer();
std::lock_guard<std::mutex> serializer_lock(serializer.serializer_mutex);
assert(serializer.dispatch_queue != nullptr);
hsasupport_singleton.GetCoreApiTable().hsa_signal_store_screlease_fn(
queue_info_session->block_signal, 1);
serializer.dispatch_queue = nullptr;
if (!serializer.dispatch_ready.empty())
{
Queue* queue = serializer.dispatch_ready.front();
serializer.dispatch_ready.erase(serializer.dispatch_ready.begin());
enable_dispatch(queue);
}
}
profiler_serializer_t& serializer =
rocprofiler::ROCProfiler_Singleton::GetInstance().GetSerializer();
std::lock_guard<std::mutex> serializer_lock(serializer.serializer_mutex);
assert(serializer.dispatch_queue != nullptr);
hsasupport_singleton.GetCoreApiTable().hsa_signal_store_screlease_fn(
queue_info_session->block_signal, 1);
serializer.dispatch_queue = nullptr;
if (!serializer.dispatch_ready.empty())
{
Queue* queue = serializer.dispatch_ready.front();
serializer.dispatch_ready.erase(serializer.dispatch_ready.begin());
enable_dispatch(queue);
}
ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_sub(1);
}
if (pending->new_signal.handle)
@@ -508,8 +509,8 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data)
hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(
queue_info_session->interrupt_signal);
if (pending->counters_count > 0 && profile && profile->events)
ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_sub(1);
if (profile)
Packet::AQLPacketProfile::MoveToCache(queue_info_session->agent, std::move(profile));
}
delete queue_info_session;
return false;
@@ -555,7 +556,6 @@ void Queue::ResetSessionID(rocprofiler_session_id_t id)
bool Queue::CheckNeededProfileConfigs()
{
std::unique_lock<std::shared_mutex> session_id_lock(session_id_mutex);
is_counter_collection_mode = false;
is_timestamp_collection_mode = false;
is_att_collection_mode = false;
@@ -610,7 +610,7 @@ bool Queue::CheckNeededProfileConfigs()
return true;
}
std::atomic<uint32_t> WRITER_ID{0};
std::atomic<size_t> WRITER_ID{0};
/**
* @brief This function is a queue write interceptor. It intercepts the
@@ -631,7 +631,10 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u
session_id.handle != rocprofiler::ROCProfiler_Singleton::GetInstance().GetCurrentSessionId().handle)
{
session_id_lock.unlock();
CheckNeededProfileConfigs();
{
std::unique_lock<std::shared_mutex> _lk(session_id_mutex);
CheckNeededProfileConfigs();
}
session_id_lock.lock();
}
@@ -658,19 +661,20 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u
std::unique_ptr<Packet::AQLPacketProfile> profile_packet{nullptr};
// If counters found in the session
if (session_data_count > 0 && is_counter_collection_mode) {
// Get the PM4 Packets using packets_generator
profile_packet = Packet::AQLPacketProfile::MoveFromCache(queue_info.GetGPUAgent());
if (!profile_packet)
profile_packet = Packet::InitializeAqlPackets(
queue_info.GetCPUAgent(), queue_info.GetGPUAgent(), session_data, session_id);
if (ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_add(1) >= SIGNAL_DELAY_THRESHOLD)
if (session_data_count > 0 && is_counter_collection_mode)
{
if (ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_add(1) >= SIGNAL_DELAY_THRESHOLD && !GetNoSerialization())
{
queue_info.cv_ready_signal.wait_for(lk, std::chrono::microseconds(1), [] {
// Optimization: Schedule threads out if there is a lot of contention for HSA queues
queue_info.cv_ready_signal.wait_for(lk, std::chrono::microseconds(5), [] {
return ACTIVE_INTERRUPT_SIGNAL_COUNT.load() <= SIGNAL_DELAY_THRESHOLD;
});
}
profile_packet = Packet::AQLPacketProfile::MoveFromCache(queue_info.GetGPUAgent());
if (!profile_packet) // Get the PM4 Packets using packets_generator
profile_packet = Packet::InitializeAqlPackets(
queue_info.GetCPUAgent(), queue_info.GetGPUAgent(), session_data, session_id);
}
if (profile_packet.get() && !GetNoSerialization())
@@ -691,97 +695,83 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u
Packet::CreateBarrierPacket(&transformed_packets, &block_signal, &block_signal);
}
uint32_t writer_id = WRITER_ID.fetch_add(1, std::memory_order_release);
size_t writer_id = WRITER_ID.fetch_add(1);
if (session_data_count > 0 && is_counter_collection_mode && profile_packet.get())
if (session_data_count > 0 && is_counter_collection_mode && profile_packet)
{
auto* start_packet = profile_packet->context->start_packet;
// Adding start packet and its barrier with a dummy signal
hsa_signal_t dummy_signal{};
dummy_signal.handle = 0;
start_packet->header = HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE;
Packet::AddVendorSpecificPacket(start_packet, &transformed_packets, dummy_signal);
Packet::CreateBarrierPacket(
&transformed_packets,
&start_packet->completion_signal,
nullptr
);
start_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT;
Packet::AddVendorSpecificPacket(start_packet, &transformed_packets, hsa_signal_t{.handle = 0});
}
auto& packet = transformed_packets.emplace_back(packets_arr[i]);
auto& dispatch_packet = reinterpret_cast<hsa_kernel_dispatch_packet_t&>(packet);
auto& __attribute__((__may_alias__)) dispatch_packet = reinterpret_cast<hsa_kernel_dispatch_packet_t&>(packet);
uint64_t correlation_id = dispatch_packet.reserved2;
if (profile_packet)
dispatch_packet.header |= BARRIER_BIT;
CreateSignal(HSA_AMD_SIGNAL_AMD_GPU_ONLY, &packet.completion_signal);
CreateSignal(0, &packet.completion_signal);
// Adding the dispatch packet newly created signal to the pending signals
// list to be processed by the signal interrupt
rocprofiler_kernel_properties_t kernel_properties =
set_kernel_properties(dispatch_packet, queue_info.GetGPUAgent());
auto* context_backup = profile_packet.get() ? profile_packet->context.get() : nullptr;
auto* read_packet = profile_packet ? profile_packet->context->read_packet : nullptr;
auto* stop_packet = profile_packet ? profile_packet->context->stop_packet : nullptr;
if (session) {
uint64_t record_id = rocprofiler::ROCProfiler_Singleton::GetInstance().GetUniqueRecordId();
AddKernelNameWithDispatchID(GetKernelNameFromKsymbols(dispatch_packet.kernel_object),
record_id);
if (session_data_count > 0 && profile_packet.get())
{
session->GetProfiler()->AddPendingSignals(
writer_id, record_id, original_packet.completion_signal, packet.completion_signal,
session_id, buffer_id, session_data_count, std::move(profile_packet),
kernel_properties, (uint32_t)syscall(__NR_gettid), user_pkt_index, correlation_id);
kernel_properties, THREAD_ID, user_pkt_index, correlation_id);
}
else
{
session->GetProfiler()->AddPendingSignals(
writer_id, record_id, original_packet.completion_signal, packet.completion_signal,
session_id, buffer_id, session_data_count, nullptr, kernel_properties,
(uint32_t)syscall(__NR_gettid), user_pkt_index, correlation_id);
THREAD_ID, user_pkt_index, correlation_id);
}
}
// Make a copy of the original packet, adding its signal to a barrier
// packet and create a new signal for it to get timestamps
if (original_packet.completion_signal.handle) {
hsa_barrier_and_packet_t barrier{};
barrier.header = HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE;
Packet::packet_t* __attribute__((__may_alias__)) pkt =
(reinterpret_cast<Packet::packet_t*>(&barrier));
transformed_packets.emplace_back(*pkt).completion_signal =
original_packet.completion_signal;
{
std::lock_guard<std::mutex> lock(
HSASupport_Singleton::GetInstance().signals_timestamps_map_lock);
HSASupport_Singleton::GetInstance()
.signals_timestamps[original_packet.completion_signal.handle] =
new_signal_timestamp_t{packet.completion_signal, std::nullopt};
}
if (original_packet.completion_signal.handle)
{
std::lock_guard<std::mutex> lock(HSASupport_Singleton::GetInstance().signals_timestamps_map_lock);
HSASupport_Singleton::GetInstance()
.signals_timestamps[original_packet.completion_signal.handle] =
new_signal_timestamp_t{packet.completion_signal, std::nullopt};
}
hsa_signal_t interrupt_signal{};
// Adding a barrier packet with the original packet's completion signal.
CreateSignal(0, &interrupt_signal);
// Adding Stop and Read PM4 Packets
if (session_data_count > 0 && is_counter_collection_mode && context_backup)
if (session_data_count > 0 && is_counter_collection_mode && stop_packet && read_packet)
{
hsa_signal_t dummy_signal{};
context_backup->read_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | (1 << HSA_PACKET_HEADER_BARRIER);
Packet::AddVendorSpecificPacket(context_backup->read_packet, &transformed_packets, dummy_signal);
context_backup->stop_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | (1 << HSA_PACKET_HEADER_BARRIER);
Packet::AddVendorSpecificPacket(context_backup->stop_packet, &transformed_packets, interrupt_signal);
// Added Interrupt Signal with barrier and provided handler for it
Packet::CreateBarrierPacket( &transformed_packets, &interrupt_signal, nullptr);
read_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT;
Packet::AddVendorSpecificPacket(read_packet, &transformed_packets, original_packet.completion_signal);
stop_packet->header = (HSA_PACKET_TYPE_VENDOR_SPECIFIC << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT;
Packet::AddVendorSpecificPacket(stop_packet, &transformed_packets, interrupt_signal);
}
else
{
Packet::packet_t pkt{};
pkt.header = (HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE) | BARRIER_BIT;
transformed_packets.emplace_back(pkt).completion_signal = original_packet.completion_signal;
Packet::CreateBarrierPacket(&transformed_packets, nullptr, &interrupt_signal);
}
rocprofiler::HSAAgentInfo& agentInfo =
rocprofiler::HSASupport_Singleton::GetInstance().GetHSAAgentInfo(
queue_info.GetGPUAgent().handle);
rocprofiler::HSASupport_Singleton::GetInstance().GetHSAAgentInfo(queue_info.GetGPUAgent().handle);
// Creating Async Handler to be called every time the interrupt signal is
// marked complete
SignalAsyncHandler(
@@ -792,7 +782,7 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u
agentInfo.GetDeviceInfo().getXccCount(), queue_info.GetBlockSignal()});
}
/* Write the transformed packets to the hardware queue. */
writer(&transformed_packets[0], transformed_packets.size());
writer(transformed_packets.data(), transformed_packets.size());
} else if (
!is_att_collection_mode||
!session ||
+2 -2
Просмотреть файл
@@ -109,10 +109,10 @@ struct queue_info_session_t {
hsa_agent_t agent;
rocprofiler_session_id_t session_id;
uint64_t queue_id;
uint32_t writer_id;
size_t writer_id;
hsa_signal_t interrupt_signal;
uint64_t gpu_index;
uint32_t xcc_count;
size_t xcc_count;
hsa_signal_t block_signal;
};
+2 -2
Просмотреть файл
@@ -35,7 +35,7 @@ AttTracer::AttTracer(rocprofiler_buffer_id_t buffer_id, rocprofiler_filter_id_t
: buffer_id_(buffer_id), filter_id_(filter_id), session_id_(session_id) {}
void AttTracer::AddPendingSignals(
uint32_t writer_id,
size_t writer_id,
uint64_t kernel_object,
const hsa_signal_t& original_completion_signal,
const hsa_signal_t& new_completion_signal,
@@ -66,7 +66,7 @@ void AttTracer::AddPendingSignals(
});
}
std::vector<att_pending_signal_t> AttTracer::MovePendingSignals(uint32_t writer_id)
std::vector<att_pending_signal_t> AttTracer::MovePendingSignals(size_t writer_id)
{
std::lock_guard<std::mutex> lock(sessions_pending_signals_lock_);
auto it = sessions_pending_signals_.find(writer_id);
+5 -5
Просмотреть файл
@@ -71,7 +71,7 @@ public:
rocprofiler_session_id_t session_id
);
void AddPendingSignals(uint32_t writer_id, uint64_t kernel_object,
void AddPendingSignals(size_t writer_id, uint64_t kernel_object,
const hsa_signal_t& original_completion_signal,
const hsa_signal_t& new_completion_signal,
rocprofiler_session_id_t session_id, rocprofiler_buffer_id_t buffer_id,
@@ -95,7 +95,7 @@ public:
uint64_t agent_handle
);
std::vector<att_pending_signal_t> MovePendingSignals(uint32_t writer_id);
std::vector<att_pending_signal_t> MovePendingSignals(size_t writer_id);
bool ATTWriteInterceptor(
const void* packets,
@@ -201,7 +201,7 @@ private:
rocprofiler_buffer_id_t buffer_id_;
rocprofiler_filter_id_t filter_id_;
rocprofiler_session_id_t session_id_;
std::atomic<uint32_t> WRITER_ID{0};
std::atomic<size_t> WRITER_ID{0};
std::vector<std::string> kernel_profile_names;
std::vector<std::pair<uint64_t,uint64_t>> kernel_profile_dispatch_ids;
@@ -209,12 +209,12 @@ private:
std::vector<rocprofiler_att_parameter_t> att_parameters_data;
std::mutex sessions_pending_signals_lock_;
std::map<uint32_t, std::vector<att_pending_signal_t>> sessions_pending_signals_;
std::map<size_t, std::vector<att_pending_signal_t>> sessions_pending_signals_;
std::condition_variable has_session_pending_cv;
std::atomic<bool> bIsSessionDestroying{false};
rocprofiler_record_id_t capture_id;
std::unordered_set<uint32_t> active_capture_event_ids;
std::unordered_set<size_t> active_capture_event_ids;
};
} // namespace att
+2 -2
Просмотреть файл
@@ -261,12 +261,12 @@ bool AttTracer::ATTContiguousWriteInterceptor(
codeobj_event_cnt = new_load_cnt;
auto symbols = codeobj_record::get_capture(this->capture_id);
std::unordered_set<uint32_t> current_ids;
std::unordered_set<size_t> current_ids;
for (size_t s=0; s<symbols.count; s++)
current_ids.insert(symbols.symbols[s].att_marker_id);
for (uint32_t prev_id : active_capture_event_ids)
for (size_t prev_id : active_capture_event_ids)
if (current_ids.find(prev_id) == current_ids.end())
InsertUnloadMarker(transformed, queue_info.GetGPUAgent(), prev_id);