Files
rocm-systems/ext-profiler/inspector/inspector.cc
T
Mark Santesson f1308997d0 NCCL 2.28.3-1
Device API (Experimental)
 * Introduces device-side APIs to integrate NCCL communication directly into application kernels.
 * Supports LSA (Load/Store Access) for CUDA P2P communication over NVLink and some PCIe platforms.
 * Supports Multimem for hardware multicast using NVLink SHARP.
 * Adds initial framework for GIN (GPU-Initiated Networking), currently under development.
 * Introduces device communicators created using ncclDevCommCreate.
 * Enables device-side communication operations with synchronization (ncclLsaBarrierSession) and memory accessors (ncclGetLsaPointer, ncclGetLsaMultimemPointer).
 * Experimental APIs - signatures and functionality may evolve in future releases.
 * No ABI compatibility is guaranteed — applications must be recompiled with each new NCCL release.

Symmetric memory improvements
 * Support for aggregating symmetric operations using ncclGroupStart/End APIs.
 * Reimplement symmetric kernels using device API.

New Host APIs
 * Introduce new host collective APIs: ncclAlltoAll, ncclScatter, ncclGather.

CE (Copy Engine) Collectives
 * Reduce SM utilization for alltoall, scatter, gather, and allgather within a single (MN)NVL domain.
 * Free up SM capacity for the application to do computation at the same time.
 * To enable the feature for ncclAllGather, ncclAlltoAll, ncclGather, ncclScatter, register buffers into symmetric windows and use the NCCL_CTA_POLICY_ZERO flag in the communicator config_t.

NCCL Inspector Plugin
 * Introduces an Inspector plugin for always-on performance monitoring.
 * Produces structured JSON output with metadata, execution time, bandwidth, and optional event traces for each NCCL operation.
 * Enables integration with analysis tools such as Performance Exporter to visualize NCCL performance bottlenecks.
 * Lightweight to enable via environment variables NCCL_PROFILER_PLUGIN and NCCL_INSPECTOR_ENABLE.

CMake support (Experiemental)
 * Adds a CMake build system as an alternative to existing Makefiles.
 * Known issues: pkg.build and Device API currently do not work with CMake.
 * The known issues will be addressed in a future release.

Decreased max CTA count from 32 to 16 on Blackwell
 * SM overhead is decreased by 50% with this improvement.
 * This may cause some perf drop on Blackwell because of the reduced SM usage.
 * If the extra SM capacity is not desired, two options are available to restore to previous behavior: 1) Setting NCCL_MIN_CTAS=32 NCCL_MAX_CTAS=32 environment variables; 2) setting communicator config to over-write max CTA count to 32.
 * Based on community feedback, future versions may consider different trade-offs between performance and SM overhead.

Plugins
 * Network
   * App-aware Network plugin. NCCL passes information about communication operations to be executed on the network end point. This allows for better tuning of network end points and their use in the plugins.
   * Improve handling of physical and virtual network devices and load/unload.
   * Network plugin version 11 - add explicit context and communication ID support for per communicator init/finalize.
   * Add Multi-Request Net API. Using this will help NCCL to anticipate multiple send/recv requests and optimize for it. See maxMultiRequestSize field in ncclNetProperties_v11_t.
 * Profiler
   * Add support for API events (group, collective, and p2p) and for tracking kernel launches in the profiler plugin.
   * Add Inspector Profiler Plugin (see section above).
   * Add a hook to Google’s CoMMA profiler on github.
 * Tuner
   * Expose NCCL tuning constants at tuner initialization via ncclTunerConstants_v5_t.
   * Add NVL Domain Information API.
 * Support multiple plugin types from a single shared object.

New Parameterization and ncclConfig changes:
 * Add new option NCCL_MNNVL_CLIQUE_ID=-2 which will use rack serial number to partition the MNNVL clique. This will limit NVLink domains to GPUs within a single rack.
 * Add NCCL_NETDEVS_POLICY to control how NET devices are assigned to GPUs. The default (AUTO) is the policy used in previous versions.
 * Add NCCL_SINGLE_PROC_MEM_REG_ENABLE control variable to enable NVLS UB registration in the “one process, multiple ranks” case as opt in.
 * Move nChannelsPerNetPeer into ncclConfig. NCCL_NCHANNELS_PER_NET_PEER can override the value in ncclConfig.
 * Enable PxN over C2C by default
   * PxN over C2C will improve performance for Grace-Blackwell platforms by allowing NCCL to leverage the NIC attached to a peer GPU over NVLINK, C2C, and PCIe.
   * This behavior can be overridden by setting NCCL_PXN_C2C=0.

