[rocprofiler-system]: Enable UCX Communication API tracing (#2306)

## Motivation

Enable UCX communication tracing and communication metadata 

## Technical Details

Implement UCX API wrappers to trace transport-layer communication. This adds communication data tracking and exposes “UCX Comm Send/Recv” timelines, enabling detailed analysis of MPI, OpenSHMEM, and other UCX-based runtime communication patterns.

- Implements function interception for UCX functions across multiple categories using gotcha component.
- Extended comm_data component to track UCX send/recv operations - Added ucx_send and ucx_recv labels for Perfetto counter tracks. Integrated UCX data tracking with existing MPI/RCCL tracking infrastructure.
- Added ROCPROFSYS_USE_UCX configuration option (enabled by default).
- Created FindUCX.cmake module for UCX header detection. Falls back to internal UCX headers if system headers not found.
- Updated all Dockerfiles  to include UCX dependencies.
This commit is contained in:
Sajina PK
2026-01-20 13:16:43 -05:00
کامیت شده توسط GitHub
والد 72f0a41658
کامیت 15c82d6da8
25فایلهای تغییر یافته به همراه10613 افزوده شده و 9 حذف شده
@@ -8,6 +8,7 @@ Full documentation for ROCm Systems Profiler is available at [https://rocm.docs.
### Added
- Support for UCX (Unified Communication X) API tracing.
- Documentation for `--trace-legacy` / `-L` CLI flag for direct tracing mode.
- Added dependency to `spdlog` library.
- Added environment variable `ROCPROFSYS_LOG_LEVEL` which control level of logging.
@@ -0,0 +1,61 @@
# ------------------------------------------------------------------------------#
#
# Finds headers for UCX (Unified Communication X)
#
# UCX is a high-performance communication framework used as a transport layer
# for MPI and other communication libraries. This module locates UCX headers
# (ucp.h, uct.h) for tracing and interception purposes.
#
# ------------------------------------------------------------------------------#
include(FindPackageHandleStandardArgs)
# ----------------------------------------------------------------------------------------#
set(UCX_HEADERS_INCLUDE_DIR_INTERNAL
"${PROJECT_SOURCE_DIR}/source/lib/rocprof-sys/library/tpls/ucx"
CACHE PATH
"Path to internal UCX headers"
)
# ----------------------------------------------------------------------------------------#
# Find UCX headers (ucp.h and uct.h are under ucx/ subdirectory)
find_path(
UCX_HEADERS_INCLUDE_DIR
NAMES ucp/api/ucp.h
PATHS /usr/include /usr/local/include /opt/ucx/include
)
if(NOT EXISTS "${UCX_HEADERS_INCLUDE_DIR}")
rocprofiler_systems_message(
AUTHOR_WARNING
"UCX headers do not exist! Setting UCX_HEADERS_INCLUDE_DIR to internal directory: ${UCX_HEADERS_INCLUDE_DIR_INTERNAL}"
)
set(UCX_HEADERS_INCLUDE_DIR
"${UCX_HEADERS_INCLUDE_DIR_INTERNAL}"
CACHE PATH
"Path to UCX headers"
FORCE
)
else()
rocprofiler_systems_message(STATUS "UCX headers found: ${UCX_HEADERS_INCLUDE_DIR}")
endif()
mark_as_advanced(UCX_HEADERS_INCLUDE_DIR)
# ----------------------------------------------------------------------------------------#
find_package_handle_standard_args(UCX DEFAULT_MSG UCX_HEADERS_INCLUDE_DIR)
# ------------------------------------------------------------------------------#
if(UCX_FOUND)
add_library(roc::ucx-headers INTERFACE IMPORTED)
target_include_directories(
roc::ucx-headers
SYSTEM
INTERFACE ${UCX_HEADERS_INCLUDE_DIR}
)
endif()
# ------------------------------------------------------------------------------#
@@ -42,6 +42,9 @@ rocprofiler_systems_add_interface_library(rocprofiler-systems-mpi
rocprofiler_systems_add_interface_library(rocprofiler-systems-libva
"Provides VA-API headers"
)
rocprofiler_systems_add_interface_library(rocprofiler-systems-ucx
"Provides UCX headers"
)
rocprofiler_systems_add_interface_library(rocprofiler-systems-bfd
"Provides Binary File Descriptor (BFD)"
)
@@ -968,6 +971,9 @@ target_include_directories(
INTERFACE ${LIBVA_HEADERS_INCLUDE_DIR}
)
find_package(UCX ${rocprofiler_systems_FIND_QUIETLY} REQUIRED)
target_include_directories(rocprofiler-systems-ucx INTERFACE ${UCX_HEADERS_INCLUDE_DIR})
# ----------------------------------------------------------------------------------------#
#
# PTL (Parallel Tasking Library) submodule
@@ -26,7 +26,7 @@ RUN zypper --non-interactive update -y && \
zypper --non-interactive install -y chrpath cmake curl dpkg-devel \
gcc-c++ gcc-fortran git gmock gtest iproute2 libdrm-devel libnuma-devel \
ninja nlohmann_json-devel openmpi3-devel python3-pip rpm-build \
sqlite3-devel wget && \
sqlite3-devel wget libucp-devel libuct-devel && \
python3 -m pip install 'cmake==3.21'
ARG ROCM_VERSION=0.0
@@ -30,7 +30,7 @@ RUN zypper --non-interactive update -y && \
zypper --non-interactive install -y chrpath cmake curl dpkg-devel \
gcc-c++ gcc-fortran git gmock gtest iproute2 ninja nlohmann_json-devel \
openmpi3-devel papi-devel python3-devel python3-pip rpm-build \
sqlite3-devel vim wget && \
sqlite3-devel vim wget libucp-devel libuct-devel && \
zypper --non-interactive clean --all && \
python3 -m pip install 'cmake==3.21' perfetto
@@ -17,7 +17,7 @@ RUN yum groupinstall -y "Development Tools" && \
yum install -y epel-release && crb enable && \
yum install -y --allowerasing chrpath cmake curl dpkg-devel gmock-devel gtest-devel \
iproute json-devel libdrm-devel ninja-build numactl-devel openmpi-devel \
papi-devel python3-pip sqlite-devel texinfo wget which zlib-devel && \
papi-devel python3-pip sqlite-devel texinfo wget which zlib-devel ucx-devel && \
yum clean all && \
python3 -m pip install 'cmake==3.21' && \
python3 -m pip install 'perfetto'
@@ -21,7 +21,7 @@ RUN yum groupinstall -y "Development Tools" && \
yum install -y epel-release && crb enable && \
yum install -y --allowerasing chrpath cmake curl dpkg-devel gmock-devel gtest-devel \
iproute json-devel ninja-build numactl-devel openmpi-devel papi-devel \
python3-devel python3-pip sqlite-devel texinfo wget which vim zlib-devel && \
python3-devel python3-pip sqlite-devel texinfo wget which vim zlib-devel ucx-devel && \
yum clean all && \
python3 -m pip install 'cmake==3.21' perfetto
@@ -29,7 +29,8 @@ RUN apt-get update && \
build-essential chrpath cmake curl flex gettext git-core gnupg2 iproute2 \
libgmock-dev libgtest-dev libnuma1 libopenmpi-dev libpapi-dev libpfm4-dev \
librpm-dev libsqlite3-dev libtool libudev1 lsb-release m4 ninja-build \
nlohmann-json3-dev python3-pip rpm texinfo wget && \
nlohmann-json3-dev python3-pip rpm texinfo wget \
libucx-dev ucx-utils && \
OS_VERSION=$(grep '^VERSION_ID=' /etc/os-release | cut -d'=' -f2 | tr -d '"') && \
OS_ID=$(grep '^ID=' /etc/os-release | cut -d'=' -f2 | tr -d '"') && \
if [ "${OS_ID}" == "ubuntu" ] && [ "${OS_VERSION}" == "22.04" ]; then \
@@ -27,7 +27,8 @@ RUN apt-get update && \
bzip2 chrpath cmake curl environment-modules flex gettext git-core gnupg2 \
gzip iproute2 libgmock-dev libgtest-dev libiberty-dev libpapi-dev libpfm4-dev \
libsqlite3-dev libtool locales lsb-release m4 ninja-build nlohmann-json3-dev \
python3-pip software-properties-common texinfo unzip wget vim zip zlib1g-dev && \
python3-pip software-properties-common texinfo unzip wget vim zip zlib1g-dev \
libucx-dev ucx-utils && \
apt-get autoclean
RUN OS_VERSION=$(grep '^VERSION_ID=' /etc/os-release | cut -d'=' -f2 | tr -d '"') && \
@@ -47,6 +47,7 @@ target_link_libraries(
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-elfutils>
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-bfd>
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-mpi>
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-ucx>
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-libva>
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-ptl>
$<BUILD_INTERFACE:rocprofiler-systems::rocprofiler-systems-rocm>
@@ -127,8 +127,9 @@ ROCPROFSYS_DEFINE_CATEGORY(category, rocm_rccl, ROCPROFSYS_CATEGORY_ROCM_RCCL, "
ROCPROFSYS_DEFINE_CATEGORY(category, pthread, ROCPROFSYS_CATEGORY_PTHREAD, "pthread", "POSIX threading functions")
ROCPROFSYS_DEFINE_CATEGORY(category, kokkos, ROCPROFSYS_CATEGORY_KOKKOS, "kokkos", "KokkosTools regions")
ROCPROFSYS_DEFINE_CATEGORY(category, mpi, ROCPROFSYS_CATEGORY_MPI, "mpi", "MPI regions")
ROCPROFSYS_DEFINE_CATEGORY(category, ucx, ROCPROFSYS_CATEGORY_UCX, "ucx", "UCX regions")
ROCPROFSYS_DEFINE_CATEGORY(category, process_sampling, ROCPROFSYS_CATEGORY_PROCESS_SAMPLING, "process_sampling", "Process-level data")
ROCPROFSYS_DEFINE_CATEGORY(category, comm_data, ROCPROFSYS_CATEGORY_COMM_DATA, "comm_data", "MPI/RCCL counters for tracking amount of data sent or received")
ROCPROFSYS_DEFINE_CATEGORY(category, comm_data, ROCPROFSYS_CATEGORY_COMM_DATA, "comm_data", "MPI/RCCL/UCX counters for tracking amount of data sent or received")
ROCPROFSYS_DEFINE_CATEGORY(category, causal, ROCPROFSYS_CATEGORY_CAUSAL, "causal", "Causal profiling data")
ROCPROFSYS_DEFINE_CATEGORY(category, cpu_freq, ROCPROFSYS_CATEGORY_CPU_FREQ, "cpu_frequency", "CPU frequency (collected in background thread)")
ROCPROFSYS_DEFINE_CATEGORY(category, process_page, ROCPROFSYS_CATEGORY_PROCESS_PAGE, "process_physical_memory", "Physical memory usage (RSS) in process in MB (collected in background thread)")
@@ -207,6 +208,7 @@ using name = perfetto_category<Tp...>;
ROCPROFSYS_PERFETTO_CATEGORY(category::pthread), \
ROCPROFSYS_PERFETTO_CATEGORY(category::kokkos), \
ROCPROFSYS_PERFETTO_CATEGORY(category::mpi), \
ROCPROFSYS_PERFETTO_CATEGORY(category::ucx), \
ROCPROFSYS_PERFETTO_CATEGORY(category::sampling), \
ROCPROFSYS_PERFETTO_CATEGORY(category::process_sampling), \
ROCPROFSYS_PERFETTO_CATEGORY(category::comm_data), \
@@ -379,6 +379,10 @@ configure_settings(bool _init)
"Enable support for MPI functions", true, "mpi", "backend",
"parallelism");
ROCPROFSYS_CONFIG_SETTING(bool, "ROCPROFSYS_USE_UCX",
"Enable support for UCX functions", true, "ucx", "backend",
"parallelism");
ROCPROFSYS_CONFIG_SETTING(
bool, "ROCPROFSYS_USE_RCCLP",
"Enable support for ROCm Communication Collectives Library (RCCL) Performance",
@@ -1943,6 +1947,13 @@ get_use_mpip()
return static_cast<tim::tsettings<bool>&>(*_v->second).get();
}
bool&
get_use_ucx()
{
static auto _v = get_config()->find("ROCPROFSYS_USE_UCX");
return static_cast<tim::tsettings<bool>&>(*_v->second).get();
}
bool
get_use_kokkosp()
{
@@ -225,6 +225,9 @@ get_use_pid();
bool&
get_use_mpip();
bool&
get_use_ucx();
bool
get_use_kokkosp();
@@ -80,6 +80,7 @@ extern "C"
ROCPROFSYS_CATEGORY_PTHREAD,
ROCPROFSYS_CATEGORY_KOKKOS,
ROCPROFSYS_CATEGORY_MPI,
ROCPROFSYS_CATEGORY_UCX,
ROCPROFSYS_CATEGORY_PROCESS_SAMPLING,
ROCPROFSYS_CATEGORY_COMM_DATA,
ROCPROFSYS_CATEGORY_CAUSAL,
@@ -55,6 +55,7 @@
#include "library/components/mpi_gotcha.hpp"
#include "library/components/numa_gotcha.hpp"
#include "library/components/pthread_gotcha.hpp"
#include "library/components/ucx_gotcha.hpp"
#include "library/components/vaapi_gotcha.hpp"
#include "library/coverage.hpp"
#include "library/process_sampler.hpp"
@@ -609,6 +610,12 @@ rocprofsys_init_tooling_hidden(void)
// start these gotchas once settings have been initialized
if(get_init_bundle()) get_init_bundle()->start();
if(get_use_ucx())
{
LOG_DEBUG("Setting up UCX traces...\n");
component::ucx_gotcha::start();
}
if(get_use_vaapi_tracing())
{
LOG_DEBUG("Setting up VA-API traces...");
@@ -900,6 +907,12 @@ rocprofsys_finalize_hidden(void)
fini_bundle_t _finalization{};
_finalization.start();
if(get_use_ucx())
{
LOG_DEBUG("Shutting down UCX tracing...\n");
component::ucx_gotcha::shutdown();
}
if(get_use_vaapi_tracing())
{
LOG_DEBUG("Shutting down VA-API tracing...");
@@ -11,6 +11,7 @@ set(component_sources
${CMAKE_CURRENT_LIST_DIR}/kill_gotcha.cpp
${CMAKE_CURRENT_LIST_DIR}/mpi_gotcha.cpp
${CMAKE_CURRENT_LIST_DIR}/numa_gotcha.cpp
${CMAKE_CURRENT_LIST_DIR}/ucx_gotcha.cpp
${CMAKE_CURRENT_LIST_DIR}/vaapi_gotcha.cpp
${CMAKE_CURRENT_LIST_DIR}/pthread_gotcha.cpp
${CMAKE_CURRENT_LIST_DIR}/pthread_create_gotcha.cpp
@@ -32,6 +33,7 @@ set(component_headers
${CMAKE_CURRENT_LIST_DIR}/mpip.hpp
${CMAKE_CURRENT_LIST_DIR}/mpi_gotcha.hpp
${CMAKE_CURRENT_LIST_DIR}/numa_gotcha.hpp
${CMAKE_CURRENT_LIST_DIR}/ucx_gotcha.hpp
${CMAKE_CURRENT_LIST_DIR}/vaapi_gotcha.hpp
${CMAKE_CURRENT_LIST_DIR}/pthread_gotcha.hpp
${CMAKE_CURRENT_LIST_DIR}/pthread_create_gotcha.hpp
@@ -85,6 +85,7 @@ metadata_initialize_comm_data_categories()
trace_cache::get_metadata_registry().add_string(
trait::name<category::comm_data>::value);
trace_cache::get_metadata_registry().add_string(trait::name<category::mpi>::value);
trace_cache::get_metadata_registry().add_string(trait::name<category::ucx>::value);
_is_initialized = true;
}
@@ -128,6 +129,16 @@ metadata_initialize_comm_data_pmc()
trait::name<category::mpi>::description, LONG_DESCRIPTION, COMPONENT, MSG,
rocprofsys::trace_cache::ABSOLUTE, BLOCK, EXPRESSION, 0, 0 });
#endif
trace_cache::get_metadata_registry().add_pmc_info(
{ agent_type::CPU, DEVICE_ID, TARGET_ARCH, EVENT_CODE, INSTANCE_ID,
comm_data::ucx_send::label, "Tracks UCX communication data sizes",
trait::name<category::ucx>::description, LONG_DESCRIPTION, COMPONENT, MSG,
rocprofsys::trace_cache::ABSOLUTE, BLOCK, EXPRESSION, 0, 0 });
trace_cache::get_metadata_registry().add_pmc_info(
{ agent_type::CPU, DEVICE_ID, TARGET_ARCH, EVENT_CODE, INSTANCE_ID,
comm_data::ucx_recv::label, "Tracks UCX communication data sizes",
trait::name<category::ucx>::description, LONG_DESCRIPTION, COMPONENT, MSG,
rocprofsys::trace_cache::ABSOLUTE, BLOCK, EXPRESSION, 0, 0 });
}
template <typename Track>
@@ -172,6 +183,8 @@ comm_data::start()
metadata_initialize_track<mpi_send>();
metadata_initialize_track<mpi_recv>();
#endif
metadata_initialize_track<ucx_send>();
metadata_initialize_track<ucx_recv>();
}
}
@@ -195,7 +208,7 @@ comm_data::configure()
_once = true;
comm_data_tracker_t::label() = "comm_data";
comm_data_tracker_t::description() = "Tracks MPI/RCCL communication data sizes";
comm_data_tracker_t::description() = "Tracks MPI/RCCL/UCX communication data sizes";
comm_data_tracker_t::display_unit() = "MB";
comm_data_tracker_t::unit() = units::megabyte;
@@ -471,6 +484,298 @@ comm_data::audit(const gotcha_data& _data, audit::incoming, const void*, int sen
}
#endif
// UCX communication tracking implementations
// ucp_tag_send_nbx: (void* ep, const void* buffer, size_t count, uint64_t tag, const
// void* param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, const void*,
size_t count, uint64_t tag, const void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_send>(count);
{
cache_comm_data_events<ucx_send>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
add(JOIN('/', _name, JOIN('=', "tag", tag)), count);
}
}
// ucp_tag_recv_nbx: (void* worker, void* buffer, size_t count, uint64_t tag, uint64_t
// tag_mask, const void* param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, void*, size_t count,
uint64_t tag, uint64_t tag_mask, const void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_recv>(count);
{
cache_comm_data_events<ucx_recv>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
add(JOIN('/', _name, JOIN('=', "tag", tag)), count);
add(JOIN('/', _name, JOIN('=', "tag", tag), JOIN('=', "tag_mask", tag_mask)),
count);
}
}
// ucp_put_nbx: (void* ep, const void* buffer, size_t count, uint64_t remote_addr, void*
// rkey, const void* param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, const void*,
size_t count, uint64_t remote_addr, void*, const void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_send>(count);
{
cache_comm_data_events<ucx_send>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
add(JOIN('/', _name, JOIN('=', "remote_addr", remote_addr)), count);
}
}
// ucp_get_nbx: (void* ep, void* buffer, size_t count, uint64_t remote_addr, void* rkey,
// const void* param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, void*, size_t count,
uint64_t remote_addr, void*, const void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_recv>(count);
{
cache_comm_data_events<ucx_recv>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
add(JOIN('/', _name, JOIN('=', "remote_addr", remote_addr)), count);
}
}
// ucp_am_send_nbx: (void* ep, unsigned id, const void* header, size_t header_length,
// const void* buffer, size_t count, const void* param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, unsigned id,
const void*, size_t header_length, const void*, size_t count,
const void*)
{
if(count == 0 && header_length == 0) return;
size_t total_size = header_length + count;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_send>(total_size);
{
cache_comm_data_events<ucx_send>(0, total_size);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, total_size);
add(JOIN('/', _name, JOIN('=', "am_id", id)), total_size);
}
}
// ucp_stream_send_nbx: (void* ep, const void* buffer, size_t count, const void* param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, const void*,
size_t count, const void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_send>(count);
{
cache_comm_data_events<ucx_send>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
}
}
// ucp_stream_recv_nbx: (void* ep, void* buffer, size_t count, size_t* length, const void*
// param)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, void*, size_t count,
size_t*, const void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_recv>(count);
{
cache_comm_data_events<ucx_recv>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
}
}
// Legacy: ucp_tag_send_nb/nbx - send with tag matching (for old-style wrappers)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, size_t count, void*,
void*, void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_send>(count);
{
cache_comm_data_events<ucx_send>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
}
}
// Legacy: ucp_tag_recv_nb/nbx - receive with tag matching (for old-style wrappers)
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, size_t count, void*,
void*, void*, void*, void*)
{
if(count == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_recv>(count);
{
cache_comm_data_events<ucx_recv>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
}
}
// ucp_put/get operations - RMA
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, size_t length,
uint64_t, void*, void*)
{
if(length == 0) return;
bool is_put = _data.tool_id.find("ucp_put") != std::string::npos;
if(get_use_perfetto())
{
if(is_put)
write_perfetto_counter_track<ucx_send>(length);
else
write_perfetto_counter_track<ucx_recv>(length);
}
{
if(is_put)
cache_comm_data_events<ucx_send>(0, length);
else
cache_comm_data_events<ucx_recv>(0, length);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, length);
}
}
// ucp_am_send_nb/nbx - active message send
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, unsigned, void*,
size_t header_length, void*, size_t length, unsigned, void*)
{
size_t total_length = header_length + length;
if(total_length == 0) return;
if(get_use_perfetto()) write_perfetto_counter_track<ucx_send>(total_length);
{
cache_comm_data_events<ucx_send>(0, total_length);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, total_length);
}
}
// ucp_stream_send/recv operations
void
comm_data::audit(const gotcha_data& _data, audit::incoming, void*, void*, size_t count,
void*, unsigned, void*)
{
if(count == 0) return;
bool is_send = _data.tool_id.find("send") != std::string::npos;
if(get_use_perfetto())
{
if(is_send)
write_perfetto_counter_track<ucx_send>(count);
else
write_perfetto_counter_track<ucx_recv>(count);
}
{
if(is_send)
cache_comm_data_events<ucx_send>(0, count);
else
cache_comm_data_events<ucx_recv>(0, count);
}
if(rocprofsys::get_use_timemory())
{
auto _name = std::string_view{ _data.tool_id };
tracker_t _t{ _name };
add(_t, count);
}
}
#if defined(ROCPROFSYS_USE_RCCL)
// Kept for reference, but now gathered throught the SDK callbacks.
@@ -77,6 +77,18 @@ struct comm_data : base<comm_data, void>
static constexpr auto label = "MPI Comm Send";
};
struct ucx_recv
{
static constexpr auto value = "comm_data";
static constexpr auto label = "UCX Comm Recv";
};
struct ucx_send
{
static constexpr auto value = "comm_data";
static constexpr auto label = "UCX Comm Send";
};
ROCPROFSYS_DEFAULT_OBJECT(comm_data)
static void preinit();
@@ -135,6 +147,61 @@ struct comm_data : base<comm_data, void>
MPI_Datatype recvtype, MPI_Comm);
#endif
// UCX communication tracking
// ucp_tag_send_nbx - send with tag matching (5 params: ep, buffer, count, tag, param)
static void audit(const gotcha_data& _data, audit::incoming, void*, const void*,
size_t count, uint64_t tag, const void*);
// ucp_tag_recv_nbx - receive with tag matching (6 params: worker, buffer, count, tag,
// tag_mask, param)
static void audit(const gotcha_data& _data, audit::incoming, void*, void*,
size_t count, uint64_t tag, uint64_t tag_mask, const void*);
// ucp_put_nbx - RMA put operation (6 params: ep, buffer, count, remote_addr, rkey,
// param)
static void audit(const gotcha_data& _data, audit::incoming, void*, const void*,
size_t count, uint64_t remote_addr, void* rkey, const void*);
// ucp_get_nbx - RMA get operation (6 params: ep, buffer, count, remote_addr, rkey,
// param)
static void audit(const gotcha_data& _data, audit::incoming, void*, void*,
size_t count, uint64_t remote_addr, void* rkey, const void*);
// ucp_am_send_nbx - active message send (7 params: ep, id, header, header_length,
// buffer, count, param)
static void audit(const gotcha_data& _data, audit::incoming, void*, unsigned id,
const void* header, size_t header_length, const void* buffer,
size_t count, const void*);
// ucp_stream_send_nbx - stream send (4 params: ep, buffer, count, param)
static void audit(const gotcha_data& _data, audit::incoming, void*, const void*,
size_t count, const void*);
// ucp_stream_recv_nbx - stream receive (5 params: ep, buffer, count, length, param)
static void audit(const gotcha_data& _data, audit::incoming, void*, void*,
size_t count, size_t* length, const void*);
// Legacy UCX functions (kept for compatibility)
// ucp_tag_send_nb/nbx - send with tag matching
static void audit(const gotcha_data& _data, audit::incoming, void*, size_t count,
void*, void*, void*);
// ucp_tag_recv_nb/nbx - receive with tag matching
static void audit(const gotcha_data& _data, audit::incoming, void*, size_t count,
void*, void*, void*, void*, void*);
// ucp_put/get operations - RMA (legacy)
static void audit(const gotcha_data& _data, audit::incoming, void*, size_t length,
uint64_t, void*, void*);
// ucp_am_send_nb/nbx - active message send (legacy)
static void audit(const gotcha_data& _data, audit::incoming, void*, unsigned, void*,
size_t, void*, size_t, unsigned, void*);
// ucp_stream_send/recv operations (legacy)
static void audit(const gotcha_data& _data, audit::incoming, void*, void*,
size_t count, void*, unsigned, void*);
private:
static auto& add(tracker_t& _t, data_type value)
{
@@ -0,0 +1,374 @@
// MIT License
//
// Copyright (c) 2022-2025 Advanced Micro Devices, Inc. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#include "library/components/ucx_gotcha.hpp"
#include "core/common.hpp"
#include "core/config.hpp"
#include "core/state.hpp"
#include "core/timemory.hpp"
#include "library/components/category_region.hpp"
#include "library/runtime.hpp"
#include <timemory/backends/threading.hpp>
#include <timemory/components/macros.hpp>
#include <timemory/mpl/concepts.hpp>
#include <timemory/utility/types.hpp>
#include <cstddef>
#include <cstdlib>
namespace rocprofsys
{
namespace component
{
namespace
{
auto&
get_ucx_gotcha()
{
static auto _v = tim::lightweight_tuple<ucx_gotcha_t>{};
return _v;
}
} // namespace
void
ucx_gotcha::configure()
{
// don't emit warnings for missing UCX functions unless debug or verbosity >= 3
if(get_verbose_env() < 3 && !get_debug_env())
{
for(size_t i = 0; i < ucx_gotcha_t::capacity(); ++i)
{
auto* itr = ucx_gotcha_t::at(i);
if(itr) itr->verbose = -1;
}
}
ucx_gotcha_t::get_initializer() = []() {
// Active Message
ucx_gotcha_t::configure<0, void*, void*, unsigned, void*, size_t, void*, size_t,
unsigned, void*>("ucp_am_send_nb");
ucx_gotcha_t::configure<1, void*, void*, unsigned, const void*, size_t,
const void*, size_t, const void*>("ucp_am_send_nbx");
ucx_gotcha_t::configure<2, void*, void*, void*, size_t, void*>(
"ucp_am_recv_data_nbx");
ucx_gotcha_t::configure<3, void, void*, void*>("ucp_am_data_release");
// Atomic operations
ucx_gotcha_t::configure<4, void*, void*, uint32_t, uint64_t, void*>(
"ucp_atomic_add32");
ucx_gotcha_t::configure<5, void*, void*, uint64_t, uint64_t, void*>(
"ucp_atomic_add64");
ucx_gotcha_t::configure<6, void*, void*, uint32_t, uint32_t, uint64_t, void*>(
"ucp_atomic_cswap32");
ucx_gotcha_t::configure<7, void*, void*, uint64_t, uint64_t, uint64_t, void*>(
"ucp_atomic_cswap64");
ucx_gotcha_t::configure<8, void*, void*, uint32_t, uint64_t, void*, void*>(
"ucp_atomic_fadd32");
ucx_gotcha_t::configure<9, void*, void*, uint64_t, uint64_t, void*, void*>(
"ucp_atomic_fadd64");
ucx_gotcha_t::configure<10, void*, void*, uint32_t, uint64_t, void*, void*>(
"ucp_atomic_swap32");
ucx_gotcha_t::configure<11, void*, void*, uint64_t, uint64_t, void*, void*>(
"ucp_atomic_swap64");
ucx_gotcha_t::configure<12, int, void*, int, uint64_t, const void*, size_t,
void*>("ucp_atomic_post");
ucx_gotcha_t::configure<13, void*, void*, int, uint64_t, void*, size_t, void*,
void*>("ucp_atomic_fetch_nb");
ucx_gotcha_t::configure<14, void*, void*, unsigned, void*, void*, size_t,
uint64_t, void*>("ucp_atomic_op_nbx");
// Cleanup and config
ucx_gotcha_t::configure<15, void, void*>("ucp_cleanup");
ucx_gotcha_t::configure<16, int, void*, const char*, const char*, const char*>(
"ucp_config_modify");
ucx_gotcha_t::configure<17, int, const char*, const char*, void**>(
"ucp_config_read");
ucx_gotcha_t::configure<18, void, void*>("ucp_config_release");
// Connection management
ucx_gotcha_t::configure<19, void*, void*, unsigned>("ucp_disconnect_nb");
// Datatype
ucx_gotcha_t::configure<20, int, void*, void**>("ucp_dt_create_generic");
ucx_gotcha_t::configure<21, void, void*>("ucp_dt_destroy");
// Endpoint
ucx_gotcha_t::configure<22, int, void*, const void*, void**>("ucp_ep_create");
ucx_gotcha_t::configure<23, void, void*>("ucp_ep_destroy");
ucx_gotcha_t::configure<24, void*, void*, const void*>("ucp_ep_modify_nb");
ucx_gotcha_t::configure<25, void*, void*, const void*>("ucp_ep_close_nbx");
ucx_gotcha_t::configure<26, int, void*>("ucp_ep_flush");
ucx_gotcha_t::configure<27, void*, void*, unsigned, void*>("ucp_ep_flush_nb");
ucx_gotcha_t::configure<28, void*, void*, const void*>("ucp_ep_flush_nbx");
// Listener
ucx_gotcha_t::configure<29, int, void*, const void*, void**>(
"ucp_listener_create");
ucx_gotcha_t::configure<30, void, void*>("ucp_listener_destroy");
ucx_gotcha_t::configure<31, int, void*, void*>("ucp_listener_query");
ucx_gotcha_t::configure<32, int, void*, void*>("ucp_listener_reject");
// Memory
ucx_gotcha_t::configure<33, int, void*, void*, size_t, int>("ucp_mem_advise");
ucx_gotcha_t::configure<34, int, void*, const void*, void**>("ucp_mem_map");
ucx_gotcha_t::configure<35, int, void*, void*>("ucp_mem_unmap");
ucx_gotcha_t::configure<36, int, void*, void*>("ucp_mem_query");
// Put/Get operations
ucx_gotcha_t::configure<37, int, void*, const void*, size_t, uint64_t, void*>(
"ucp_put");
ucx_gotcha_t::configure<38, int, void*, void*, size_t, uint64_t, void*>(
"ucp_get");
ucx_gotcha_t::configure<39, int, void*, const void*, size_t, uint64_t, void*>(
"ucp_put_nbi");
ucx_gotcha_t::configure<40, int, void*, void*, size_t, uint64_t, void*>(
"ucp_get_nbi");
ucx_gotcha_t::configure<41, void*, void*, const void*, size_t, uint64_t, void*,
void*>("ucp_put_nb");
ucx_gotcha_t::configure<42, void*, void*, void*, size_t, uint64_t, void*, void*>(
"ucp_get_nb");
ucx_gotcha_t::configure<43, void*, void*, const void*, size_t, uint64_t, void*,
const void*>("ucp_put_nbx");
ucx_gotcha_t::configure<44, void*, void*, void*, size_t, uint64_t, void*,
const void*>("ucp_get_nbx");
// Request
ucx_gotcha_t::configure<45, void*, void*>("ucp_request_alloc");
ucx_gotcha_t::configure<46, void, void*, void*>("ucp_request_cancel");
ucx_gotcha_t::configure<47, int, void*>("ucp_request_is_completed");
// Remote key
ucx_gotcha_t::configure<48, void, void*>("ucp_rkey_buffer_release");
ucx_gotcha_t::configure<49, void, void*>("ucp_rkey_destroy");
ucx_gotcha_t::configure<50, int, void*, void*, void**, size_t*>("ucp_rkey_pack");
ucx_gotcha_t::configure<51, int, void*, void*, void**>("ucp_rkey_ptr");
// Stream
ucx_gotcha_t::configure<52, void, void*, void*>("ucp_stream_data_release");
ucx_gotcha_t::configure<53, void*, void*, void*, size_t, size_t*, unsigned,
void*>("ucp_stream_recv_data_nb");
ucx_gotcha_t::configure<54, void*, void*, const void*, size_t, void*>(
"ucp_stream_send_nb");
ucx_gotcha_t::configure<55, void*, void*, void*, size_t, size_t*, void*>(
"ucp_stream_recv_nb");
ucx_gotcha_t::configure<56, void*, void*, const void*, size_t, const void*>(
"ucp_stream_send_nbx");
ucx_gotcha_t::configure<57, void*, void*, void*, size_t, size_t*, const void*>(
"ucp_stream_recv_nbx");
ucx_gotcha_t::configure<58, void*, void*>("ucp_stream_worker_poll");
// Tag matching
ucx_gotcha_t::configure<59, void*, void*, void*, void*, size_t, void*, void*>(
"ucp_tag_msg_recv_nb");
ucx_gotcha_t::configure<60, void*, void*, void*, void*, size_t, const void*>(
"ucp_tag_msg_recv_nbx");
ucx_gotcha_t::configure<61, void*, void*, const void*, size_t, void*, void*>(
"ucp_tag_send_nbr");
ucx_gotcha_t::configure<62, void*, void*, void*, size_t, void*, void*, void*>(
"ucp_tag_recv_nbr");
ucx_gotcha_t::configure<63, void*, void*, const void*, size_t, void*, void*>(
"ucp_tag_send_nb");
ucx_gotcha_t::configure<64, void*, void*, void*, size_t, void*, void*, void*>(
"ucp_tag_recv_nb");
ucx_gotcha_t::configure<65, void*, void*, const void*, size_t, uint64_t,
const void*>("ucp_tag_send_nbx");
ucx_gotcha_t::configure<66, void*, void*, void*, size_t, uint64_t, uint64_t,
const void*>("ucp_tag_recv_nbx");
ucx_gotcha_t::configure<67, void*, void*, const void*, size_t, uint64_t, void*>(
"ucp_tag_send_sync_nb");
ucx_gotcha_t::configure<68, void*, void*, const void*, size_t, uint64_t,
const void*>("ucp_tag_send_sync_nbx");
// Worker
ucx_gotcha_t::configure<69, int, void*, const void*, void**>("ucp_worker_create");
ucx_gotcha_t::configure<70, void, void*>("ucp_worker_destroy");
ucx_gotcha_t::configure<71, int, void*, void**, size_t*>(
"ucp_worker_get_address");
ucx_gotcha_t::configure<72, int, void*, int*>("ucp_worker_get_efd");
ucx_gotcha_t::configure<73, int, void*>("ucp_worker_arm");
ucx_gotcha_t::configure<74, int, void*>("ucp_worker_fence");
ucx_gotcha_t::configure<75, int, void*>("ucp_worker_wait");
ucx_gotcha_t::configure<76, int, void*>("ucp_worker_signal");
ucx_gotcha_t::configure<77, int, void*, void*, size_t, void*>(
"ucp_worker_wait_mem");
ucx_gotcha_t::configure<78, int, void*>("ucp_worker_flush");
ucx_gotcha_t::configure<79, void*, void*, unsigned, void*>("ucp_worker_flush_nb");
ucx_gotcha_t::configure<80, void*, void*, unsigned, void*>(
"ucp_worker_flush_nbx");
ucx_gotcha_t::configure<81, int, void*, unsigned, void*, void*, void*>(
"ucp_worker_set_am_handler");
ucx_gotcha_t::configure<82, int, void*, const void*>(
"ucp_worker_set_am_recv_handler");
ucx_gotcha_t::configure<83, unsigned, void*>("ucp_worker_progress");
// UCT Active Message (low-level transport)
ucx_gotcha_t::configure<84, ssize_t, void*, unsigned, void*, void*>(
"uct_ep_am_bcopy");
ucx_gotcha_t::configure<85, ssize_t, void*, unsigned, const void*, unsigned,
const void*, size_t, void*>("uct_ep_am_zcopy");
ucx_gotcha_t::configure<86, ssize_t, void*, unsigned, uint64_t, const void*,
unsigned>("uct_ep_am_short");
ucx_gotcha_t::configure<87, unsigned, void*>("uct_iface_progress");
ucx_gotcha_t::configure<88, int, void*, unsigned, void*, void*, unsigned>(
"uct_iface_set_am_handler");
// Legacy UCX function variants that might be used on older systems
ucx_gotcha_t::configure<89, void*, void*, const void*, size_t, void*>(
"ucp_tag_send");
ucx_gotcha_t::configure<90, void*, void*, void*, size_t, void*, void*>(
"ucp_tag_recv");
ucx_gotcha_t::configure<91, void*, void*, const void*, size_t, int, int, void*>(
"ucp_send");
ucx_gotcha_t::configure<92, void*, void*, void*, size_t, int, int, void*>(
"ucp_recv");
};
}
void
ucx_gotcha::shutdown()
{
ucx_gotcha_t::disable();
}
void
ucx_gotcha::start()
{
if(!get_ucx_gotcha().get<ucx_gotcha_t>()->get_is_running())
{
configure();
get_ucx_gotcha().start();
}
}
void
ucx_gotcha::stop()
{}
// Generic audit functions now handled by template in header
// Specific audit functions for tag operations
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, const void* arg2,
size_t arg3, uint64_t arg4, const void* arg5)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id }, "ep", arg1,
"buffer", arg2, "count", arg3, "tag", arg4,
"param", arg5);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4, arg5);
}
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, void* arg2,
size_t arg3, uint64_t arg4, uint64_t arg5, const void* arg6)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id }, "worker",
arg1, "buffer", arg2, "count", arg3, "tag",
arg4, "tag_mask", arg5, "param", arg6);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4, arg5, arg6);
}
// RMA operations
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, const void* arg2,
size_t arg3, uint64_t arg4, void* arg5, const void* arg6)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id }, "ep", arg1,
"buffer", arg2, "count", arg3, "remote_addr",
arg4, "rkey", arg5, "param", arg6);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4, arg5, arg6);
}
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, void* arg2,
size_t arg3, uint64_t arg4, void* arg5, const void* arg6)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id }, "ep", arg1,
"buffer", arg2, "count", arg3, "remote_addr",
arg4, "rkey", arg5, "param", arg6);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4, arg5, arg6);
}
// Active message send
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, unsigned arg2,
const void* arg3, size_t arg4, const void* arg5, size_t arg6,
const void* arg7)
{
category_region<category::ucx>::start(
std::string_view{ _data.tool_id }, "ep", arg1, "id", arg2, "header", arg3,
"header_length", arg4, "buffer", arg5, "count", arg6, "param", arg7);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
}
// Stream operations
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, const void* arg2,
size_t arg3, const void* arg4)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id }, "ep", arg1,
"buffer", arg2, "count", arg3, "param", arg4);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4);
}
void
ucx_gotcha::audit(const gotcha_data& _data, audit::incoming, void* arg1, void* arg2,
size_t arg3, size_t* arg4, const void* arg5)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id }, "ep", arg1,
"buffer", arg2, "count", arg3, "length", arg4,
"param", arg5);
// Also trigger communication data tracking
comm_data::audit(_data, audit::incoming{}, arg1, arg2, arg3, arg4, arg5);
}
void
ucx_gotcha::audit(const gotcha_data& _data, audit::outgoing, void* ret)
{
category_region<category::ucx>::stop(std::string_view{ _data.tool_id }, "return",
ret);
}
void
ucx_gotcha::audit(const gotcha_data& _data, audit::outgoing, int ret)
{
category_region<category::ucx>::stop(std::string_view{ _data.tool_id }, "return",
ret);
}
} // namespace component
} // namespace rocprofsys
TIMEMORY_STORAGE_INITIALIZER(rocprofsys::component::ucx_gotcha)
@@ -0,0 +1,114 @@
// MIT License
//
// Copyright (c) 2022-2025 Advanced Micro Devices, Inc. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#pragma once
#include "core/common.hpp"
#include "core/defines.hpp"
#include "core/timemory.hpp"
#include "library/components/comm_data.hpp"
#include <timemory/components/base.hpp>
#include <timemory/components/gotcha/backends.hpp>
#include <cstdint>
#include <cstdlib>
#include <string>
#include <utility>
namespace rocprofsys
{
namespace component
{
struct ucx_gotcha : tim::component::base<ucx_gotcha, void>
{
static constexpr size_t gotcha_capacity = 100;
using gotcha_data = tim::component::gotcha_data;
ROCPROFSYS_DEFAULT_OBJECT(ucx_gotcha)
// string id for component
static std::string label() { return "ucx_gotcha"; }
// generate the gotcha wrappers
static void configure();
static void shutdown();
static void start();
static void stop();
// Generic template audit function for UCX operations with void* parameters
template <typename... Args>
static void audit(const gotcha_data& _data, audit::incoming, Args...)
{
category_region<category::ucx>::start(std::string_view{ _data.tool_id });
}
public:
// Specific audit functions for tag operations (with uint64_t tags)
// ucp_tag_send_nbx: (void* ep, const void* buffer, size_t count, uint64_t tag, const
// void* param)
static void audit(const gotcha_data&, audit::incoming, void*, const void*, size_t,
uint64_t, const void*);
// ucp_tag_recv_nbx: (void* worker, void* buffer, size_t count, uint64_t tag, uint64_t
// tag_mask, const void* param)
static void audit(const gotcha_data&, audit::incoming, void*, void*, size_t, uint64_t,
uint64_t, const void*);
// RMA operations
// ucp_put_nbx: (void* ep, const void* buffer, size_t count, uint64_t remote_addr,
// void* rkey, const void* param)
static void audit(const gotcha_data&, audit::incoming, void*, const void*, size_t,
uint64_t, void*, const void*);
// ucp_get_nbx: (void* ep, void* buffer, size_t count, uint64_t remote_addr, void*
// rkey, const void* param)
static void audit(const gotcha_data&, audit::incoming, void*, void*, size_t, uint64_t,
void*, const void*);
// Active message send
// ucp_am_send_nbx: (void* ep, unsigned id, const void* header, size_t header_length,
// const void* buffer, size_t count, const void* param)
static void audit(const gotcha_data&, audit::incoming, void*, unsigned, const void*,
size_t, const void*, size_t, const void*);
// Stream operations
// ucp_stream_send_nbx: (void* ep, const void* buffer, size_t count, const void*
// param)
static void audit(const gotcha_data&, audit::incoming, void*, const void*, size_t,
const void*);
// ucp_stream_recv_nbx: (void* ep, void* buffer, size_t count, size_t* length, const
// void* param)
static void audit(const gotcha_data&, audit::incoming, void*, void*, size_t, size_t*,
const void*);
// Outgoing audit for return values
static void audit(const gotcha_data&, audit::outgoing, void*);
static void audit(const gotcha_data&, audit::outgoing, int);
};
} // namespace component
using ucx_bundle_t =
tim::component_bundle<category::ucx, component::ucx_gotcha, component::comm_data>;
using ucx_gotcha_t = tim::component::gotcha<component::ucx_gotcha::gotcha_capacity,
ucx_bundle_t, category::ucx>;
} // namespace rocprofsys
تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است Diff را بارگزاری کن
تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است Diff را بارگزاری کن
تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است Diff را بارگزاری کن
@@ -35,6 +35,7 @@ include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-pthread-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-rocm-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-user-api-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-mpi-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-ucx-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-kokkos-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-openmp-tests.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-code-coverage-tests.cmake)
@@ -62,9 +63,12 @@ include(${CMAKE_CURRENT_LIST_DIR}/rocprof-sys-thread-limit-tests.cmake)
#
# -------------------------------------------------------------------------------------- #
#delete temp files created by rocprofiler-sys tests in /tmp owned by the current user. Always return success.
add_test(
NAME rocprofsys-cleanup-tmp-files
COMMAND sh -c "rm -f /tmp/buffered_storage*.bin /tmp/metadata*.json"
COMMAND
sh -c
"find /tmp -maxdepth 1 -user $(whoami) \\( -name 'buffered_storage*.bin' -o -name 'metadata*.json' \\) -delete 2>/dev/null || true"
WORKING_DIRECTORY ${PROJECT_BINARY_DIR}
)
@@ -0,0 +1,264 @@
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# -------------------------------------------------------------------------------------- #
#
# UCX tests - MPI examples with UCX transport
#
# -------------------------------------------------------------------------------------- #
# UCX tests require MPI examples since UCX is MPI's transport layer
if(NOT ROCPROFSYS_USE_MPI AND NOT ROCPROFSYS_USE_MPI_HEADERS)
return()
endif()
# Detect MPI implementation by checking include paths
set(_DETECTED_MPI_IMPL "unknown")
if("${MPI_C_COMPILER_INCLUDE_DIRS};${MPI_C_HEADER_DIR}" MATCHES "openmpi")
set(_DETECTED_MPI_IMPL "openmpi")
elseif("${MPI_C_COMPILER_INCLUDE_DIRS};${MPI_C_HEADER_DIR}" MATCHES "mpich")
set(_DETECTED_MPI_IMPL "mpich")
endif()
# Only proceed if OpenMPI is detected
if(NOT "${_DETECTED_MPI_IMPL}" STREQUAL "openmpi")
message(
STATUS
"Skipping UCX tests - requires OpenMPI (detected: ${_DETECTED_MPI_IMPL}). UCX tests use OpenMPI-specific environment variables (OMPI_MCA_*)."
)
return()
endif()
# Force OpenMPI to use UCX transport via environment variables
set(_ucxp_mpi_environment
"OMPI_MCA_pml=ucx" # Use UCX point-to-point messaging layer
"OMPI_MCA_osc=ucx" # Use UCX one-sided communications
"OMPI_MCA_pml_ucx_tls=tcp,self" # Force TCP and self (not sysv/posix/cma which bypass UCX functions)
"OMPI_MCA_pml_ucx_devices=any" # Accept any device (not just InfiniBand/Mellanox)
"OMPI_MCA_btl=^vader,sm" # Disable shared memory BTLs to force communication through UCX
"UCX_TLS=tcp,self" # Tell UCX to use TCP for inter-process, self for intra-process
"OMPI_MCA_pml_base_verbose=100" # Show which PML is selected
"UCX_LOG_LEVEL=info" # Enable UCX logging to show transport usage
)
# Base environment for UCX tests
set(_ucx_base_environment
"${_base_environment}"
"ROCPROFSYS_USE_UCX=ON"
"ROCPROFSYS_DEBUG=OFF"
"ROCPROFSYS_VERBOSE=2"
"ROCPROFSYS_DL_VERBOSE=2"
"${_ucxp_mpi_environment}"
)
# First test: UCX availability check using mpi-example (basic test)
# This test checks if UCX is available. If not, subsequent UCX tests will be marked as skipped.
rocprofiler_systems_add_test(
SKIP_BASELINE SKIP_RUNTIME SKIP_REWRITE SKIP_SYS_RUN
NAME "ucx-availability-check"
TARGET mpi-example
MPI ON
NUM_PROCS 2
LABELS "ucx;availability"
REWRITE_ARGS
-e
-v
2
--label
file
line
return
args
--min-instructions
0
ENVIRONMENT "${_ucx_base_environment};ROCPROFSYS_VERBOSE=1"
REWRITE_RUN_PASS_REGEX
"UCX.*configured|ucp_|uct_|UCX transport|pml.*ucx"
REWRITE_RUN_FAIL_REGEX
"PML ucx cannot be selected|UCX is not available|No UCX support found|Failed to select|ROCPROFSYS_ABORT_FAIL_REGEX"
REWRITE_RUN_SKIP_REGEX
"PML ucx cannot be selected|UCX is not available|No UCX support found|Failed to select"
)
# Enhanced UCX environment with more detailed logging
set(_ucx_environment
"${_base_environment}"
"ROCPROFSYS_USE_UCX=ON"
"ROCPROFSYS_DEBUG=ON"
"ROCPROFSYS_VERBOSE=3"
"ROCPROFSYS_DL_VERBOSE=3"
"ROCPROFSYS_PERFETTO_BACKEND=inprocess"
"ROCPROFSYS_PERFETTO_FILL_POLICY=ring_buffer"
"ROCPROFSYS_USE_PID=OFF"
"ROCPROFSYS_MPI_INIT=OFF"
"${_ucxp_mpi_environment}"
)
# Debug environment - extra verbose for troubleshooting CI issues
set(_ucx_debug_environment
"${_ucx_environment}"
"UCX_LOG_LEVEL=debug" # Maximum UCX logging
"OMPI_MCA_mpi_show_mca_params=all" # Show all MCA parameters
)
# UCX perfetto trace test
rocprofiler_systems_add_test(
SKIP_RUNTIME
NAME "ucx-perfetto"
TARGET mpi-example
MPI ON
NUM_PROCS 2
LABELS "ucx;perfetto"
REWRITE_ARGS
-e
-v
2
--label
file
line
--min-instructions
0
ENVIRONMENT "${_ucx_environment};ROCPROFSYS_VERBOSE=1;ROCPROFSYS_TRACE_LEGACY=ON;ROCPROFSYS_PERFETTO_COMBINE_TRACES=ON"
REWRITE_RUN_PASS_REGEX
"Successfully executed: .+rocprof-sys-merge-output.sh.*"
REWRITE_RUN_FAIL_REGEX
"Script not found|Failed to execute|ROCPROFSYS_ABORT_FAIL_REGEX"
SYS_RUN_PASS_REGEX
"ucp_tag_send|ucp_tag_recv|UCX.*configured|Using UCX|pml.*ucx"
)
# Validation test for UCX perfetto trace to ensure communication tracks are present
rocprofiler_systems_add_validation_test(
NAME ucx-perfetto-sys-run
PERFETTO_METRIC "ucx"
PERFETTO_FILE "merged.proto"
LABELS "ucx;perfetto"
ARGS --counter-names "UCX Comm Recv" "UCX Comm Send" -p
)
# Test all MPI example binaries with UCX transport
foreach(
_UCX_EXAMPLE
all2all
allgather
allreduce
scatter-gather
send-recv
)
rocprofiler_systems_add_test(
SKIP_BASELINE SKIP_RUNTIME SKIP_SAMPLING
NAME "ucx-${_UCX_EXAMPLE}"
TARGET mpi-${_UCX_EXAMPLE}
MPI ON
NUM_PROCS 2
LABELS "ucx"
REWRITE_ARGS -e -v 2 --label file line --min-instructions 0
RUN_ARGS 30
ENVIRONMENT "${_ucx_environment};ROCPROFSYS_VERBOSE=1;ROCPROFSYS_TRACE_LEGACY=ON;ROCPROFSYS_PERFETTO_COMBINE_TRACES=ON"
REWRITE_RUN_PASS_REGEX
"UCX.*trace|ucp_.*trace|Category.*ucx|UCX function.*called"
SYS_RUN_PASS_REGEX
"ucp_tag_send|ucp_tag_recv|write_perfetto_counter_track.*ucx"
)
# Add validation test to check for UCX communication tracks and bytes
rocprofiler_systems_add_validation_test(
NAME ucx-${_UCX_EXAMPLE}-sys-run
PERFETTO_METRIC "ucx"
PERFETTO_FILE "merged.proto"
LABELS "ucx"
ARGS --counter-names "UCX Comm Recv" "UCX Comm Send" -p
)
endforeach()
# UCX with MPIP integration test
rocprofiler_systems_add_test(
SKIP_RUNTIME
NAME "ucx-mpip-integration"
TARGET mpi-all2all
MPI ON
NUM_PROCS 2
LABELS "ucx;mpip"
REWRITE_ARGS
-e
-v
2
--label
file
line
args
--min-instructions
0
ENVIRONMENT
"${_ucx_environment};ROCPROFSYS_USE_MPIP=ON"
RUN_ARGS 30
REWRITE_RUN_PASS_REGEX
"UCX.*trace.*MPI.*trace|ucp_.*MPI_|Category.*ucx.*Category.*mpi"
)
# UCX with different message sizes
foreach(_MSG_SIZE 1024 4096 16384)
rocprofiler_systems_add_test(
SKIP_BASELINE SKIP_RUNTIME
NAME "ucx-bcast-${_MSG_SIZE}"
TARGET mpi-bcast
MPI ON
NUM_PROCS 2
LABELS "ucx;bcast"
REWRITE_ARGS
-e
-v
2
--label
file
line
--min-instructions
0
ENVIRONMENT "${_ucx_environment}"
RUN_ARGS ${_MSG_SIZE}
REWRITE_RUN_PASS_REGEX
"UCX.*trace|ucp_.*send|ucp_.*recv|Category.*ucx"
)
endforeach()
# Test UCX active message functionality
rocprofiler_systems_add_test(
SKIP_BASELINE SKIP_RUNTIME
NAME "ucx-active-messages"
TARGET mpi-allreduce
MPI ON
NUM_PROCS 2
LABELS "ucx;am"
REWRITE_ARGS
-e
-v
2
--label
file
line
--min-instructions
0
ENVIRONMENT "${_ucx_environment};OMPI_MCA_btl=^vader,tcp,openib,uct"
RUN_ARGS 64
REWRITE_RUN_PASS_REGEX
"ucp_am_send|ucp_am_recv|uct_ep_am|Active.*Message"
)