From 0c53a12a8855199e41837b109df62f22c9b0bfcb Mon Sep 17 00:00:00 2001 From: Kian Cossettini Date: Tue, 7 Oct 2025 19:01:25 -0400 Subject: [PATCH] [rocprofiler-systems] [ROCpd] Add OMPT callbacks to ROCpd (#1016) * Add OMPT to ROCpd * Use correct category * Added wrapper functions for future control * Formatting * Fix naming * Comment change * Remove ompt_get_cb_args * Switched to using region_sample for OMPT * Remove relic function * Remove get_use_rocpd that was used in this pr (one still remains) * Rename ompt_get_args_string and reuse in tool_tracing_callback_stop * Make lock init and destroy cb instant * [Prototype] ROCPD Name fix * [Prototype] ROCPD Name fix P1 * [Prototype] ROCPD Name fix P2 * ROCPD Name fix * Var name changes * Rewrite cb overwrite to single function * [Important] Use parallel_data as key for parallel callback map * Fix workflow failure * Make cpp USE_ROCM consistent with hpp and use default constructor if USE_ROCM = 0 * Add missing ROCPROFILER_VERSION check * Improve readability * Make ompt storage maps thread local * Part 1: Variable name fix, memory cleanup, and fixed asserts * Part 2: Add comments * Part 3: Add CI_THROW * Part 4: Formatting * Part 5: Move #include to cpp --- .../core/trace_cache/metadata_registry.cpp | 114 +++++++- .../core/trace_cache/metadata_registry.hpp | 12 +- .../rocprof-sys/library/rocprofiler-sdk.cpp | 273 +++++++++++++++--- 3 files changed, 361 insertions(+), 38 deletions(-) diff --git a/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.cpp b/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.cpp index 160d33ba77..0284f3f585 100644 --- a/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.cpp +++ b/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.cpp @@ -21,6 +21,7 @@ // SOFTWARE. #include "metadata_registry.hpp" +#include "core/debug.hpp" #include #include @@ -210,7 +211,7 @@ metadata_registry::get_string_list() const return result; } -#if ROCPROFSYS_USE_ROCM +#if ROCPROFSYS_USE_ROCM > 0 void metadata_registry::add_code_object( @@ -278,6 +279,102 @@ metadata_registry::get_kernel_symbol_list() const return result; } +// As the underlying implementation of callback_name_info_t resizes the category storage +// during emplace, this special method is required +void +metadata_registry::overwrite_callback_names( + std::initializer_list< + std::pair> + rename_table) +{ + if(rename_table.size() == 0) return; + + using callback_kind_t = rocprofiler_callback_tracing_kind_t; + using operation_names_t = std::vector; + + auto category_names = std::vector{}; + auto modified_ops = std::map{}; + + auto extract_operations = [&](callback_kind_t cat) -> operation_names_t { + auto items = m_callback_tracing_info.items(); + const auto* target_category = items[static_cast(cat)]; + + auto operations_data = target_category->items(); + operation_names_t operation_names; + operation_names.reserve(operations_data.size()); + + for(const auto& [op_idx, op_name] : operations_data) + operation_names.push_back(*op_name); + + return operation_names; + }; + + // Store category names + category_names.resize(ROCPROFILER_CALLBACK_TRACING_LAST); + for(callback_kind_t i = ROCPROFILER_CALLBACK_TRACING_NONE; + i < ROCPROFILER_CALLBACK_TRACING_LAST; + i = static_cast(static_cast(i) + 1)) + { + category_names[i] = m_callback_tracing_info.at(i); + } + + // Process list + for(const auto& category_info : rename_table) + { + auto callback_kind = category_info.first; + // Store operations of all following categories + // as they will be deleted + for(callback_kind_t i = + static_cast(static_cast(callback_kind) + 1); + i < ROCPROFILER_CALLBACK_TRACING_LAST; + i = static_cast(static_cast(i) + 1)) + { + if(modified_ops.find(i) != modified_ops.end()) break; + modified_ops[i] = extract_operations(i); + } + + ROCPROFSYS_CI_THROW(modified_ops.find(callback_kind) != modified_ops.end(), + "Overwriting a previously overwritten entry is forbidden"); + + ROCPROFSYS_CI_THROW(!modified_ops.empty() && + callback_kind >= modified_ops.begin()->first, + "Category must have a larger enum value than all previously " + "modified_ops categories"); + + // Overwrite desired category + auto operation_names = extract_operations(callback_kind); + for(const auto& [index, new_value] : category_info.second) + { + ROCPROFSYS_CI_THROW(index < 0 || + static_cast(index) >= operation_names.size(), + "Index is invalid"); + operation_names[index] = new_value; + } + modified_ops[callback_kind] = std::move(operation_names); + } + if(modified_ops.empty()) return; + + // Emplace the changed category operations + for(callback_kind_t i = modified_ops.begin()->first; + i < ROCPROFILER_CALLBACK_TRACING_LAST; + i = static_cast(static_cast(i) + 1)) + { + auto renaming_entry = modified_ops.find(i); + + ROCPROFSYS_CI_THROW(renaming_entry == modified_ops.end(), + "A category that needs to be emplaced is missing"); + + const auto& operations_vec = renaming_entry->second; + m_callback_tracing_info.emplace(i, category_names.at(i).data()); + for(size_t op_idx = 0; op_idx < operations_vec.size(); ++op_idx) + { + m_callback_tracing_info.emplace( + i, static_cast(op_idx), + operations_vec[op_idx].data()); + } + } +} + rocprofiler::sdk::buffer_name_info_t metadata_registry::get_buffer_name_info() const { @@ -292,5 +389,20 @@ metadata_registry::get_callback_tracing_info() const #endif +metadata_registry::metadata_registry() +{ +#if ROCPROFSYS_USE_ROCM > 0 + overwrite_callback_names({ +# if(ROCPROFILER_VERSION >= 600) + { ROCPROFILER_CALLBACK_TRACING_OMPT, + { { ROCPROFILER_OMPT_ID_thread_begin, "omp_thread" }, + { ROCPROFILER_OMPT_ID_thread_end, "omp_thread" }, + { ROCPROFILER_OMPT_ID_parallel_begin, "omp_parallel" }, + { ROCPROFILER_OMPT_ID_parallel_end, "omp_parallel" } } } +# endif + }); +#endif +} + } // namespace trace_cache } // namespace rocprofsys diff --git a/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.hpp b/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.hpp index 6e53b34398..499f4fff28 100644 --- a/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.hpp +++ b/projects/rocprofiler-systems/source/lib/core/trace_cache/metadata_registry.hpp @@ -35,6 +35,8 @@ # include # include #endif +#include +#include #include #include #include @@ -215,7 +217,7 @@ struct metadata_registry private: friend class cache_manager; - metadata_registry() = default; + metadata_registry(); common::synchronized m_process; common::synchronized< std::unordered_set> @@ -240,6 +242,14 @@ private: rocprofiler::sdk::callback_name_info_t m_callback_tracing_info{ rocprofiler::sdk::get_callback_tracing_names() }; + + using callback_rename_map_t = + std::map; + + void overwrite_callback_names( + std::initializer_list< + std::pair> + rename_table); #endif }; diff --git a/projects/rocprofiler-systems/source/lib/rocprof-sys/library/rocprofiler-sdk.cpp b/projects/rocprofiler-systems/source/lib/rocprof-sys/library/rocprofiler-sdk.cpp index dd46c375a2..79614109ac 100644 --- a/projects/rocprofiler-systems/source/lib/rocprof-sys/library/rocprofiler-sdk.cpp +++ b/projects/rocprofiler-systems/source/lib/rocprof-sys/library/rocprofiler-sdk.cpp @@ -110,6 +110,31 @@ typedef struct rocprofiler_stream_id_t #endif +#if(ROCPROFILER_VERSION >= 600) + +struct rocprofsys_ompt_data_storage_t +{ + rocprofiler_callback_tracing_record_t record; + rocprofiler_timestamp_t _beg_ts; + function_args_t args; // Required for orphan ENTER events +}; + +auto +ompt_get_unified_name(const rocprofiler_callback_tracing_record_t& record) +{ + std::string_view _name = + tool_data->callback_tracing_info.at(record.kind, record.operation); + + // Forces omp_parallel begin and end to have same name, allowing track to connect + if(record.operation == ROCPROFILER_OMPT_ID_parallel_begin || + record.operation == ROCPROFILER_OMPT_ID_parallel_end) + _name = "omp_parallel"; + + return _name; +} + +#endif + auto& get_stream_stack() { @@ -613,6 +638,21 @@ cache_memory_allocation(rocprofiler_buffer_tracing_memory_allocation_record_t* r } #endif // clang-format on + +std::string +get_args_string(const function_args_t& args) +{ + std::string args_str; + std::for_each(args.begin(), args.end(), [&args_str](const argument_info& arg) { + const auto* delimiter = ";;"; + std::stringstream ss; + ss << arg.arg_number << delimiter << arg.arg_type << delimiter << arg.arg_name + << delimiter << arg.arg_value << delimiter; + args_str.append(ss.str()); + }); + return args_str; +} + template void tool_tracing_callback_start(CategoryT, rocprofiler_callback_tracing_record_t record, @@ -823,16 +863,7 @@ tool_tracing_callback_stop( { cache_category(); cache_add_thread_info(record.thread_id); - std::string args_str; - - std::for_each(args.begin(), args.end(), [&args_str](const argument_info& arg) { - const auto* delimiter = ";;"; - std::stringstream ss; - ss << arg.arg_number << delimiter << arg.arg_type << delimiter << arg.arg_name - << delimiter << arg.arg_value << delimiter; - args_str.append(ss.str()); - }); - + std::string args_str = get_args_string(args); cache_region(&record, _beg_ts, _end_ts, call_stack->to_string(), args_str, trait::name::value); } @@ -904,31 +935,184 @@ get_kernel_dispatch_timestamps() } #if(ROCPROFILER_VERSION >= 600) + +// An instant event is one that has its beg_ts = end_ts +void +ompt_cache_instant_event( + rocprofiler_callback_tracing_record_t record, rocprofiler_timestamp_t _instant_ts, + std::optional>& _bt_data) +{ + auto args = function_args_t{}; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, iterate_args_callback, 2, &args); + auto call_stack = get_backtrace(_bt_data); + + cache_category(); + cache_add_thread_info(record.thread_id); + cache_region(&record, _instant_ts, _instant_ts, call_stack->to_string(), + get_args_string(args), trait::name::value); +} + +// OMPT callbacks with no corresponding begin/end are treated as "instant" +void +ompt_cache_orphan_event( + const rocprofsys_ompt_data_storage_t& stored_data, + std::optional>& _bt_data) +{ + auto call_stack = get_backtrace(_bt_data); + cache_category(); + cache_add_thread_info(stored_data.record.thread_id); + cache_region(&stored_data.record, stored_data._beg_ts, stored_data._beg_ts, + call_stack->to_string(), get_args_string(stored_data.args), + trait::name::value); +} + +// Any OMPT callback that can be of phase ENTER or EXIT is a standard callback. +// I.e. it has an ompt_scope_endpoint_t in its definition (excluding +// ROCPROFILER_OMPT_ID_nest_lock as it is a mutex) +auto& +get_ompt_standard_cb_storage() +{ + // uint64_t -> internal id from rocprofiler_correlation_id_t + static thread_local auto _v = + std::unordered_map{}; + return _v; +} + +// An OMPT parallel callback consists of ROCPROFILER_OMPT_ID_parallel_begin and +// ROCPROFILER_OMPT_ID_parallel_end +// As the beginning and end can only occur on the same thread, they are connected into a +// single track called "omp_parallel" for clarity. In this track, the information +// contained within parallel_begin should be displayed as it contains all the information +// that parallel_end has as well as the flags and number of threads/teams that were +// requested. +auto& +get_ompt_parallel_cb_storage() +{ + // uintptr_t -> parallel_data (see callback definition) + static thread_local auto _v = + std::unordered_map{}; + return _v; +} + +void +ompt_push_standard_callback(const rocprofiler_callback_tracing_record_t& record, + const rocprofiler_timestamp_t& _beg_ts) +{ + auto args = function_args_t{}; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, iterate_args_callback, 1, &args); + get_ompt_standard_cb_storage().emplace( + record.correlation_id.internal, + rocprofsys_ompt_data_storage_t{ record, _beg_ts, args }); +} + +void +ompt_pop_standard_callback( + const rocprofiler_callback_tracing_record_t& record, + const rocprofiler_timestamp_t& _end_ts, + std::optional>& _bt_data) +{ + auto it = get_ompt_standard_cb_storage().find(record.correlation_id.internal); + + if(it == get_ompt_standard_cb_storage().end()) + { + auto args = function_args_t{}; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, iterate_args_callback, 2, &args); + ompt_cache_orphan_event(rocprofsys_ompt_data_storage_t{ record, _end_ts, args }, + _bt_data); + return; + } + + auto stored_data = it->second; + get_ompt_standard_cb_storage().erase(it); + + auto call_stack = get_backtrace(_bt_data); + cache_category(); + cache_add_thread_info(record.thread_id); + cache_region(&record, stored_data._beg_ts, _end_ts, call_stack->to_string(), + get_args_string(stored_data.args), + trait::name::value); +} + +void +ompt_push_parallel_callback(const rocprofiler_callback_tracing_record_t& record, + const rocprofiler_timestamp_t& _beg_ts) +{ + auto* payload_data = + static_cast(record.payload); + const void* parallel_data_address = payload_data->args.parallel_begin.parallel_data; + + auto args = function_args_t{}; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, iterate_args_callback, 1, &args); + get_ompt_parallel_cb_storage().emplace( + reinterpret_cast(parallel_data_address), + rocprofsys_ompt_data_storage_t{ record, _beg_ts, args }); +} + +void +ompt_pop_parallel_callback( + const rocprofiler_callback_tracing_record_t& record, + const rocprofiler_timestamp_t& _end_ts, + std::optional>& _bt_data) +{ + auto* payload_data = + static_cast(record.payload); + const void* parallel_data_address = payload_data->args.parallel_end.parallel_data; + + auto it = get_ompt_parallel_cb_storage().find( + reinterpret_cast(parallel_data_address)); + + if(it == get_ompt_parallel_cb_storage().end()) + { + auto args = function_args_t{}; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, iterate_args_callback, 2, &args); + ompt_cache_orphan_event(rocprofsys_ompt_data_storage_t{ record, _end_ts, args }, + _bt_data); + return; + } + + auto stored_data = it->second; + get_ompt_parallel_cb_storage().erase(it); + auto call_stack = get_backtrace(_bt_data); + + cache_category(); + cache_add_thread_info(record.thread_id); + cache_region(&record, stored_data._beg_ts, _end_ts, call_stack->to_string(), + get_args_string(stored_data.args), + trait::name::value); +} + +void +ompt_finalize_orphan_events() +{ + auto empty_call_stack = + std::optional>{ std::nullopt }; + for(const auto& [parallel_data, stored_data] : get_ompt_parallel_cb_storage()) + { + ompt_cache_orphan_event(stored_data, empty_call_stack); + } + + for(const auto& [correlation_id, stored_data] : get_ompt_standard_cb_storage()) + { + ompt_cache_orphan_event(stored_data, empty_call_stack); + } + + get_ompt_parallel_cb_storage().clear(); + get_ompt_standard_cb_storage().clear(); +} + // To handle events without finalization, perfetto push must occur in start -// Allows capture of worker thread implicit and sync tasks +// Allows capture of worker thread implicit tasks and sync regions void ompt_tracing_callback_start(rocprofiler_callback_tracing_record_t record, rocprofiler_user_data_t* /*user_data*/, rocprofiler_timestamp_t ts) { - static bool is_first_implicit_call = true; - - // Ignore first ompt_implicit_call as this is created after runtime initialization but - // before first region - // Respective end is also not received due to finalization occurring too late - if(is_first_implicit_call && (record.kind == ROCPROFILER_CALLBACK_TRACING_OMPT && - record.operation == ROCPROFILER_OMPT_ID_implicit_task)) - { - is_first_implicit_call = false; - return; - } - - std::string_view _name = - tool_data->callback_tracing_info.at(record.kind, record.operation); - - // Forces omp_parallel begin and end to have same name, allowing perfetto track to - // connect. This will be changed in the future - if(record.operation == ROCPROFILER_OMPT_ID_parallel_begin) _name = "omp_parallel"; + std::string_view _name = ompt_get_unified_name(record); if(get_use_timemory()) { @@ -975,12 +1159,7 @@ ompt_tracing_callback_stop( rocprofiler_timestamp_t ts, std::optional>& _bt_data) { - std::string_view _name = - tool_data->callback_tracing_info.at(record.kind, record.operation); - - // Forces omp_parallel begin and end to have same name, allowing perfetto track to - // connect. This will be changed in the future - if(record.operation == ROCPROFILER_OMPT_ID_parallel_end) _name = "omp_parallel"; + std::string_view _name = ompt_get_unified_name(record); if(get_use_timemory()) { @@ -996,6 +1175,7 @@ ompt_tracing_callback_stop( rocprofiler_iterate_callback_tracing_kind_operation_args(record, save_args, 2, &args); } + uint64_t _end_ts = ts; tracing::pop_perfetto_ts( category::rocm_ompt_api{}, _name.data(), _end_ts, @@ -1076,6 +1256,17 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record, } }; +#if(ROCPROFILER_VERSION >= 600) + static bool is_first_implicit_task = false; + if(!is_first_implicit_task && record.operation == ROCPROFILER_OMPT_ID_implicit_task) + { + // We do not capture implicit task with flags = 1 on main thread + // For now, this is identified as the first implicit task call + is_first_implicit_task = true; + return; + } +#endif + auto ts = rocprofiler_timestamp_t{}; ROCPROFILER_CALL(rocprofiler_get_timestamp(&ts)); const char* name = ""; @@ -1128,6 +1319,7 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record, case ROCPROFILER_CALLBACK_TRACING_OMPT: { ompt_tracing_callback_start(record, user_data, ts); + ompt_push_standard_callback(record, ts); break; } case ROCPROFILER_CALLBACK_TRACING_ROCDECODE_API: @@ -1211,6 +1403,7 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record, case ROCPROFILER_CALLBACK_TRACING_OMPT: { ompt_tracing_callback_stop(record, user_data, ts, _bt_data); + ompt_pop_standard_callback(record, ts, _bt_data); break; } case ROCPROFILER_CALLBACK_TRACING_ROCDECODE_API: @@ -1306,9 +1499,11 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record, { case ROCPROFILER_OMPT_ID_parallel_begin: ompt_tracing_callback_start(record, user_data, ts); + ompt_push_parallel_callback(record, ts); break; case ROCPROFILER_OMPT_ID_parallel_end: ompt_tracing_callback_stop(record, user_data, ts, _bt_data); + ompt_pop_parallel_callback(record, ts, _bt_data); break; case ROCPROFILER_OMPT_ID_lock_init: case ROCPROFILER_OMPT_ID_lock_destroy: @@ -1333,10 +1528,12 @@ tool_tracing_callback(rocprofiler_callback_tracing_record_t record, // These callbacks are considered instant events and should start // and immediately call stop as no corresponding "end" will be // received - ompt_tracing_callback_start(record, user_data, ts); + auto start_ts = ts; + ompt_tracing_callback_start(record, user_data, start_ts); ROCPROFILER_CALL( rocprofiler_get_timestamp(&ts)); // Set artificial end ts ompt_tracing_callback_stop(record, user_data, ts, _bt_data); + ompt_cache_instant_event(record, start_ts, _bt_data); break; } default: @@ -2098,6 +2295,10 @@ tool_fini(void* callback_data) static std::atomic_flag _once = ATOMIC_FLAG_INIT; if(_once.test_and_set()) return; +#if(ROCPROFILER_VERSION >= 600) + ompt_finalize_orphan_events(); +#endif + flush(); stop();