Other Improvements:
 * Allow FP8 support for non-reductive operations on pre sm90 devices. (See https://github.com/pytorch/pytorch/pull/151594#discussion_r2135777776)
 * Fix NVLS+CollNet and temporarily disables COLLNET_CHAIN for >8 GPUs.
 * Only consider running interfaces for socket traffic. NCCL will not attempt to use interfaces that do not have the IFF_RUNNING bit. (https://github.com/NVIDIA/nccl/issues/1798)
 * Modernize mutex management. Convert to std::mutex and std::lock_guard.
 * Remove sm35 and sm50 GENCODE targets which have long been deprecated and were causing issues with the latest NCCL release builds.
 * Improved NVLS/NVLSTree tuning prediction to improve algorithm and protocol selection.
 * NVLSTree Tuning Fixes. Update tuning data for H100, GB200-NV72.
 * Respond better to RoCE link flaps. Instead of reporting an “unknown event” it will now report “GID table changed”.
 * Move libvirt bridge interface to the end of possible interfaces so that they are considered last. These interfaces are usually virtual bridges to relay traffic to containers running on the host and cannot be used for traffic to a remote node and are therefore unsuitable.
2025-09-02 13:53:34 -07:00

1531 baris
47 KiB
C++

#include "inspector.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include "common.h"
#define JSON_CHK(expr) \
do { \
const jsonResult_t res = (expr); \
if (res != jsonSuccess) { \
INFO(NCCL_INSPECTOR, "jsonError: %s\n", jsonErrorString(res)); \
return inspectorJsonError; \
} \
} while (0)
#define INS_CHK(call) \
do { \
inspectorResult_t res = call; \
if (inspectorSuccess != res) { \
INFO(NCCL_INSPECTOR, "%s:%d -> error %d: %s", __FILE__, __LINE__, res, \
inspectorErrorString(res)); \
return res; \
} \
} while (0);
#define JSON_CHK_GOTO(expr, res, label) \
do { \
const jsonResult_t macro_res = (expr); \
if (macro_res != jsonSuccess) { \
INFO(NCCL_INSPECTOR, "jsonError: %s\n", jsonErrorString(macro_res)); \
res = inspectorJsonError; \
goto label; \
} \
} while (0)
#define INS_CUDA_CHK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
INFO(NCCL_INSPECTOR, "Cuda failure '%s'", cudaGetErrorString(err)); \
return inspectorCudaError; \
} \
} while (false)
// Global flag to control inspector use
static bool enableNcclInspector = false;
// Global flag to control starting internal dump thread
static bool enableNcclInspectorDumpThread = false;
// Global flag to control verbose dumping (event_trace)
static bool enableNcclInspectorDumpVerbose = false;
// Extra guard to prevent spurious messages for eager pollers that try to dump
// out results before we have initialized
static bool ncclInspectorInit = false;
// Define the global logFn variable
ncclDebugLogger_t logFn = nullptr;
/*
* Description:
*
* Returns the current time in microseconds since the epoch.
*
* Thread Safety:
*
* Thread-safe (uses gettimeofday).
*
* Input:
*
* None.
*
* Output:
*
* None.
*
* Return:
* uint64_t - current time in microseconds.
*
* Error Handling:
* This function uses gettimeofday() which rarely fails. In case of
* failure, the function returns 0. Callers should check for 0 return
* value if precise error handling is required.
*
*/
uint64_t inspectorGetTime() {
uint64_t ts = 0;
timeval tv;
gettimeofday(&tv, 0);
ts = tv.tv_sec * 1000000 + tv.tv_usec;
return ts;
}
/*
* Description:
*
* Converts a string to the corresponding ncclDataType_t enum value.
*
* Thread Safety:
* Thread-safe (read-only string input).
*
* Input:
*
* const char* str - string representation of the datatype.
*
* Output:
*
* None.
*
* Return:
*
* ncclDataType_t - corresponding enum value, or -1 if unknown.
*
*/
ncclDataType_t inspectorStringToDatatype(const char* str) {
if (strcmp(str, "ncclInt8") == 0) return ncclInt8;
if (strcmp(str, "ncclInt32") == 0) return ncclInt32;
if (strcmp(str, "ncclUint32") == 0) return ncclUint32;
if (strcmp(str, "ncclInt64") == 0) return ncclInt64;
if (strcmp(str, "ncclUint64") == 0) return ncclUint64;
if (strcmp(str, "ncclFloat16") == 0) return ncclFloat16;
if (strcmp(str, "ncclFloat32") == 0) return ncclFloat32;
if (strcmp(str, "ncclFloat64") == 0) return ncclFloat64;
if (strcmp(str, "ncclBfloat16") == 0) return ncclBfloat16;
if (strcmp(str, "ncclFloat8e4m3") == 0) return ncclFloat8e4m3;
if (strcmp(str, "ncclFloat8e5m2") == 0) return ncclFloat8e5m2;
return (ncclDataType_t)-1; // Or handle error as appropriate
}
/*
* Description:
*
* Converts a string to the corresponding ncclFunc_t enum value.
*
* Thread Safety:
* Thread-safe (read-only string input).
*
* Input:
* const char* str - string representation of the function (must not be NULL).
*
* Output:
* None.
*
* Return:
* ncclFunc_t - corresponding enum value, or ncclNumFuncs if unknown.
*
* Preconditions:
* - str must not be NULL
*/
ncclFunc_t ncclStringToFunc(const char* str) {
if (strcmp(str, "AllGather") == 0) return ncclFuncAllGather;
if (strcmp(str, "AllReduce") == 0) return ncclFuncAllReduce;
if (strcmp(str, "Broadcast") == 0) return ncclFuncBroadcast;
if (strcmp(str, "Recv") == 0) return ncclFuncRecv;
if (strcmp(str, "Reduce") == 0) return ncclFuncReduce;
if (strcmp(str, "ReduceScatter") == 0) return ncclFuncReduceScatter;
if (strcmp(str, "SendRecv") == 0) return ncclFuncSendRecv;
if (strcmp(str, "Send") == 0) return ncclFuncSend;
return ncclNumFuncs; // Invalid / unknown
}
const char* ncclFuncToString(ncclFunc_t fn) {
switch (fn) {
case ncclFuncAllGather: return "AllGather";
case ncclFuncAllReduce: return "AllReduce";
case ncclFuncBroadcast: return "Broadcast";
case ncclFuncRecv: return "Recv";
case ncclFuncReduce: return "Reduce";
case ncclFuncReduceScatter: return "ReduceScatter";
case ncclFuncSendRecv: return "SendRecv";
case ncclFuncSend: return "Send";
default: return "Invalid";
}
}
struct inspectorDumpThread;
static inspectorDumpThread* dumper = nullptr;
#define UNUSED(x) (void)(x)
inspectorResult_t inspectorLockInit(pthread_rwlock_t* lockRef) {
if (0 != pthread_rwlock_init(lockRef, nullptr)) {
return inspectorLockError;
} else {
return inspectorSuccess;
}
}
inspectorResult_t inspectorLockDestroy(pthread_rwlock_t* lockRef) {
if (0 != pthread_rwlock_destroy(lockRef)) {
return inspectorLockError;
} else {
return inspectorSuccess;
}
}
inspectorResult_t inspectorLockRd(pthread_rwlock_t* lockRef) {
if (0 != pthread_rwlock_rdlock(lockRef)) {
return inspectorLockError;
} else {
return inspectorSuccess;
}
}
inspectorResult_t inspectorLockWr(pthread_rwlock_t* lockRef) {
if (0 != pthread_rwlock_wrlock(lockRef)) {
return inspectorLockError;
} else {
return inspectorSuccess;
}
}
inspectorResult_t inspectorUnlockRWLock(pthread_rwlock_t* lockRef) {
if (0 != pthread_rwlock_unlock(lockRef)) {
return inspectorLockError;
} else {
return inspectorSuccess;
}
}
// TODO inspect these retvals
#define INSPECTOR_LOCK_RD_FLAG(lockRef, lockFlag, debug) \
do { \
if (!lockFlag) { \
INS_CHK(inspectorLockRd(lockRef)); \
} \
lockFlag = true; \
} while (0);
#define INSPECTOR_LOCK_WR_FLAG(lockRef, lockFlag, debug) \
do { \
if (!lockFlag) { \
INS_CHK(inspectorLockWr(lockRef)); \
} \
lockFlag = true; \
} while (0);
#define INSPECTOR_UNLOCK_RW_LOCK_FLAG(lockRef, lockFlag, debug) \
do { \
if (lockFlag) { \
INS_CHK(inspectorUnlockRWLock(lockRef)); \
} \
lockFlag = false; \
} while (0);
struct inspectorCommInfoList {
struct inspectorCommInfo* comms;
uint32_t ncomms;
pthread_rwlock_t guard;
};
struct inspectorState {
struct inspectorCommInfoList liveComms;
struct inspectorCommInfoList deletedComms;
};
static inspectorState g_state;
static inspectorResult_t inspectorCommInfoListInit(struct inspectorCommInfoList* commList) {
if (commList->comms) {
return inspectorGlobalInitError;
}
commList->comms = nullptr;
commList->ncomms = 0;
INS_CHK(inspectorLockInit(&commList->guard));
return inspectorSuccess;
}
static inspectorResult_t inspectorGlobalStateInit() {
memset(&g_state, 0, sizeof(struct inspectorState));
INS_CHK(inspectorCommInfoListInit(&g_state.liveComms));
INS_CHK(inspectorCommInfoListInit(&g_state.deletedComms));
return inspectorSuccess;
}
/*
* Description:
*
* Converts inspectorTimingSource_t enum to a string representation.
*
* Thread Safety:
* Thread-safe (read-only operation).
*
* Input:
* inspectorTimingSource_t timingSource - timing source enum value.
*
* Output:
* None.
*
* Return:
* const char* - string representation of the timing source.
*/
static const char* inspectorTimingSourceToString(inspectorTimingSource_t timingSource) {
switch (timingSource) {
case inspectorTimingSourceKernelGpu:
return "kernel_gpu";
case inspectorTimingSourceKernelCpu:
return "kernel_cpu";
case inspectorTimingSourceCollectiveCpu:
return "collective_cpu";
default:
return "unknown";
}
}
/*
* Description:
*
* Writes the header information for a communicator to the JSON output.
*
* Thread Safety:
* Not thread-safe (should be called with proper locking).
*
* Input:
* jsonFileOutput* jfo - JSON output handle.
* struct inspectorCommInfo* commInfo - communicator info.
*
* Output:
* Header is written to JSON output.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inspectorResult_t inspectorCommInfoHeader(jsonFileOutput* jfo,
struct inspectorCommInfo* commInfo) {
JSON_CHK(jsonStartObject(jfo));
JSON_CHK(jsonKey(jfo, "id")); JSON_CHK(jsonStr(jfo, commInfo->commHashStr));
JSON_CHK(jsonKey(jfo, "rank")); JSON_CHK(jsonInt(jfo, commInfo->rank));
JSON_CHK(jsonKey(jfo, "n_ranks")); JSON_CHK(jsonInt(jfo, commInfo->nranks));
JSON_CHK(jsonKey(jfo, "nnodes")); JSON_CHK(jsonUint64(jfo, commInfo->nnodes));
JSON_CHK(jsonFinishObject(jfo));
return inspectorSuccess;
}
/*
* Description:
*
* Writes metadata header information to the JSON output.
*
* Thread Safety:
* Not thread-safe (should be called with proper locking).
*
* Input:
* jsonFileOutput* jfo - JSON output handle.
*
* Output:
* Metadata header is written to JSON output.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inspectorResult_t inspectorCommInfoMetaHeader(jsonFileOutput* jfo) {
JSON_CHK(jsonStartObject(jfo));
{
JSON_CHK(jsonKey(jfo, "inspector_output_format_version")); JSON_CHK(jsonStr(jfo, "v4.0"));
JSON_CHK(jsonKey(jfo, "git_rev")); JSON_CHK(jsonStr(jfo, get_git_version_info()));
JSON_CHK(jsonKey(jfo, "rec_mechanism")); JSON_CHK(jsonStr(jfo, "nccl_profiler_interface"));
JSON_CHK(jsonKey(jfo, "dump_timestamp_us")); JSON_CHK(jsonUint64(jfo, inspectorGetTime()));
char hostname[256];
gethostname(hostname, 255);
JSON_CHK(jsonKey(jfo, "hostname")); JSON_CHK(jsonStr(jfo, hostname));
JSON_CHK(jsonKey(jfo, "pid")); JSON_CHK(jsonUint64(jfo, getpid()));
}
JSON_CHK(jsonFinishObject(jfo));
return inspectorSuccess;
}
/*
* Description:
*
* Writes verbose information (event_trace) for a completed
* collective operation to the JSON output.
*
* Thread Safety:
* Not thread-safe (should be called with proper locking).
*
* Input:
* jsonFileOutput* jfo - JSON output handle.
* const struct inspectorCompletedCollInfo* collInfo - completed
* collective info.
*
* Output:
* Verbose collective info is written to JSON output.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inline inspectorResult_t inspectorCompletedCollVerbose(jsonFileOutput* jfo,
struct inspectorCompletedCollInfo* collInfo) {
// Add event trace information
JSON_CHK(jsonKey(jfo, "event_trace_sn"));
JSON_CHK(jsonStartObject(jfo));
{
// Collective events
JSON_CHK(jsonKey(jfo, "coll_start_sn")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.evntTrace[NCCL_INSP_EVT_TRK_COLL_START].sn));
JSON_CHK(jsonKey(jfo, "coll_stop_sn")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.evntTrace[NCCL_INSP_EVT_TRK_COLL_STOP].sn));
// Kernel events
JSON_CHK(jsonKey(jfo, "kernel_events"));
JSON_CHK(jsonStartList(jfo));
for (uint32_t ch = 0; ch < collInfo->collEvtTrk.nChannels; ch++) {
JSON_CHK(jsonStartObject(jfo));
JSON_CHK(jsonKey(jfo, "channel_id")); JSON_CHK(jsonInt(jfo, ch));
JSON_CHK(jsonKey(jfo, "kernel_start_sn")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.kernelCh[ch].evntTrace[NCCL_INSP_EVT_TRK_KERNEL_START].sn));
JSON_CHK(jsonKey(jfo, "kernel_stop_sn")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.kernelCh[ch].evntTrace[NCCL_INSP_EVT_TRK_KERNEL_STOP].sn));
JSON_CHK(jsonKey(jfo, "kernel_record_sn")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.kernelCh[ch].evntTrace[NCCL_INSP_EVT_TRK_KERNEL_RECORD].sn));
JSON_CHK(jsonFinishObject(jfo));
}
JSON_CHK(jsonFinishList(jfo));
}
JSON_CHK(jsonFinishObject(jfo));
JSON_CHK(jsonKey(jfo, "event_trace_ts"));
JSON_CHK(jsonStartObject(jfo));
{
// Collective events
JSON_CHK(jsonKey(jfo, "coll_start_ts")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.evntTrace[NCCL_INSP_EVT_TRK_COLL_START].ts));
JSON_CHK(jsonKey(jfo, "coll_stop_ts")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.evntTrace[NCCL_INSP_EVT_TRK_COLL_STOP].ts));
// Kernel events
JSON_CHK(jsonKey(jfo, "kernel_events"));
JSON_CHK(jsonStartList(jfo));
for (uint32_t ch = 0; ch < collInfo->collEvtTrk.nChannels; ch++) {
JSON_CHK(jsonStartObject(jfo));
JSON_CHK(jsonKey(jfo, "channel_id")); JSON_CHK(jsonInt(jfo, ch));
JSON_CHK(jsonKey(jfo, "kernel_start_ts")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.kernelCh[ch].evntTrace[NCCL_INSP_EVT_TRK_KERNEL_START].ts));
JSON_CHK(jsonKey(jfo, "kernel_stop_ts")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.kernelCh[ch].evntTrace[NCCL_INSP_EVT_TRK_KERNEL_STOP].ts));
JSON_CHK(jsonKey(jfo, "kernel_record_ts")); JSON_CHK(jsonUint64(jfo, collInfo->collEvtTrk.kernelCh[ch].evntTrace[NCCL_INSP_EVT_TRK_KERNEL_RECORD].ts));
JSON_CHK(jsonFinishObject(jfo));
}
JSON_CHK(jsonFinishList(jfo));
}
JSON_CHK(jsonFinishObject(jfo));
return inspectorSuccess;
}
/*
* Description:
*
* Writes completed collective operation information to the JSON
* output.
*
* Thread Safety:
* Not thread-safe (should be called with proper locking).
*
* Input:
* jsonFileOutput* jfo - JSON output handle.
* const struct inspectorCompletedCollInfo* collInfo - completed
* collective info.
*
* Output:
* Collective info is written to JSON output.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inline inspectorResult_t inspectorCompletedColl(jsonFileOutput* jfo,
struct inspectorCompletedCollInfo* collInfo) {
JSON_CHK(jsonStartObject(jfo));
{
JSON_CHK(jsonKey(jfo, "coll")); JSON_CHK(jsonStr(jfo, ncclFuncToString(collInfo->func)));
JSON_CHK(jsonKey(jfo, "coll_sn")); JSON_CHK(jsonUint64(jfo, collInfo->sn));
JSON_CHK(jsonKey(jfo, "coll_msg_size_bytes")); JSON_CHK(jsonUint64(jfo, collInfo->msgSizeBytes));
JSON_CHK(jsonKey(jfo, "coll_exec_time_us")); JSON_CHK(jsonUint64(jfo, collInfo->execTimeUsecs));
JSON_CHK(jsonKey(jfo, "coll_timing_source")); JSON_CHK(jsonStr(jfo, inspectorTimingSourceToString(collInfo->timingSource)));
JSON_CHK(jsonKey(jfo, "coll_algobw_gbs")); JSON_CHK(jsonDouble(jfo, collInfo->algoBwGbs));
JSON_CHK(jsonKey(jfo, "coll_busbw_gbs")); JSON_CHK(jsonDouble(jfo, collInfo->busBwGbs));
if (enableNcclInspectorDumpVerbose) {
INS_CHK(inspectorCompletedCollVerbose(jfo, collInfo));
}
}
JSON_CHK(jsonFinishObject(jfo));
return inspectorSuccess;
}
/*
* Description:
*
* Dumps the state of a communicator to the JSON output if needed.
*
* Thread Safety:
* Not thread-safe (should be called with proper locking).
*
* Input:
* jsonFileOutput* jfo - JSON output handle.
* inspectorCommInfo* commInfo - communicator info.
* bool* needs_writing - set to true if output was written.
*
* Output:
* State is dumped to JSON output if needed.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inspectorResult_t inspectorCommInfoDump(jsonFileOutput* jfo,
inspectorCommInfo* commInfo,
bool* needs_writing) {
*needs_writing = false;
if (commInfo == nullptr)
return inspectorSuccess;
struct inspectorCompletedCollInfo collInfo;
memset(&collInfo, 0, sizeof(struct inspectorCompletedCollInfo));
inspectorLockWr(&commInfo->guard);
if (commInfo->dump) {
*needs_writing = true;
memcpy(&collInfo,
&commInfo->completedCollInfo,
sizeof(struct inspectorCompletedCollInfo));
commInfo->dump = false;
}
inspectorUnlockRWLock(&commInfo->guard);
if (*needs_writing) {
JSON_CHK(jsonLockOutput(jfo));
JSON_CHK(jsonStartObject(jfo));
{
JSON_CHK(jsonKey(jfo, "header"));
inspectorCommInfoHeader(jfo, commInfo);
JSON_CHK(jsonKey(jfo, "metadata"));
inspectorCommInfoMetaHeader(jfo);
JSON_CHK(jsonKey(jfo, "coll_perf"));
INS_CHK(inspectorCompletedColl(jfo, &collInfo));
}
JSON_CHK(jsonFinishObject(jfo));
JSON_CHK(jsonNewline(jfo));
JSON_CHK(jsonUnlockOutput(jfo));
}
return inspectorSuccess;
}
/*
* Description:
*
* Dumps the state of all communicators in a commList to the JSON
* output.
*
* Thread Safety:
* Thread-safe - assumes no locks are taken and acquires all necessary
* locks to iterate through all communicator objects and dump their state.
*
* Input:
* jsonFileOutput* jfo - JSON output handle (must not be NULL).
* struct inspectorCommInfoList* commList - list of communicators (must not be NULL).
*
* Output:
* State of all communicators is dumped to JSON output.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inspectorResult_t inspectorCommInfoListDump(jsonFileOutput* jfo,
struct inspectorCommInfoList* commList) {
bool flush = false;
INS_CHK(inspectorLockRd(&commList->guard));
inspectorResult_t res = inspectorSuccess;
if (commList->ncomms > 0) {
for (struct inspectorCommInfo* itr = commList->comms;
itr != nullptr;
itr = itr->next) {
bool needs_writing;
INS_CHK_GOTO(inspectorCommInfoDump(jfo, itr, &needs_writing), res, finalize);
if (needs_writing) {
flush = true;
}
}
if (flush) {
JSON_CHK_GOTO(jsonLockOutput(jfo), res, finalize);
JSON_CHK_GOTO(jsonFlushOutput(jfo), res, finalize);
JSON_CHK_GOTO(jsonUnlockOutput(jfo), res, finalize);
}
}
finalize:
INS_CHK(inspectorUnlockRWLock(&commList->guard));
return res;
}
/*
* Description:
* Finalizes and cleans up a commList, freeing all communicators.
*
* Thread Safety:
* Not thread-safe (should be called with proper locking).
*
* Input:
* struct commList* commList - list of communicators.
*
* Output:
* All communicators are freed.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
static inspectorResult_t inspectorCommInfoListFinalize(struct inspectorCommInfoList* commList) {
struct inspectorCommInfo* nextComm = nullptr;
INS_CHK(inspectorLockWr(&commList->guard));
while (commList->comms != nullptr && commList->ncomms != 0) {
INFO(NCCL_INSPECTOR, "NCCL Inspector: comm %lu still in tracker",
commList->comms->commHash);
nextComm = commList->comms->next;
INS_CHK(inspectorLockDestroy(&commList->comms->guard));
free(commList->comms);
commList->comms = nextComm;
commList->ncomms--;
}
INS_CHK(inspectorUnlockRWLock(&commList->guard));
return inspectorSuccess;
}
/*
* Description:
*
* Ensures the given directory exists and is writable, creating it
* if necessary.
*
* Thread Safety:
* Not thread-safe (should be called during initialization).
*
* Input:
* char* workdir - directory path.
*
* Output:
* Directory is created if needed.
*
* Return:
*
* bool - true if directory exists and is writable, false otherwise.
*
*/
static bool ensureDir(char* workdir) {
struct stat st;
// Check if directory exists
if (stat(workdir, &st) == 0) {
if (S_ISDIR(st.st_mode)) {
// Directory exists, check if it's writable
if (access(workdir, W_OK) == 0) {
return true; // Directory exists and is writable
} else {
INFO(NCCL_INSPECTOR,
"NCCL Inspectoer: dump directory %s exists, but is not "
"writable",
workdir);
return false;
}
} else {
INFO(NCCL_INSPECTOR,
"NCCL Inspector: dump location %s exists, but is not a "
"directory",
workdir);
return false;
}
} else {
// Directory doesn't exist, try to create it
const mode_t mode = 0777;
if (mkdir(workdir, mode) == 0) {
return true; // Directory created successfully
} else {
INFO(NCCL_INSPECTOR,
"NCCL Inspector: failed to create dump directory %s: %s", workdir,
strerror(errno));
return false;
}
}
}
/*
* Description:
*
* Generates the output dump directory path based on environment
* variables.
*
* Thread Safety:
* Not thread-safe (should be called during initialization).
*
* Input:
* char** workdir - pointer to output directory string.
*
* Output:
* workdir is set to the generated directory path.
*
* Return:
* None.
*/
static void genDumpDir(char** workdir) {
char* dumpdir = getenv("NCCL_INSPECTOR_DUMP_DIR");
if (dumpdir != NULL) {
*workdir = strdup(dumpdir);
// TODO check errors here
return;
}
char* jobid = getenv("SLURM_JOBID");
bool badJobId = true;
if (jobid != NULL) {
errno = 0;
const int intid = strtol(jobid, NULL, 10);
if (errno == 0) {
char tmp[2048];
snprintf(tmp, 2048, "nccl-inspector-%d", intid);
*workdir = strdup(tmp);
badJobId = false;
}
}
if (badJobId) {
*workdir = strdup("nccl-inspector-unknown-jobid");
}
}
struct inspectorDumpThread {
bool run{false};
jsonFileOutput* jfo;
char* outputRoot;
uint64_t sampleIntervalUsecs;
pthread_t pthread;
pthread_rwlock_t guard;
inspectorDumpThread(const char* outputRoot, uint64_t sampleIntervalUsecs)
: jfo(nullptr), outputRoot(strdup(outputRoot)), sampleIntervalUsecs(sampleIntervalUsecs) {
if (inspectorLockInit(&guard) != inspectorSuccess) {
INFO(NCCL_INSPECTOR, "NCCL Inspector inspectorDumpThread: couldn't init lock");
}
}
~inspectorDumpThread() {
if (jfo != nullptr) {
jsonFinalizeFileOutput(jfo);
jfo = nullptr;
}
if (outputRoot != nullptr) {
free(outputRoot);
outputRoot = nullptr;
}
if (inspectorLockDestroy(&guard) != inspectorSuccess) {
INFO(NCCL_INSPECTOR, "NCCL Inspector inspectorDumpThread: couldn't destroy lock");
}
}
void startThread() {
inspectorLockWr(&guard);
run = true;
inspectorUnlockRWLock(&guard);
if (pthread_create(&pthread, NULL, dumpMain, this) != 0) {
INFO(NCCL_INSPECTOR,
"NCCL Inspector inspectorDumpThread: couldn't create dump thread!");
return;
}
INFO(NCCL_INSPECTOR, "NCCL Inspector inspectorDumpThread: created");
}
void stopThread() {
INFO(NCCL_ENV, "NCCL Inspector Stopping Dump thread");
inspectorLockWr(&guard);
run = false;
inspectorUnlockRWLock(&guard);
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1000000; // 1ms
nanosleep(&ts, NULL);
INFO(NCCL_INSPECTOR, "NCCL Inspector inspectorDumpThread: stopped");
}
inspectorResult_t inspectorStateDump(const char* output_root) {
if (!ncclInspectorInit) {
return inspectorUninitializedError;
}
if (!enableNcclInspector) {
INFO(NCCL_INSPECTOR, "NCCL Inspector is not enabled, will not do ncclAllCommTallyDump");
return inspectorDisabledError;
}
if (jfo == 0) {
char hostname[256];
gethostname(hostname, 255);
char tmp[2048];
snprintf(tmp, sizeof(tmp), "%s/%s-pid%d.log", output_root, hostname, getpid());
jsonResult_t result = jsonInitFileOutput(&jfo, tmp);
if (jsonSuccess != result) {
INFO(NCCL_INSPECTOR, "Cannot open %s for writing: %s", tmp, jsonErrorString(result));
return inspectorFileOpenError;
}
chmod(tmp, 0666);
}
if (jfo != nullptr) {
inspectorCommInfoListDump(jfo, &g_state.liveComms);
inspectorCommInfoListDump(jfo, &g_state.deletedComms);
}
if (g_state.deletedComms.ncomms > 0) {
inspectorCommInfoListFinalize(&g_state.deletedComms);
}
return inspectorSuccess;
}
static void* dumpMain(void* arg) {
inspectorDumpThread* dumper = (inspectorDumpThread*)arg;
inspectorResult_t res = inspectorSuccess;
struct timespec ts;
ts.tv_sec = dumper->sampleIntervalUsecs / 1000000;
ts.tv_nsec = dumper->sampleIntervalUsecs % 1000000;
while (dumper->run) {
inspectorLockWr(&dumper->guard);
if (!dumper->run) {
inspectorUnlockRWLock(&dumper->guard);
break;
}
res = dumper->inspectorStateDump(dumper->outputRoot);
if (res == inspectorFileOpenError || res == inspectorDisabledError) {
inspectorUnlockRWLock(&dumper->guard);
break;
}
inspectorUnlockRWLock(&dumper->guard);
nanosleep(&ts, NULL);
}
return 0;
}
};
/*
* Description:
*
* Shows the NCCL Inspector plugin version and configuration
* environment variables in a structured format similar to NCCL's
* showVersion function.
*
* Thread Safety:
* Thread-safe (read-only environment variable access).
*
* Input:
* None.
*
* Output:
* Logs version and environment variables to debug output.
*
* Return:
* None.
*/
static void showInspectorVersion() {
VERSION("NCCL Inspector Plugin - Version: %s", get_git_version_info());
}
/*
* Description:
*
* Shows all NCCL Inspector environment variables and their values
* in a structured format.
*
* Thread Safety:
* Thread-safe (read-only environment variable access).
*
* Input:
* None.
*
* Output:
* Logs environment variables to debug output.
*
* Return:
* None.
*/
static void showInspectorEnvVars() {
struct {
const char* name;
const char* value;
const char* defaultVal;
const char* description;
} envVars[] = {
{"NCCL_INSPECTOR_ENABLE", getenv("NCCL_INSPECTOR_ENABLE"), "0", "Enable/disable inspector plugin"},
{"NCCL_INSPECTOR_DUMP_THREAD_ENABLE", getenv("NCCL_INSPECTOR_DUMP_THREAD_ENABLE"), "1", "Enable/disable dump thread"},
{"NCCL_INSPECTOR_DUMP_THREAD_INTERVAL_MICROSECONDS", getenv("NCCL_INSPECTOR_DUMP_THREAD_INTERVAL_MICROSECONDS"), "0", "Dump thread interval in microseconds"},
{"NCCL_INSPECTOR_DUMP_DIR", getenv("NCCL_INSPECTOR_DUMP_DIR"), "(auto-generated)", "Output directory for inspector logs"},
{"NCCL_INSPECTOR_DUMP_VERBOSE", getenv("NCCL_INSPECTOR_DUMP_VERBOSE"), "0", "Enable/disable verbose dumping (event_trace)"}
};
const int numEnvVars = sizeof(envVars) / sizeof(envVars[0]);
VERSION("NCCL Inspector Environment Variables:");
for (int i = 0; i < numEnvVars; i++) {
VERSION(" %s = %s%s%s",
envVars[i].name,
envVars[i].value ? envVars[i].value : "(not set)",
envVars[i].value ? "" : ", default=",
envVars[i].value ? "" : envVars[i].defaultVal);
}
}
/*
* Description:
*
* Initializes the global inspector state and starts the dump thread
* if enabled.
*
* Thread Safety:
*
* Not thread-safe (should be called during initialization).
*
* Input:
* None.
*
* Output:
* Global state is initialized and dump thread may be started.
*
* Return:
* inspectorResult_t - success or error code.
*/
inspectorResult_t inspectorGlobalInit(int rank) {
char* str = getenv("NCCL_INSPECTOR_ENABLE");
int enable = str ? atoi(str) : 0; // default disable
enableNcclInspector = enable == 0 ? false : true;
ncclInspectorInit = true;
// Show version and environment configuration (similar to NCCL's showVersion)
if (rank == 0) {
showInspectorVersion();
showInspectorEnvVars();
}
if (enableNcclInspector == false) {
VERSION("NCCL Inspector Plugin DISABLED (NCCL_INSPECTOR_ENABLE=%s)",
str ? str : "0");
return inspectorDisabledError;
}
INS_CHK(inspectorGlobalStateInit());
str = getenv("NCCL_INSPECTOR_DUMP_THREAD_ENABLE");
enable = str ? atoi(str) : 1; // default enable
enableNcclInspectorDumpThread = enable == 0 ? false : true;
str = getenv("NCCL_INSPECTOR_DUMP_VERBOSE");
enable = str ? atoi(str) : 0; // default disable
enableNcclInspectorDumpVerbose = enable == 0 ? false : true;
if (enableNcclInspectorDumpThread) {
str = getenv("NCCL_INSPECTOR_DUMP_THREAD_INTERVAL_MICROSECONDS");
const uint64_t interval = str ? strtoull(str, 0, 0) : 0;
if (interval == 0) {
INFO(NCCL_INSPECTOR, "NCCL Inspector: dump thread enabled but "
"NCCL_INSPECTOR_DUMP_THREAD_INTERVAL_MICROSECONDS is 0; not "
"starting internal dump "
"thread.");
return inspectorSuccess;
}
char* dumpdir;
genDumpDir(&dumpdir);
if (dumpdir != nullptr) {
if (!ensureDir(dumpdir)) {
free(dumpdir);
INFO(NCCL_INSPECTOR, "NCCL Inspector: failed to generate a dump dir; not "
"starting internal dump thread.");
return inspectorSuccess;
}
dumper = new inspectorDumpThread(dumpdir, interval);
dumper->startThread();
INFO(NCCL_INSPECTOR,
"NCCL Inspector enabled with polling interval %lu us and "
"output directory %s",
interval, dumpdir);
free(dumpdir);
} else {
INFO(NCCL_INSPECTOR, "NCCL Inspector: failed to generate a dump "
"dir; not starting internal dump thread.");
}
} else {
INFO(NCCL_INSPECTOR,
"NCCL Inspector: NCCL_INSPECTOR_DUMP_THREAD_ENABLE set to 0; not "
"starting internal dump "
"thread.");
}
return inspectorSuccess;
}
/*
* Description:
*
* Returns a string describing the given inspectorResult_t error
* code.
*
* Thread Safety:
* Thread-safe (read-only operation).
*
* Input:
* inspectorResult_t result - error code.
*
* Output:
* None.
*
* Return:
* const char* - error string.
*/
const char* inspectorErrorString(inspectorResult_t result) {
switch (result) {
case inspectorSuccess:
return "Success";
case inspectorUninitializedError:
return "Inspector is not initialized";
case inspectorMemoryError:
return "Inspector encountered issue allocating memory";
case inspectorFileOpenError:
return "Inspector could not open file";
case inspectorDisabledError:
return "Inspector is disabled";
case inspectorLockError:
return "Inspector encountered error with lock";
case inspectorPthreadError:
return "Inspector encountered error with pthreads";
case inspectorJsonError:
return "Inspector encountered error while emitting JSON";
case inspectorCudaError:
return "Inspector encountered CUDA error";
case inspectorBadHash:
return "Inspector encountered bad communicator hash";
case inspectorDeleteUnknownCommError:
return "Inspector was asked to delete a communicator that it is not "
"tracking";
case inspectorAddDuplicateCommError:
return "Inspector was asked to add a communicator it was already "
"tracking";
case inspectorNop:
return "Inspector NOP";
case inspectorNullTally:
return "Inspector encountered a null OpTally";
case inspectorGlobalInitError:
return "Inspector encountered a repeated global init";
case inspectorReturn:
return "Inspector Unconditional Return";
default:
return "Unknown error";
}
}
/*
* Description:
* Converts a communicator hash to a string.
*
* Thread Safety:
* Thread-safe (writes to provided buffer).
*
* Input:
* uint64_t commHash - communicator hash.
* char hashStr[NCCL_COMM_HASH_LENGTH] - output buffer.
*
* Output:
* hashStr is set to the string representation of commHash.
*
* Return:
* inspectorResult_t - success or error code.
*/
inspectorResult_t inspectorCommGetHashStr(uint64_t commHash,
char hashStr[NCCL_COMM_HASH_LENGTH]) {
snprintf(hashStr, NCCL_COMM_HASH_LENGTH, "0x%lx",
commHash);
return inspectorSuccess;
}
/*
* Description:
* Compares two communicator configurations for equality.
*
* Thread Safety:
* Thread-safe (read-only comparison).
*
* Input:
* uint64_t lCommHash - left communicator hash.
* uint64_t rCommHash - right communicator hash.
* int lRank - left rank.
* int rRank - right rank.
*
* Output:
* None.
*
* Return:
* bool - true if communicators are equal (same hash and rank), false otherwise.
*/
static bool comm_eq(uint64_t lCommHash, uint64_t rCommHash,
int lRank, int rRank) {
return lCommHash == rCommHash && lRank == rRank;
}
/*
* Description:
* Initializes a communicator info structure with the provided parameters.
*
* Thread Safety:
* Not thread-safe - should be called during communicator initialization.
*
* Input:
* struct inspectorCommInfo* commInfo - communicator info structure to initialize (must not be NULL).
* const char* commName - communicator name (can be NULL).
* uint64_t commHash - communicator hash.
* int nnodes - number of nodes (must be > 0).
* int nranks - number of ranks (must be > 0).
* int rank - rank (must be >= 0 and < nranks).
*
* Output:
* commInfo is initialized with the provided parameters.
*
* Return:
* inspectorResult_t - success or error code.
*
* Preconditions:
* - commInfo must not be NULL
* - nnodes must be positive
* - nranks must be positive
* - rank must be non-negative and less than nranks
*/
static inspectorResult_t inspectorFillCommInfo(struct inspectorCommInfo* commInfo,
const char* commName, uint64_t commHash,
int nnodes, int nranks, int rank) {
commInfo->commName = commName;
commInfo->commHash = commHash;
inspectorCommGetHashStr(commHash, commInfo->commHashStr);
commInfo->rank = rank;
commInfo->nranks = nranks;
commInfo->nnodes = nnodes;
commInfo->dump = false;
INS_CHK(inspectorLockInit(&commInfo->guard));
commInfo->next = nullptr;
return inspectorSuccess;
}
/*
* Description:
* Adds a communicator to the global state.
*
* Thread Safety:
* Thread-safe (uses locks internally).
*
* Input:
* struct inspectorCommInfo **commInfo - pointer to output struct (must not be NULL).
* const char* commName - communicator name (can be NULL).
* uint64_t commHash - communicator hash.
* int nNodes - number of nodes (must be > 0).
* int nranks - number of ranks (must be > 0).
* int rank - rank (must be >= 0 and < nranks).
*
* Output:
* commInfo is set to the new communicator struct.
*
* Return:
* inspectorResult_t - success or error code.
*
* Preconditions:
* - commInfo must not be NULL
* - nNodes must be positive
* - nranks must be positive
* - rank must be non-negative and less than nranks
*/
inspectorResult_t inspectorAddComm(struct inspectorCommInfo **commInfo,
const char* commName, uint64_t commHash,
int nNodes, int nranks, int rank) {
struct inspectorCommInfoList* liveCommInfoList = &g_state.liveComms;
struct inspectorCommInfo* commInfoPtr = nullptr;
inspectorResult_t res = inspectorSuccess;
bool locked = false;
INSPECTOR_LOCK_RD_FLAG(&liveCommInfoList->guard, locked,
"inspectorAddComm: commList::guard -rd");
for (struct inspectorCommInfo* itr = liveCommInfoList->comms;
itr != nullptr;
itr = itr->next) {
if (comm_eq(commHash, itr->commHash, rank, itr->rank)) {
INFO(NCCL_INSPECTOR, "NCCL Inspector: comm 0x%lx already in tracker",
commHash);
res = inspectorAddDuplicateCommError;
goto finalize;
}
}
INSPECTOR_UNLOCK_RW_LOCK_FLAG(&liveCommInfoList->guard, locked,
"inspectorAddComm: commList::guard");
commInfoPtr
= (struct inspectorCommInfo*)calloc(1, sizeof(struct inspectorCommInfo));
if (0 == commInfoPtr) {
res = inspectorMemoryError;
goto finalize;
}
INS_CHK_GOTO(inspectorFillCommInfo(commInfoPtr,
commName,
commHash,
nNodes,
nranks,
rank),
res, fail);
INSPECTOR_LOCK_WR_FLAG(&liveCommInfoList->guard, locked,
"inspectorAddComm: commList::guard -wr");
++liveCommInfoList->ncomms;
commInfoPtr->next = liveCommInfoList->comms;
liveCommInfoList->comms = commInfoPtr;
finalize:
INSPECTOR_UNLOCK_RW_LOCK_FLAG(&liveCommInfoList->guard, locked,
"inspectorAddComm: commList::guard");
*commInfo = commInfoPtr;
return res;
fail:
if (commInfoPtr) {
free(commInfoPtr);
commInfoPtr = nullptr;
}
goto finalize;
}
/*
* Description:
*
* Removes a communicator from the global state and moves it to the
* deleted list.
*
* Thread Safety:
* Thread-safe (uses locks internally).
*
* Input:
* struct inspectorCommInfo *commInfo - communicator to remove.
*
* Output:
* Communicator is removed from live list and added to deleted list.
*
* Return:
* inspectorResult_t - success or error code.
*/
inspectorResult_t inspectorDelComm(struct inspectorCommInfo *commInfo) {
struct inspectorCommInfoList* liveCommInfoList = &g_state.liveComms;
struct inspectorCommInfoList* deletedCommInfoList = &g_state.deletedComms;
struct inspectorCommInfo* commInfoPtr = nullptr;
bool locked = false;
INFO(NCCL_INSPECTOR, "NCCL Inspector: DelComm removing 0x%lx",
commInfo->commHash);
INSPECTOR_LOCK_WR_FLAG(&liveCommInfoList->guard, locked,
"inspectorDelComm: liveCommInfoList::guard -wr");
struct inspectorCommInfo** prev_ptr = &liveCommInfoList->comms;
for (struct inspectorCommInfo* itr = liveCommInfoList->comms;
itr != nullptr;
itr = itr->next) {
if (comm_eq(commInfo->commHash, itr->commHash, commInfo->rank, itr->rank)) {
*prev_ptr = itr->next;
liveCommInfoList->ncomms--;
commInfoPtr = itr;
break;
}
prev_ptr = &itr->next;
}
INSPECTOR_UNLOCK_RW_LOCK_FLAG(&liveCommInfoList->guard, locked,
"inspectorDelComm: liveCommInfoList::guard -unlock");
if (!commInfoPtr) {
INFO(NCCL_INSPECTOR, "NCCL Inspector: DelComm can't remove 0x%lx, not present",
commInfo->commHash);
return inspectorDeleteUnknownCommError;
}
inspectorLockWr(&commInfoPtr->guard);
commInfoPtr->dump = false;
inspectorUnlockRWLock(&commInfoPtr->guard);
INSPECTOR_LOCK_WR_FLAG(&deletedCommInfoList->guard, locked,
"inspectorDelComm: deletedCommInfoList::guard -wr");
commInfoPtr->next = deletedCommInfoList->comms;
deletedCommInfoList->comms = commInfoPtr;
deletedCommInfoList->ncomms++;
INSPECTOR_UNLOCK_RW_LOCK_FLAG(&deletedCommInfoList->guard, locked,
"inspectorDelComm: deletedCommInfoList::guard -unlock");
return inspectorSuccess;
}
/*
* Description:
*
* Computes the algorithmic and bus bandwidth (in GB/s) for a given
* NCCL collective operation, based on the communication info and
* completed collective details. The calculation uses the message
* size, execution time, and the type of collective operation to
* determine the effective bandwidths. The 'factor' variable adjusts
* the bus bandwidth calculation according to the communication
* pattern of each collective, as described in the NCCL performance
* documentation:
* https://github.com/NVIDIA/nccl-tests/blob/master/doc/PERFORMANCE.md
*
* Thread Safety:
*
* This function does not perform any locking and assumes the caller
* ensures thread safety if required.
*
* Input:
*
* commInfo - Pointer to inspectorCommInfo structure containing
* communicator details.
*
* completedColl- Pointer to inspectorCompletedCollInfo structure
* containing completed collective info.
*
* collType - The type of NCCL collective operation (ncclFunc_t).
*
* Output:
* Updates the algoBwGbs and busBwGbs fields of the completedColl
* structure.
*
* Return:
* N.A. (void function)
*/
void inspectorComputeCollBw(struct inspectorCommInfo *commInfo,
struct inspectorCompletedCollInfo *completedColl,
ncclFunc_t collType) {
double timeInSec = completedColl->execTimeUsecs / 1000000.0;
double factor = 0.0;
double trafficSize = 0.0;
switch (collType) {
case ncclFuncReduce:
case ncclFuncBroadcast:
trafficSize = (double)completedColl->msgSizeBytes;
factor = 1;
break;
case ncclFuncAllReduce:
trafficSize = (double)completedColl->msgSizeBytes;
factor = ((double)(2 * (commInfo->nranks - 1))) / ((double)commInfo->nranks);
break;
case ncclFuncReduceScatter:
trafficSize = (double)(completedColl->msgSizeBytes * commInfo->nranks);
factor = ((double)(commInfo->nranks - 1)) / ((double)commInfo->nranks);
break;
case ncclFuncAllGather:
trafficSize = (double)(completedColl->msgSizeBytes * commInfo->nranks);
factor = ((double)(commInfo->nranks - 1)) / ((double)commInfo->nranks);
break;
case ncclFuncSendRecv:
case ncclFuncSend:
case ncclFuncRecv:
trafficSize = (double)completedColl->msgSizeBytes;
factor = 1;
break;
default:
trafficSize = 0;
factor = 0.0;
}
completedColl->algoBwGbs = timeInSec != 0 ? (trafficSize / 1.0E9 / timeInSec) : 0;
completedColl->busBwGbs = completedColl->algoBwGbs * factor;
}
/*
* Description:
*
* Helper function to calculate kernel execution time using GPU
* clock values. The GPU clock values are measured in nanoseconds
* from the globaltimer register.
*
* Thread Safety:
* Thread-safe (read-only operations on kernel info).
*
* Input:
* struct inspectorKernelChInfo *kernelCh - kernel channel info
* containing GPU clock values.
*
* Output:
* None.
*
* Return:
* uint64_t - execution time in microseconds, or 0 if invalid timing
* data.
*/
static uint64_t calculateKernelGpuExecTimeUsecs(struct inspectorKernelChInfo *kernelCh) {
if (kernelCh->startGpuClk != 0 && kernelCh->stopGpuClk != 0) {
if (kernelCh->stopGpuClk > kernelCh->startGpuClk) {
uint64_t execTimeNanosecs = kernelCh->stopGpuClk - kernelCh->startGpuClk;
return execTimeNanosecs / 1000;
}
}
return 0;
}
/*
* Description:
*
* Calculates the maximum kernel execution time across all kernel
* channels in a collective operation, using GPU clock values when
* available and falling back to CPU timestamps when necessary.
*
* Thread Safety:
* Thread-safe (read-only operations on collective info).
*
* Input:
* struct inspectorCollInfo *collInfo - collective operation info
* containing kernel channels.
* inspectorTimingSource_t *timingSource - pointer to store the timing source used.
*
* Output:
* timingSource is set to indicate whether GPU, CPU, or collective timing was used.
*
* Return:
*
* uint64_t - maximum execution time in microseconds across all
* kernels, or collective execution time if no kernel
* timing is available.
*
*/
static uint64_t calculateMaxKernelExecTimeUsecs(struct inspectorCollInfo *collInfo,
inspectorTimingSource_t *timingSource) {
uint64_t maxKernelExecTimeUsecs = 0;
inspectorTimingSource_t bestTimingSource = inspectorTimingSourceCollectiveCpu;
for (uint32_t i = 0; i < collInfo->nChannels; i++) {
struct inspectorKernelChInfo *kernelCh = &collInfo->kernelCh[i];
uint64_t gpuExecTimeUsecs = calculateKernelGpuExecTimeUsecs(kernelCh);
if (gpuExecTimeUsecs > 0) {
if (gpuExecTimeUsecs > maxKernelExecTimeUsecs) {
maxKernelExecTimeUsecs = gpuExecTimeUsecs;
bestTimingSource = inspectorTimingSourceKernelGpu;
}
} else {
if (kernelCh->tsCompletedUsec > kernelCh->tsStartUsec) {
uint64_t cpuExecTimeUsecs = kernelCh->tsCompletedUsec - kernelCh->tsStartUsec;
if (cpuExecTimeUsecs > maxKernelExecTimeUsecs) {
maxKernelExecTimeUsecs = cpuExecTimeUsecs;
bestTimingSource = inspectorTimingSourceKernelCpu;
}
}
}
}
if (maxKernelExecTimeUsecs > 0) {
*timingSource = bestTimingSource;
return maxKernelExecTimeUsecs;
} else {
*timingSource = inspectorTimingSourceCollectiveCpu;
return collInfo->tsCompletedUsec - collInfo->tsStartUsec;
}
}
/*
* Description:
*
* Updates the performance information for a completed collective
* operation.
*
* Thread Safety:
* Thread-safe (uses locks internally).
*
* Input:
* struct inspectorCommInfo *commInfo - communicator info.
* struct inspectorCollInfo *collInfo - completed collective info.
*
* Output:
* commInfo is updated with completed collective info.
*
* Return:
* None.
*
*/
void inspectorUpdateCollPerf(struct inspectorCompletedCollInfo *completedColl,
struct inspectorCollInfo *collInfo) {
completedColl->func = ncclStringToFunc(collInfo->func);
completedColl->sn = collInfo->sn;
completedColl->msgSizeBytes = collInfo->msgSizeBytes;
completedColl->execTimeUsecs =
calculateMaxKernelExecTimeUsecs(collInfo, &completedColl->timingSource);
completedColl->collEvtTrk = collInfo->collEvtTrk;
}
/*
* Description:
*
* Finalizes the global inspector state and stops the dump thread if
* running.
*
* Thread Safety:
* Not thread-safe (should be called during teardown).
*
* Input:
* None.
*
* Output:
* Global state is finalized and dump thread is stopped.
*
* Return:
* inspectorResult_t - success or error code.
*
*/
inspectorResult_t inspectorGlobalFinalize() {
if (dumper) {
dumper->stopThread();
delete dumper;
dumper = nullptr;
}
return inspectorSuccess;
}