SDK: counter collection serialization per device (#1157)

Migrates profiler_serializer class in QueueController to have an instance per-agent instead of one globally. Other changes in this commit are to allow for maps of the queues associated with each agent to be passed to profiler_serializer when it is turned on/off. Existing test cases cover whether or not the kernels are serialized (multistream app). New test case added to show that this serialization only occurs on a per device level with a kernel launched on one device waiting for a value to be set on the other.
This commit is contained in:
Benjamin Welton
2024-10-25 13:13:36 -07:00
committed by GitHub
parent 5e1643cf81
commit 4a5b1d98c2
13 changed files with 275 additions and 51 deletions
+1
View File
@@ -119,6 +119,7 @@ Full documentation for ROCprofiler-SDK is available at [Click Here](source/docs/
- Fix Support for derived counters in reduce operation and bug fix for max in reduce
- Check to force tools to initialize context id with zero.
- Fix to handle a range of values for select() dimension in expressions parser.
- PMC dispatch based Counter Collection Serialization is now per-device instead of global across all devices.
### Removed
+22
View File
@@ -53,6 +53,28 @@ set_tests_properties(
"${counter-collection-buffer-env}" FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}")
set_source_files_properties(per_dev_serialization.cpp PROPERTIES LANGUAGE HIP)
add_executable(counter-collection-buffer-device-serialization)
target_sources(counter-collection-buffer-device-serialization
PRIVATE per_dev_serialization.cpp)
target_link_libraries(counter-collection-buffer-device-serialization
PRIVATE counter-collection-buffer-client Threads::Threads)
rocprofiler_samples_get_ld_library_path_env(LIBRARY_PATH_ENV)
rocprofiler_samples_get_preload_env(PRELOAD_ENV counter-collection-buffer-client)
set(counter-collection-buffer-device-serialization-env "${PRELOAD_ENV}"
"${LIBRARY_PATH_ENV}")
add_test(NAME counter-collection-buffer-device-serialization
COMMAND $<TARGET_FILE:counter-collection-buffer-device-serialization>)
set_tests_properties(
counter-collection-buffer-device-serialization
PROPERTIES TIMEOUT 120 LABELS "samples" ENVIRONMENT
"${counter-collection-buffer-device-serialization-env}"
FAIL_REGULAR_EXPRESSION "${ROCPROFILER_DEFAULT_FAIL_REGEX}")
add_library(counter-collection-callback-client SHARED)
target_sources(counter-collection-callback-client PRIVATE callback_client.cpp client.hpp)
target_link_libraries(
+31 -1
View File
@@ -22,6 +22,7 @@
#include "client.hpp"
#include <cstdint>
#include <fstream>
#include <functional>
#include <iostream>
@@ -74,6 +75,13 @@ get_buffer()
return buf;
}
std::unordered_map<uint64_t, std::vector<rocprofiler_record_dimension_info_t>>**
dimension_cache()
{
static std::unordered_map<uint64_t, std::vector<rocprofiler_record_dimension_info_t>>* cache;
return &cache;
}
/**
* For a given counter, query the dimensions that it has. Typically you will
* want to call this function once to get the dimensions and cache them.
@@ -81,6 +89,20 @@ get_buffer()
std::vector<rocprofiler_record_dimension_info_t>
counter_dimensions(rocprofiler_counter_id_t counter)
{
if(*dimension_cache() == nullptr) return {};
if((*dimension_cache())->count(counter.handle) > 0)
{
return (*dimension_cache())->at(counter.handle);
}
return {};
}
void
fill_dimension_cache(rocprofiler_counter_id_t counter)
{
assert(*dimension_cache() != nullptr);
std::vector<rocprofiler_record_dimension_info_t> dims;
rocprofiler_available_dimensions_cb_t cb =
[](rocprofiler_counter_id_t,
@@ -97,7 +119,7 @@ counter_dimensions(rocprofiler_counter_id_t counter)
};
ROCPROFILER_CALL(rocprofiler_iterate_counter_dimensions(counter, cb, &dims),
"Could not iterate counter dimensions");
return dims;
(*dimension_cache())->emplace(counter.handle, dims);
}
/**
@@ -251,6 +273,7 @@ build_profile_for_agent(rocprofiler_agent_id_t agent,
{
std::clog << "Counter: " << counter.handle << " " << version.name << "\n";
collect_counters.push_back(counter);
fill_dimension_cache(counter);
}
}
@@ -375,6 +398,10 @@ tool_fini(void* user_data)
auto* output_stream = static_cast<std::ostream*>(user_data);
*output_stream << std::flush;
if(output_stream != &std::cout && output_stream != &std::cerr) delete output_stream;
auto* tmp_ptr = *dimension_cache();
*dimension_cache() = nullptr;
delete tmp_ptr;
}
} // namespace
@@ -416,6 +443,9 @@ rocprofiler_configure(uint32_t version,
&tool_fini,
static_cast<void*>(output_stream)};
*dimension_cache() =
new std::unordered_map<uint64_t, std::vector<rocprofiler_record_dimension_info_t>>();
// return pointer to configure data
return &cfg;
}
@@ -0,0 +1,71 @@
// MIT License
//
// Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#include <hip/hip_runtime.h>
#include "client.hpp"
#define HIP_CALL(call) \
do \
{ \
hipError_t err = call; \
if(err != hipSuccess) \
{ \
fprintf(stderr, "%s\n", hipGetErrorString(err)); \
abort(); \
} \
} while(0)
__global__ void
kernelA(int* wait_on, int value, int* no_opt)
{
while(*wait_on != value)
{
(*no_opt)++;
};
(*wait_on)--;
}
int
main(int, char**)
{
int ntotdevice = 0;
HIP_CALL(hipGetDeviceCount(&ntotdevice));
if(ntotdevice < 2) return 0;
start();
int* check_value = nullptr;
int* no_opt = nullptr;
HIP_CALL(hipMallocManaged(&check_value, sizeof(*check_value)));
HIP_CALL(hipMallocManaged(&no_opt, sizeof(*no_opt)));
*no_opt = 0;
*check_value = 1;
// Will hang if per-device serialization is not functional
HIP_CALL(hipSetDevice(0));
hipLaunchKernelGGL(kernelA, dim3(1), dim3(1), 0, 0, check_value, 0, no_opt);
HIP_CALL(hipSetDevice(1));
hipLaunchKernelGGL(kernelA, dim3(1), dim3(1), 0, 0, check_value, 1, no_opt);
HIP_CALL(hipSetDevice(0));
HIP_CALL(hipDeviceSynchronize());
std::cerr << "Run complete\n";
}
@@ -62,12 +62,14 @@ queue_cb(const context::context* ctx,
// and maybe adds barrier packets if the state is transitioning from serialized <->
// unserialized
auto maybe_add_serialization = [&](auto& gen_pkt) {
CHECK_NOTNULL(hsa::get_queue_controller())->serializer().rlock([&](const auto& serializer) {
for(auto& s_pkt : serializer.kernel_dispatch(queue))
{
gen_pkt->before_krn_pkt.push_back(s_pkt.ext_amd_aql_pm4);
}
});
CHECK_NOTNULL(hsa::get_queue_controller())
->serializer(&queue)
.rlock([&](const auto& serializer) {
for(auto& s_pkt : serializer.kernel_dispatch(queue))
{
gen_pkt->before_krn_pkt.push_back(s_pkt.ext_amd_aql_pm4);
}
});
};
// Packet generated when no instrumentation is performed. May contain serialization
@@ -189,9 +191,9 @@ completed_cb(const context::context* ctx,
if(!pkt) return;
CHECK_NOTNULL(hsa::get_queue_controller())->serializer().wlock([&](auto& serializer) {
serializer.kernel_completion_signal(session.queue);
});
CHECK_NOTNULL(hsa::get_queue_controller())
->serializer(&session.queue)
.wlock([&](auto& serializer) { serializer.kernel_completion_signal(session.queue); });
// We have no profile config, nothing to output.
if(!prof_config) return;
@@ -49,13 +49,13 @@ hsa_barrier::~hsa_barrier()
}
void
hsa_barrier::set_barrier(const queue_map_t& q)
hsa_barrier::set_barrier(const queue_map_ptr_t& q)
{
_core_api.hsa_signal_store_screlease_fn(_barrier_signal, 1);
_queue_waiting.wlock([&](auto& queue_waiting) {
for(const auto& [_, queue] : q)
{
queue->lock_queue([ptr = queue.get(), &queue_waiting]() {
queue->lock_queue([ptr = queue, &queue_waiting]() {
if(ptr->active_async_packets() > 0)
{
queue_waiting[ptr->get_id().handle] = ptr->active_async_packets();
@@ -46,12 +46,11 @@ class Queue;
class hsa_barrier
{
public:
using queue_map_t = std::unordered_map<hsa_queue_t*, std::unique_ptr<Queue>>;
using queue_map_ptr_t = std::unordered_map<hsa_queue_t*, Queue*>;
hsa_barrier(std::function<void()>&& finished, CoreApiTable core_api);
~hsa_barrier();
void set_barrier(const queue_map_t& q);
void set_barrier(const queue_map_ptr_t& q);
std::optional<rocprofiler_packet> enqueue_packet(const Queue* queue);
bool register_completion(const Queue* queue);
@@ -36,7 +36,7 @@ profiler_serializer_ready_signal_handler(hsa_signal_value_t /* signal_value */,
auto* hsa_queue = static_cast<hsa_queue_t*>(data);
const auto* queue = CHECK_NOTNULL(get_queue_controller())->get_queue(*hsa_queue);
CHECK(queue);
CHECK_NOTNULL(get_queue_controller())->serializer().wlock([&](auto& serializer) {
CHECK_NOTNULL(get_queue_controller())->serializer(queue).wlock([&](auto& serializer) {
serializer.queue_ready(hsa_queue, *queue);
});
return true;
@@ -242,7 +242,7 @@ profiler_serializer::destroy_queue(hsa_queue_t* id, const Queue& queue)
// Enable the serializer
void
profiler_serializer::enable(const queue_map_t& queues)
profiler_serializer::enable(const hsa_barrier::queue_map_ptr_t& queues)
{
if(_serializer_status == Status::ENABLED) return;
@@ -257,14 +257,14 @@ profiler_serializer::enable(const queue_map_t& queues)
std::make_unique<hsa_barrier>(
[] {}, CHECK_NOTNULL(get_queue_controller())->get_core_table()));
_serializer_status = Status::ENABLED;
_barrier.back().barrier->set_barrier(queues);
_barrier.back().barrier->set_barrier(queues);
ROCP_INFO << "Profiler serialization enabled";
}
// Disable the serializer
void
profiler_serializer::disable(const queue_map_t& queues)
profiler_serializer::disable(const hsa_barrier::queue_map_ptr_t& queues)
{
if(_serializer_status == Status::DISABLED) return;
@@ -279,8 +279,8 @@ profiler_serializer::disable(const queue_map_t& queues)
std::make_unique<hsa_barrier>(
[] {}, CHECK_NOTNULL(get_queue_controller())->get_core_table()));
_serializer_status = Status::DISABLED;
_barrier.back().barrier->set_barrier(queues);
_barrier.back().barrier->set_barrier(queues);
ROCP_INFO << "Profiler serialization disabled";
}
@@ -68,7 +68,6 @@ public:
std::unique_ptr<hsa_barrier> barrier;
};
using queue_map_t = std::unordered_map<hsa_queue_t*, std::unique_ptr<Queue>>;
void kernel_completion_signal(const Queue&);
// Signal a kernel dispatch is taking place, generates packets needed to be
// inserted to support kernel dispatch
@@ -76,9 +75,9 @@ public:
void queue_ready(hsa_queue_t* hsa_queue, const Queue& queue);
// Enable the serializer
void enable(const queue_map_t& queues);
void enable(const hsa_barrier::queue_map_ptr_t& queues);
// Disable the serializer
void disable(const queue_map_t& queues);
void disable(const hsa_barrier::queue_map_ptr_t& queues);
void destroy_queue(hsa_queue_t* id, const Queue& queue);
@@ -28,6 +28,7 @@
#include "lib/rocprofiler-sdk/registration.hpp"
#include <rocprofiler-sdk/fwd.h>
#include <memory>
namespace rocprofiler
{
@@ -62,8 +63,9 @@ create_queue(hsa_agent_t agent,
controller->get_ext_table(),
queue);
controller->serializer().wlock(
[&](auto& serializer) { serializer.add_queue(queue, *new_queue); });
controller->serializer(new_queue.get()).wlock([&](auto& serializer) {
serializer.add_queue(queue, *new_queue);
});
controller->add_queue(*queue, std::move(new_queue));
return HSA_STATUS_SUCCESS;
@@ -316,23 +318,88 @@ QueueController::get_queue(const hsa_queue_t& _hsa_queue) const
_hsa_queue);
}
common::Synchronized<hsa::profiler_serializer>&
QueueController::serializer(const Queue* queue)
{
CHECK(queue);
common::Synchronized<hsa::profiler_serializer>* ret = nullptr;
_profiler_serializer.ulock(
[&](const auto& m) {
if(auto ptr = m.find(queue->get_agent().get_rocp_agent()->id); ptr != m.end())
{
ret = ptr->second.get();
return true;
}
return false;
},
[&](auto& m) {
ret = m.emplace(queue->get_agent().get_rocp_agent()->id,
std::make_shared<common::Synchronized<hsa::profiler_serializer>>())
.first->second.get();
if(_serialized_enabled.load() == true)
{
ret->wlock([&](auto& serializer) { serializer.enable({}); });
}
return true;
});
return *ret;
}
namespace
{
std::unordered_map<rocprofiler_agent_id_t, hsa_barrier::queue_map_ptr_t>
per_dev_map(const QueueController::queue_map_t& _queues_v)
{
std::unordered_map<rocprofiler_agent_id_t, hsa_barrier::queue_map_ptr_t> dmap;
for(const auto& [k, v] : _queues_v)
{
dmap[v->get_agent().get_rocp_agent()->id][k] = v.get();
}
return dmap;
}
}; // namespace
void
QueueController::disable_serialization()
{
_queues.rlock([](const queue_map_t& _queues_v) {
if(get_queue_controller())
get_queue_controller()->serializer().wlock(
[&](auto& serializer) { serializer.disable(_queues_v); });
_queues.rlock([&](const queue_map_t& _queues_v) {
_serialized_enabled.store(false);
auto pd_map = per_dev_map(_queues_v);
_profiler_serializer.wlock([&](auto& m) {
for(auto& [k, v] : m)
{
if(auto it = pd_map.find(k); it != pd_map.end())
{
v->wlock([&](auto& serializer) { serializer.disable(it->second); });
}
else
{
v->wlock([&](auto& serializer) { serializer.disable({}); });
}
}
});
});
}
void
QueueController::enable_serialization()
{
_queues.rlock([](const queue_map_t& _queues_v) {
if(get_queue_controller())
get_queue_controller()->serializer().wlock(
[&](auto& serializer) { serializer.enable(_queues_v); });
_queues.rlock([&](const queue_map_t& _queues_v) {
_serialized_enabled.store(true);
auto pd_map = per_dev_map(_queues_v);
_profiler_serializer.wlock([&](auto& m) {
for(auto& [k, v] : m)
{
if(auto it = pd_map.find(k); it != pd_map.end())
{
v->wlock([&](auto& serializer) { serializer.enable(it->second); });
}
else
{
v->wlock([&](auto& serializer) { serializer.enable({}); });
}
}
});
});
}
@@ -23,6 +23,7 @@
#pragma once
#include <rocprofiler-sdk/rocprofiler.h>
#include <rocprofiler-sdk/cxx/hash.hpp>
#include "lib/rocprofiler-sdk/hsa/profile_serializer.hpp"
#include "lib/rocprofiler-sdk/hsa/queue.hpp"
@@ -83,7 +84,7 @@ public:
void iterate_callbacks(const callback_iterator_cb_t&) const;
common::Synchronized<hsa::profiler_serializer>& serializer() { return _profiler_serializer; }
common::Synchronized<hsa::profiler_serializer>& serializer(const Queue*);
/**
* Disable serialization for QueueController, has no effect if counter collection
@@ -107,12 +108,16 @@ private:
using client_id_map_t = std::unordered_map<ClientID, agent_callback_tuple_t>;
using resource_alloc_t = void(const AgentCache&, const CoreApiTable&, const AmdExtTable&);
CoreApiTable _core_table = {};
AmdExtTable _ext_table = {};
common::Synchronized<queue_map_t> _queues = {};
common::Synchronized<client_id_map_t> _callback_cache = {};
agent_cache_map_t _supported_agents = {};
common::Synchronized<hsa::profiler_serializer> _profiler_serializer;
CoreApiTable _core_table = {};
AmdExtTable _ext_table = {};
common::Synchronized<queue_map_t> _queues = {};
common::Synchronized<client_id_map_t> _callback_cache = {};
agent_cache_map_t _supported_agents = {};
std::atomic<bool> _serialized_enabled = {false};
common::Synchronized<
std::unordered_map<rocprofiler_agent_id_t,
std::shared_ptr<common::Synchronized<hsa::profiler_serializer>>>>
_profiler_serializer;
};
QueueController*
@@ -192,8 +192,14 @@ TEST(hsa_barrier, no_block_single)
auto queues = create_queue_map(1);
// Immediate return of barrier due to no active async packets
hsa::hsa_barrier barrier(finished_func, get_api_table());
barrier.set_barrier(queues);
hsa::hsa_barrier barrier(finished_func, get_api_table());
hsa_barrier::queue_map_ptr_t q_map;
for(const auto& [k, v] : queues)
{
q_map[k] = v.get();
}
barrier.set_barrier(q_map);
executed_handlers = 0;
ASSERT_TRUE(barrier.complete());
should_execute_handler = true;
@@ -224,7 +230,14 @@ TEST(hsa_barrier, no_block_multi)
// Immediate return of barrier due to no active async packets
hsa::hsa_barrier barrier(finished_func, get_api_table());
barrier.set_barrier(queues);
hsa_barrier::queue_map_ptr_t q_map;
for(const auto& [k, v] : queues)
{
q_map[k] = v.get();
}
barrier.set_barrier(q_map);
ASSERT_TRUE(barrier.complete());
should_execute_handler = true;
executed_handlers = 0;
@@ -265,7 +278,13 @@ TEST(hsa_barrier, block_single)
should_execute_handler = false;
executed_handlers = 0;
barrier.set_barrier(queues);
hsa_barrier::queue_map_ptr_t q_map;
for(const auto& [k, v] : queues)
{
q_map[k] = v.get();
}
barrier.set_barrier(q_map);
ASSERT_FALSE(barrier.complete());
should_execute_handler = false;
@@ -323,7 +342,13 @@ TEST(hsa_barrier, block_multi)
should_execute_handler = false;
executed_handlers = 0;
barrier.set_barrier(queues);
hsa_barrier::queue_map_ptr_t q_map;
for(const auto& [k, v] : queues)
{
q_map[k] = v.get();
}
barrier.set_barrier(q_map);
ASSERT_FALSE(barrier.complete());
should_execute_handler = false;
@@ -334,10 +334,12 @@ DispatchThreadTracer::pre_kernel_call(const hsa::Queue& queue,
// and maybe adds barrier packets if the state is transitioning from serialized <->
// unserialized
auto maybe_add_serialization = [&](auto& gen_pkt) {
CHECK_NOTNULL(hsa::get_queue_controller())->serializer().rlock([&](const auto& serializer) {
for(auto& s_pkt : serializer.kernel_dispatch(queue))
gen_pkt->before_krn_pkt.push_back(s_pkt.ext_amd_aql_pm4);
});
CHECK_NOTNULL(hsa::get_queue_controller())
->serializer(&queue)
.rlock([&](const auto& serializer) {
for(auto& s_pkt : serializer.kernel_dispatch(queue))
gen_pkt->before_krn_pkt.push_back(s_pkt.ext_amd_aql_pm4);
});
};
auto control_flags = params.dispatch_cb_fn(queue.get_id(),
@@ -381,8 +383,9 @@ public:
auto* controller = hsa::get_queue_controller();
if(!controller) return;
controller->serializer().wlock(
[&](auto& serializer) { serializer.kernel_completion_signal(session.queue); });
controller->serializer(&session.queue).wlock([&](auto& serializer) {
serializer.kernel_completion_signal(session.queue);
});
}
const hsa::Queue::queue_info_session_t& session;
};