[rocprofiler-systems] [ROCpd] Add OMPT callbacks to ROCpd (#1016)

* Add OMPT to ROCpd

* Use correct category

* Added wrapper functions for future control

* Formatting

* Fix naming

* Comment change

* Remove ompt_get_cb_args

* Switched to using region_sample for OMPT

* Remove relic function

* Remove get_use_rocpd that was used in this pr (one still remains)

* Rename ompt_get_args_string and reuse in tool_tracing_callback_stop

* Make lock init and destroy cb instant

* [Prototype] ROCPD Name fix

* [Prototype] ROCPD Name fix P1

* [Prototype] ROCPD Name fix P2

* ROCPD Name fix

* Var name changes

* Rewrite cb overwrite to single function

* [Important] Use parallel_data as key for parallel callback map

* Fix workflow failure

* Make cpp USE_ROCM consistent with hpp and use default constructor if USE_ROCM = 0

* Add missing ROCPROFILER_VERSION check

* Improve readability

* Make ompt storage maps thread local

* Part 1: Variable name fix, memory cleanup, and fixed asserts

* Part 2: Add comments

* Part 3: Add CI_THROW

* Part 4: Formatting

* Part 5: Move #include to cpp
Tento commit je obsažen v:
Kian Cossettini
2025-10-07 19:01:25 -04:00
odevzdal GitHub
rodič d6bdc53f1a
revize 0c53a12a88
3 změnil soubory, kde provedl 361 přidání a 38 odebrání
@@ -21,6 +21,7 @@
// SOFTWARE.
#include "metadata_registry.hpp"
#include "core/debug.hpp"
#include <algorithm>
#include <cstdint>
@@ -210,7 +211,7 @@ metadata_registry::get_string_list() const
return result;
}
#if ROCPROFSYS_USE_ROCM
#if ROCPROFSYS_USE_ROCM > 0
void
metadata_registry::add_code_object(
@@ -278,6 +279,102 @@ metadata_registry::get_kernel_symbol_list() const
return result;
}
// As the underlying implementation of callback_name_info_t resizes the category storage
// during emplace, this special method is required
void
metadata_registry::overwrite_callback_names(
std::initializer_list<
std::pair<rocprofiler_callback_tracing_kind_t, callback_rename_map_t>>
rename_table)
{
if(rename_table.size() == 0) return;
using callback_kind_t = rocprofiler_callback_tracing_kind_t;
using operation_names_t = std::vector<std::string_view>;
auto category_names = std::vector<std::string_view>{};
auto modified_ops = std::map<callback_kind_t, operation_names_t>{};
auto extract_operations = [&](callback_kind_t cat) -> operation_names_t {
auto items = m_callback_tracing_info.items();
const auto* target_category = items[static_cast<size_t>(cat)];
auto operations_data = target_category->items();
operation_names_t operation_names;
operation_names.reserve(operations_data.size());
for(const auto& [op_idx, op_name] : operations_data)
operation_names.push_back(*op_name);
return operation_names;
};
// Store category names
category_names.resize(ROCPROFILER_CALLBACK_TRACING_LAST);
for(callback_kind_t i = ROCPROFILER_CALLBACK_TRACING_NONE;
i < ROCPROFILER_CALLBACK_TRACING_LAST;
i = static_cast<callback_kind_t>(static_cast<int>(i) + 1))
{
category_names[i] = m_callback_tracing_info.at(i);
}
// Process list
for(const auto& category_info : rename_table)
{
auto callback_kind = category_info.first;
// Store operations of all following categories
// as they will be deleted
for(callback_kind_t i =
static_cast<callback_kind_t>(static_cast<int>(callback_kind) + 1);
i < ROCPROFILER_CALLBACK_TRACING_LAST;
i = static_cast<callback_kind_t>(static_cast<int>(i) + 1))
{
if(modified_ops.find(i) != modified_ops.end()) break;
modified_ops[i] = extract_operations(i);
}
ROCPROFSYS_CI_THROW(modified_ops.find(callback_kind) != modified_ops.end(),
"Overwriting a previously overwritten entry is forbidden");
ROCPROFSYS_CI_THROW(!modified_ops.empty() &&
callback_kind >= modified_ops.begin()->first,
"Category must have a larger enum value than all previously "
"modified_ops categories");
// Overwrite desired category
auto operation_names = extract_operations(callback_kind);
for(const auto& [index, new_value] : category_info.second)
{
ROCPROFSYS_CI_THROW(index < 0 ||
static_cast<size_t>(index) >= operation_names.size(),
"Index is invalid");
operation_names[index] = new_value;
}
modified_ops[callback_kind] = std::move(operation_names);
}
if(modified_ops.empty()) return;
// Emplace the changed category operations
for(callback_kind_t i = modified_ops.begin()->first;
i < ROCPROFILER_CALLBACK_TRACING_LAST;
i = static_cast<callback_kind_t>(static_cast<int>(i) + 1))
{
auto renaming_entry = modified_ops.find(i);
ROCPROFSYS_CI_THROW(renaming_entry == modified_ops.end(),
"A category that needs to be emplaced is missing");
const auto& operations_vec = renaming_entry->second;
m_callback_tracing_info.emplace(i, category_names.at(i).data());
for(size_t op_idx = 0; op_idx < operations_vec.size(); ++op_idx)
{
m_callback_tracing_info.emplace(
i, static_cast<rocprofiler_tracing_operation_t>(op_idx),
operations_vec[op_idx].data());
}
}
}
rocprofiler::sdk::buffer_name_info_t<const char*>
metadata_registry::get_buffer_name_info() const
{
@@ -292,5 +389,20 @@ metadata_registry::get_callback_tracing_info() const
#endif
metadata_registry::metadata_registry()
{
#if ROCPROFSYS_USE_ROCM > 0
overwrite_callback_names({
# if(ROCPROFILER_VERSION >= 600)
{ ROCPROFILER_CALLBACK_TRACING_OMPT,
{ { ROCPROFILER_OMPT_ID_thread_begin, "omp_thread" },
{ ROCPROFILER_OMPT_ID_thread_end, "omp_thread" },
{ ROCPROFILER_OMPT_ID_parallel_begin, "omp_parallel" },
{ ROCPROFILER_OMPT_ID_parallel_end, "omp_parallel" } } }
# endif
});
#endif
}
} // namespace trace_cache
} // namespace rocprofsys
@@ -35,6 +35,8 @@
# include <rocprofiler-sdk/callback_tracing.h>
# include <rocprofiler-sdk/cxx/name_info.hpp>
#endif
#include <initializer_list>
#include <map>
#include <set>
#include <sstream>
#include <stdint.h>
@@ -215,7 +217,7 @@ struct metadata_registry
private:
friend class cache_manager;
metadata_registry() = default;
metadata_registry();
common::synchronized<info::process> m_process;
common::synchronized<
std::unordered_set<info::pmc, info::pmc_info_hash, info::pmc_info_equal>>
@@ -240,6 +242,14 @@ private:
rocprofiler::sdk::callback_name_info_t<const char*> m_callback_tracing_info{
rocprofiler::sdk::get_callback_tracing_names<const char*>()
};
using callback_rename_map_t =
std::map<rocprofiler_tracing_operation_t, std::string_view>;
void overwrite_callback_names(
std::initializer_list<
std::pair<rocprofiler_callback_tracing_kind_t, callback_rename_map_t>>
rename_table);
#endif
};
@@ -110,6 +110,31 @@ typedef struct rocprofiler_stream_id_t
#endif
#if(ROCPROFILER_VERSION >= 600)
struct rocprofsys_ompt_data_storage_t
{
rocprofiler_callback_tracing_record_t record;
rocprofiler_timestamp_t _beg_ts;
function_args_t args; // Required for orphan ENTER events
};
auto
ompt_get_unified_name(const rocprofiler_callback_tracing_record_t& record)
{
std::string_view _name =
tool_data->callback_tracing_info.at(record.kind, record.operation);
// Forces omp_parallel begin and end to have same name, allowing track to connect
if(record.operation == ROCPROFILER_OMPT_ID_parallel_begin ||
record.operation == ROCPROFILER_OMPT_ID_parallel_end)
_name = "omp_parallel";
return _name;
}
#endif
auto&
get_stream_stack()
{
@@ -613,6 +638,21 @@ cache_memory_allocation(rocprofiler_buffer_tracing_memory_allocation_record_t* r
}
#endif
// clang-format on
std::string
get_args_string(const function_args_t& args)
{
std::string args_str;
std::for_each(args.begin(), args.end(), [&args_str](const argument_info& arg) {
const auto* delimiter = ";;";
std::stringstream ss;
ss << arg.arg_number << delimiter << arg.arg_type << delimiter << arg.arg_name
<< delimiter << arg.arg_value << delimiter;
args_str.append(ss.str());
});
return args_str;
}
template <typename CategoryT>
void
tool_tracing_callback_start(CategoryT, rocprofiler_callback_tracing_record_t record,
@@ -823,16 +863,7 @@ tool_tracing_callback_stop(
{
cache_category<CategoryT>();
cache_add_thread_info(record.thread_id);
std::string args_str;
std::for_each(args.begin(), args.end(), [&args_str](const argument_info& arg) {
const auto* delimiter = ";;";
std::stringstream ss;
ss << arg.arg_number << delimiter << arg.arg_type << delimiter << arg.arg_name
<< delimiter << arg.arg_value << delimiter;
args_str.append(ss.str());
});
std::string args_str = get_args_string(args);
cache_region(&record, _beg_ts, _end_ts, call_stack->to_string(), args_str,
trait::name<CategoryT>::value);
}
@@ -904,31 +935,184 @@ get_kernel_dispatch_timestamps()
}
#if(ROCPROFILER_VERSION >= 600)
// An instant event is one that has its beg_ts = end_ts
void
ompt_cache_instant_event(
rocprofiler_callback_tracing_record_t record, rocprofiler_timestamp_t _instant_ts,
std::optional<std::vector<tim::unwind::processed_entry>>& _bt_data)
{
auto args = function_args_t{};
rocprofiler_iterate_callback_tracing_kind_operation_args(
record, iterate_args_callback, 2, &args);
auto call_stack = get_backtrace(_bt_data);
cache_category<category::rocm_ompt_api>();
cache_add_thread_info(record.thread_id);
cache_region(&record, _instant_ts, _instant_ts, call_stack->to_string(),
get_args_string(args), trait::name<category::rocm_ompt_api>::value);
}
// OMPT callbacks with no corresponding begin/end are treated as "instant"
void
ompt_cache_orphan_event(
const rocprofsys_ompt_data_storage_t& stored_data,
std::optional<std::vector<tim::unwind::processed_entry>>& _bt_data)
{
auto call_stack = get_backtrace(_bt_data);
cache_category<category::rocm_ompt_api>();
cache_add_thread_info(stored_data.record.thread_id);
cache_region(&stored_data.record, stored_data._beg_ts, stored_data._beg_ts,
call_stack->to_string(), get_args_string(stored_data.args),
trait::name<category::rocm_ompt_api>::value);
}
// Any OMPT callback that can be of phase ENTER or EXIT is a standard callback.
// I.e. it has an ompt_scope_endpoint_t in its definition (excluding
// ROCPROFILER_OMPT_ID_nest_lock as it is a mutex)
auto&
get_ompt_standard_cb_storage()
{
// uint64_t -> internal id from rocprofiler_correlation_id_t
static thread_local auto _v =
std::unordered_map<uint64_t, rocprofsys_ompt_data_storage_t>{};
return _v;
}
// An OMPT parallel callback consists of ROCPROFILER_OMPT_ID_parallel_begin and
// ROCPROFILER_OMPT_ID_parallel_end
// As the beginning and end can only occur on the same thread, they are connected into a
// single track called "omp_parallel" for clarity. In this track, the information
// contained within parallel_begin should be displayed as it contains all the information
// that parallel_end has as well as the flags and number of threads/teams that were
// requested.
auto&
get_ompt_parallel_cb_storage()
{
// uintptr_t -> parallel_data (see callback definition)
static thread_local auto _v =
std::unordered_map<uintptr_t, rocprofsys_ompt_data_storage_t>{};
return _v;
}
void
ompt_push_standard_callback(const rocprofiler_callback_tracing_record_t& record,
const rocprofiler_timestamp_t& _beg_ts)
{
auto args = function_args_t{};
rocprofiler_iterate_callback_tracing_kind_operation_args(
record, iterate_args_callback, 1, &args);
get_ompt_standard_cb_storage().emplace(
record.correlation_id.internal,
rocprofsys_ompt_data_storage_t{ record, _beg_ts, args });
}
void
ompt_pop_standard_callback(
const rocprofiler_callback_tracing_record_t& record,
const rocprofiler_timestamp_t& _end_ts,
std::optional<std::vector<tim::unwind::processed_entry>>& _bt_data)
{
auto it = get_ompt_standard_cb_storage().find(record.correlation_id.internal);
if(it == get_ompt_standard_cb_storage().end())
{
auto args = function_args_t{};
rocprofiler_iterate_callback_tracing_kind_operation_args(
record, iterate_args_callback, 2, &args);
ompt_cache_orphan_event(rocprofsys_ompt_data_storage_t{ record, _end_ts, args },
_bt_data);
return;
}
auto stored_data = it->second;
get_ompt_standard_cb_storage().erase(it);
auto call_stack = get_backtrace(_bt_data);
cache_category<category::rocm_ompt_api>();
cache_add_thread_info(record.thread_id);
cache_region(&record, stored_data._beg_ts, _end_ts, call_stack->to_string(),
get_args_string(stored_data.args),
trait::name<category::rocm_ompt_api>::value);
}
void
ompt_push_parallel_callback(const rocprofiler_callback_tracing_record_t& record,
const rocprofiler_timestamp_t& _beg_ts)
{
auto* payload_data =
static_cast<rocprofiler_callback_tracing_ompt_data_t*>(record.payload);
const void* parallel_data_address = payload_data->args.parallel_begin.parallel_data;
auto args = function_args_t{};
rocprofiler_iterate_callback_tracing_kind_operation_args(
record, iterate_args_callback, 1, &args);
get_ompt_parallel_cb_storage().emplace(
reinterpret_cast<uintptr_t>(parallel_data_address),
rocprofsys_ompt_data_storage_t{ record, _beg_ts, args });
}
void
ompt_pop_parallel_callback(
const rocprofiler_callback_tracing_record_t& record,
const rocprofiler_timestamp_t& _end_ts,
std::optional<std::vector<tim::unwind::processed_entry>>& _bt_data)
{
auto* payload_data =
static_cast<rocprofiler_callback_tracing_ompt_data_t*>(record.payload);
const void* parallel_data_address = payload_data->args.parallel_end.parallel_data;
auto it = get_ompt_parallel_cb_storage().find(
reinterpret_cast<uintptr_t>(parallel_data_address));
if(it == get_ompt_parallel_cb_storage().end())
{
auto args = function_args_t{};
rocprofiler_iterate_callback_tracing_kind_operation_args(
record, iterate_args_callback, 2, &args);
ompt_cache_orphan_event(rocprofsys_ompt_data_storage_t{ record, _end_ts, args },
_bt_data);
return;
}
auto stored_data = it->second;
get_ompt_parallel_cb_storage().erase(it);
auto call_stack = get_backtrace(_bt_data);
cache_category<category::rocm_ompt_api>();
cache_add_thread_info(record.thread_id);
cache_region(&record, stored_data._beg_ts, _end_ts, call_stack->to_string(),
get_args_string(stored_data.args),
trait::name<category::rocm_ompt_api>::value);
}
void
ompt_finalize_orphan_events()
{
auto empty_call_stack =
std::optional<std::vector<tim::unwind::processed_entry>>{ std::nullopt };
for(const auto& [parallel_data, stored_data] : get_ompt_parallel_cb_storage())
{
ompt_cache_orphan_event(stored_data, empty_call_stack);
}
for(const auto& [correlation_id, stored_data] : get_ompt_standard_cb_storage())
{
ompt_cache_orphan_event(stored_data, empty_call_stack);
}
get_ompt_parallel_cb_storage().clear();
get_ompt_standard_cb_storage().clear();
}
// To handle events without finalization, perfetto push must occur in start
// Allows capture of worker thread implicit and sync tasks
// Allows capture of worker thread implicit tasks and sync regions
void
ompt_tracing_callback_start(rocprofiler_callback_tracing_record_t record,
rocprofiler_user_data_t* /*user_data*/,
rocprofiler_timestamp_t ts)
{
static bool is_first_implicit_call = true;
// Ignore first ompt_implicit_call as this is created after runtime initialization but
// before first region
// Respective end is also not received due to finalization occurring too late
if(is_first_implicit_call && (record.kind == ROCPROFILER_CALLBACK_TRACING_OMPT &&
record.operation == ROCPROFILER_OMPT_ID_implicit_task))
{
is_first_implicit_call = false;
return;
}
std::string_view _name =
tool_data->callback_tracing_info.at(record.kind, record.operation);
// Forces omp_parallel begin and end to have same name, allowing perfetto track to
// connect. This will be changed in the future
if(record.operation == ROCPROFILER_OMPT_ID_parallel_begin) _name = "omp_parallel";
std::string_view _name = ompt_get_unified_name(record);
if(get_use_timemory())
{
@@ -975,12 +1159,7 @@ ompt_tracing_callback_stop(
rocprofiler_timestamp_t ts,
std::optional<std::vector<tim::unwind::processed_entry>>& _bt_data)
{
std::string_view _name =
tool_data->callback_tracing_info.at(record.kind, record.operation);
// Forces omp_parallel begin and end to have same name, allowing perfetto track to
// connect. This will be changed in the future
if(record.operation == ROCPROFILER_OMPT_ID_parallel_end) _name = "omp_parallel";
std::string_view _name = ompt_get_unified_name(record);
if(get_use_timemory())
{
@@ -996,6 +1175,7 @@ ompt_tracing_callback_stop(
rocprofiler_iterate_callback_tracing_kind_operation_args(record, save_args, 2,
&args);
}
uint64_t _end_ts = ts;
tracing::pop_perfetto_ts(
category::rocm_ompt_api{}, _name.data(), _end_ts,
@@ -1076,6 +1256,17 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record,
}
};
#if(ROCPROFILER_VERSION >= 600)
static bool is_first_implicit_task = false;
if(!is_first_implicit_task && record.operation == ROCPROFILER_OMPT_ID_implicit_task)
{
// We do not capture implicit task with flags = 1 on main thread
// For now, this is identified as the first implicit task call
is_first_implicit_task = true;
return;
}
#endif
auto ts = rocprofiler_timestamp_t{};
ROCPROFILER_CALL(rocprofiler_get_timestamp(&ts));
const char* name = "";
@@ -1128,6 +1319,7 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record,
case ROCPROFILER_CALLBACK_TRACING_OMPT:
{
ompt_tracing_callback_start(record, user_data, ts);
ompt_push_standard_callback(record, ts);
break;
}
case ROCPROFILER_CALLBACK_TRACING_ROCDECODE_API:
@@ -1211,6 +1403,7 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record,
case ROCPROFILER_CALLBACK_TRACING_OMPT:
{
ompt_tracing_callback_stop(record, user_data, ts, _bt_data);
ompt_pop_standard_callback(record, ts, _bt_data);
break;
}
case ROCPROFILER_CALLBACK_TRACING_ROCDECODE_API:
@@ -1306,9 +1499,11 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record,
{
case ROCPROFILER_OMPT_ID_parallel_begin:
ompt_tracing_callback_start(record, user_data, ts);
ompt_push_parallel_callback(record, ts);
break;
case ROCPROFILER_OMPT_ID_parallel_end:
ompt_tracing_callback_stop(record, user_data, ts, _bt_data);
ompt_pop_parallel_callback(record, ts, _bt_data);
break;
case ROCPROFILER_OMPT_ID_lock_init:
case ROCPROFILER_OMPT_ID_lock_destroy:
@@ -1333,10 +1528,12 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record,
// These callbacks are considered instant events and should start
// and immediately call stop as no corresponding "end" will be
// received
ompt_tracing_callback_start(record, user_data, ts);
auto start_ts = ts;
ompt_tracing_callback_start(record, user_data, start_ts);
ROCPROFILER_CALL(
rocprofiler_get_timestamp(&ts)); // Set artificial end ts
ompt_tracing_callback_stop(record, user_data, ts, _bt_data);
ompt_cache_instant_event(record, start_ts, _bt_data);
break;
}
default:
@@ -2098,6 +2295,10 @@ tool_fini(void* callback_data)
static std::atomic_flag _once = ATOMIC_FLAG_INIT;
if(_once.test_and_set()) return;
#if(ROCPROFILER_VERSION >= 600)
ompt_finalize_orphan_events();
#endif
flush();
stop();