From f95b383dfaf748fac967ea4d5e64741e07368fc9 Mon Sep 17 00:00:00 2001 From: Xianwei Zhang Date: Tue, 7 Jul 2020 15:39:08 -0400 Subject: [PATCH] pmc: add support of concurrent kernel profiling The profiling was only enabled in serial mode, i.e., kernels are serialized in execution, and counters are reset at each kernel start and read at kernel completion. This patch adds the concurrent mode, by issuing the process-level start packet to reset counters, and then reading twice at kernel start and end time to obtain the counter value difference. The new concurrent profiling usage needs the integration with the corresponding augment at aqlprofile side. Change-Id: I94b4442eadc8c64b8fba51b1e4916fc8b895ad21 [ROCm/rocprofiler commit: 61c9df463197f51ab70a2aa35709c702af9f5373] --- projects/rocprofiler/src/core/context.h | 46 ++++++++++++-- .../rocprofiler/src/core/intercept_queue.cpp | 4 +- .../rocprofiler/src/core/intercept_queue.h | 42 ++++++++++--- projects/rocprofiler/src/core/profile.h | 61 ++++++++++++++++--- projects/rocprofiler/src/core/rocprofiler.cpp | 21 ++++++- projects/rocprofiler/test/tool/tool.cpp | 3 + 6 files changed, 154 insertions(+), 23 deletions(-) diff --git a/projects/rocprofiler/src/core/context.h b/projects/rocprofiler/src/core/context.h index 77bf17eafd..02150734ed 100644 --- a/projects/rocprofiler/src/core/context.h +++ b/projects/rocprofiler/src/core/context.h @@ -104,10 +104,12 @@ class Group { } } - hsa_status_t Finalize() { - hsa_status_t status = pmc_profile_.Finalize(start_vector_, stop_vector_, read_vector_); + hsa_status_t Finalize(const bool is_concurrent = false) { + hsa_status_t status = pmc_profile_.Finalize(start_vector_, stop_vector_, + read_vector_, is_concurrent); if (status == HSA_STATUS_SUCCESS) { - status = trace_profile_.Finalize(start_vector_, stop_vector_, read_vector_); + status = trace_profile_.Finalize(start_vector_, stop_vector_, + read_vector_, is_concurrent); } if (status == HSA_STATUS_SUCCESS) { if (!pmc_profile_.Empty()) ++n_profiles_; @@ -283,6 +285,30 @@ class Context { } } + /* Handle the completion of kernel-begin 'read' packet */ + static bool HandlerRead(hsa_signal_value_t value, void* arg) { + Group* group = reinterpret_cast(arg); + Context* context = group->GetContext(); + + // Handle the completion signal of read packet at kernel begin + const profile_vector_t profile_vector = context->GetProfiles(group->GetIndex()); + for (auto& tuple : profile_vector) { + // Wait for read packet to complete + util::HsaRsrcFactory::Instance().SignalWaitRestore(tuple.completion_signal, 1); + const profile_t* profile = tuple.profile; + // Copy the counter values, read at kernel begin, to the right half of + // the buffer, so that the next kernel-end read can reuse the left half + char* data = reinterpret_cast(profile->output_buffer.ptr); + const uint32_t num = profile->output_buffer.size / 2; + for(uint32_t i = 0; i < num; ++i) { + data[i+num] = data[i]; // left --> right + data[i] = 0; // reset left + } + } + + return false; + } + static bool Handler(hsa_signal_value_t value, void* arg) { Group* group = reinterpret_cast(arg); Context* context = group->GetContext(); @@ -314,6 +340,9 @@ class Context { return &record_; } + // Concurrent profiling mode + static bool k_concurrent_; + private: Context(const util::AgentInfo* agent_info, Queue* queue, rocprofiler_feature_t* info, const uint32_t info_count, rocprofiler_handler_t handler, void* handler_arg) @@ -368,6 +397,11 @@ class Context { set_[group_index].ResetRefsCount(); const profile_vector_t profile_vector = GetProfiles(group_index); for (auto& tuple : profile_vector) { + // Handler for read packet completion + if (k_concurrent_) { + hsa_amd_signal_async_handler(tuple.completion_signal, HSA_SIGNAL_CONDITION_LT, 1, HandlerRead, + &set_[group_index]); + } // Handler for stop packet completion hsa_amd_signal_async_handler(tuple.completion_signal, HSA_SIGNAL_CONDITION_LT, 1, Handler, &set_[group_index]); @@ -486,7 +520,7 @@ class Context { void Finalize() { for (unsigned index = 0; index < set_.size(); ++index) { - const hsa_status_t status = set_[index].Finalize(); + const hsa_status_t status = set_[index].Finalize(k_concurrent_); if (status != HSA_STATUS_SUCCESS) EXC_RAISING(status, "context finalize failed"); } } @@ -620,8 +654,12 @@ class Context { hsa_signal_t dispatch_signal_; hsa_signal_t orig_signal_; rocprofiler_dispatch_record_t record_; + }; +#define CONTEXT_INSTANTIATE() \ + bool rocprofiler::Context::k_concurrent_ = false; + } // namespace rocprofiler #endif // SRC_CORE_CONTEXT_H_ diff --git a/projects/rocprofiler/src/core/intercept_queue.cpp b/projects/rocprofiler/src/core/intercept_queue.cpp index 809c00c36a..705fff29b5 100644 --- a/projects/rocprofiler/src/core/intercept_queue.cpp +++ b/projects/rocprofiler/src/core/intercept_queue.cpp @@ -42,7 +42,7 @@ InterceptQueue::queue_id_t InterceptQueue::current_queue_id = 0; rocprofiler_hsa_callback_fun_t InterceptQueue::submit_callback_fun_ = NULL; void* InterceptQueue::submit_callback_arg_ = NULL; -bool InterceptQueue::k_concurrent_ = false; bool InterceptQueue::opt_mode_ = false; - +uint32_t InterceptQueue::k_concurrent_ = K_CONC_OFF; +std::once_flag InterceptQueue::once_flag_; } // namespace rocprofiler diff --git a/projects/rocprofiler/src/core/intercept_queue.h b/projects/rocprofiler/src/core/intercept_queue.h index d5a7a8f697..f0bf06a4d0 100644 --- a/projects/rocprofiler/src/core/intercept_queue.h +++ b/projects/rocprofiler/src/core/intercept_queue.h @@ -41,9 +41,17 @@ THE SOFTWARE. #include "util/hsa_rsrc_factory.h" namespace rocprofiler { +enum { + K_CONC_OFF = 0, + K_CONC_PMC = 1, + K_CONC_TRACE = 2 +}; + extern decltype(hsa_queue_create)* hsa_queue_create_fn; extern decltype(hsa_queue_destroy)* hsa_queue_destroy_fn; +void PmcStarter(Context* context); + static std::mutex ctx_a_mutex; typedef std::map ctx_a_map_t; static ctx_a_map_t* ctx_a_map = NULL; @@ -90,8 +98,8 @@ class InterceptQueue { if (!obj_map_) obj_map_ = new obj_map_t; InterceptQueue* obj = new InterceptQueue(agent, *queue, proxy); (*obj_map_)[(uint64_t)(*queue)] = obj; - if (k_concurrent_) { - status = proxy->SetInterceptCB(OnSubmitCB_SQTT, obj); + if (k_concurrent_ == K_CONC_TRACE) { + status = proxy->SetInterceptCB(OnSubmitCB_ctrace, obj); } else if (opt_mode_) { status = proxy->SetInterceptCB(OnSubmitCB_opt, obj); } else { @@ -317,9 +325,27 @@ class InterceptQueue { const pkt_vector_t& start_vector = context->StartPackets(group.index); const pkt_vector_t& stop_vector = context->StopPackets(group.index); - pkt_vector_t packets = start_vector; - packets.insert(packets.end(), *packet); - packets.insert(packets.end(), stop_vector.begin(), stop_vector.end()); + const pkt_vector_t& read_vector = context->ReadPackets(group.index); + pkt_vector_t packets; + + if (k_concurrent_ == K_CONC_OFF) { // serial + packets = start_vector; + packets.insert(packets.end(), *packet); + packets.insert(packets.end(), stop_vector.begin(), stop_vector.end()); + } else { // concurrent + // Atrt PMC once + std::call_once(once_flag_, PmcStarter, context); + // Reads at both kernel start and end + assert(read_vector.size() == 2 * start_vector.size()); + auto mid = read_vector.begin() + read_vector.size()/2; + // Read at kernel start + packets.insert(packets.end(), read_vector.begin(), mid); + // Kernel dispatch packet + packets.insert(packets.end(), *packet); + // Read at kernel end + packets.insert(packets.end(), mid, read_vector.end()); + } + if (writer != NULL) { writer(&packets[0], packets.size()); } else { @@ -347,7 +373,7 @@ class InterceptQueue { } } - static void OnSubmitCB_SQTT(const void* in_packets, uint64_t count, uint64_t user_que_idx, void* data, + static void OnSubmitCB_ctrace(const void* in_packets, uint64_t count, uint64_t user_que_idx, void* data, hsa_amd_queue_intercept_packet_writer writer) { const packet_t* packets_arr = reinterpret_cast(in_packets); InterceptQueue* obj = reinterpret_cast(data); @@ -480,8 +506,8 @@ class InterceptQueue { static void TrackerOn(bool on) { tracker_on_ = on; } static bool IsTrackerOn() { return tracker_on_; } - static bool k_concurrent_; static bool opt_mode_; + static uint32_t k_concurrent_; private: static void queue_event_callback(hsa_status_t status, hsa_queue_t *queue, void *arg) { @@ -595,6 +621,8 @@ class InterceptQueue { const util::AgentInfo* agent_info_; queue_event_callback_t queue_event_callback_; queue_id_t queue_id; + + static std::once_flag once_flag_; }; } // namespace rocprofiler diff --git a/projects/rocprofiler/src/core/profile.h b/projects/rocprofiler/src/core/profile.h index 9ed033759b..61f6537e2b 100644 --- a/projects/rocprofiler/src/core/profile.h +++ b/projects/rocprofiler/src/core/profile.h @@ -119,7 +119,34 @@ class Profile { virtual void Insert(const profile_info_t& info) { info_vector_.push_back(info.rinfo); } - hsa_status_t Finalize(pkt_vector_t& start_vector, pkt_vector_t& stop_vector, pkt_vector_t& read_vector) { + void SetConcurrent(profile_t* profile) { + // Check whether conconcurrent has been set + for (const parameter_t* p = profile->parameters; + p < (profile->parameters + profile->parameter_count); ++p) { + // If yes, stop here + if (p->parameter_name == HSA_VEN_AMD_AQLPROFILE_PARAMETER_NAME_K_CONCURRENT) { + return; + } + } + + // Otherwise, try to set + parameter_t* parameters = new parameter_t[profile->parameter_count+1]; + for (unsigned i = 0; i < profile->parameter_count; ++i) { + parameters[i].parameter_name = profile->parameters[i].parameter_name; + parameters[i].value = profile->parameters[i].value; + } + if (profile->parameters) free(const_cast(profile->parameters)); + parameters[profile->parameter_count].parameter_name = + HSA_VEN_AMD_AQLPROFILE_PARAMETER_NAME_K_CONCURRENT; + parameters[profile->parameter_count].value = 1; + profile->parameters = parameters; + profile->parameter_count += 1; + } + + hsa_status_t Finalize(pkt_vector_t& start_vector, pkt_vector_t& stop_vector, + pkt_vector_t& read_vector, bool is_concurrent = false) { + if (is_concurrent) SetConcurrent(&profile_); + hsa_status_t status = HSA_STATUS_SUCCESS; if (!info_vector_.empty()) { @@ -127,11 +154,14 @@ class Profile { const pfn_t* api = rsrc->AqlProfileApi(); packet_t start{}; packet_t stop{}; - packet_t read{}; + packet_t read{}; // read at kernel start + packet_t read2{}; // read at kernel end // Check the profile buffer sizes status = api->hsa_ven_amd_aqlprofile_start(&profile_, NULL); if (status != HSA_STATUS_SUCCESS) AQL_EXC_RAISING(status, "aqlprofile_start(NULL)"); + // Double output buffer size if concurrent + if (is_concurrent) profile_.output_buffer.size *= 2; status = Allocate(rsrc); if (status != HSA_STATUS_SUCCESS) AQL_EXC_RAISING(status, "Allocate()"); @@ -144,21 +174,28 @@ class Profile { #ifdef AQLPROF_NEW_API if (profile_.type == HSA_VEN_AMD_AQLPROFILE_EVENT_TYPE_PMC) { rd_status = api->hsa_ven_amd_aqlprofile_read(&profile_, &read); + if (is_concurrent){ // concurrent: one more read + if (rd_status != HSA_STATUS_SUCCESS) AQL_EXC_RAISING(status, "aqlprofile_read"); + rd_status = api->hsa_ven_amd_aqlprofile_read(&profile_, &read2); + } } #if 0 // Read API returns error if disabled if (rd_status != HSA_STATUS_SUCCESS) AQL_EXC_RAISING(status, "aqlprofile_read"); #endif #endif - // Set completion signal + // Set completion signal of start hsa_signal_t dummy_signal{}; dummy_signal.handle = 0; start.completion_signal = dummy_signal; + + // Set completion signal of read/stop hsa_signal_t post_signal; status = hsa_signal_create(1, 0, NULL, &post_signal); if (status != HSA_STATUS_SUCCESS) EXC_RAISING(status, "signal_create " << std::hex << status); stop.completion_signal = post_signal; read.completion_signal = post_signal; + read2.completion_signal = post_signal; completion_signal_ = post_signal; // Fill packet vectors @@ -180,18 +217,24 @@ class Profile { AQL_EXC_RAISING(status, "hsa_ven_amd_aqlprofile_legacy_get_pm4"); if (rd_status == HSA_STATUS_SUCCESS) { - const uint32_t read_index = read_vector.size(); - read_vector.insert(read_vector.end(), LEGACY_SLOT_SIZE_PKT, packet_t{}); - status = api->hsa_ven_amd_aqlprofile_legacy_get_pm4( - &read, reinterpret_cast(&read_vector[read_index])); - if (status != HSA_STATUS_SUCCESS) - AQL_EXC_RAISING(status, "hsa_ven_amd_aqlprofile_legacy_get_pm4"); + pkt_vector_t reads = {read}; + if (is_concurrent) reads.push_back(read2); + for (auto rd : reads) { + const uint32_t read_index = read_vector.size(); + read_vector.insert(read_vector.end(), LEGACY_SLOT_SIZE_PKT, packet_t{}); + status = api->hsa_ven_amd_aqlprofile_legacy_get_pm4( + &rd, reinterpret_cast(&read_vector[read_index])); + if (status != HSA_STATUS_SUCCESS) + AQL_EXC_RAISING(status, "hsa_ven_amd_aqlprofile_legacy_get_pm4"); + } } } else { start_vector.push_back(start); stop_vector.push_back(stop); if (rd_status == HSA_STATUS_SUCCESS) { read_vector.push_back(read); + if (is_concurrent) + read_vector.push_back(read2); } } } diff --git a/projects/rocprofiler/src/core/rocprofiler.cpp b/projects/rocprofiler/src/core/rocprofiler.cpp index 924626fee1..0978ed34cd 100644 --- a/projects/rocprofiler/src/core/rocprofiler.cpp +++ b/projects/rocprofiler/src/core/rocprofiler.cpp @@ -150,6 +150,20 @@ void RestoreHsaApi() { table->amd_ext_->hsa_amd_queue_intercept_register_fn = hsa_amd_queue_intercept_register_fn; } +void PmcStarter(Context* context) { + hsa_agent_t agent = context->GetAgent(); + // Create queue + hsa_queue_t* queue; + hsa_status_t status = rocprofiler::CreateQueuePro(agent, 1, + HSA_QUEUE_TYPE_MULTI, NULL, NULL, UINT32_MAX, UINT32_MAX, &queue); + if (status != HSA_STATUS_SUCCESS) EXC_RAISING(status, "CreateQueuePro"); + HsaQueue hsa_queue(NULL, queue); + context->Start(0, &hsa_queue); + context->Read(0, &hsa_queue); + context->GetData(0); + hsa_queue_destroy(queue); +} + void StandaloneIntercept() { ::HsaApiTable* table = kHsaApiTable; table->core_->hsa_queue_create_fn = rocprofiler::CreateQueuePro; @@ -216,7 +230,10 @@ uint32_t LoadTool() { if (settings.code_obj_tracking) intercept_mode |= CODE_OBJ_TRACKING_MODE; if (settings.memcopy_tracking) intercept_mode |= MEMCOPY_INTERCEPT_MODE; if (settings.hsa_intercepting) intercept_mode |= HSA_INTERCEPT_MODE; - if (settings.k_concurrent) InterceptQueue::k_concurrent_ = true; + if (settings.k_concurrent) { + Context::k_concurrent_ = settings.k_concurrent; + InterceptQueue::k_concurrent_ = settings.k_concurrent; + } if (settings.opt_mode) InterceptQueue::opt_mode_ = true; } @@ -429,6 +446,8 @@ util::Logger::mutex_t util::Logger::mutex_; std::atomic util::Logger::instance_{}; } +CONTEXT_INSTANTIATE(); + /////////////////////////////////////////////////////////////////////////////////////////////////// // Public library methods // diff --git a/projects/rocprofiler/test/tool/tool.cpp b/projects/rocprofiler/test/tool/tool.cpp index d820c17535..7f2291244d 100644 --- a/projects/rocprofiler/test/tool/tool.cpp +++ b/projects/rocprofiler/test/tool/tool.cpp @@ -1402,6 +1402,9 @@ extern "C" PUBLIC_API void OnLoadToolProp(rocprofiler_settings_t* settings) fflush(stdout); const uint32_t features_found = metrics_vec.size() + traces_found; + // set a value to indicate tracing mode + if (settings->k_concurrent != 0) settings->k_concurrent = (traces_found == 0) ? 1 : 2; + if (is_spm_trace) { for (uint32_t index = 0; index < features_found; index++) { features[index].kind = ROCPROFILER_FEATURE_KIND_TRACE;