Files

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

3500 wiersze
143 KiB
C++

2018-09-24 16:06:59 -07:00
/*************************************************************************
2022-01-07 06:39:55 -08:00
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2023 Advanced Micro Devices, Inc. All rights reserved.
2022-12-13 07:51:04 +08:00
* Modifications Copyright (c) Microsoft Corporation. Licensed under the MIT License.
2018-09-24 16:06:59 -07:00
*
* See LICENSE.txt for license information
************************************************************************/
#include "nccl.h"
2018-12-13 15:56:12 -08:00
#include "channel.h"
2018-09-24 16:06:59 -07:00
#include "nvmlwrap.h"
2021-04-12 16:00:11 -07:00
#include "gdrwrap.h"
2018-09-24 16:06:59 -07:00
#include "bootstrap.h"
#include "transport.h"
#include "group.h"
#include "net.h"
2020-01-16 16:02:42 -08:00
#include "coll_net.h"
2018-12-13 15:56:12 -08:00
#include "enqueue.h"
2019-11-19 14:57:39 -08:00
#include "graph.h"
#include "argcheck.h"
#include "device.h"
#include "collectives.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
2023-09-26 05:47:28 -07:00
#include "tuner.h"
2024-12-18 08:26:06 -08:00
#include "ras.h"
2025-05-29 20:56:40 -07:00
#include "profiler.h"
2025-01-27 03:30:22 -08:00
#include "mnnvl.h"
2018-09-24 16:06:59 -07:00
#include <fcntl.h>
#include <unistd.h>
2019-07-05 15:43:00 -07:00
#include <hip/hip_runtime.h>
2018-09-24 16:06:59 -07:00
#include <string.h>
#include <errno.h>
#include <assert.h>
2018-11-13 10:37:20 -08:00
#include <dlfcn.h>
2019-11-19 14:57:39 -08:00
#include <sys/types.h>
#include <sys/stat.h>
2025-09-02 13:21:14 -07:00
#include <sys/resource.h>
2019-11-19 14:57:39 -08:00
#include <unistd.h>
#include "graph/topo.h"
#include "graph/xml.h"
2023-09-12 15:34:40 -04:00
#include "archinfo.h"
2023-09-26 05:47:28 -07:00
#include "param.h"
2025-01-27 03:30:22 -08:00
#include "nvtx_payload_schemas.h"
2025-05-29 20:56:40 -07:00
#include "utils.h"
2025-09-02 13:21:14 -07:00
#include <mutex>
#include "ce_coll.h"
#include "nvtx.h"
2018-09-24 16:06:59 -07:00
2021-01-28 09:45:01 -07:00
// [RCCL]
#include "git_version.h"
#include "rccl_vars.h"
#include "hip_rocm_version_info.h"
//#include <hsa/hsa_ext_amd.h>
#ifdef ENABLE_MSCCLPP
#include "mscclpp/mscclpp_nccl.h"
#endif
#ifdef USE_AMDSMI
#include "amdsmi_wrap.h"
#else
#include "rocm_smi_wrap.h"
#endif
#include "rccl_common.h"
2021-01-28 09:45:01 -07:00
// [/RCCL]
#ifdef ENABLE_ROCSHMEM
#include <rocshmem/rocshmem.hpp>
#define NUM_SYM_BUF 8
#endif
2022-12-13 07:51:04 +08:00
#include "msccl/msccl_lifecycle.h"
2023-09-12 06:30:04 +08:00
#include "msccl/msccl_status.h"
2025-07-30 14:59:28 -07:00
#include "latency_profiler/CollTrace.h"
#include "latency_profiler/CollTraceFunc.h"
2026-01-08 13:55:40 -08:00
#include <cpuid.h>
2022-12-13 07:51:04 +08:00
#ifndef STR2
#define STR2(v) #v
#endif
#ifndef STR
#define STR(v) STR2(v)
#endif
2018-11-13 10:37:20 -08:00
2024-08-14 15:04:13 -06:00
#if CUDART_VERSION >= 9020 || defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
2018-09-24 16:06:59 -07:00
#define NCCL_GROUP_CUDA_STREAM 0 // CGMD: CUDA 9.2,10.X Don't need to use an internal CUDA stream
#else
#define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream
#endif
#define TEMP_BUFF_SIZE (4 * 1024 * 1024) // Define Size for Temporary Buffer for Direct RS
2025-04-19 00:21:27 -04:00
using namespace rccl;
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3] = { "AllGather", "AllReduce", "AlltoAllPivot", "AllToAllGda", "Broadcast", "Reduce", "ReduceScatter", "SendRecv"};
2024-09-10 05:57:10 -07:00
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNetDirect", "CollNetChain", "NVLS", "NVLSTree", "PAT" };
2020-05-12 14:40:18 -07:00
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
const char* ncclDevRedOpStr[ncclNumDevRedOps] = { "Sum", "Prod", "MinMax", "PreMulSum", "SumPostDiv" };
2020-12-22 13:28:21 -05:00
const char *ncclTypeStr[ncclNumTypes] = {"_i8", "_u8", "_i32", "_u32", "_i64", "_u64", "_f16", "_f32", "_f64", "_b16"};
2020-05-12 14:40:18 -07:00
2018-09-24 16:06:59 -07:00
NCCL_PARAM(GroupCudaStream, "GROUP_CUDA_STREAM", NCCL_GROUP_CUDA_STREAM);
NCCL_PARAM(CheckPointers, "CHECK_POINTERS", 0);
2023-02-27 02:48:21 -08:00
NCCL_PARAM(CommBlocking, "COMM_BLOCKING", NCCL_CONFIG_UNDEF_INT);
2024-06-11 01:28:01 -07:00
NCCL_PARAM(RuntimeConnect, "RUNTIME_CONNECT", 1);
2025-05-29 20:56:40 -07:00
NCCL_PARAM(WinEnable, "WIN_ENABLE", 1);
NCCL_PARAM(CollnetEnable, "COLLNET_ENABLE", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(CtaPolicy, "CTA_POLICY", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(NvlsChannels, "NVLS_NCHANNELS", NCCL_CONFIG_UNDEF_INT);
2025-09-02 13:21:14 -07:00
NCCL_PARAM(SetCpuStackSize, "SET_CPU_STACK_SIZE", 1);
extern int64_t ncclParamSingleProcMemRegEnable();
2018-09-24 16:06:59 -07:00
2021-03-06 20:32:30 -08:00
struct allocationTracker allocTracker[MAX_ALLOC_TRACK_NGPU] = {};
2025-12-02 10:03:15 -08:00
ncclResult_t commReclaim(ncclComm_t comm);
2021-03-06 20:32:30 -08:00
#ifdef ENABLE_ROCSHMEM
RCCL_PARAM(RocshmemThreshold, "ROCSHMEM_THRESHOLD", (size_t)(262144));
RCCL_PARAM(RocshmemEnabled, "ROCSHMEM_ENABLE", 1);
std::unordered_map<ncclComm_t, rocshmem::rocshmem_team_t> ncclCommToRshmemTeam;
#endif
#ifdef ENABLE_MSCCLPP
size_t std::hash<ncclUniqueId>::operator ()(const ncclUniqueId& uniqueId) const noexcept {
return (size_t)getHash(uniqueId.internal, NCCL_UNIQUE_ID_BYTES);
}
bool operator ==(const ncclUniqueId& a, const ncclUniqueId& b) {
return memcmp(a.internal, b.internal, NCCL_UNIQUE_ID_BYTES) == 0;
}
RCCL_PARAM(MscclppThreshold, "MSCCLPP_THRESHOLD", (size_t)(16*1024*1024));
#endif
2025-03-25 15:21:16 -05:00
static constexpr int64_t defaultEnableMscclpp = 0;
2024-09-11 09:55:16 -06:00
RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp);
2025-03-04 12:48:59 -06:00
RCCL_PARAM(MscclppForceEnabled, "MSCCLPP_FORCE_ENABLE", 0);
// Turn off cheap fence for gfx942/gfx950
RCCL_PARAM(Gfx9CheapFenceOff, "GFX9_CHEAP_FENCE_OFF", 0);
2024-09-11 09:55:16 -06:00
2021-04-12 16:00:11 -07:00
// GDRCOPY support: Off by default
2025-03-18 08:17:01 -07:00
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0);
2021-04-12 16:00:11 -07:00
// GDRCOPY support
gdr_t ncclGdrCopy = NULL;
ncclResult_t initGdrCopy() {
if (ncclParamGdrCopyEnable() == 1) {
ncclGdrCopy = ncclGdrInit();
}
return ncclSuccess;
}
2025-09-02 13:21:14 -07:00
// The default Linux stack size (8MB) is safe.
#define SAFE_STACK_SIZE (8192*1024)
static ncclResult_t setCpuStackSize() {
if (ncclParamSetCpuStackSize() != 0) {
// Query the stack size used for newly launched threads.
pthread_attr_t attr;
size_t stackSize;
PTHREADCHECK(pthread_attr_init(&attr), "pthread_attr_init");
PTHREADCHECK(pthread_attr_getstacksize(&attr, &stackSize), "pthread_attr_getstacksize");
if (stackSize < SAFE_STACK_SIZE) {
// GNU libc normally uses RLIMIT_STACK as the default pthread stack size, unless it's set to "unlimited" --
// in that case a fallback value of 2MB (!) is used.
// Query the actual resource limit so that we can distinguish between the settings of 2MB and unlimited.
struct rlimit stackLimit;
char buf[30];
SYSCHECK(getrlimit(RLIMIT_STACK, &stackLimit), "getrlimit");
if (stackLimit.rlim_cur == RLIM_INFINITY)
strcpy(buf, "unlimited");
else
snprintf(buf, sizeof(buf), "%ldKB", stackLimit.rlim_cur/1024);
INFO(NCCL_INIT|NCCL_ENV, "Stack size limit (%s) is unsafe; will use %dKB for newly launched threads",
buf, SAFE_STACK_SIZE/1024);
// Change the default pthread stack size (via a nonportable API, which will become necessary if we switch
// to C++ threads).
PTHREADCHECK(pthread_attr_setstacksize(&attr, SAFE_STACK_SIZE), "pthread_attr_setstacksize");
PTHREADCHECK(pthread_setattr_default_np(&attr), "pthread_setattr_default_np");
}
PTHREADCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy");
}
return ncclSuccess;
}
2024-06-11 01:28:01 -07:00
static ncclResult_t initResult = ncclSuccess;
2025-09-02 13:21:14 -07:00
static std::once_flag initOnceFlag;
2022-05-24 02:02:31 -07:00
ncclResult_t checkHsaEnvSetting() {
// get user-specified value for `HSA_NO_SCRATCH_RECLAIM`
const char* hsaScratchEnv = getenv("HSA_NO_SCRATCH_RECLAIM");
int hipRuntimeVersion = 0;
// hipVer is an integer e.g., 6.2.41133 -> 60241133
CUDACHECK(hipRuntimeGetVersion(&hipRuntimeVersion));
const int firmwareVersion = getFirmwareVersion();
hipDeviceProp_t devProp;
// use GPU0 should be good enough
CUDACHECK(hipGetDeviceProperties(&devProp, 0));
INFO(NCCL_INIT, "Hipruntime version: %d, firmware version: %d", hipRuntimeVersion, firmwareVersion);
if (!validHsaScratchEnvSetting(hsaScratchEnv, hipRuntimeVersion, firmwareVersion, devProp.gcnArchName)) {
// Always print out this warning message
ERROR("HSA_NO_SCRATCH_RECLAIM=1 must be set to avoid performance degradation with the current HIP configuration. (Runtime version:%d, GPU Firmware version:%d)", hipRuntimeVersion, firmwareVersion);
ERROR("Please set HSA_NO_SCRATCH_RECLAIM=1 and rerun.");
return ncclSystemError;
}
return ncclSuccess;
}
// Fail the job if build flag HIP_HOST_UNCACHED_MEMORY is not set on mi350x
ncclResult_t checkHostUncacheMemSetting(struct ncclComm* comm) {
#if defined(HIP_HOST_UNCACHED_MEMORY)
return ncclSuccess;
#else
if( IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx950") ){
ERROR("Build flag HIP_HOST_UNCACHED_MEMORY must be set to avoid memory corruption on mi350x");
return ncclSystemError;
}
else {
return ncclSuccess;
}
#endif
}
2024-06-11 01:28:01 -07:00
static void initOnceFunc() {
NCCLCHECKGOTO(checkHsaEnvSetting(), initResult, exit);
2024-06-11 01:28:01 -07:00
initEnv();
2025-09-02 13:21:14 -07:00
setCpuStackSize();
2024-06-11 01:28:01 -07:00
initGdrCopy();
// Always initialize bootstrap network
NCCLCHECKGOTO(bootstrapNetInit(), initResult, exit);
2022-05-24 02:02:31 -07:00
#ifndef NVTX_NO_IMPL
2024-06-11 01:28:01 -07:00
initNvtxRegisteredEnums();
#endif
2024-06-11 01:28:01 -07:00
exit:;
}
2022-05-24 02:02:31 -07:00
2018-09-24 16:06:59 -07:00
static ncclResult_t ncclInit() {
char strValue[2048];
NCCLCHECK(ncclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
if (strcmp(strValue, "1") == 0)
WARN("NUMA auto balancing enabled which can lead to variability in the RCCL performance! Disable by \"sudo sysctl kernel.numa_balancing=0\"");
NCCLCHECK(ncclTopoGetStrFromSys("/proc", "version", strValue));
char *verStr, *state;
verStr = strtok_r(strValue, " ", &state);
for (int i = 0; i < 2; i ++) {
verStr = strtok_r(NULL, " ", &state);
if (verStr == NULL) break;
}
INFO(NCCL_INIT, "Kernel version: %s", verStr);
if (strstr(verStr, "cray") == NULL) {
2026-01-08 13:55:40 -08:00
unsigned int eax, ebx, ecx, edx;
if (!__get_cpuid(1, &eax, &ebx, &ecx, &edx))
ecx = 0; // cpuid not supported
NCCLCHECK(ncclTopoGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue));
2026-01-08 13:55:40 -08:00
// Check BIOS string and hypervisor presence on ecx bit 31
if (strncmp("Hyper-V UEFI Release", strValue, 20) != 0 && (ecx & (1u << 31)) == 0) {
2023-11-15 18:01:12 -08:00
FILE* file;
if ((file = fopen("/proc/cmdline", "r")) != NULL) {
if (feof(file) == 0 && ferror(file) == 0) {
int len = fread(strValue, 1, 2047, file);
2023-11-15 18:01:12 -08:00
strValue[len] = '\0';
}
fclose(file);
}
if (strstr(strValue, "iommu=pt") == NULL)
WARN("Missing \"iommu=pt\" from kernel command line which can lead to system instablity or hang!");
}
#ifndef HIP_UNCACHED_MEMORY
char *env = getenv("HSA_FORCE_FINE_GRAIN_PCIE");
if (env == NULL || strcmp(env, "1") != 0)
WARN("Missing \"HSA_FORCE_FINE_GRAIN_PCIE=1\" from environment which can lead to low RCCL performance, system instablity or hang!");
#endif
}
2025-09-02 13:21:14 -07:00
std::call_once(initOnceFlag, initOnceFunc);
2024-06-11 01:28:01 -07:00
return initResult;
2018-09-24 16:06:59 -07:00
}
NCCL_API(ncclResult_t, ncclGetVersion, int* version);
ncclResult_t ncclGetVersion_impl(int* version) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("GetVersion");
2018-09-24 16:06:59 -07:00
if (version == NULL) return ncclInvalidArgument;
*version = NCCL_VERSION_CODE;
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclGetUniqueId, ncclUniqueId* out);
ncclResult_t ncclGetUniqueId_impl(ncclUniqueId* out) {
2018-09-24 16:06:59 -07:00
NCCLCHECK(ncclInit());
NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
2024-09-10 05:57:10 -07:00
struct ncclBootstrapHandle handle;
NCCLCHECK(bootstrapGetUniqueId(&handle));
// ncclUniqueId and bootstrapHandle don't have the same size and alignment
// reset to 0 to avoid undefined data
memset(out, 0, sizeof(*out));
// copy to avoid alignment mismatch
memcpy(out, &handle, sizeof(handle));
2025-04-19 00:21:27 -04:00
Recorder::instance().record(rrGetUniqueId, -1, -1, out);
2025-03-12 13:46:21 -07:00
TRACE_CALL("ncclGetUniqueId(0x%llx)", (unsigned long long)getHash(out->internal, NCCL_UNIQUE_ID_BYTES));
2024-09-10 05:57:10 -07:00
return ncclSuccess;
2018-09-24 16:06:59 -07:00
}
2019-03-14 19:39:20 -07:00
// Prevent compiler from optimizing out these operations
2019-12-06 18:14:55 +01:00
#ifdef __clang__
2019-12-09 18:31:13 +01:00
#define NCCL_NO_OPTIMIZE __attribute__((optnone))
2019-12-06 18:14:55 +01:00
#else
#define NCCL_NO_OPTIMIZE __attribute__((optimize("O0")))
#endif
void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) {
2022-11-29 04:27:46 -08:00
// Important that this does not trash intraComm0.
2019-11-19 14:57:39 -08:00
comm->rank = comm->cudaDev = comm->busId = comm->nRanks = -1;
2024-03-26 06:08:55 -07:00
comm->startMagic = comm->endMagic = 0;
2019-03-14 19:39:20 -07:00
}
2025-06-25 21:01:34 -07:00
RCCL_PARAM_DECLARE(EnableProxyTrace);
RCCL_PARAM(EnableProxyTrace, "ENABLE_PROXY_TRACE", 0);
RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0);
2025-01-31 07:51:10 -08:00
RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0);
RCCL_PARAM(EnableContextTracking, "ENABLE_CONTEXT_TRACKING", 0);
2019-11-26 16:33:13 -08:00
#ifdef ENABLE_COLLTRACE
2024-01-18 15:07:16 -07:00
// Should be in sync with 'ALL_COLLS' in Generator.cmake
2019-11-26 16:33:13 -08:00
void *ncclCommThreadMain(void *arg) {
ncclComm_t comm = (ncclComm_t)arg;
2023-08-03 07:16:12 -07:00
int head[MAXCHANNELS];
2023-07-21 07:31:27 -07:00
double vega_gpu_rtc_freq;
2023-08-03 07:16:12 -07:00
2023-09-12 15:34:40 -04:00
vega_gpu_rtc_freq = GetDeviceWallClockRateInKhz(comm->cudaDev) * 1.0E3;
2025-02-12 13:36:31 -08:00
for (int channel = 0; channel < MAXCHANNELS; channel++) {
int tail = comm->collTraceTail[channel].tail;
if (tail < COLLTRACE_NUM_ITEMS)
head[channel] = 0;
else
head[channel] = tail - COLLTRACE_NUM_ITEMS;
}
2019-11-26 16:33:13 -08:00
do {
int numActiveChans = MAXCHANNELS;
2023-08-03 07:16:12 -07:00
for (int channel = 0; channel < MAXCHANNELS; channel++) {
2025-01-31 07:51:10 -08:00
int tail = comm->collTraceTail[channel].tail;
2023-08-03 07:16:12 -07:00
int count;
2025-01-31 07:51:10 -08:00
count = tail - head[channel];
if (count == 0) {
numActiveChans--;
2023-08-03 07:16:12 -07:00
continue;
}
for (int i = 0; i < count; i++) {
2025-02-12 13:36:31 -08:00
volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel]%COLLTRACE_NUM_ITEMS;
const uint8_t type = td->type;
2023-08-03 07:16:12 -07:00
if (type == ncclCollTraceNotReady)
2026-01-08 16:07:18 -08:00
break;
head[channel] ++;
2023-08-03 07:16:12 -07:00
char line[1024];
int offset = 0;
const uint16_t fIdx = td->funcIndex;
2023-08-03 07:16:12 -07:00
if (type == ncclCollTraceDataType) {
2025-02-12 13:36:31 -08:00
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] L:%04d DT %08x %016lx %016lx",
2026-01-08 16:07:18 -08:00
(double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, fIdx, td->data_0, td->opCount, td->data_1);
} else {
if (type & ncclCollTraceP2pElemType)
2026-01-08 16:07:18 -08:00
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06x-%06x", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, td->p2pOpCount[0], td->p2pOpCount[1]);
else
2026-01-08 16:07:18 -08:00
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, td->opCount);
2023-08-03 07:16:12 -07:00
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d root %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
2023-08-03 07:16:12 -07:00
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d",
td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase,
comm->busId, comm->nRanks);
2023-08-03 07:16:12 -07:00
} else {
switch (type&0xf) {
case ncclCollTraceKernelLaunchType:
case ncclCollTraceCollLaunchType:
if ((type&0xf) == ncclCollTraceKernelLaunchType)
2026-01-08 16:07:18 -08:00
sprintf(line+offset, " KL %s [%02d:%02d-%02d:%02x] HWID %d:%x ", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, td->xccId, td->data_0);
2023-08-03 07:16:12 -07:00
else if ((type&0xf) == ncclCollTraceCollLaunchType)
2026-01-08 16:07:18 -08:00
sprintf(line+offset, " CL %s [%02d:%02d-%02d:%02x] %d ", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, td->batchIx);
2023-08-03 07:16:12 -07:00
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d root %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
2023-08-03 07:16:12 -07:00
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d",
td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase,
comm->busId, comm->nRanks);
2023-08-03 07:16:12 -07:00
break;
case ncclCollTraceKernelEndType:
2026-01-08 16:07:18 -08:00
sprintf(line+offset, " KE %s [%02d:%02d-%02d:%02x] busId %lx nRanks %d", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, comm->busId, comm->nRanks);
2023-08-03 07:16:12 -07:00
break;
case ncclCollTraceAbortType:
2026-01-08 16:07:18 -08:00
sprintf(line+offset, " KA %s [%02d:%02d-%02d:%02x]", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid);
2023-08-03 07:16:12 -07:00
break;
default:
sprintf(line+offset, " unknown collective trace data type");
break;
}
}
}
INFO(NCCL_COLL, "%s td->type:%d", line, type);
2026-01-08 16:07:18 -08:00
volatile uint8_t *tdtype = &td->type;
*tdtype = ncclCollTraceNotReady;
(*tdtype); // read back for flushing
2019-11-26 16:33:13 -08:00
}
}
if (comm->collTraceExit && numActiveChans == 0)
break;
usleep(1000); //sleep 1ms
} while(true);
2025-01-31 07:51:10 -08:00
if (comm->collTraceThread)
pthread_exit(NULL);
else
return 0;
2019-11-26 16:33:13 -08:00
}
#endif
2019-12-06 18:14:55 +01:00
#undef NCCL_NO_OPTIMIZE
2022-05-24 02:02:31 -07:00
static ncclResult_t ncclDestructorFnFree(struct ncclDestructor* dtor) {
free(dtor->obj);
return ncclSuccess;
}
void ncclCommPushFree(struct ncclComm* comm, void* obj) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnFree;
dtor->obj = obj;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t ncclDestructorFnCudaFree(struct ncclDestructor* dtor) {
2023-04-03 05:32:07 -07:00
NCCLCHECK(ncclCudaFree(dtor->obj));
2022-05-24 02:02:31 -07:00
return ncclSuccess;
}
void ncclCommPushCudaFree(struct ncclComm* comm, void* obj) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnCudaFree;
dtor->obj = obj;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t ncclDestructorFnCudaHostFree(struct ncclDestructor* dtor) {
2024-09-10 05:57:10 -07:00
NCCLCHECK(ncclCudaHostFree(dtor->obj));
2022-05-24 02:02:31 -07:00
return ncclSuccess;
}
void ncclCommPushCudaHostFree(struct ncclComm* comm, void* obj) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnCudaHostFree;
dtor->obj = obj;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t ncclDestructorFnCudaGdrFree(struct ncclDestructor* dtor) {
NCCLCHECK(ncclGdrCudaFree(dtor->obj));
return ncclSuccess;
}
void ncclCommPushCudaGdrFree(struct ncclComm* comm, void* handle) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnCudaGdrFree;
dtor->obj = handle;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
2018-09-24 16:06:59 -07:00
static ncclResult_t commFree(ncclComm_t comm) {
2024-06-11 01:28:01 -07:00
int abort = 0;
2022-11-29 04:27:46 -08:00
/* commFree() should not involve any sync among ranks. */
2018-09-24 16:06:59 -07:00
if (comm == NULL)
return ncclSuccess;
2021-09-08 13:56:25 -07:00
2025-09-02 13:21:14 -07:00
NCCLCHECK(ncclCeFinalize(comm));
2025-05-29 20:56:40 -07:00
// tempBuff is allocated per-communicator for direct ReduceScatter on gfx950.
// It is owned by the communicator; free it during communicator teardown.
if (comm->tempBuff) {
NCCLCHECK(ncclCudaFree(comm->tempBuff));
comm->tempBuff = nullptr;
}
2025-09-02 13:21:14 -07:00
if (comm->symmetricSupport) {
NCCLCHECK(ncclSymkFinalize(comm));
NCCLCHECK(ncclDevrFinalize(comm));
}
2024-12-18 08:26:06 -08:00
NCCLCHECK(ncclRasCommFini(comm));
2022-11-29 04:27:46 -08:00
/* in commReclaim, we have guaranteed only last rank which calls ncclCommDestroy() will
* free all intra-process communicators; therefore, we only need to focus on local
* resource cleanup in commFree(). */
2023-04-03 05:32:07 -07:00
if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) {
2024-09-10 05:57:10 -07:00
PTHREADCHECK(pthread_join(comm->proxyState->thread, nullptr), "pthread_join");
2024-02-05 05:06:02 -08:00
if (comm->proxyState->threadUDS) {
// UDS support
2024-09-10 05:57:10 -07:00
PTHREADCHECK(pthread_join(comm->proxyState->threadUDS, nullptr), "pthread_join");
2024-02-05 05:06:02 -08:00
}
2023-04-03 05:32:07 -07:00
}
2022-01-07 06:39:55 -08:00
2024-12-18 08:26:06 -08:00
if (comm->memPool) CUDACHECK(cudaMemPoolDestroy(comm->memPool));
2024-09-10 05:57:10 -07:00
2021-09-08 13:56:25 -07:00
delete[] comm->userRedOps;
2020-09-04 14:35:05 -07:00
free(comm->connectSend);
free(comm->connectRecv);
2018-09-24 16:06:59 -07:00
2025-06-25 21:01:34 -07:00
if (rcclParamEnableProxyTrace()) {
2025-12-11 17:02:35 -05:00
WARN("commFree() ProxyTrace:");
2025-06-25 21:01:34 -07:00
if (comm->proxyState && comm->proxyState->proxyTrace){
WARN("%s", comm->proxyState->proxyTrace->dump().c_str());
}
}
2025-06-25 21:01:34 -07:00
2019-07-05 15:43:00 -07:00
#ifdef ENABLE_PROFILING
struct ncclProf *prof, *prof_seq;
prof = (struct ncclProf*)malloc(sizeof(struct ncclProf)*MAXCHANNELS*PROFILE_NUM_LAUNCHES);
if (prof == nullptr) {
WARN("Failed to allocate profiling buffer");
// Skip profiling but continue with destruction
goto skip_profiling;
}
CUDACHECK(hipMemcpy(prof, comm->devComm->devProf, sizeof(struct ncclProf)*MAXCHANNELS*PROFILE_NUM_LAUNCHES, hipMemcpyDeviceToHost));
2021-10-27 08:21:48 -07:00
#define VEGA_GPU_RTC_FREQUENCY 2.5E7
for (int i=0; i<comm->nChannels; i++) {
for (int s=0; s<prof[MAXCHANNELS*i].seq; s++) {
if (prof[MAXCHANNELS*s+i].count == 0) continue;
for (int j=0; j<prof[MAXCHANNELS*s+i].count; j++) {
INFO(NCCL_INIT, "# [%02d:%02d] %02d-%02d L:%04u %6.2fus", comm->rank, i, s, j, prof[MAXCHANNELS*s+i].elem[j].line, (prof[MAXCHANNELS*s+i].elem[j].timeStamp-prof[MAXCHANNELS*s+i].elem[0].timeStamp)/VEGA_GPU_RTC_FREQUENCY*1.0E6);
}
}
2020-05-28 00:15:47 +00:00
}
free(prof);
CUDACHECK(hipFree(comm->devComm->devProf));
skip_profiling:
2019-07-05 15:43:00 -07:00
#endif
2019-11-26 16:33:13 -08:00
#ifdef ENABLE_COLLTRACE
comm->collTraceExit = 1;
2025-01-31 07:51:10 -08:00
if (comm->collTraceEnabled) {
if (comm->collTraceThread)
pthread_join(comm->collTraceThread, NULL);
else
ncclCommThreadMain((void *)comm);
}
NCCLCHECK(ncclCudaFree((void *)comm->collTrace));
2026-01-08 16:07:18 -08:00
NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail));
2019-11-26 16:33:13 -08:00
#endif
2018-12-13 15:56:12 -08:00
free(comm->peerInfo);
2022-08-18 02:53:17 -07:00
if (comm->topo)
ncclTopoFree(comm->topo);
if (comm->nodeRanks) {
for (int n=0; n<comm->nNodes; n++) free(comm->nodeRanks[n].localRankToRank);
free(comm->nodeRanks);
}
2022-01-07 06:39:55 -08:00
free(comm->rankToNode);
free(comm->rankToLocalRank);
2023-04-03 05:32:07 -07:00
free(comm->collNetHeads);
2025-01-27 03:30:22 -08:00
free(comm->clique.ranks);
2018-12-13 15:56:12 -08:00
if (comm->bootstrap)
NCCLCHECK(bootstrapClose(comm->bootstrap));
2020-05-12 14:40:18 -07:00
for (int channel=0; channel<MAXCHANNELS; channel++)
2023-04-03 05:32:07 -07:00
NCCLCHECK(freeChannel(comm->channels+channel, comm->nRanks, 1, comm->localRanks));
2018-09-24 16:06:59 -07:00
if (comm->doneEvent != NULL)
2019-07-05 15:43:00 -07:00
CUDACHECK(hipEventDestroy(comm->doneEvent));
2018-09-24 16:06:59 -07:00
2023-04-03 05:32:07 -07:00
if (comm->sharedRes) {
if (ncclAtomicRefCountDecrement(&comm->sharedRes->refCount) == 0) {
for (int c=0; c<MAXCHANNELS; c++) {
if (comm->sharedRes->peers[c]) free(comm->sharedRes->peers[c]);
if (comm->sharedRes->devPeers[c]) ncclCudaFree(comm->sharedRes->devPeers[c]);
}
free(comm->sharedRes->tpRankToLocalRank);
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream));
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream));
2025-03-12 13:46:21 -07:00
CUDACHECK(cudaEventDestroy(comm->sharedRes->launchEvent));
CUDACHECK(cudaEventDestroy(comm->sharedRes->scratchEvent));
2023-11-13 10:26:55 -08:00
NCCLCHECK(ncclProxyDestroy(comm));
2023-04-03 05:32:07 -07:00
free(comm->sharedRes);
}
2022-08-18 02:53:17 -07:00
}
2018-09-24 16:06:59 -07:00
2023-05-05 07:55:20 -07:00
#if CUDART_VERSION >= 12010
2023-02-27 02:48:21 -08:00
if (comm->nvlsSupport) NCCLCHECK(ncclNvlsFree(comm));
2023-05-05 07:55:20 -07:00
#endif
2023-02-27 02:48:21 -08:00
2022-05-24 02:02:31 -07:00
struct ncclDestructor* dtor = comm->destructorHead;
while (dtor != nullptr) {
NCCLCHECK(dtor->fn(dtor));
dtor = dtor->next;
2018-09-24 16:06:59 -07:00
}
2022-08-18 02:53:17 -07:00
ncclMemoryStackDestruct(&comm->memScoped);
ncclMemoryStackDestruct(&comm->memPermanent);
2024-06-11 01:28:01 -07:00
abort = *comm->abortFlag;
2023-04-03 05:32:07 -07:00
if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) {
2024-06-11 01:28:01 -07:00
free(comm->abortFlag);
NCCLCHECK(ncclCudaHostFree((void*)comm->abortFlagDev));
2023-11-13 10:26:55 -08:00
free(comm->abortFlagRefCount);
2023-04-03 05:32:07 -07:00
}
free((void*)comm->config.netName);
free(comm->topParentRanks);
free(comm->topParentLocalRanks);
2024-09-10 05:57:10 -07:00
free(comm->gproxyConn);
2026-01-27 08:29:16 -07:00
free(comm->archName);
2019-03-14 19:39:20 -07:00
2024-02-05 05:06:02 -08:00
NCCLCHECK(ncclRegCleanup(comm));
2023-09-26 05:47:28 -07:00
2025-12-02 10:03:15 -08:00
NCCLCHECK(ncclDestroySideStream(comm->cudaDev));
2024-06-11 01:28:01 -07:00
INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx - %s COMPLETE", comm, comm->rank, comm->nRanks, comm->cudaDev, comm->busId, abort ? "Abort" : "Destroy");
2022-11-29 04:27:46 -08:00
commPoison(comm); // poison comm before free to avoid comm reuse.
2024-09-10 05:57:10 -07:00
NCCLCHECK(ncclProfilerPluginFinalize(comm));
2024-06-11 01:28:01 -07:00
NCCLCHECK(ncclNetFinalize(comm));
// Disable until we validate NCCL_LAUNCH_IMPLICIT_ORDER support.
// but can be enabled via environment variable
if (rcclParamEnableContextTracking() == 1) {
ncclCudaContextDrop(comm->context);
INFO(NCCL_INIT, "cudaDev %d context tracking destroyed", comm->cudaDev);
}
2022-11-29 04:27:46 -08:00
free(comm);
2022-08-18 02:53:17 -07:00
2018-09-24 16:06:59 -07:00
return ncclSuccess;
}
2025-07-14 08:55:39 -07:00
RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 1);
RCCL_PARAM(PivotAlltoallEnable, "PIVOT_ALLTOALL_ENABLE", 1);
2022-09-08 14:45:27 -07:00
RCCL_PARAM(LL128ForceEnable, "LL128_FORCE_ENABLE", 0);
2021-07-08 14:12:04 -07:00
NCCL_PARAM(AggChannelSize, "AGG_CHANNEL_SIZE", -2);
2021-09-08 13:56:25 -07:00
NCCL_PARAM(DisableGraphHelper, "GRAPH_HELPER_DISABLE", 0);
2022-05-24 02:02:31 -07:00
// GDRCOPY support: FIFO_ENABLE when enabled locates a workFifo in CUDA memory
NCCL_PARAM(GdrCopyFifoEnable, "GDRCOPY_FIFO_ENABLE", 1);
#define NCCL_WORK_FIFO_BYTES_DEFAULT (1<<22)
2024-06-11 01:28:01 -07:00
NCCL_PARAM(WorkFifoBytes, "WORK_FIFO_BYTES", NCCL_WORK_FIFO_BYTES_DEFAULT);
NCCL_PARAM(WorkArgsBytes, "WORK_ARGS_BYTES", INT64_MAX);
2022-05-24 02:02:31 -07:00
enum ncclLaunchMode ncclParamLaunchMode;
// Detect DMA-BUF support
static ncclResult_t dmaBufSupported(struct ncclComm* comm) {
2023-10-03 03:17:48 -06:00
if (comm->ncclNet->regMrDmaBuf == NULL || rocmLibraryInit() != ncclSuccess) return ncclInternalError;
2022-05-24 02:02:31 -07:00
#if CUDA_VERSION >= 11070
int flag = 0;
2022-11-07 14:09:26 -08:00
CUdevice dev;
2022-05-24 02:02:31 -07:00
int cudaDriverVersion;
2023-02-27 02:48:21 -08:00
CUDACHECK(cudaDriverGetVersion(&cudaDriverVersion));
if (CUPFN(cuDeviceGet) == NULL || cudaDriverVersion < 11070) return ncclInternalError;
2022-11-07 14:09:26 -08:00
CUCHECK(cuDeviceGet(&dev, comm->cudaDev));
2022-05-24 02:02:31 -07:00
// Query device to see if DMA-BUF support is available
2022-11-07 14:09:26 -08:00
(void) CUPFN(cuDeviceGetAttribute(&flag, CU_DEVICE_ATTRIBUTE_DMA_BUF_SUPPORTED, dev));
2022-05-24 02:02:31 -07:00
if (flag == 0) return ncclInternalError;
INFO(NCCL_INIT, "DMA-BUF is available on GPU device %d", comm->cudaDev);
return ncclSuccess;
#else
return pfn_hsa_amd_portable_export_dmabuf != NULL ? ncclSuccess : ncclInternalError;
2022-05-24 02:02:31 -07:00
#endif
return ncclInternalError;
}
2022-08-18 02:53:17 -07:00
ncclResult_t ncclCommEnsureReady(ncclComm_t comm) {
/* comm must be ready, or error will be reported */
ncclResult_t ret = ncclSuccess;
2024-06-11 01:28:01 -07:00
if (__atomic_load_n(comm->abortFlag, __ATOMIC_ACQUIRE)) {
2023-09-26 05:47:28 -07:00
ncclGroupJobAbort(comm->groupJob);
2022-08-18 02:53:17 -07:00
} else {
NCCLCHECK(ncclCommGetAsyncError(comm, &ret));
2025-03-12 13:46:21 -07:00
if (ret == ncclInProgress) {
2023-02-27 02:48:21 -08:00
WARN("Attempt to use communicator before the previous operation returned ncclSuccess");
2025-03-12 13:46:21 -07:00
ret = ncclInvalidArgument;
2022-08-18 02:53:17 -07:00
goto exit;
}
2025-03-12 13:46:21 -07:00
/* if ret is not ncclInProgress, we just keep it. */
2022-08-18 02:53:17 -07:00
}
exit:
return ret;
}
RCCL_PARAM(InjectFaults, "INJECT_FAULTS", 0);
2023-04-03 05:32:07 -07:00
static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, int ndev, int rank) {
2018-09-24 16:06:59 -07:00
if (ndev < 1) {
WARN("invalid device count (%d) requested", ndev);
return ncclInvalidArgument;
}
if (rank >= ndev || rank < 0) {
WARN("rank %d exceeds ndev=%d", rank, ndev);
return ncclInvalidArgument;
}
2022-05-24 02:02:31 -07:00
ncclMemoryStackConstruct(&comm->memPermanent);
ncclMemoryStackConstruct(&comm->memScoped);
comm->destructorHead = nullptr;
comm->rank = rank;
comm->nRanks = ndev;
NCCLCHECK(ncclNetInit(comm));
2023-04-03 05:32:07 -07:00
INFO(NCCL_INIT, "Using network %s", comm->ncclNet->name);
2022-05-24 02:02:31 -07:00
2025-05-29 20:56:40 -07:00
if (parent && parent->shareResources) {
2023-04-03 05:32:07 -07:00
if (parent->ncclNet != comm->ncclNet) {
WARN("Split shares resources, but parent comm netName %s is different from child comm netName %s", parent->ncclNet->name, comm->ncclNet->name);
return ncclInvalidUsage;
}
}
2018-09-24 16:06:59 -07:00
// Try to create a CUDA object right away. If there is something wrong with
// the device we're on (failure cause #1) , better know it early.
2019-07-05 15:43:00 -07:00
hipEvent_t doneEvent;
CUDACHECK(hipEventCreateWithFlags(&doneEvent, hipEventDisableTiming));
2018-09-24 16:06:59 -07:00
comm->doneEvent = doneEvent;
comm->lastStream = nullptr;
2023-04-03 05:32:07 -07:00
CUDACHECK(cudaGetDevice(&comm->cudaDev));
2018-09-24 16:06:59 -07:00
2025-12-02 10:03:15 -08:00
// RCCL: create persistent stream for calloc
NCCLCHECK(ncclCreateSideStream(comm->cudaDev));
// Disable until we validate NCCL_LAUNCH_IMPLICIT_ORDER support.
// but can be enabled via environment variable
if (rcclParamEnableContextTracking() == 1) {
NCCLCHECK(ncclCudaContextTrack(&comm->context));
INFO(NCCL_INIT, "cudaDev %d context tracking created", comm->cudaDev);
}
2025-03-12 13:46:21 -07:00
2019-11-19 14:57:39 -08:00
NCCLCHECK(getBusId(comm->cudaDev, &comm->busId));
char busId[]="0000:00:00.0";
NCCLCHECK(int64ToBusId(comm->busId, busId));
#ifdef USE_AMDSMI
NCCLCHECK(amd_smi_init());
NCCLCHECK(amd_smi_getDeviceIndexByPciBusId(busId, (unsigned int*)&comm->nvmlDev));
#else
NCCLCHECK(rocm_smi_init());
NCCLCHECK(rocm_smi_getDeviceIndexByPciBusId(busId, (unsigned int*)&comm->nvmlDev));
#endif
2022-11-29 04:27:46 -08:00
comm->compCap = ncclCudaCompCap();
TRACE(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx compCap %d", comm, rank, ndev, comm->cudaDev, comm->busId, comm->compCap);
2018-12-13 15:56:12 -08:00
2018-09-24 16:06:59 -07:00
comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false;
2022-05-24 02:02:31 -07:00
comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;
2018-09-24 16:06:59 -07:00
2019-11-26 16:33:13 -08:00
#ifdef ENABLE_COLLTRACE
2026-01-08 16:07:18 -08:00
NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS));
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS, hipDeviceMallocUncached));
2026-01-08 16:07:18 -08:00
#else
NCCLCHECK(ncclCudaCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS));
2026-01-08 16:07:18 -08:00
#endif
2023-08-03 07:16:12 -07:00
comm->collTraceExit = 0;
2025-01-31 07:51:10 -08:00
comm->collTraceEnabled = false; // we can enable colltrace without starting a thread
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) {
comm->collTraceEnabled = true;
if (rcclParamKernelCollTraceThreadEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->collTraceThread = 0;
}
2019-11-26 16:33:13 -08:00
#endif
if (rcclParamInjectFaults() != 0) {
#ifdef ENABLE_FAULT_INJECTION
comm->faults = rcclParamInjectFaults();
if (comm->rank == 0) INFO(NCCL_INIT, "Enabled RCCL faults injection with value 0x%lx", comm->faults);
#else
WARN("Ignore faults injection of value 0x%lx as RCCL is not compiled to support it", rcclParamInjectFaults());
#endif
}
2023-06-13 00:19:57 -07:00
memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));
2020-09-04 14:35:05 -07:00
2022-05-24 02:02:31 -07:00
ncclMemoryPoolConstruct(&comm->memPool_ncclKernelPlan);
ncclMemoryPoolConstruct(&comm->memPool_ncclProxyOp);
2025-05-29 20:56:40 -07:00
for (int i = 0; i < ncclGroupTaskTypeNum; i++) {
comm->groupNext[i] = reinterpret_cast<struct ncclComm*>(0x1);
}
2022-05-24 02:02:31 -07:00
comm->preconnectNext = reinterpret_cast<struct ncclComm*>(0x1);
2021-05-11 18:16:30 -07:00
2020-09-04 14:35:05 -07:00
static_assert(MAXCHANNELS <= sizeof(*comm->connectSend)*8, "comm->connectSend must have enough bits for all channels");
static_assert(MAXCHANNELS <= sizeof(*comm->connectRecv)*8, "comm->connectRecv must have enough bits for all channels");
NCCLCHECK(ncclCalloc(&comm->connectSend, comm->nRanks*NCCL_MAX_CONNS));
NCCLCHECK(ncclCalloc(&comm->connectRecv, comm->nRanks*NCCL_MAX_CONNS));
2020-09-04 14:35:05 -07:00
2020-05-12 14:40:18 -07:00
// Mark channels as non initialized.
2022-05-24 02:02:31 -07:00
for (int c=0; c < MAXCHANNELS; c++) comm->channels[c].id = -1;
2025-05-29 20:56:40 -07:00
if (parent == NULL || !parent->shareResources) {
2023-04-03 05:32:07 -07:00
struct ncclSharedResources* sharedRes = NULL;
NCCLCHECK(ncclCalloc(&sharedRes, 1));
/* most of attributes are assigned later in initTransportsRank(). */
sharedRes->owner = comm;
sharedRes->tpNRanks = comm->nRanks;
NCCLCHECK(ncclCalloc(&sharedRes->tpRankToLocalRank, comm->nRanks));
NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->deviceStream));
NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->hostStream));
2025-03-12 13:46:21 -07:00
CUDACHECK(cudaEventCreateWithFlags(&sharedRes->launchEvent, cudaEventDisableTiming));
CUDACHECK(cudaEventCreateWithFlags(&sharedRes->scratchEvent, cudaEventDisableTiming));
2023-04-03 05:32:07 -07:00
comm->sharedRes = sharedRes;
sharedRes->refCount = 1;
} else {
comm->sharedRes = parent->sharedRes;
ncclAtomicRefCountIncrement(&parent->sharedRes->refCount);
}
2019-11-26 16:33:13 -08:00
CUDACHECK(hipDeviceGetAttribute(&comm->WarpSize, hipDeviceAttributeWarpSize, comm->cudaDev));
2023-04-03 05:32:07 -07:00
if (comm->topParentRanks == NULL) {
NCCLCHECK(ncclCalloc(&comm->topParentRanks, comm->nRanks));
for (int i = 0; i < comm->nRanks; ++i)
comm->topParentRanks[i] = i;
}
ncclIntruQueueMpscConstruct(&comm->callbackQueue);
2024-09-10 05:57:10 -07:00
ncclIntruQueueConstruct(&comm->legacyRegCleanupQueue);
2025-09-02 13:21:14 -07:00
ncclIntruQueueConstruct(&comm->ceInitTaskQueue);
2024-02-05 05:06:02 -08:00
comm->regCache.pageSize = sysconf(_SC_PAGESIZE);
2024-09-10 05:57:10 -07:00
do {
cudaMemPoolProps props = {};
props.allocType = cudaMemAllocationTypePinned;
props.handleTypes = cudaMemHandleTypeNone;
props.location.type = cudaMemLocationTypeDevice;
props.location.id = comm->cudaDev;
CUDACHECK(cudaMemPoolCreate(&comm->memPool, &props));
uint64_t releaseThreshold = ~uint64_t(0);
CUDACHECK(cudaMemPoolSetAttribute(comm->memPool, cudaMemPoolAttrReleaseThreshold, &releaseThreshold));
} while (0);
ncclIntruQueueConstruct(&comm->eventCallbackQueue);
2018-09-24 16:06:59 -07:00
return ncclSuccess;
}
static ncclResult_t devCommSetup(ncclComm_t comm) {
2022-11-29 04:27:46 -08:00
ncclResult_t ret = ncclSuccess;
2022-05-24 02:02:31 -07:00
int nRanks = comm->nRanks;
2025-09-02 13:21:14 -07:00
struct ncclKernelCommAndChannels tmpCommAndChans;
struct ncclKernelCommAndChannels *devCommAndChans = NULL;
2025-08-05 17:36:23 -05:00
//struct ncclNvmlCCStatus ccStatus; //unused variable - compiler warning
bool ccEnable = false;
2025-03-12 13:46:21 -07:00
cudaStream_t deviceStream;
2022-11-29 04:27:46 -08:00
2025-05-29 20:56:40 -07:00
memset(&tmpCommAndChans, '\0', sizeof(tmpCommAndChans));
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclStrongStreamAcquire(ncclCudaGraphNone(), &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream), ret, fail);
NCCLCHECKGOTO(ncclCudaCallocAsync(&devCommAndChans, 1, deviceStream), ret, fail);
2022-05-24 02:02:31 -07:00
ncclCommPushCudaFree(comm, devCommAndChans);
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclCudaCallocAsync(&tmpCommAndChans.comm.rankToLocalRank, comm->nRanks, deviceStream), ret, fail);
2024-09-10 05:57:10 -07:00
ncclCommPushCudaFree(comm, tmpCommAndChans.comm.rankToLocalRank);
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclCudaMemcpyAsync(tmpCommAndChans.comm.rankToLocalRank, comm->rankToLocalRank, comm->nRanks, deviceStream), ret, fail);
2021-07-08 14:12:04 -07:00
comm->devComm = &devCommAndChans->comm;
2022-05-24 02:02:31 -07:00
tmpCommAndChans.comm.rank = comm->rank;
tmpCommAndChans.comm.nRanks = nRanks;
2024-02-05 05:06:02 -08:00
tmpCommAndChans.comm.node = comm->node;
tmpCommAndChans.comm.nNodes = comm->nNodes;
2024-06-11 01:28:01 -07:00
tmpCommAndChans.comm.abortFlag = comm->abortFlagDev;
2024-12-18 08:26:06 -08:00
tmpCommAndChans.comm.isAllNvlink = comm->isAllNvlink;
tmpCommAndChans.comm.p2pnChannelsPerPeer = comm->p2pnChannelsPerPeer;
2022-05-24 02:02:31 -07:00
for (int p=0; p < NCCL_NUM_PROTOCOLS; p++) {
tmpCommAndChans.comm.buffSizes[p] = comm->buffSizes[p];
}
2023-09-26 05:47:28 -07:00
tmpCommAndChans.comm.p2pChunkSize = comm->p2pChunkSize;
2022-05-24 02:02:31 -07:00
tmpCommAndChans.comm.channels = &devCommAndChans->channels[0];
2021-07-08 14:12:04 -07:00
2024-06-11 01:28:01 -07:00
comm->workArgsBytes = std::min<size_t>(ncclParamWorkArgsBytes(), ncclMaxKernelArgsSize(comm->cudaArch));
#if !defined(__HIP_PLATFORM_AMD__) && !defined(__HIPCC__)
2024-06-11 01:28:01 -07:00
memset(&ccStatus, 0, sizeof(ccStatus));
2025-09-02 13:21:14 -07:00
ccEnable = (ncclSuccess == ncclNvmlGetCCStatus(&ccStatus)) && (ccStatus.CCEnabled || ccStatus.multiGpuProtectedPCIE || ccStatus.multiGpuNVLE);
2024-12-18 08:26:06 -08:00
if (ccEnable) {
2024-06-11 01:28:01 -07:00
comm->workFifoBytes = 0;
} else {
2025-05-29 20:56:40 -07:00
comm->workFifoBytes = ncclParamWorkFifoBytes();
if (0 != (comm->workFifoBytes & (comm->workFifoBytes-1))) {
WARN("NCCL_WORK_FIFO_BYTES=%d is being ignored because it is not a power of 2.", comm->workFifoBytes);
comm->workFifoBytes = NCCL_WORK_FIFO_BYTES_DEFAULT;
2024-06-11 01:28:01 -07:00
}
2025-05-29 20:56:40 -07:00
comm->workFifoBytes = std::min(comm->workFifoBytes, 1u<<30);
2024-06-11 01:28:01 -07:00
}
#else
comm->workFifoBytes = ncclParamWorkFifoBytes();
if (0 != (comm->workFifoBytes & (comm->workFifoBytes-1))) {
WARN("NCCL_WORK_FIFO_BYTES=%d is being ignored because it is not a power of 2.", comm->workFifoBytes);
comm->workFifoBytes = NCCL_WORK_FIFO_BYTES_DEFAULT;
}
comm->workFifoBytes = std::min(comm->workFifoBytes, 1u<<30);
#endif
2024-06-11 01:28:01 -07:00
if (comm->rank == 0) {
2024-12-18 08:26:06 -08:00
INFO(NCCL_INIT, "CC %s, workFifoBytes %d", ccEnable ? "On" : "Off", comm->workFifoBytes);
2022-05-24 02:02:31 -07:00
}
2019-03-14 19:39:20 -07:00
2022-05-24 02:02:31 -07:00
if (ncclGdrCopy != NULL && ncclParamGdrCopyFifoEnable() == 1) {
2024-06-11 01:28:01 -07:00
// The workFifoBuf lives in GDR mapped CUDA memory.
2025-12-02 10:03:15 -08:00
NCCLCHECKGOTO(ncclGdrCudaCalloc(&comm->workFifoBuf, &comm->workFifoBufDev, comm->workFifoBytes, &comm->workFifoBufGdrHandle), ret, fail);
2024-06-11 01:28:01 -07:00
ncclCommPushCudaGdrFree(comm, comm->workFifoBufGdrHandle);
2022-05-24 02:02:31 -07:00
} else {
2024-06-11 01:28:01 -07:00
// The workFifoBuf lives in cudaHost memory.
comm->workFifoBufGdrHandle = nullptr;
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->workFifoBuf, comm->workFifoBytes), ret, fail);
ncclCommPushCudaHostFree(comm, comm->workFifoBuf);
comm->workFifoBufDev = comm->workFifoBuf;
2022-05-24 02:02:31 -07:00
}
2024-06-11 01:28:01 -07:00
comm->workFifoProduced = 0;
2025-05-29 20:56:40 -07:00
comm->workFifoProducedLastRecorded = 0;
comm->workFifoConsumed = 0;
2022-05-24 02:02:31 -07:00
2025-03-12 13:46:21 -07:00
// Alloc profiler counters for the kernel
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->profiler.workStarted, MAXCHANNELS), ret, fail);
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->profiler.workCompleted, MAXCHANNELS), ret, fail);
tmpCommAndChans.comm.workStarted = comm->profiler.workStarted;
tmpCommAndChans.comm.workCompleted = comm->profiler.workCompleted;
ncclCommPushCudaHostFree(comm, comm->profiler.workStarted);
ncclCommPushCudaHostFree(comm, comm->profiler.workCompleted);
2024-02-05 05:06:02 -08:00
if (comm->collNetDenseToUserRank != nullptr) {
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclCudaCallocAsync(&tmpCommAndChans.comm.collNetDenseToUserRank, nRanks, deviceStream), ret, fail);
2024-02-05 05:06:02 -08:00
ncclCommPushCudaFree(comm, tmpCommAndChans.comm.collNetDenseToUserRank);
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclCudaMemcpyAsync(tmpCommAndChans.comm.collNetDenseToUserRank, comm->collNetDenseToUserRank, nRanks, deviceStream), ret, fail);
2024-02-05 05:06:02 -08:00
}
2022-05-24 02:02:31 -07:00
for (int c=0; c < MAXCHANNELS; c++) {
tmpCommAndChans.channels[c].peers = comm->channels[c].devPeers;
tmpCommAndChans.channels[c].ring = comm->channels[c].ring;
tmpCommAndChans.channels[c].ring.userRanks = comm->channels[c].devRingUserRanks;
tmpCommAndChans.channels[c].tree = comm->channels[c].tree;
2022-08-18 02:53:17 -07:00
tmpCommAndChans.channels[c].collnetChain = comm->channels[c].collnetChain;
tmpCommAndChans.channels[c].collnetDirect = comm->channels[c].collnetDirect;
2022-09-14 15:29:30 +00:00
tmpCommAndChans.channels[c].binTree = comm->channels[c].binTree;
2023-02-27 02:48:21 -08:00
tmpCommAndChans.channels[c].nvls = comm->channels[c].nvls;
2022-05-24 02:02:31 -07:00
if (comm->channels[c].ring.userRanks != nullptr) {
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclCudaMemcpyAsync(tmpCommAndChans.channels[c].ring.userRanks, comm->channels[c].ring.userRanks, nRanks, deviceStream), ret, fail);
2022-05-24 02:02:31 -07:00
}
2018-09-24 16:06:59 -07:00
}
2019-03-14 19:39:20 -07:00
#ifdef ENABLE_COLLTRACE
tmpCommAndChans.comm.collTrace = comm->collTrace;
tmpCommAndChans.comm.collTraceTail = comm->collTraceTail;
tmpCommAndChans.comm.collTraceThread = comm->collTraceThread;
#endif
#if defined(ENABLE_NPKIT)
WARN("NPKIT is deprecated, please use Profiler Plugin instead!");
// Init NPKit
NCCLCHECK(NpKit::Init(comm->rank));
tmpCommAndChans.comm.npKitEventCollectContexts = NpKit::GetGpuEventCollectContexts();
2023-05-24 22:41:05 +08:00
tmpCommAndChans.comm.cpuTimestamp = NpKit::GetCpuTimestamp();
#endif
#ifdef ENABLE_PROFILING
2025-12-02 10:03:15 -08:00
NCCLCHECK(ncclCudaCalloc(&tmpCommAndChans.comm.devProf, MAXCHANNELS*PROFILE_NUM_LAUNCHES));
#endif
#ifdef ENABLE_FAULT_INJECTION
tmpCommAndChans.comm.faults = comm->faults;
#endif
2025-03-12 13:46:21 -07:00
NCCLCHECKGOTO(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, deviceStream), ret, fail);
2022-11-29 04:27:46 -08:00
exit:
2025-03-12 13:46:21 -07:00
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->deviceStream, /*concurrent=*/false));
2023-09-20 05:51:14 -07:00
NCCLCHECK(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream));
2022-11-29 04:27:46 -08:00
return ret;
fail:
goto exit;
2018-09-24 16:06:59 -07:00
}
// Pre-process the string so that running "strings" on the lib can quickly reveal the version.
2024-08-14 15:04:13 -06:00
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#define VERSION_STRING "RCCL version : " STR(NCCL_MAJOR) "." STR(NCCL_MINOR) "." STR(NCCL_PATCH) NCCL_SUFFIX
#define VERSION_STRING_EXTENDED "HIP version : " HIP_BUILD_INFO "\nROCm version : " ROCM_BUILD_INFO
2019-07-05 15:43:00 -07:00
#else
#define VERSION_STRING "NCCL version " STR(NCCL_MAJOR) "." STR(NCCL_MINOR) "." STR(NCCL_PATCH) NCCL_SUFFIX
#define VERSION_STRING_EXTENDED "CUDA version " STR(CUDA_MAJOR) "." STR(CUDA_MINOR)
2019-07-05 15:43:00 -07:00
#endif
2018-09-24 16:06:59 -07:00
static void showVersion() {
char versionInfo[2048+2*HOST_NAME_MAX], hostInfo[HOST_NAME_MAX], libPathInfo[2048];
2024-08-14 15:04:13 -06:00
// Retrieve Hostname info
if (gethostname(hostInfo, sizeof(hostInfo)-1) != 0) {
// Returns Unknown in hostInfo if function call unsuccessful
strncpy(hostInfo, "Unknown", sizeof(hostInfo)-1);
}
// Retrieve librccl path
Dl_info pathInfo;
if (dladdr((void*)ncclCommInitRank, &pathInfo)) {
strncpy(libPathInfo, pathInfo.dli_fname, sizeof(libPathInfo)-1);
} else {
// Sets libPath to Unknown if the above function call is not successful
strncpy(libPathInfo, "Unknown", sizeof(libPathInfo)-1);
}
snprintf(versionInfo, sizeof(versionInfo),
"%s-%s\n%s\n"
"%-12s : %s\n%12s : %s",
VERSION_STRING, rcclGitHash, VERSION_STRING_EXTENDED,
"Hostname", hostInfo, "Librccl path", libPathInfo
);
2024-06-11 01:28:01 -07:00
if (ncclDebugLevel == NCCL_LOG_VERSION || ncclDebugLevel == NCCL_LOG_WARN) {
VERSION("%s", versionInfo);
2024-06-11 01:28:01 -07:00
} else {
INFO(NCCL_ALL,"%s", versionInfo);
2018-09-24 16:06:59 -07:00
}
}
2025-01-27 03:30:22 -08:00
NCCL_PARAM(MNNVLUUID, "MNNVL_UUID", -1);
2024-09-10 05:57:10 -07:00
NCCL_PARAM(MNNVLCliqueId, "MNNVL_CLIQUE_ID", -1);
2019-11-19 14:57:39 -08:00
static ncclResult_t fillInfo(struct ncclComm* comm, struct ncclPeerInfo* info, uint64_t commHash) {
2025-05-29 20:56:40 -07:00
cudaDeviceProp prop;
2019-11-19 14:57:39 -08:00
info->rank = comm->rank;
info->cudaDev = comm->cudaDev;
info->nvmlDev = comm->nvmlDev;
2024-09-10 05:57:10 -07:00
NCCLCHECK(ncclGetVersion(&info->version));
2019-07-05 15:43:00 -07:00
info->hostHash=getHostHash()+commHash;
info->pidHash=getPidHash()+commHash;
2024-06-11 01:28:01 -07:00
info->cuMemSupport = ncclCuMemEnable();
2025-05-29 20:56:40 -07:00
CUDACHECK(cudaGetDeviceProperties(&prop, comm->cudaDev));
info->totalGlobalMem = ROUNDUP(prop.totalGlobalMem, (1L << 32));
2018-12-13 15:56:12 -08:00
2019-11-19 14:57:39 -08:00
// Get the device MAJOR:MINOR of /dev/shm so we can use that
// information to decide whether we can use SHM for inter-process
// communication in a container environment
struct stat statbuf;
SYSCHECK(stat("/dev/shm", &statbuf), "stat");
info->shmDev = statbuf.st_dev;
info->busId = comm->busId;
// detect if fine grained memory is available on this GPU
int *ptr;
2023-07-21 07:31:27 -07:00
#if defined(HIP_UNCACHED_MEMORY)
if (hipExtMallocWithFlags((void**)&ptr, sizeof(int), hipDeviceMallocUncached) == hipSuccess) {
2023-07-21 07:31:27 -07:00
#else
if (hipExtMallocWithFlags((void**)&ptr, sizeof(int), hipDeviceMallocFinegrained) == hipSuccess) {
2023-07-21 07:31:27 -07:00
#endif
CUDACHECK(hipFree(ptr));
info->hasFineGrain = true;
2024-06-25 08:00:15 -07:00
// GPU supports GDR if DMABUF is supported
if (dmaBufSupported(comm) == ncclSuccess)
info->gdrSupport = 1;
else
NCCLCHECK(ncclGpuGdrSupport(comm, &info->gdrSupport));
}
else {
info->hasFineGrain = false;
info->gdrSupport = 0;
}
comm->hasFineGrain = info->hasFineGrain;
2022-01-07 06:39:55 -08:00
info->comm = comm;
2023-04-03 05:32:07 -07:00
info->cudaCompCap = comm->minCompCap = comm->maxCompCap = comm->compCap;
2024-02-05 05:06:02 -08:00
2024-08-14 15:04:13 -06:00
#if !defined(__HIP_PLATFORM_AMD__) && !defined(__HIPCC__)
2024-02-05 05:06:02 -08:00
// MNNVL support
{
// MNNVL: Request the fabric UUID and partition info
char busId[NVML_DEVICE_PCI_BUS_ID_BUFFER_SIZE];
nvmlDevice_t nvmlDev;
NCCLCHECK(int64ToBusId(info->busId, busId));
NCCLCHECK(ncclNvmlDeviceGetHandleByPciBusId(busId, &nvmlDev));
info->fabricInfo.state = NVML_GPU_FABRIC_STATE_NOT_SUPPORTED;
(void) ncclNvmlDeviceGetGpuFabricInfoV(nvmlDev, &info->fabricInfo);
if (info->fabricInfo.state != NVML_GPU_FABRIC_STATE_NOT_SUPPORTED) {
2025-09-02 13:21:14 -07:00
unsigned long uuid0 = 0;
unsigned long uuid1 = 0;
2025-01-27 03:30:22 -08:00
if (ncclParamMNNVLUUID() != -1) {
2025-09-02 13:21:14 -07:00
unsigned long temp_uuid0 = (unsigned long)ncclParamMNNVLUUID();
unsigned long temp_uuid1 = (unsigned long)ncclParamMNNVLUUID();
memcpy(info->fabricInfo.clusterUuid, &temp_uuid0, sizeof(temp_uuid0));
memcpy(info->fabricInfo.clusterUuid + sizeof(temp_uuid0), &temp_uuid1, sizeof(temp_uuid1));
2025-01-27 03:30:22 -08:00
}
2025-09-02 13:21:14 -07:00
memcpy(&uuid0, info->fabricInfo.clusterUuid, sizeof(uuid0));
memcpy(&uuid1, info->fabricInfo.clusterUuid + sizeof(uuid0), sizeof(uuid1));
if (ncclParamMNNVLCliqueId() == -2) {
nvmlPlatformInfo_t platformInfo = { 0 };
NCCLCHECK(ncclNvmlDeviceGetPlatformInfo(nvmlDev, &platformInfo));
INFO(NCCL_INIT, "MNNVL rack serial %s slot %d tray %d hostId %d peerType %d moduleId %d",
platformInfo.chassisSerialNumber, platformInfo.slotNumber, platformInfo.trayIndex,
platformInfo.hostId, platformInfo.peerType, platformInfo.moduleId);
// Use a hash of the Rack serial number to partition the NVLD clique
info->fabricInfo.cliqueId = getHash(platformInfo.chassisSerialNumber, sizeof(platformInfo.chassisSerialNumber));
} else if (ncclParamMNNVLCliqueId() != -1) info->fabricInfo.cliqueId = ncclParamMNNVLCliqueId();
2024-02-05 05:06:02 -08:00
INFO(NCCL_INIT, "MNNVL busId 0x%lx fabric UUID %lx.%lx cliqueId 0x%x state %d healthMask 0x%x",
info->busId,
2025-09-02 13:21:14 -07:00
uuid0, uuid1,
2024-02-05 05:06:02 -08:00
info->fabricInfo.cliqueId, info->fabricInfo.state, info->fabricInfo.healthMask);
}
}
#endif
2024-02-05 05:06:02 -08:00
2018-09-24 16:06:59 -07:00
return ncclSuccess;
}
2019-11-19 14:57:39 -08:00
static ncclResult_t setupChannel(struct ncclComm* comm, int channelId, int rank, int nranks, int* ringRanks) {
2018-12-13 15:56:12 -08:00
TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
NCCLCHECK(initChannel(comm, channelId));
2019-11-19 14:57:39 -08:00
struct ncclRing* ring = &comm->channels[channelId].ring;
2021-07-08 14:12:04 -07:00
// Find our ring-distance from rank zero and reorganize ranks to start with rank.
int ixZero=0, ixRank=0;
for (int i=0; i < nranks; i++) {
if (ringRanks[i] == 0) ixZero = i;
if (ringRanks[i] == rank) ixRank = i;
2018-09-24 16:06:59 -07:00
}
2021-07-08 14:12:04 -07:00
ring->index = (ixRank-ixZero + nranks)%nranks;
2018-09-24 16:06:59 -07:00
for (int i=0; i<nranks; i++) {
2021-07-08 14:12:04 -07:00
ring->userRanks[i] = ringRanks[(i+ixRank)%nranks];
2018-09-24 16:06:59 -07:00
}
return ncclSuccess;
}
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#else
2020-05-12 14:40:18 -07:00
#define DEFAULT_LL_BUFFSIZE (NCCL_LL_LINES_PER_THREAD*NCCL_LL_MAX_NTHREADS*NCCL_STEPS*sizeof(union ncclLLFifoLine))
#define DEFAULT_LL128_BUFFSIZE (NCCL_LL128_ELEMS_PER_THREAD*NCCL_LL128_MAX_NTHREADS*NCCL_STEPS*sizeof(uint64_t))
2020-09-04 14:35:05 -07:00
#define DEFAULT_BUFFSIZE (1 << 22) /* 4MiB */
#endif
2020-05-12 14:40:18 -07:00
NCCL_PARAM(BuffSize, "BUFFSIZE", -2);
NCCL_PARAM(LlBuffSize, "LL_BUFFSIZE", -2);
NCCL_PARAM(Ll128BuffSize, "LL128_BUFFSIZE", -2);
// Default value of P2P_NET_CHUNKSIZE may be overwritten by changes in src/rccl_wrap.cc
2022-08-18 02:53:17 -07:00
NCCL_PARAM(P2pNetChunkSize, "P2P_NET_CHUNKSIZE", (1 << 17)); /* 128 kB */
2022-11-29 04:27:46 -08:00
NCCL_PARAM(P2pPciChunkSize, "P2P_PCI_CHUNKSIZE", (1 << 17)); /* 128 kB */
NCCL_PARAM(P2pNvlChunkSize, "P2P_NVL_CHUNKSIZE", (1 << 19)); /* 512 kB */
2022-08-18 02:53:17 -07:00
2020-05-12 14:40:18 -07:00
static ncclResult_t computeBuffSizes(struct ncclComm* comm) {
int64_t envs[NCCL_NUM_PROTOCOLS] = { ncclParamLlBuffSize(), ncclParamLl128BuffSize(), ncclParamBuffSize() };
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
int defaults[NCCL_NUM_PROTOCOLS];
rcclSetDefaultBuffSizes(comm, defaults);
#else
2020-05-12 14:40:18 -07:00
int defaults[NCCL_NUM_PROTOCOLS] = { DEFAULT_LL_BUFFSIZE, DEFAULT_LL128_BUFFSIZE, DEFAULT_BUFFSIZE };
#endif
2020-05-12 14:40:18 -07:00
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
2022-05-24 02:02:31 -07:00
comm->buffSizes[p] = envs[p] != -2 ? envs[p] : defaults[p];
2018-12-13 15:56:12 -08:00
}
2022-08-18 02:53:17 -07:00
if (comm->nNodes > 1) {
rcclSetP2pNetChunkSize(comm, comm->p2pChunkSize);
comm->p2pChunkSize = (comm->p2pChunkSize > RCCL_VALUE_INVALID)? comm->p2pChunkSize : ncclParamP2pNetChunkSize();
}
2024-12-18 08:26:06 -08:00
else if (comm->isAllNvlink) comm->p2pChunkSize = ncclParamP2pNvlChunkSize();
2022-11-29 04:27:46 -08:00
else comm->p2pChunkSize = ncclParamP2pPciChunkSize();
2023-09-26 05:47:28 -07:00
// Make sure P2P chunksize is not larger than coll chunksize.
if (comm->p2pChunkSize * NCCL_STEPS > comm->buffSizes[NCCL_PROTO_SIMPLE]) comm->p2pChunkSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS;
2023-04-03 05:32:07 -07:00
if (comm->sharedRes->owner != comm) {
/* make sure split comm p2pChunkSize won't exceed shared p2pChunkSize. */
comm->p2pChunkSize = std::min(comm->p2pChunkSize, comm->sharedRes->tpP2pChunkSize);
} else {
comm->sharedRes->tpP2pChunkSize = comm->p2pChunkSize;
}
2022-11-29 04:27:46 -08:00
INFO(NCCL_INIT, "P2P Chunksize set to %d", comm->p2pChunkSize);
2018-12-13 15:56:12 -08:00
return ncclSuccess;
}
2020-01-16 16:02:42 -08:00
NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0);
2021-05-11 18:16:30 -07:00
NCCL_PARAM(CollNetNodeThreshold, "COLLNET_NODE_THRESHOLD", 2);
NCCL_PARAM(NvbPreconnect, "NVB_PRECONNECT", 0);
2023-02-02 12:52:47 -08:00
NCCL_PARAM(AllocP2pNetLLBuffers, "ALLOC_P2P_NET_LL_BUFFERS", 0);
#ifdef ENABLE_WARP_SPEED
extern int64_t rcclParamWarpSpeedEnable();
extern int64_t rcclParamWarpSpeedAutoMode();
extern int64_t rcclParamWarpSpeedCuCount();
#endif
2024-02-05 05:06:02 -08:00
// MNNVL: Flag to indicate whether to enable Multi-Node NVLink
2024-03-26 06:08:55 -07:00
NCCL_PARAM(MNNVLEnable, "MNNVL_ENABLE", 2);
2024-06-11 01:28:01 -07:00
#define TIMER_INIT_TOTAL 0
#define TIMER_INIT_KERNELS 1
#define TIMER_INIT_BOOTSTRAP 2
#define TIMER_INIT_ALLGATHER 3
#define TIMER_INIT_TOPO 4
#define TIMER_INIT_GRAPHS 5
#define TIMER_INIT_CONNECT 6
2024-09-10 05:57:10 -07:00
#define TIMER_INIT_ALLOC 7
#define TIMERS_INIT_COUNT 8
2024-06-11 01:28:01 -07:00
2025-09-02 13:21:14 -07:00
static ncclResult_t initNvlDomainInfo(struct ncclComm* comm) {
// Initialize NVLink domain info
comm->nvlDomainInfo.nNvlDomains = comm->nNodes;
comm->nvlDomainInfo.minRanksPerNvlDomain = comm->minLocalRanks;
comm->nvlDomainInfo.maxRanksPerNvlDomain = comm->maxLocalRanks;
TRACE(NCCL_INIT, "NVLink domains: %d domains, min ranks per domain: %d, max ranks per domain: %d",
comm->nNodes, comm->nvlDomainInfo.minRanksPerNvlDomain, comm->nvlDomainInfo.maxRanksPerNvlDomain);
return ncclSuccess;
}
2024-06-11 01:28:01 -07:00
static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* parent, uint64_t timers[TIMERS_INIT_COUNT]) {
2020-09-04 14:35:05 -07:00
// We use 2 AllGathers
// 1. { peerInfo, comm, compCap}
// 2. { nChannels, graphInfo, topoRanks }
2022-11-29 04:27:46 -08:00
ncclResult_t ret = ncclSuccess;
2018-09-24 16:06:59 -07:00
int rank = comm->rank;
int nranks = comm->nRanks;
2024-02-26 02:52:26 -08:00
int nNodes = 1;
2022-11-29 04:27:46 -08:00
cpu_set_t affinitySave;
2024-06-11 01:28:01 -07:00
struct ncclTopoGraph* ringGraph = &comm->graphs[NCCL_ALGO_RING];
struct ncclTopoGraph* treeGraph = &comm->graphs[NCCL_ALGO_TREE];
struct ncclTopoGraph* collNetChainGraph = &comm->graphs[NCCL_ALGO_COLLNET_CHAIN];
struct ncclTopoGraph* collNetDirectGraph = &comm->graphs[NCCL_ALGO_COLLNET_DIRECT];
struct ncclTopoGraph* nvlsGraph = &comm->graphs[NCCL_ALGO_NVLS];
2024-09-10 05:57:10 -07:00
struct ncclTopoGraph* graphs[NCCL_NUM_ALGORITHMS] = { treeGraph, ringGraph, collNetDirectGraph, collNetChainGraph, nvlsGraph, nvlsGraph, treeGraph };
2022-11-29 04:27:46 -08:00
struct graphInfo {
int pattern;
int nChannels;
int sameChannels;
float bwIntra;
float bwInter;
int typeIntra;
int typeInter;
2024-03-26 06:08:55 -07:00
int crossNic;
2022-11-29 04:27:46 -08:00
};
struct allGatherInfo {
2023-04-03 05:32:07 -07:00
struct graphInfo graphInfo[NCCL_NUM_ALGORITHMS];
2022-11-29 04:27:46 -08:00
struct ncclTopoRanks topoRanks;
2024-06-11 01:28:01 -07:00
int cpuArch;
int cpuVendor;
2025-05-29 20:56:40 -07:00
int localRanks;
int nc;
bool pivotA2AEnabled;
bool ll128Enabled;
bool mscclEnabled;
2022-11-29 04:27:46 -08:00
};
int nChannelsOrig;
struct allGatherInfo *allGather3Data = NULL;
struct ncclTopoRanks** allTopoRanks = NULL;
int *nodesFirstRank = NULL, *nodesTreePatterns = NULL;
int *rings = NULL;
int* nvbPeers = NULL;
struct ncclProxyConnector proxyConn;
int* pxnPeers = NULL;
2023-04-03 05:32:07 -07:00
int *topParentLocalRanks = NULL;
2025-05-29 20:56:40 -07:00
int p2pLevel = -1;
2018-09-24 16:06:59 -07:00
2023-09-12 06:30:04 +08:00
bool needsProxy = false;
bool mscclNeedsProxy = needsProxy;
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_ALLGATHER] = clockNano();
2018-12-13 15:56:12 -08:00
// AllGather1 - begin
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclCalloc(&comm->peerInfo, nranks+1), ret, fail); // Extra rank to represent CollNet root
2023-04-03 05:32:07 -07:00
NCCLCHECKGOTO(fillInfo(comm, comm->peerInfo+rank, comm->commHash), ret, fail);
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, comm->peerInfo, sizeof(struct ncclPeerInfo)), ret, fail);
2025-05-29 20:56:40 -07:00
__atomic_store_n(&comm->peerInfoValid, true, __ATOMIC_RELEASE);
2020-09-04 14:35:05 -07:00
2024-06-11 01:28:01 -07:00
comm->cuMemSupport = 1;
2020-09-04 14:35:05 -07:00
for (int i = 0; i < nranks; i++) {
2024-09-10 05:57:10 -07:00
if (comm->peerInfo[i].version != comm->peerInfo[rank].version) {
WARN("Mismatched NCCL version detected : rank %d version %d rank %d version %d",
i, comm->peerInfo[i].version, rank, comm->peerInfo[rank].version);
ret = ncclInvalidUsage;
goto fail;
}
2024-02-26 02:52:26 -08:00
if (comm->peerInfo[i].hostHash != comm->peerInfo[rank].hostHash) nNodes++;
2024-06-11 01:28:01 -07:00
if (!comm->peerInfo[i].cuMemSupport) comm->cuMemSupport = 0;
2022-01-07 06:39:55 -08:00
if ((i != rank) && (comm->peerInfo[i].hostHash == comm->peerInfo[rank].hostHash) && (comm->peerInfo[i].busId == comm->peerInfo[rank].busId)) {
WARN("Duplicate GPU detected : rank %d and rank %d both on CUDA device %lx", rank, i, comm->peerInfo[rank].busId);
2022-11-29 04:27:46 -08:00
ret = ncclInvalidUsage;
goto fail;
2020-09-04 14:35:05 -07:00
}
}
2024-04-15 12:03:57 -06:00
2018-12-13 15:56:12 -08:00
// AllGather1 - end
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_ALLGATHER] = clockNano() - timers[TIMER_INIT_ALLGATHER];
2018-12-13 15:56:12 -08:00
2025-01-27 03:30:22 -08:00
// Check for MNNVL support
2025-05-29 20:56:40 -07:00
NCCLCHECKGOTO(ncclGetUserP2pLevel(&p2pLevel), ret, fail);
if ((nNodes > 1 && ncclParamMNNVLEnable() != 0 && p2pLevel != 0) || ncclParamMNNVLEnable() == 1) {
2025-01-27 03:30:22 -08:00
NCCLCHECKGOTO(ncclMnnvlCheck(comm), ret, fail);
2024-02-05 05:06:02 -08:00
}
2022-11-29 04:27:46 -08:00
do {
// Compute intra-process ranks
int intraProcRank0 = -1, intraProcRank = -1, intraProcRanks = 0;
2024-03-26 06:08:55 -07:00
for (int i = 0; i < nranks; i++) comm->minCompCap = std::min(comm->minCompCap, comm->peerInfo[i].cudaCompCap);
for (int i = 0; i < nranks; i++) comm->maxCompCap = std::max(comm->maxCompCap, comm->peerInfo[i].cudaCompCap);
2023-09-26 05:47:28 -07:00
comm->nvlsRegSupport = 1;
2022-11-29 04:27:46 -08:00
for (int i = 0; i < nranks; i++) {
if ((comm->peerInfo[i].hostHash == comm->peerInfo[rank].hostHash)
&& (comm->peerInfo[i].pidHash == comm->peerInfo[rank].pidHash)) {
// Rank is in same process
if (intraProcRanks == 0) intraProcRank0 = i;
if (i == rank) intraProcRank = intraProcRanks;
intraProcRanks++;
if (intraProcRank0 == rank && rank != i) {
comm->peerInfo[i].comm->intraNext = comm->intraNext;
comm->intraNext = comm->peerInfo[i].comm;
}
}
2023-09-26 05:47:28 -07:00
if (comm->nvlsRegSupport) {
for (int j = i + 1; j < nranks; j++) {
if (comm->peerInfo[i].hostHash == comm->peerInfo[j].hostHash &&
comm->peerInfo[i].pidHash == comm->peerInfo[j].pidHash) {
comm->nvlsRegSupport = 0;
break;
}
}
}
2022-11-29 04:27:46 -08:00
}
2024-03-26 06:08:55 -07:00
// Buffer Registration is not supported with MNNVL
if (comm->MNNVL) comm->nvlsRegSupport = 0;
2025-09-02 13:21:14 -07:00
else if (ncclParamSingleProcMemRegEnable()) comm->nvlsRegSupport = 1;
2024-03-26 06:08:55 -07:00
2022-11-29 04:27:46 -08:00
TRACE(NCCL_INIT,"pidHash[%d] %lx intraProcRank %d intraProcRanks %d intraProcRank0 %d",
rank, comm->peerInfo[rank].pidHash, intraProcRank, intraProcRanks, intraProcRank0);
if (intraProcRank == -1 || intraProcRank0 == -1 || comm->peerInfo[intraProcRank0].comm == NULL) {
WARN("Failed to determine intra proc ranks rank %d hostHash %lx pidHash %lx intraProcRank %d intraProcRanks %d intraProcRank0 %d",
rank, comm->peerInfo[rank].hostHash, comm->peerInfo[rank].pidHash,
intraProcRank, intraProcRanks, intraProcRank0);
ret = ncclInternalError;
goto fail;
}
#if defined(ENABLE_NPKIT)
if (intraProcRanks != 1) {
WARN("NPKit currently does not support more than 1 device per process");
ret = ncclInternalError;
goto fail;
}
#endif
2022-11-29 04:27:46 -08:00
struct ncclComm* comm0 = comm->peerInfo[intraProcRank0].comm;
assert(intraProcRank==0 ? comm==comm0 : true);
comm->intraComm0 = comm0;
comm->intraRank = intraProcRank;
comm->intraRanks = intraProcRanks;
comm->intraBarrierPhase = 0;
comm->intraBarrierCounter = 0;
comm->intraBarrierGate = 0;
} while(0);
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_TOPO] = clockNano();
2024-12-18 08:26:06 -08:00
// Dump XML if requested by user
const char* dumpXmlFile;
dumpXmlFile = ncclGetEnv("NCCL_TOPO_DUMP_FILE");
if (dumpXmlFile) {
NCCLCHECKGOTO(ncclTopoGetSystem(comm, NULL, dumpXmlFile), ret, fail);
}
2019-11-19 14:57:39 -08:00
// Topo detection / System graph creation
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoGetSystem(comm, &comm->topo), ret, fail);
comm->topo->tuning = rcclGetTuningIndexForArch(comm->archName);
INFO(NCCL_INIT, "Tuning index set to: %d", comm->topo->tuning);
// save nRanks to ncclTopoSystem as indicator of multi-node
comm->topo->nRanks = comm->nRanks;
2021-11-05 08:53:47 -07:00
// init netGdrLevel
comm->topo->netGdrLevel = -2;
// init Pivot A2A related fields
comm->topo->pivotA2AEnabled = false;
comm->topo->pivotA2ANumBiRings = 0;
2022-09-08 14:45:27 -07:00
// LL128
comm->topo->ll128Enabled = false;
// Topology hint for MSCCL internal scheduler about whether to enable MSCCL
comm->topo->mscclEnabled = false;
// Topology hint if tree has been defined by model or User
comm->topo->treeDefined = false;
2019-11-19 14:57:39 -08:00
// Compute paths between GPUs and NICs
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoComputePaths(comm->topo, comm), ret, fail);
2019-11-19 14:57:39 -08:00
// Remove inaccessible GPUs and unused NICs
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoTrimSystem(comm->topo, comm), ret, fail);
2019-11-19 14:57:39 -08:00
// Recompute paths after trimming
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoComputePaths(comm->topo, comm), ret, fail);
2020-01-16 16:02:42 -08:00
// Init search
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoSearchInit(comm->topo), ret, fail);
2024-06-11 01:28:01 -07:00
// Decide on comm's CPU architecture.
NCCLCHECKGOTO(ncclTopoComputeCommCPU(comm), ret, fail);
2019-11-19 14:57:39 -08:00
// Print final topology
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoPrint(comm->topo), ret, fail);
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_TOPO] = clockNano() - timers[TIMER_INIT_TOPO];
2019-11-19 14:57:39 -08:00
2022-01-07 06:39:55 -08:00
// Set Affinity to a CPU local the our GPU, so that all memory we allocate
// on the host is local.
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclTopoGetCpuAffinity(comm->topo, comm->rank, &comm->cpuAffinity), ret, fail);
2022-01-07 06:39:55 -08:00
if (CPU_COUNT(&comm->cpuAffinity)) {
sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave);
sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity);
}
2023-04-03 05:32:07 -07:00
// Determine local CollNet support
2025-05-29 20:56:40 -07:00
if (!collNetSupport(comm)) {
comm->config.collnetEnable = 0;
2023-04-03 05:32:07 -07:00
}
// Determine local Nvls support
NCCLCHECK(ncclNvlsInit(comm));
2022-01-07 06:39:55 -08:00
2024-04-15 12:03:57 -06:00
// [RCCL] Compute hostIdx (based on hostHash)
{
comm->topo->nHosts = 0;
for (int r = 0; r < nranks; r++) {
int isNewHost = 1;
// Check if this is the first time this hostname has been used
for (int i = 0; i < r && isNewHost; i++) {
if (comm->peerInfo[i].hostHash == comm->peerInfo[r].hostHash) {
isNewHost = 0;
}
}
if (isNewHost)
{
// Check if this is the same hostname associated with this rank
if (comm->peerInfo[r].hostHash == comm->peerInfo[rank].hostHash)
comm->topo->hostIdx = comm->topo->nHosts;
comm->topo->nHosts++;
}
}
}
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_GRAPHS] = clockNano();
2021-04-07 11:29:44 -06:00
// Get rings and trees
2024-06-11 01:28:01 -07:00
memset(ringGraph, 0, sizeof(struct ncclTopoGraph));
ringGraph->id = 0;
ringGraph->pattern = NCCL_TOPO_PATTERN_RING;
ringGraph->minChannels = 1;
ringGraph->maxChannels = MAXCHANNELS/2;
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, ringGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, ringGraph), ret, fail);
memset(treeGraph, 0, sizeof(struct ncclTopoGraph));
treeGraph->id = 1;
treeGraph->pattern = NCCL_TOPO_PATTERN_BALANCED_TREE;
treeGraph->minChannels = ringGraph->nChannels;
treeGraph->maxChannels = ringGraph->nChannels;
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, treeGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, treeGraph), ret, fail);
memset(collNetChainGraph, 0, sizeof(struct ncclTopoGraph));
collNetChainGraph->id = 2;
collNetChainGraph->pattern = NCCL_TOPO_PATTERN_TREE;
collNetChainGraph->collNet = 1;
collNetChainGraph->minChannels = ringGraph->nChannels;
collNetChainGraph->maxChannels = ringGraph->nChannels;
memset(collNetDirectGraph, 0, sizeof(struct ncclTopoGraph));
2024-09-10 05:57:10 -07:00
collNetDirectGraph->id = 4;
2024-06-11 01:28:01 -07:00
collNetDirectGraph->pattern = NCCL_TOPO_PATTERN_COLLNET_DIRECT;
collNetDirectGraph->collNet = 1;
collNetDirectGraph->minChannels = 1;
collNetDirectGraph->maxChannels = MAXCHANNELS;
2025-05-29 20:56:40 -07:00
if (comm->config.collnetEnable) {
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, collNetChainGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, collNetChainGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, collNetDirectGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, collNetDirectGraph), ret, fail);
2023-04-03 05:32:07 -07:00
}
2024-06-11 01:28:01 -07:00
memset(nvlsGraph, 0, sizeof(struct ncclTopoGraph));
nvlsGraph->id = 3;
nvlsGraph->pattern = NCCL_TOPO_PATTERN_NVLS;
nvlsGraph->minChannels = 1;
nvlsGraph->maxChannels = MAXCHANNELS;
2023-04-03 05:32:07 -07:00
if (comm->nvlsSupport) {
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, nvlsGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, nvlsGraph), ret, fail);
2023-04-03 05:32:07 -07:00
}
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_GRAPHS] = clockNano() - timers[TIMER_INIT_GRAPHS];
2021-04-07 11:29:44 -06:00
bool allXgmi, hasPeerAccess;
allXgmi = true;
hasPeerAccess = true;
// Check that all the GPUs have peer access to one another and are XGMI connected
for (int i = 0; i < nranks && hasPeerAccess; i++) {
int cudaDev1 = comm->peerInfo[i].cudaDev;
for (int j = 0; j < nranks; j++) {
if (i == j) continue;
int cudaDev2 = comm->peerInfo[j].cudaDev;
int p2p;
if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p)
{
hasPeerAccess = false;
break;
}
bool isXGMI;
// Limit to single intermediate GPU for enabling clique
NCCLCHECK(ncclTopoGetLinkType(comm->topo, i, j, &isXGMI, 1));
allXgmi &= isXGMI;
}
}
2022-08-18 02:53:17 -07:00
// Initialize num P2P LL buffers for this communicator
comm->allocP2pNetLLBuffers = ncclParamAllocP2pNetLLBuffers() == 1;
2020-01-16 16:02:42 -08:00
if (comm->rank == ncclParamGraphDumpFileRank()) {
2024-06-11 01:28:01 -07:00
struct ncclTopoGraph* dumpGraphs[5] = { ringGraph, treeGraph, collNetDirectGraph, collNetChainGraph, nvlsGraph };
NCCLCHECKGOTO(ncclTopoDumpGraphs(comm->topo, 5, dumpGraphs), ret, fail);
2021-04-12 16:00:11 -07:00
}
if ((comm->topo->type & RCCL_TOPO_4P2H_ROME) && (comm->topo->type & RCCL_TOPO_GDR_ALL)) {
if (rcclParamP2pNetDisable() == 0) {
2022-05-31 11:31:30 -07:00
if (!(comm->topo->type & RCCL_TOPO_FORCE_INTRA)) comm->p2pNet = 1;
INFO(NCCL_INIT, "RCCL enabled same node P2P over network");
}
else
INFO(NCCL_INIT, "RCCL force disabled same node P2P over network");
}
2024-06-11 01:28:01 -07:00
// Because timers[[TIMER_INIT_ALLGATHER] already contains the timing of the first allgather,
// we temporarily store the start time of the subsequent one in an as-of-yet unused CONNECT timer.
timers[TIMER_INIT_CONNECT] = clockNano();
2018-12-13 15:56:12 -08:00
// AllGather3 - begin
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclCalloc(&allGather3Data, nranks), ret, fail);
2020-08-26 11:40:11 -07:00
int idx;
NCCLCHECK(ncclTopoIdToIndex(comm->topo, GPU, comm->busId, &idx));
allGather3Data[rank].nc = 2;
if (comm->topo->nodes[GPU].count == comm->topo->nRanks &&
2023-09-12 15:34:40 -04:00
IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx906") && allXgmi)
allGather3Data[rank].nc = 4;
2023-09-12 15:34:40 -04:00
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx908"))
allGather3Data[rank].nc = std::max(4/ringGraph->nChannels, 2);
if (comm->topo->nodes[GPU].count == comm->topo->nRanks &&
2022-08-22 19:09:22 +00:00
(comm->topo->type & RCCL_TOPO_CR8G))
allGather3Data[rank].nc = 4;
if (comm->topo->nodes[GPU].count == comm->topo->nRanks &&
2023-09-12 15:34:40 -04:00
IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx90a"))
allGather3Data[rank].nc = 4;
2023-09-12 15:34:40 -04:00
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx90a"))
allGather3Data[rank].nc = std::max(allGather3Data[rank].nc, 4/ringGraph->nChannels);
if (ringGraph->nChannels > MAXCHANNELS/2)
2022-01-14 10:03:30 -08:00
allGather3Data[rank].nc = 1;
comm -> gfx9CheapFenceOff = 1;
#ifdef HIP_UNCACHED_MEMORY
if(!rcclParamGfx9CheapFenceOff()){
if(IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx942")){
comm -> gfx9CheapFenceOff = 0;
}
else if(IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx950")){
comm -> gfx9CheapFenceOff = ROCM_VERSION < 70002 && nNodes > 1; // Enable for single node only prior to ROCm 7.0.2
}
}
INFO(NCCL_INIT, "GFX9 cheap fence is %s", comm -> gfx9CheapFenceOff ? "OFF" : "ON");
#endif
// RCCL: Only use one slice per primitive on some single node gfx9xx systems, only currently enabled for AllReduce, ReduceScatter, and AllGather
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx942") || IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx950")){
comm->rcclUseOneSlice = nNodes == 1;
}
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx942")) {
// Multi-node MI300A
int managed = 0;
CUDACHECK(hipDeviceGetAttribute(&managed, hipDeviceAttributeDirectManagedMemAccessFromHost, 0));
if (managed && nNodes > 1) {
// This forces the minimum channels to 24
allGather3Data[rank].nc = 6;
} else {
// MI300X
if (nranks == 2)
// NCCL_MIN_NCHANNELS=32
allGather3Data[rank].nc = 16;
else if (nranks == 4)
// NCCL_MIN_NCHANNELS=24
allGather3Data[rank].nc = 4;
}
}
#ifdef ENABLE_WARP_SPEED
comm->topo->warpSpeedEnabled = (rcclParamWarpSpeedEnable() != 0 || rcclParamWarpSpeedAutoMode() != 0 || rcclParamWarpSpeedCuCount() > 0);
#endif
// For single node communicators that do not uses the full xgmi links per gpu, i.e., nranks < 8
// Inflate the nChannels a bit to achieve higher b/w.
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx950")) {
if (nranks == 2 && nNodes == 1){
allGather3Data[rank].nc = 16;
} else if (nranks == 4 && nNodes == 1){
allGather3Data[rank].nc = 8;
} else {
allGather3Data[rank].nc = 4;
}
}
#ifdef ENABLE_WARP_SPEED
// Double default channels for WarpSpeed enabled communicators
if (comm->topo->warpSpeedEnabled) {
allGather3Data[rank].nc *= 2;
}
#endif
2022-05-17 18:14:04 -07:00
allGather3Data[rank].pivotA2AEnabled = comm->topo->pivotA2AEnabled && rcclParamPivotAlltoallEnable();
2022-09-08 14:45:27 -07:00
comm->topo->ll128Enabled = comm->topo->ll128Enabled || rcclParamLL128ForceEnable();
allGather3Data[rank].ll128Enabled = comm->topo->ll128Enabled;
allGather3Data[rank].mscclEnabled = comm->topo->mscclEnabled;
2019-11-19 14:57:39 -08:00
2023-04-03 05:32:07 -07:00
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
allGather3Data[rank].graphInfo[a].pattern = graphs[a]->pattern;
allGather3Data[rank].graphInfo[a].nChannels = graphs[a]->nChannels;
allGather3Data[rank].graphInfo[a].sameChannels = graphs[a]->sameChannels;
allGather3Data[rank].graphInfo[a].bwIntra = graphs[a]->bwIntra;
allGather3Data[rank].graphInfo[a].bwInter = graphs[a]->bwInter;
allGather3Data[rank].graphInfo[a].typeIntra = graphs[a]->typeIntra;
allGather3Data[rank].graphInfo[a].typeInter = graphs[a]->typeInter;
2024-03-26 06:08:55 -07:00
allGather3Data[rank].graphInfo[a].crossNic = graphs[a]->crossNic;
2023-04-03 05:32:07 -07:00
}
2019-11-19 14:57:39 -08:00
2024-06-11 01:28:01 -07:00
allGather3Data[rank].cpuArch = comm->cpuArch;
allGather3Data[rank].cpuVendor = comm->cpuVendor;
comm->nChannels = std::min(treeGraph->nChannels, ringGraph->nChannels);
//For a 1rank job theres no topology constraint, so ncclTopoCompute drives the ring to its allowed maximum, which results 4 x MAXCHANNELS channels for single rank comms and causes issues.
if (comm->nRanks == 1) {
comm->nChannels = treeGraph->nChannels = ringGraph->nChannels = 8;
}
2023-04-03 05:32:07 -07:00
NCCLCHECKGOTO(ncclTopoPreset(comm, graphs, &allGather3Data[rank].topoRanks), ret, fail);
2019-11-19 14:57:39 -08:00
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allGather3Data, sizeof(*allGather3Data)), ret, fail);
2018-12-13 15:56:12 -08:00
2019-11-19 14:57:39 -08:00
// Determine nNodes, firstRanks, ...
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclCalloc(&nodesFirstRank, nranks), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&nodesTreePatterns, nranks), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->rankToNode, comm->nRanks), ret, fail);
2022-01-07 06:39:55 -08:00
for (int r=0; r<nranks; r++) {
int node;
int firstRank = allGather3Data[r].topoRanks.ringRecv[0];
for (node=0; node<comm->nNodes && nodesFirstRank[node] != firstRank; node++);
if (node == comm->nNodes) {
comm->nNodes++;
2019-11-19 14:57:39 -08:00
nodesFirstRank[node] = firstRank;
2020-09-04 14:35:05 -07:00
// Record tree pattern of each node as they can be different depending on sm arch
2023-04-03 05:32:07 -07:00
nodesTreePatterns[node] = allGather3Data[r].graphInfo[NCCL_ALGO_TREE].pattern;
2019-11-19 14:57:39 -08:00
}
2022-01-07 06:39:55 -08:00
comm->rankToNode[r] = node;
2024-06-11 01:28:01 -07:00
if (comm->cpuArch != allGather3Data[r].cpuArch &&
comm->cpuArch != NCCL_TOPO_CPU_ARCH_MIXED) {
comm->cpuArch = NCCL_TOPO_CPU_ARCH_MIXED;
}
if (comm->cpuVendor != allGather3Data[r].cpuVendor &&
comm->cpuVendor != NCCL_TOPO_CPU_VENDOR_MIXED) {
comm->cpuVendor = NCCL_TOPO_CPU_VENDOR_MIXED;
}
2022-01-07 06:39:55 -08:00
}
2024-06-11 01:28:01 -07:00
// Alert the user to the presence of mixed CPUs. In the past this has caused
// locks in some collective routines. This may help debug issues in the future.
if (rank==0) {
if (comm->cpuArch == NCCL_TOPO_CPU_ARCH_MIXED) {
INFO(NCCL_GRAPH, "CPUs with mixed architecture were detected.");
}
if (comm->cpuVendor == NCCL_TOPO_CPU_VENDOR_MIXED) {
INFO(NCCL_GRAPH, "CPUs with mixed vendors were detected.");
}
2022-01-07 06:39:55 -08:00
}
2024-06-11 01:28:01 -07:00
2022-01-07 06:39:55 -08:00
// Now that we know nNodes, alloc nodeRanks and compute localRanks for each node
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclCalloc(&comm->nodeRanks, comm->nNodes), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->rankToLocalRank, comm->nRanks), ret, fail);
2022-01-07 06:39:55 -08:00
for (int r=0; r<comm->nRanks; r++) {
int node = comm->rankToNode[r];
comm->rankToLocalRank[r] = comm->nodeRanks[node].localRanks;
comm->nodeRanks[node].localRanks++;
}
2025-09-02 13:21:14 -07:00
comm->minLocalRanks = INT_MAX;
2022-01-07 06:39:55 -08:00
// Allocate ranks arrays for each node
for (int n=0; n<comm->nNodes; n++) {
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclCalloc(&comm->nodeRanks[n].localRankToRank, comm->nodeRanks[n].localRanks), ret, fail);
2022-01-07 06:39:55 -08:00
comm->maxLocalRanks = std::max(comm->maxLocalRanks, comm->nodeRanks[n].localRanks);
2025-09-02 13:21:14 -07:00
comm->minLocalRanks = std::min(comm->minLocalRanks, comm->nodeRanks[n].localRanks);
2022-01-07 06:39:55 -08:00
comm->nodeRanks[n].localRanks = 0;
}
// And fill the ranks arrays
for (int r=0; r<comm->nRanks; r++) {
int node = comm->rankToNode[r];
comm->nodeRanks[node].localRankToRank[comm->nodeRanks[node].localRanks++] = r;
}
comm->node = comm->rankToNode[rank];
comm->localRankToRank = comm->nodeRanks[comm->node].localRankToRank;
comm->localRank = comm->rankToLocalRank[rank];
comm->localRanks = comm->nodeRanks[comm->node].localRanks;
2025-09-02 13:21:14 -07:00
NCCLCHECKGOTO(initNvlDomainInfo(comm), ret, fail);
2022-01-07 06:39:55 -08:00
TRACE(NCCL_INIT,"hostHash[%d] %lx localRank %d localRanks %d localRank0 %d",
rank, comm->peerInfo[rank].hostHash, comm->localRank, comm->localRanks, comm->localRankToRank[0]);
if (comm->localRank == -1 || comm->localRankToRank[0] == -1 || comm->localRanks == 0) {
WARN("Failed to determine local ranks rank %d hostHash %lx pidHash %lx localRank %d localRanks %d localRank0 %d",
rank, comm->peerInfo[rank].hostHash, comm->peerInfo[rank].pidHash,
comm->localRank, comm->localRanks, comm->localRankToRank[0]);
2022-11-29 04:27:46 -08:00
ret = ncclInternalError;
goto fail;
2019-11-19 14:57:39 -08:00
}
2018-09-24 16:06:59 -07:00
2024-02-05 05:06:02 -08:00
INFO(NCCL_INIT, "comm %p rank %d nRanks %d nNodes %d localRanks %d localRank %d MNNVL %d",
comm, rank, comm->nRanks, comm->nNodes, comm->localRanks, comm->localRank, comm->MNNVL);
2022-11-29 04:27:46 -08:00
nChannelsOrig = comm->nChannels;
NCCLCHECKGOTO(ncclCalloc(&allTopoRanks, comm->nRanks), ret, fail);
int nc;
nc = allGather3Data[0].nc;
2019-11-19 14:57:39 -08:00
for (int i=0; i<nranks; i++) {
allTopoRanks[i] = &allGather3Data[i].topoRanks;
nc = std::min(allGather3Data[i].nc, nc);
2019-11-19 14:57:39 -08:00
// Make sure we align all ranks so that the tuning is consistent across ranks
comm->topo->pivotA2AEnabled = comm->topo->pivotA2AEnabled && allGather3Data[i].pivotA2AEnabled;
2022-09-08 14:45:27 -07:00
comm->topo->ll128Enabled = comm->topo->ll128Enabled && allGather3Data[i].ll128Enabled;
comm->topo->mscclEnabled = comm->topo->mscclEnabled && allGather3Data[i].mscclEnabled;
2023-04-03 05:32:07 -07:00
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
graphs[a]->nChannels = std::min(allGather3Data[i].graphInfo[a].nChannels, graphs[a]->nChannels);
graphs[a]->sameChannels = std::min(allGather3Data[i].graphInfo[a].sameChannels, graphs[a]->sameChannels);
graphs[a]->bwIntra = std::min(allGather3Data[i].graphInfo[a].bwIntra, graphs[a]->bwIntra);
graphs[a]->bwInter = std::min(allGather3Data[i].graphInfo[a].bwInter, graphs[a]->bwInter);
graphs[a]->typeIntra = std::max(allGather3Data[i].graphInfo[a].typeIntra, graphs[a]->typeIntra);
graphs[a]->typeInter = std::max(allGather3Data[i].graphInfo[a].typeInter, graphs[a]->typeInter);
2024-03-26 06:08:55 -07:00
graphs[a]->crossNic = std::max(allGather3Data[i].graphInfo[a].crossNic, graphs[a]->crossNic);
2023-04-03 05:32:07 -07:00
}
2025-03-12 13:46:21 -07:00
comm->maxTreePattern = std::max(comm->maxTreePattern, allGather3Data[i].graphInfo[NCCL_ALGO_TREE].pattern);
2019-11-19 14:57:39 -08:00
}
2025-05-29 20:56:40 -07:00
if (graphs[NCCL_ALGO_COLLNET_CHAIN]->nChannels == 0) comm->config.collnetEnable = 0;
2024-03-26 06:08:55 -07:00
if (graphs[NCCL_ALGO_NVLS]->nChannels == 0) comm->nvlsSupport = comm->nvlsChannels = 0;
2018-12-13 15:56:12 -08:00
comm->nChannels = treeGraph->nChannels = ringGraph->nChannels =
(comm->topo->nodes[GPU].count != comm->topo->nRanks && comm->topo->nodes[NET].count)
? std::min(treeGraph->nChannels, ringGraph->nChannels) : ringGraph->nChannels;
2019-11-19 14:57:39 -08:00
if (comm->nChannels < nChannelsOrig) {
// We started duplicating channels during Preset(), so we need to move the
// duplicated channels since we have removed some.
for (int i=0; i<comm->nChannels; i++) memcpy(comm->channels+comm->nChannels+i, comm->channels+nChannelsOrig+i, sizeof(struct ncclChannel));
2018-09-24 16:06:59 -07:00
}
2018-12-13 15:56:12 -08:00
2022-01-07 06:39:55 -08:00
// Determine CollNet support after all-gather now that we know nNodes and each node localRanks
2025-05-29 20:56:40 -07:00
if (comm->config.collnetEnable == 1) {
2022-01-07 06:39:55 -08:00
int collNetNodeThreshold = ncclParamCollNetNodeThreshold();
if (comm->nNodes < collNetNodeThreshold) {
2021-05-11 18:16:30 -07:00
INFO(NCCL_INIT, "Communicator has %d nodes which is less than CollNet node threshold %d, disabling CollNet", comm->nNodes, collNetNodeThreshold);
2025-05-29 20:56:40 -07:00
comm->config.collnetEnable = 0;
2022-01-07 06:39:55 -08:00
}
2021-05-11 18:16:30 -07:00
}
2025-01-27 03:30:22 -08:00
NCCLCHECK(ncclTopoPathAllNVLink(comm->topo, &comm->isAllNvlink));
2024-12-18 08:26:06 -08:00
comm->isOneRPN = (comm->maxLocalRanks == 1);
2021-05-11 18:16:30 -07:00
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(ncclCalloc(&rings, nranks*MAXCHANNELS), ret, fail);
2019-11-19 14:57:39 -08:00
NCCLCHECKGOTO(ncclTopoPostset(comm, nodesFirstRank, nodesTreePatterns, allTopoRanks, rings, graphs, parent, nc), ret, fail);
if (comm->topo->treeDefined) NCCLCHECK(ncclTreeBasePostset(comm, treeGraph));
2019-11-19 14:57:39 -08:00
// AllGather3 - end
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_ALLGATHER] += clockNano() - timers[TIMER_INIT_CONNECT];
2019-11-19 14:57:39 -08:00
TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels);
char line[4096];
2019-11-19 14:57:39 -08:00
line[0]='\0';
for (int c=0; c<comm->nChannels; c++) {
2020-09-04 14:35:05 -07:00
struct ncclTree* tree = &comm->channels[c].tree;
snprintf(line+strlen(line), 2047-strlen(line), " [%d] %d/%d/%d->%d->%d",
2020-09-04 14:35:05 -07:00
c, tree->down[0], tree->down[1], tree->down[2], rank, tree->up);
INFO(NCCL_GRAPH, "Ring %d : %d -> %d -> %d comm %p nRanks %02d busId %lx", c, comm->channels[c].ring.prev,
comm->rank, comm->channels[c].ring.next, comm, comm->nRanks, comm->busId);
2019-11-19 14:57:39 -08:00
}
line[4095] = '\0';
INFO(NCCL_INIT, "Trees%s comm %p nRanks %02d busId %lx", line, comm, comm->nRanks, comm->busId);
2018-09-24 16:06:59 -07:00
2022-11-29 04:27:46 -08:00
NCCLCHECKGOTO(computeBuffSizes(comm), ret, fail);
2020-05-12 14:40:18 -07:00
2023-04-03 05:32:07 -07:00
// Compute nChannels per peer for p2p
NCCLCHECKGOTO(ncclTopoComputeP2pChannels(comm), ret, fail);
/* until now, all info of comm should be known. We can initialize shared resources and
* map localRanks to top parent local ranks. NOTE: this shareRes init must be put before
* all proxy operations. */
if (comm->sharedRes->owner == comm) {
comm->sharedRes->tpNLocalRanks = comm->localRanks;
comm->sharedRes->magic = comm->magic;
comm->sharedRes->tpNChannels = comm->nChannels;
comm->sharedRes->tpP2pNChannels = comm->p2pnChannels;
memcpy(comm->sharedRes->tpRankToLocalRank, comm->rankToLocalRank, sizeof(int) * comm->nRanks);
}
NCCLCHECKGOTO(ncclCalloc(&topParentLocalRanks, comm->localRanks), ret, fail);
for (int i = 0; i < comm->localRanks; ++i) {
int tpRank = comm->topParentRanks[comm->localRankToRank[i]];
topParentLocalRanks[i] = comm->sharedRes->tpRankToLocalRank[tpRank];
}
comm->topParentLocalRanks = topParentLocalRanks;
2025-05-29 20:56:40 -07:00
// Profiler plugin context has to be initialized before proxy thread
NCCLCHECK(ncclProfilerPluginInit(comm));
NCCLCHECKGOTO(ncclTransportCheckP2pType(comm, &comm->isAllDirectP2p, &comm->directMode), ret, fail);
2023-04-03 05:32:07 -07:00
// Launch proxy service thread, after this, the proxy calls can be used.
2025-05-29 20:56:40 -07:00
if (parent && parent->shareResources) {
2024-02-05 05:06:02 -08:00
comm->proxyState = parent->sharedRes->proxyState;
ncclAtomicRefCountIncrement(&parent->sharedRes->proxyState->refCount);
} else {
NCCLCHECKGOTO(ncclProxyCreate(comm), ret, fail);
}
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclCalloc(&comm->gproxyConn, comm->nRanks), ret, fail);
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_CONNECT] = clockNano();
do { // Build p2p schedule
int node = comm->node;
int nNodes = comm->nNodes;
int nRanks = comm->nRanks;
int local = comm->localRank;
int nLocals = comm->maxLocalRanks;
struct ncclNodeRanks* nodeRanks = comm->nodeRanks;
bool flat = false;
for (int node = 0; node < nNodes; node++) {
if (nodeRanks[node].localRanks != nLocals) {
flat = true;
nNodes = 1; node = 0;
nLocals = nRanks; local = rank;
break;
}
}
int nNodesPow2 = pow2Up(nNodes);
int nLocalsPow2 = pow2Up(nLocals);
comm->p2pSchedule = ncclMemoryStackAlloc<ncclComm::P2pSchedulePair>(&comm->memPermanent, nRanks);
comm->planner.peers = ncclMemoryStackAlloc<ncclKernelPlanner::Peer>(&comm->memPermanent, nRanks);
uint32_t nodeRound = 0;
uint32_t nodeDelta = 0;
int round = 0;
// When enumerating peer deltas we use the quadratic formula (x*x+x)/2 mod N.
// Since that formula only produces valid permutations when N is a pow of 2,
// we let N = pow2Up(n) and filter out results greater-eq to n.
// Example sequence for 16 ranks: 0, 1, 3, 6, 10, 15, 5, 12, 4, 13, 7, 2, 14, 11, 9, 8
do {
if (nodeDelta < nNodes) { // Filter nonsensical node deltas
int sendNode = (node + nodeDelta) % nNodes;
int recvNode = (node - nodeDelta + nNodes) % nNodes;
uint32_t localRound = 0;
uint32_t localDelta = 0;
do {
if (localDelta < nLocals) { // Filter nonsensical node-local deltas
int sendLocal = (local + localDelta) % nLocals;
int recvLocal = (local - localDelta + nLocals) % nLocals;
comm->p2pSchedule[round].sendRank = flat ? sendLocal : nodeRanks[sendNode].localRankToRank[sendLocal];
comm->p2pSchedule[round].recvRank = flat ? recvLocal : nodeRanks[recvNode].localRankToRank[recvLocal];
round += 1;
}
localRound += 1;
localDelta = (localDelta + localRound) & (nLocalsPow2 - 1); // Quadratic update
} while (localRound != nLocalsPow2);
}
nodeRound += 1;
nodeDelta = (nodeDelta + nodeRound) & (nNodesPow2 - 1); // Quadratic update
} while (nodeRound != nNodesPow2);
2023-04-03 05:32:07 -07:00
2024-06-11 01:28:01 -07:00
if (round != nRanks) {
WARN("P2p schedule creation has bugs.");
ret = ncclInternalError;
goto fail;
}
2024-06-11 01:28:01 -07:00
} while (0);
2020-09-04 14:35:05 -07:00
2024-06-11 01:28:01 -07:00
comm->runtimeConn = comm->cuMemSupport && ncclParamRuntimeConnect();
if (comm->runtimeConn) {
2024-02-05 05:06:02 -08:00
for (int c=0; c<comm->nChannels; c++) {
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, fail);
2023-04-03 05:32:07 -07:00
}
2025-05-29 20:56:40 -07:00
// Attempt to setup NVLS, may silently fail and disable NVLS
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclNvlsSetup(comm, parent), ret, fail);
// Check if we can setup CollNet
2025-05-29 20:56:40 -07:00
if (comm->config.collnetEnable) ncclCollNetSetup(comm, parent, graphs);
2024-06-11 01:28:01 -07:00
} else {
for (int c=0; c<comm->nChannels; c++) {
NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, fail);
}
NCCLCHECKGOTO(ncclTransportRingConnect(comm), ret, fail);
2021-05-11 18:16:30 -07:00
2025-03-04 13:30:36 -05:00
// Connect NET for intranode use
if (comm->graphs[NCCL_ALGO_RING].nIntraChannels && rcclParamP2pNetDisable() == 0) {
comm->useIntraNet = 1;
for (int c = 0; c < comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
if (comm->nRanks == 1) continue;
NCCLCHECKGOTO(ncclTransportP2pConnect(comm, c, 1, &channel->ring.prev, 1, &channel->ring.next, NCCL_CONN_IDX_P2P_NET), ret, fail);
}
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &comm->graphs[NCCL_ALGO_RING], NCCL_CONN_IDX_P2P_NET), ret, fail);
}
2024-06-11 01:28:01 -07:00
// Connect Trees
NCCLCHECKGOTO(ncclTransportTreeConnect(comm), ret, fail);
2023-02-27 02:48:21 -08:00
2024-09-10 05:57:10 -07:00
// Connect PAT only for communicators with 1 GPU per node
if (comm->maxLocalRanks == 1) NCCLCHECKGOTO(ncclTransportPatConnect(comm), ret, fail);
2025-05-29 20:56:40 -07:00
// Attempt to setup NVLS, may silently fail and disable NVLS
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclNvlsSetup(comm, parent), ret, fail);
NCCLCHECKGOTO(ncclNvlsBufferSetup(comm), ret, fail);
2018-09-24 16:06:59 -07:00
2024-06-11 01:28:01 -07:00
// And NVLS trees if needed
NCCLCHECKGOTO(ncclNvlsTreeConnect(comm), ret, fail);
2020-05-12 14:40:18 -07:00
2024-06-11 01:28:01 -07:00
// Check if we can setup CollNet
2025-05-29 20:56:40 -07:00
if (comm->config.collnetEnable) {
2024-06-11 01:28:01 -07:00
ncclCollNetSetup(comm, parent, graphs);
NCCLCHECKGOTO(ncclCollNetChainBufferSetup(comm), ret, fail);
2024-09-10 05:57:10 -07:00
if (comm->maxLocalRanks <= NCCL_MAX_DIRECT_ARITY+1) {
NCCLCHECKGOTO(ncclCollNetDirectBufferSetup(comm), ret, fail);
}
2024-06-11 01:28:01 -07:00
}
2023-02-27 02:48:21 -08:00
2024-06-11 01:28:01 -07:00
// Connect to local net proxy
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclProxyConnect(comm, TRANSPORT_NET, 1, comm->rank, &proxyConn), ret, fail);
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclProxyCallBlocking(comm, &proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0), ret, fail);
// Then to remote ones when using PXN
if (ncclPxnDisable(comm) == 0) {
int nranks;
NCCLCHECKGOTO(ncclTopoGetPxnRanks(comm, &pxnPeers, &nranks), ret, fail);
for (int r=0; r<nranks; r++) {
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclProxyConnect(comm, TRANSPORT_NET, 1, pxnPeers[r], &proxyConn), ret, fail);
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclProxyCallBlocking(comm, &proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0), ret, fail);
2022-05-24 02:02:31 -07:00
}
}
2024-06-11 01:28:01 -07:00
if (ncclParamNvbPreconnect()) {
// Connect p2p when using NVB path
int nvbNpeers;
NCCLCHECKGOTO(ncclTopoGetNvbGpus(comm->topo, comm->rank, &nvbNpeers, &nvbPeers), ret, fail);
for (int r=0; r<nvbNpeers; r++) {
int peer = nvbPeers[r];
int sendRound=0, recvRound=0;
while (comm->p2pSchedule[sendRound].sendRank != peer) sendRound++;
while (comm->p2pSchedule[recvRound].recvRank != peer) recvRound++;
uint8_t sendBase = ncclP2pChannelBaseForRound(comm, sendRound);
uint8_t recvBase = ncclP2pChannelBaseForRound(comm, recvRound);
for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
int channelId;
channelId = ncclP2pChannelForPart(comm->p2pnChannels, sendBase, c, comm->p2pnChannelsPerPeer, comm->nNodes);
2024-06-11 01:28:01 -07:00
if (comm->channels[channelId].peers[peer]->send[1].connected == 0) {
comm->connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64));
2024-06-11 01:28:01 -07:00
}
channelId = ncclP2pChannelForPart(comm->p2pnChannels, recvBase, c, comm->p2pnChannelsPerPeer, comm->nNodes);
2024-06-11 01:28:01 -07:00
if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) {
comm->connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64));
2024-06-11 01:28:01 -07:00
}
2021-05-11 18:16:30 -07:00
}
}
2023-02-27 02:48:21 -08:00
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, NULL, 1), ret, fail);
}
2021-05-11 18:16:30 -07:00
}
2024-06-11 01:28:01 -07:00
TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels);
2022-01-07 06:39:55 -08:00
2024-06-11 01:28:01 -07:00
// Compute time models for algorithm and protocol combinations
2025-09-02 13:21:14 -07:00
NCCLCHECKGOTO(ncclTopoInitTunerConstants(comm), ret, fail);
NCCLCHECKGOTO(ncclTunerPluginLoad(comm), ret, fail);
if (comm->tuner) {
NCCLCHECK(comm->tuner->init(&comm->tunerContext, comm->commHash, comm->nRanks, comm->nNodes, ncclDebugLog, &comm->nvlDomainInfo, &comm->tunerConstants));
}
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclTopoTuneModel(comm, comm->minCompCap, comm->maxCompCap, graphs), ret, fail);
INFO(NCCL_INIT, "comm:%p, nRanks:%d, nNodes:%d, coll channels:%d collnet channels:%d, nvls channels:%d, p2p channels:%d, p2p channels per peer:%d", comm, comm->nRanks, comm->nNodes, comm->nChannels, comm->nChannels, comm->nvlsChannels, comm->p2pnChannels, comm->p2pnChannelsPerPeer);
2022-05-24 02:02:31 -07:00
if (comm->intraRank == 0) { // Load ncclParamLaunchMode
2023-09-26 05:47:28 -07:00
const char* str = ncclGetEnv("NCCL_LAUNCH_MODE");
2022-05-24 02:02:31 -07:00
enum ncclLaunchMode mode, modeOld;
if (str && strcasecmp(str, "GROUP") == 0) {
mode = ncclLaunchModeGroup;
} else {
mode = ncclLaunchModeParallel;
}
// In theory we could be racing with other communicators not associated with
// this one if the user is connecting to multiple ncclUniqueId's concurrently.
modeOld = __atomic_exchange_n(&ncclParamLaunchMode, mode, __ATOMIC_RELAXED);
if (modeOld == ncclLaunchModeInvalid && str && str[0]!='\0') {
INFO(NCCL_ENV, "NCCL_LAUNCH_MODE set by environment to %s", mode == ncclLaunchModeParallel ? "PARALLEL" : "GROUP");
}
}
2025-05-29 20:56:40 -07:00
comm->symmetricSupport = comm->isAllDirectP2p && comm->nNodes == 1 && ncclParamWinEnable() && ncclCuMemEnable();
2025-09-02 13:21:14 -07:00
comm->devrState.bigSize = 0;
comm->ceColl.baseUCSymReadyPtr = NULL;
comm->ceColl.baseUCSymComplPtr = NULL;
2025-05-29 20:56:40 -07:00
2022-11-29 04:27:46 -08:00
// Call devCommSetup before the last barrier, making sure we don't have a thread running in front and starting to
// launch NCCL kernels before all cuda mem allocation is complete. That could cause a deadlock.
NCCLCHECKGOTO(devCommSetup(comm), ret, fail);
2025-05-29 20:56:40 -07:00
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_CONNECT] = clockNano() - timers[TIMER_INIT_CONNECT];
if (mscclEnabled() && (comm->topo->mscclEnabled || mscclForceEnabled())) {
WARN("MSCCL is deprecated, please be careful with this feature!");
2023-09-12 06:30:04 +08:00
NCCLCHECK(mscclInit(comm));
mscclStatus& status = mscclGetStatus(comm);
2023-09-12 06:30:04 +08:00
status.needsProxy |= mscclNeedsProxy;
}
2021-07-08 14:12:04 -07:00
/* Local intra-node barrier */
2024-03-26 06:08:55 -07:00
NCCLCHECKGOTO(bootstrapIntraNodeBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0]), ret, fail);
2018-12-13 15:56:12 -08:00
// We should have allocated all buffers, collective fifos, ... we can
// restore the affinity.
2018-12-13 15:56:12 -08:00
TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
2022-11-29 04:27:46 -08:00
exit:
if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave);
2023-04-03 05:32:07 -07:00
/* If split resource is shared, we are not able to unlink the proxy ops pool here since the child comm can
* attach the proxy ops pool of parent at any time; otherwise, unlink it here to make sure the pool will be
* properly cleaned up. */
2025-05-29 20:56:40 -07:00
if (comm->sharedRes->owner == comm && !comm->shareResources && ret == ncclSuccess && !ncclCuMemEnable()) ncclProxyShmUnlink(comm);
2022-11-29 04:27:46 -08:00
free(allTopoRanks);
free(nodesTreePatterns);
free(nodesFirstRank);
free(allGather3Data);
free(rings);
free(nvbPeers);
free(pxnPeers);
return ret;
fail:
goto exit;
2018-09-24 16:06:59 -07:00
}
#ifdef USE_INDIRECT_FUNCTION_CALL
NCCL_PARAM(SetStackSize, "SET_STACK_SIZE", 1);
#else
2021-04-12 16:00:11 -07:00
NCCL_PARAM(SetStackSize, "SET_STACK_SIZE", 0);
#endif
2024-01-07 20:25:02 -08:00
RCCL_PARAM(StackSizeOverride, "STACK_SIZE_OVERRIDE", 0);
2023-02-27 02:48:21 -08:00
NCCL_PARAM(CGAClusterSize, "CGA_CLUSTER_SIZE", NCCL_CONFIG_UNDEF_INT);
// Match config max/minCTAs
NCCL_PARAM(MaxCTAs, "MAX_CTAS", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(MinCTAs, "MIN_CTAS", NCCL_CONFIG_UNDEF_INT);
#define NCCL_MAX_CGA_CLUSTER_SIZE 8
2021-04-12 16:00:11 -07:00
2025-09-02 13:21:14 -07:00
NCCL_PARAM(NChannelsPerNetPeer, "NCHANNELS_PER_NET_PEER", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(NvlinkUtilCentricSchedEnable, "NVLINK_UTIL_CENTRIC_SCHED_ENABLE", 0);
2024-09-10 05:57:10 -07:00
#define NCCL_COMMINIT_FUNCNAME_LEN 128
2022-05-24 02:02:31 -07:00
struct ncclCommInitRankAsyncJob {
struct ncclAsyncJob base;
2023-04-03 05:32:07 -07:00
struct ncclComm* comm;
struct ncclComm** newcomm;
int cudaDev;
// For ncclCommInitRank
2024-09-10 05:57:10 -07:00
int nranks, myrank, nId;
ncclUniqueId* commId;
2023-04-03 05:32:07 -07:00
// for ncclCommSplit
struct ncclComm* parent;
int color, key;
2024-12-18 08:26:06 -08:00
int splitCount;
2025-05-29 20:56:40 -07:00
// For Shrink
int* excludeRanksList;
int excludeRanksCount;
2024-09-10 05:57:10 -07:00
// name of the function calling
char funcName[NCCL_COMMINIT_FUNCNAME_LEN];
2022-05-24 02:02:31 -07:00
};
2022-08-18 02:53:17 -07:00
struct ncclCommFinalizeAsyncJob {
struct ncclAsyncJob base;
ncclComm_t comm;
};
2023-04-03 05:32:07 -07:00
NCCL_PARAM(CommSplitShareResources, "COMM_SPLIT_SHARE_RESOURCES", NCCL_CONFIG_UNDEF_INT);
2025-05-29 20:56:40 -07:00
NCCL_PARAM(CommShrinkShareResources, "COMM_SHRINK_SHARE_RESOURCES", NCCL_CONFIG_UNDEF_INT);
2023-04-03 05:32:07 -07:00
2024-09-10 05:57:10 -07:00
typedef struct{
int key;
int color;
} commSplitInfo;
2023-04-03 05:32:07 -07:00
static ncclResult_t commGetSplitInfo(struct ncclComm* comm, struct ncclComm* parent, int color, int key, int* nRanksRet, int* myRankRet, int* parentRanksRet) {
int nRanks = 0, myRank = 0;
ncclResult_t ret = ncclSuccess;
2024-09-10 05:57:10 -07:00
commSplitInfo* info = NULL;
NCCLCHECKGOTO(ncclCalloc(&info, parent->nRanks), ret, fail);
2023-04-03 05:32:07 -07:00
// Compute nRanks, my rank and the ranks (of the original comm) before and after me
2024-09-10 05:57:10 -07:00
info[parent->rank].color = color;
info[parent->rank].key = key;
NCCLCHECKGOTO(bootstrapAllGather(parent->bootstrap, info, sizeof(commSplitInfo)), ret, fail);
2023-04-03 05:32:07 -07:00
// Negative color does not create a new comm. Return now.
if (color == NCCL_SPLIT_NOCOLOR) goto exit;
memset(parentRanksRet, 0xff, sizeof(int) * parent->nRanks);
for (int i = 0; i < parent->nRanks; i++) {
2024-09-10 05:57:10 -07:00
if (info[i].color != color) continue;
2023-04-03 05:32:07 -07:00
// Find where to insert this rank
int insert = 0;
2024-09-10 05:57:10 -07:00
while (insert < nRanks && info[parentRanksRet[insert]].key <= info[i].key) insert++;
2023-04-03 05:32:07 -07:00
// Shift ranks by one after insert
for (int r = nRanks; r > insert; r--) parentRanksRet[r] = parentRanksRet[r - 1];
// Insert our rank
parentRanksRet[insert] = i;
nRanks++;
}
for (int i = 0; i < nRanks; i++) {
if (parentRanksRet[i] == parent->rank) myRank = i;
}
*nRanksRet = nRanks;
*myRankRet = myRank;
exit:
2024-09-10 05:57:10 -07:00
free(info);
2023-04-03 05:32:07 -07:00
return ret;
fail:
goto exit;
}
2025-05-29 20:56:40 -07:00
static ncclResult_t getParentRanks(int parentRanks, int parentRank, int* excludeRanksList, int excludeRanksCount, int* nRanksRet, int* myRankRet, int* parentRanksRet) {
int count = 0, j = 0;
for (int i = 0; i < parentRanks; i++) {
// we assume excludeRanksList is sorted
if (j < excludeRanksCount && excludeRanksList[j] == i) {
j++;
continue;
}
if (i == parentRank) *myRankRet = count;
parentRanksRet[count++] = i;
}
*nRanksRet = parentRanks - excludeRanksCount;
return ncclSuccess;
}
2022-05-24 02:02:31 -07:00
static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
struct ncclCommInitRankAsyncJob* job = (struct ncclCommInitRankAsyncJob*)job_;
2023-04-03 05:32:07 -07:00
ncclComm_t comm = job->comm;
ncclResult_t res = ncclSuccess;
2023-02-27 02:48:21 -08:00
int archMajor, archMinor;
size_t maxLocalSizeBytes = 0;
2023-04-03 05:32:07 -07:00
int cudaDev = job->cudaDev;
int* parentRanks = NULL;
int cudaArch;
2025-01-27 03:30:22 -08:00
int maxSharedMem = 0;
2024-09-10 05:57:10 -07:00
double sum_timers = 0;
uint64_t timers[TIMERS_INIT_COUNT] = {0};
unsigned long long commIdHash;
char* archName;
int cuCount;
hipDeviceProp_t devProp;
2025-08-05 17:36:23 -05:00
#ifdef USE_INDIRECT_FUNCTION_CALL
2024-01-07 20:25:02 -08:00
int64_t stackSize;
2025-08-05 17:36:23 -05:00
#endif
2018-09-24 16:06:59 -07:00
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_TOTAL] = clockNano();
2022-11-29 04:27:46 -08:00
CUDACHECKGOTO(cudaSetDevice(cudaDev), res, fail);
2025-01-27 03:30:22 -08:00
CUDACHECKGOTO(cudaDeviceGetAttribute(&maxSharedMem, cudaDevAttrMaxSharedMemoryPerBlockOptin, cudaDev), res, fail);
2023-04-03 05:32:07 -07:00
CUDACHECKGOTO(cudaDeviceGetAttribute(&archMajor, cudaDevAttrComputeCapabilityMajor, cudaDev), res, fail);
CUDACHECKGOTO(cudaDeviceGetAttribute(&archMinor, cudaDevAttrComputeCapabilityMinor, cudaDev), res, fail);
cudaArch = 100*archMajor + 10*archMinor;
2023-02-27 02:48:21 -08:00
CUDACHECKGOTO(hipGetDeviceProperties(&devProp, cudaDev), res, fail);
cuCount = devProp.multiProcessorCount;
2026-01-27 08:29:16 -07:00
archName = strdup(devProp.gcnArchName);
if (!archName) {
res = ncclSystemError;
WARN("strdup failed for architecture name");
goto fail;
}
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_KERNELS] = clockNano();
2025-01-27 03:30:22 -08:00
NCCLCHECK(ncclInitKernelsForDevice(cudaArch, maxSharedMem, &maxLocalSizeBytes));
2021-04-12 16:00:11 -07:00
// Set the maximum kernel stack size of all kernels to avoid
// a CUDA memory reconfig on load (c.f. NVSHMEM issue)
#ifdef USE_INDIRECT_FUNCTION_CALL
if (ncclParamSetStackSize() == 1 && !IsArchMatch(archName,"gfx942") && !IsArchMatch(archName,"gfx950")) {
2024-01-07 20:25:02 -08:00
stackSize = rcclParamStackSizeOverride() ? rcclParamStackSizeOverride() : maxLocalSizeBytes;
if (stackSize == 0) {
if (IsArchMatch(archName,"gfx906"))
2024-01-07 20:25:02 -08:00
stackSize = 1024;
else
stackSize = 512;
}
INFO(NCCL_INIT, "Setting cudaLimitStackSize to %zi maxLocalSizeBytes %zi", stackSize, maxLocalSizeBytes);
CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, stackSize));
2021-04-12 16:00:11 -07:00
}
#endif
2021-04-12 16:00:11 -07:00
if (maxLocalSizeBytes > 0 && ncclParamSetStackSize() == 1) {
2024-06-11 01:28:01 -07:00
TRACE(NCCL_INIT, "Setting cudaLimitStackSize to %zu", maxLocalSizeBytes);
2021-04-12 16:00:11 -07:00
CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, maxLocalSizeBytes));
}
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_KERNELS] = clockNano() - timers[TIMER_INIT_KERNELS];
2023-04-03 05:32:07 -07:00
if (job->parent) {
NCCLCHECKGOTO(ncclCalloc(&parentRanks, job->parent->nRanks), res, fail);
2025-05-29 20:56:40 -07:00
if (job->excludeRanksCount) {
NCCLCHECKGOTO(getParentRanks(job->parent->nRanks, job->parent->rank, job->excludeRanksList, job->excludeRanksCount, &job->nranks, &job->myrank, parentRanks), res, fail);
} else {
NCCLCHECKGOTO(commGetSplitInfo(comm, job->parent, job->color, job->key, &job->nranks, &job->myrank, parentRanks), res, fail);
// Negative color does not create a new comm object. We needed to take part in the allgather, but we're done now.
if (job->color == NCCL_SPLIT_NOCOLOR) goto exit;
}
2025-03-12 13:46:21 -07:00
// child hash obtained from (parent hash, split count, color)
uint64_t hacc[2] = {1, 1};
eatHash(hacc, &job->parent->commHash);
eatHash(hacc, &job->splitCount);
eatHash(hacc, &job->color);
comm->commHash = digestHash(hacc);
2025-09-02 13:21:14 -07:00
timers[TIMER_INIT_ALLOC] = clockNano();
NCCLCHECKGOTO(commAlloc(comm, job->parent, job->nranks, job->myrank), res, fail);
timers[TIMER_INIT_ALLOC] = clockNano() - timers[TIMER_INIT_ALLOC];
2024-12-18 08:26:06 -08:00
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p splitCount %d color %d key %d- Init START", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->splitCount, job->color, job->key);
2024-09-10 05:57:10 -07:00
timers[TIMER_INIT_BOOTSTRAP] = clockNano();
NCCLCHECKGOTO(bootstrapSplit(comm->commHash, comm, job->parent, job->color, job->key, parentRanks), res, fail);
timers[TIMER_INIT_BOOTSTRAP] = clockNano() - timers[TIMER_INIT_BOOTSTRAP];
// debug info, no commId was used
commIdHash = 0;
2023-04-03 05:32:07 -07:00
} else {
2025-09-02 13:21:14 -07:00
// obtain a unique hash using the first commId
comm->commHash = commIdHash = getHash(job->commId->internal, NCCL_UNIQUE_ID_BYTES);
2024-09-10 05:57:10 -07:00
timers[TIMER_INIT_ALLOC] = clockNano();
2023-04-03 05:32:07 -07:00
NCCLCHECKGOTO(commAlloc(comm, NULL, job->nranks, job->myrank), res, fail);
2024-09-10 05:57:10 -07:00
timers[TIMER_INIT_ALLOC] = clockNano() - timers[TIMER_INIT_ALLOC];
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init START", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, commIdHash);
timers[TIMER_INIT_BOOTSTRAP] = clockNano();
NCCLCHECKGOTO(bootstrapInit(job->nId, (struct ncclBootstrapHandle*)job->commId, comm), res, fail);
timers[TIMER_INIT_BOOTSTRAP] = clockNano() - timers[TIMER_INIT_BOOTSTRAP];
2023-04-03 05:32:07 -07:00
}
comm->cudaArch = cudaArch;
comm->archName = archName;
comm->cuCount = cuCount;
2023-06-13 00:19:57 -07:00
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(initTransportsRank(comm, job->parent, timers), res, fail);
// Check if using host uncached mem correctly
NCCLCHECK(checkHostUncacheMemSetting(comm));
// RCCL: determine and set unroll factor for comm
NCCLCHECK(commSetUnrollFactor(comm));
#ifdef ENABLE_ROCSHMEM
if (rcclParamRocshmemEnabled()) { // @TODO - This doesn't seem to disable when I set ROCSHMEM_ENABLE=0 on command line
INFO(NCCL_INIT,"Initializing rocSHMEM inside of RCCL");
int ret;
rocshmem::rocshmem_uniqueid_t rocshmemUniqueId;
rocshmem::rocshmem_init_attr_t rocshmemAttr;
if(comm->rank == 0 ) {
ret = rocshmem::rocshmem_get_uniqueid (&rocshmemUniqueId);
if (ret != rocshmem::ROCSHMEM_SUCCESS) {
ERROR("Error in rocshmem_get_uniqueid, Rocshmem cannot be initialized.");
return ncclSystemError;
}
}
NCCLCHECKGOTO(bootstrapBroadcast(comm->bootstrap, comm->rank, comm->nRanks, 0, &rocshmemUniqueId,
sizeof(rocshmemUniqueId)), res, fail);
ret = rocshmem::rocshmem_set_attr_uniqueid_args(job->myrank, job->nranks, &rocshmemUniqueId, &rocshmemAttr);
if (ret != rocshmem::ROCSHMEM_SUCCESS) {
ERROR("Error in rocshmem_set_attr_uniqueid_args, Rocshmem cannot be initialized.");
return ncclSystemError;
}
ret = rocshmem::rocshmem_init_attr(rocshmem::ROCSHMEM_INIT_WITH_UNIQUEID, &rocshmemAttr);
if (ret != rocshmem::ROCSHMEM_SUCCESS) {
ERROR("Error in rocshmem_init_attr, Rocshmem cannot be initialized.");
return ncclSystemError;
}
comm->sourceRshmem = (void**) malloc(NUM_SYM_BUF * sizeof(void *));
comm->destRshmem = (void**) malloc(NUM_SYM_BUF * sizeof(void *));
for (int i = 0; i < NUM_SYM_BUF; i++) {
comm->sourceRshmem[i] = (void *)rocshmem::rocshmem_malloc((size_t)(1*1024*1024));
comm->destRshmem[i] = (void *)rocshmem::rocshmem_malloc((size_t)(1*1024*1024));
}
comm->enableRocshmem = rcclParamRocshmemEnabled();
comm->rocshmemThreshold = rcclParamRocshmemThreshold();
comm->numSymBuf = NUM_SYM_BUF;
comm->symId = 0;
//rocshmem::rocshmem_team_t team_reduce_world_dup;
comm->team_reduce_world_dup = rocshmem::ROCSHMEM_TEAM_INVALID;
rocshmem::rocshmem_team_split_strided(rocshmem::ROCSHMEM_TEAM_WORLD, 0, 1, job->nranks, nullptr, 0,
&(comm->team_reduce_world_dup));
ncclCommToRshmemTeam[comm] = comm->team_reduce_world_dup;
CUDACHECK(hipDeviceSynchronize());
}
#endif
// Allocate Temp Buffer for Direct Reduce Scatter
if (IsArchMatch(archName,"gfx950")) {
NCCLCHECK(ncclCudaMalloc(&(comm->tempBuff), TEMP_BUFF_SIZE));
}
#ifdef ENABLE_MSCCLPP
2024-09-11 09:55:16 -06:00
if (job->parent) {
if (job->parent->mscclppCompatible) {
INFO(NCCL_INIT, "MSCCL++: Splitting a compatible communicator; using parent mscclpp_comm");
comm->mscclppCompatible = true;
comm->mscclpp_threshold = job->parent->mscclpp_threshold;
comm->mscclpp_comm = job->parent->mscclpp_comm;
2025-10-09 14:08:38 -05:00
const ncclUniqueId& parentUniqueId = ncclCommToUniqueIdMap[job->parent];
auto& mscclppUniqueId = mscclpp_uniqueIdMap[parentUniqueId];
mscclpp_uniqueIdReverseMap[mscclppUniqueId].insert(parentUniqueId);
ncclCommToUniqueIdMap[comm] = parentUniqueId;
2024-09-11 09:55:16 -06:00
}
}
else
#endif
if (rcclParamMscclppEnabled()) {
#ifdef ENABLE_MSCCLPP
if (mscclEnabled() && (comm->topo->mscclEnabled || mscclForceEnabled()) && mscclppCommCompatible(comm)) {
comm->mscclppCompatible = IsArchMatch(archName, "gfx942") || IsArchMatch(archName, "gfx950");
if (comm->mscclppCompatible) {
bool mapContainsId = (mscclpp_uniqueIdMap.count(*job->commId) > 0);
auto& mscclppUniqueId = mscclpp_uniqueIdMap[*job->commId];
if (comm->localRank == 0 && !mapContainsId) {
NCCLCHECKGOTO(mscclpp_ncclGetUniqueId(&mscclppUniqueId), res, fail);
TRACE_CALL("mscclpp_ncclGetUniqueId(0x%llx)", (unsigned long long)getHash(mscclppUniqueId.internal, NCCL_UNIQUE_ID_BYTES));
}
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, &mscclppUniqueId, sizeof(mscclppUniqueId)), res, fail);
unsigned long long mscclppUniqueIdHash; (void)mscclppUniqueIdHash;
TRACE_CALL("bootstrapIntraNodeBroadcast(rank=%d, nranks=%d, root=%d, bcastData=hash:0x%llx)", comm->localRank, comm->localRanks, 0, (mscclppUniqueIdHash = (unsigned long long)getHash(mscclppUniqueId.internal, NCCL_UNIQUE_ID_BYTES)));
mscclpp_uniqueIdReverseMap[mscclppUniqueId].insert(*job->commId);
comm->mscclpp_threshold = rcclParamMscclppThreshold();
INFO(NCCL_INIT, "MSCCL++: Enabled! Msg size threshold=%zu", comm->mscclpp_threshold);
NCCLCHECKGOTO(mscclpp_ncclCommInitRank(&(comm->mscclpp_comm), job->nranks, mscclppUniqueId, job->myrank), res, fail);
TRACE_CALL("mscclpp_ncclCommInitRank (*comm=%p, nranks=%d, commId=hash:0x%llx, myrank=%d)", comm->mscclpp_comm, job->nranks, mscclppUniqueIdHash, job->myrank);
mscclpp_commToUniqueIdMap[comm->mscclpp_comm] = mscclppUniqueId;
ncclCommToUniqueIdMap[comm] = *job->commId;
if (rcclParamMscclppForceEnabled()) {
comm->mscclppForceEnable = true;
} else {
comm->mscclppForceEnable = false;
}
} else {
WARN("MSCCL++: Cannot enable MSCCL++ on %s architecture", devProp.gcnArchName);
}
} else {
comm->mscclppCompatible = false;
WARN("MSCCL++: Cannot enable MSCCL++; environment is not MSCCL compatible");
}
2024-09-11 09:55:16 -06:00
#else
WARN("MSCCL++: Feature not enabled. ENABLE_MSCCLPP must be defined at compile-time to enable this feature.");
#endif
2024-09-11 09:55:16 -06:00
}
2025-07-30 14:59:28 -07:00
NCCLCHECKGOTO(latency_profiler::collTraceInit(comm), res, fail);
2022-08-18 02:53:17 -07:00
// update communicator state
comm->initState = ncclSuccess;
2024-06-11 01:28:01 -07:00
timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL];
2018-09-24 16:06:59 -07:00
2023-02-02 12:52:47 -08:00
// Trace this call for replay tool
2023-04-03 05:32:07 -07:00
if (job->parent) {
/* unlink child abort flag. */
__atomic_store_n(&job->parent->childAbortFlag, NULL, __ATOMIC_RELEASE);
2024-09-10 05:57:10 -07:00
TRACE_CALL("ncclCommSplit(%p, %d, %d, %p, %d, %d)", job->parent, job->color, job->key, comm, comm->rank, comm->nRanks);
2024-12-18 08:26:06 -08:00
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p splitCount %d color %d key %d - Init COMPLETE", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->splitCount, job->color, job->key);
2024-03-26 06:08:55 -07:00
} else {
2024-09-10 05:57:10 -07:00
// the name for the replay tool is ncclCommInitRank for all the variations
TRACE_CALL("ncclCommInitRank(%p, %d, 0x%llx, %d, %d)", comm, comm->nRanks, commIdHash, comm->rank, comm->cudaDev);
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init COMPLETE", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, commIdHash);
}
sum_timers = 0.0;
for (int it = 1; it < TIMERS_INIT_COUNT; ++it)
sum_timers += (timers[it] / 1e9);
INFO(NCCL_INIT | NCCL_PROFILE,
"Init timings - %s: rank %d nranks %d total %.2f (kernels %.2f, alloc %.2f, bootstrap %.2f, allgathers %.2f, topo %.2f, graphs %.2f, "
"connections %.2f, rest %.2f)",
job->funcName, comm->rank, comm->nRanks,
timers[TIMER_INIT_TOTAL] / 1e9, timers[TIMER_INIT_KERNELS] / 1e9, timers[TIMER_INIT_ALLOC] / 1e9,
timers[TIMER_INIT_BOOTSTRAP] / 1e9, timers[TIMER_INIT_ALLGATHER] / 1e9, timers[TIMER_INIT_TOPO] / 1e9,
timers[TIMER_INIT_GRAPHS] / 1e9, timers[TIMER_INIT_CONNECT] / 1e9, timers[TIMER_INIT_TOTAL] / 1e9 - sum_timers);
2022-11-29 04:27:46 -08:00
exit:
2023-04-03 05:32:07 -07:00
if (job->newcomm) {
/* assign it to user pointer. */
__atomic_store_n(job->newcomm, comm, __ATOMIC_RELEASE);
}
free(parentRanks);
2018-09-24 16:06:59 -07:00
return res;
2022-11-29 04:27:46 -08:00
fail:
comm->initState = res;
goto exit;
2018-09-24 16:06:59 -07:00
}
2023-02-27 02:48:21 -08:00
#define NCCL_CONFIG_DEFAULT(config, field, undef, defvalue, fieldStr, format) \
if (config->field == undef) { \
config->field = defvalue; \
} else { \
INFO(NCCL_ENV, "Comm config " fieldStr " set to " format, config->field); \
}
2023-04-03 05:32:07 -07:00
static ncclResult_t envConfigOverride(ncclComm_t comm) {
2022-08-18 02:53:17 -07:00
ncclResult_t ret = ncclSuccess;
2023-04-03 05:32:07 -07:00
const char* tmpNetName = comm->config.netName;
const char* envNetName;
2023-02-27 02:48:21 -08:00
int blockingEnv;
int cgaClusterSizeEnv;
int minCTAsEnv;
int maxCTAsEnv;
2023-04-03 05:32:07 -07:00
int splitShareEnv;
2025-06-18 10:34:47 -07:00
const char* collnetEnableEnv;
2025-05-29 20:56:40 -07:00
int ctaPolicyEnv;
int shrinkShareEnv;
int nvlsCTAsEnv;
2025-09-02 13:21:14 -07:00
int nChannelsPerNetPeerEnv;
int nvlinkUtilCentricSchedEnableEnv;
2023-04-03 05:32:07 -07:00
2025-09-02 13:21:14 -07:00
/* override configuration with env variable. */
2023-04-03 05:32:07 -07:00
blockingEnv = ncclParamCommBlocking();
if (blockingEnv == 0 || blockingEnv == 1)
comm->config.blocking = blockingEnv;
cgaClusterSizeEnv = ncclParamCGAClusterSize();
if (0 <= cgaClusterSizeEnv && cgaClusterSizeEnv <= NCCL_MAX_CGA_CLUSTER_SIZE) {
comm->config.cgaClusterSize = cgaClusterSizeEnv;
} else if (cgaClusterSizeEnv > NCCL_MAX_CGA_CLUSTER_SIZE) {
2025-01-27 03:30:22 -08:00
INFO(NCCL_ENV, "NCCL_CGA_CLUSTER_SIZE value %d is too big. Limiting value to %d.", cgaClusterSizeEnv, NCCL_MAX_CGA_CLUSTER_SIZE);
2023-04-03 05:32:07 -07:00
comm->config.cgaClusterSize = NCCL_MAX_CGA_CLUSTER_SIZE;
}
minCTAsEnv = ncclParamMinCTAs();
if (minCTAsEnv != NCCL_CONFIG_UNDEF_INT) {
2025-01-27 03:30:22 -08:00
if (minCTAsEnv <= 0)
INFO(NCCL_ENV, "NCCL_MIN_CTAS %d is too low, leaving it set at %d", minCTAsEnv, comm->config.minCTAs);
else
comm->config.minCTAs = minCTAsEnv;
2023-04-03 05:32:07 -07:00
}
maxCTAsEnv = ncclParamMaxCTAs();
if (maxCTAsEnv != NCCL_CONFIG_UNDEF_INT) {
2025-01-27 03:30:22 -08:00
if (maxCTAsEnv <= 0)
INFO(NCCL_ENV, "NCCL_MAX_CTAS %d is too low, leaving it set at %d", maxCTAsEnv, comm->config.maxCTAs);
else
comm->config.maxCTAs = maxCTAsEnv;
2023-04-03 05:32:07 -07:00
}
2025-09-02 13:21:14 -07:00
/* override configuration with env variable. */
nChannelsPerNetPeerEnv = ncclParamNChannelsPerNetPeer();
if (nChannelsPerNetPeerEnv != NCCL_CONFIG_UNDEF_INT) {
if (nChannelsPerNetPeerEnv <= 0)
INFO(NCCL_ENV, "NCCL_NCHANNELS_PER_NET_PEER %d is too low, leaving it set at %d", nChannelsPerNetPeerEnv, comm->config.nChannelsPerNetPeer);
else
comm->config.nChannelsPerNetPeer = nChannelsPerNetPeerEnv;
}
nvlinkUtilCentricSchedEnableEnv = ncclParamNvlinkUtilCentricSchedEnable();
if (nvlinkUtilCentricSchedEnableEnv != NCCL_CONFIG_UNDEF_INT) {
if (nvlinkUtilCentricSchedEnableEnv != 0 && nvlinkUtilCentricSchedEnableEnv != 1)
INFO(NCCL_ENV, "NCCL_NVLINK_UTIL_CENTRIC_SCHED_ENABLE %d is not valid, leaving it set at %d", nvlinkUtilCentricSchedEnableEnv, comm->config.nvlinkCentricSched);
else
comm->config.nvlinkCentricSched = nvlinkUtilCentricSchedEnableEnv;
}
2023-09-26 05:47:28 -07:00
envNetName = ncclGetEnv("NCCL_NET");
2023-04-03 05:32:07 -07:00
if (envNetName)
tmpNetName = envNetName;
if (tmpNetName != NULL) {
int netNameLen = strlen(tmpNetName) + 1;
comm->config.netName = (char*)malloc(netNameLen);
if (comm->config.netName == nullptr) {
WARN("Failed to allocate memory for network name");
return ncclSystemError;
}
2023-04-03 05:32:07 -07:00
memcpy((void*)comm->config.netName, tmpNetName, netNameLen);
} else {
comm->config.netName = NULL;
}
splitShareEnv = ncclParamCommSplitShareResources();
if (splitShareEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.splitShare = splitShareEnv;
}
2025-05-29 20:56:40 -07:00
shrinkShareEnv = ncclParamCommShrinkShareResources();
if (shrinkShareEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.shrinkShare = shrinkShareEnv;
}
2025-06-18 10:34:47 -07:00
// NCCL_COLLNET_ENABLE needs to be reloaded each time for comm init
// since users might change the env on the fly to enable/disable collnet
collnetEnableEnv = ncclGetEnv("NCCL_COLLNET_ENABLE");
if (collnetEnableEnv != NULL) {
int collnetEnableInt = (int)strtol(collnetEnableEnv, NULL, 0);
if (collnetEnableInt != NCCL_CONFIG_UNDEF_INT) {
comm->config.collnetEnable = collnetEnableInt;
INFO(NCCL_ENV, "NCCL_COLLNET_ENABLE set by environment to %d.", collnetEnableInt);
}
2025-05-29 20:56:40 -07:00
}
ctaPolicyEnv = ncclParamCtaPolicy();
if (ctaPolicyEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.CTAPolicy = ctaPolicyEnv;
}
nvlsCTAsEnv = ncclParamNvlsChannels();
if (nvlsCTAsEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.nvlsCTAs = nvlsCTAsEnv;
}
2023-04-03 05:32:07 -07:00
/* cap channels if needed */
if (comm->config.minCTAs > MAXCHANNELS) {
2025-01-27 03:30:22 -08:00
INFO(NCCL_ENV, "minCTAs %d is larger than #channels upper limit %d, cap it to %d", comm->config.minCTAs, MAXCHANNELS, MAXCHANNELS);
2023-04-03 05:32:07 -07:00
comm->config.minCTAs = MAXCHANNELS;
}
if (comm->config.maxCTAs > MAXCHANNELS) {
2025-01-27 03:30:22 -08:00
INFO(NCCL_ENV, "maxCTAs %d is larger than #channels upper limit %d, cap it to %d", comm->config.maxCTAs, MAXCHANNELS, MAXCHANNELS);
2023-04-03 05:32:07 -07:00
comm->config.maxCTAs = MAXCHANNELS;
}
if (comm->config.minCTAs > comm->config.maxCTAs) {
2025-01-27 03:30:22 -08:00
INFO(NCCL_ENV, "minCTAs %d is larger than maxCTAs %d, set both to %d", comm->config.minCTAs, comm->config.maxCTAs, comm->config.maxCTAs);
2023-04-03 05:32:07 -07:00
comm->config.minCTAs = comm->config.maxCTAs;
}
if (comm->config.splitShare != 1 && comm->config.splitShare != 0) {
2025-01-27 03:30:22 -08:00
INFO(NCCL_ENV, "splitShare %d is not a valid value 0/1, set it to 0", comm->config.splitShare);
2023-04-03 05:32:07 -07:00
comm->config.splitShare = 0;
}
2025-05-29 20:56:40 -07:00
if (comm->config.collnetEnable != 1 && comm->config.collnetEnable != 0) {
INFO(NCCL_ENV, "collnetEnable %d is not a valid value 0/1, set it to 0", comm->config.collnetEnable);
comm->config.collnetEnable = 0;
}
2025-09-02 13:21:14 -07:00
if (comm->config.CTAPolicy < NCCL_CTA_POLICY_DEFAULT || comm->config.CTAPolicy > NCCL_CTA_POLICY_ZERO) {
2025-05-29 20:56:40 -07:00
INFO(NCCL_ENV, "CTAPolicy %d is not a valid value, set it to %d", comm->config.CTAPolicy, NCCL_CTA_POLICY_DEFAULT);
comm->config.CTAPolicy = NCCL_CTA_POLICY_DEFAULT;
}
if (comm->config.nvlsCTAs != NCCL_CONFIG_UNDEF_INT && comm->config.nvlsCTAs <= 0) {
INFO(NCCL_ENV, "nvlsCTAs %d is not a valid value, NCCL will decide the default value automatically", comm->config.nvlsCTAs);
comm->config.nvlsCTAs = NCCL_CONFIG_UNDEF_INT;
}
2025-09-02 13:21:14 -07:00
2023-04-03 05:32:07 -07:00
return ret;
}
static ncclResult_t copyCommConfig(ncclComm_t childComm, ncclComm_t parnet) {
memcpy(&childComm->config, &parnet->config, sizeof(ncclConfig_t));
NCCLCHECK(envConfigOverride(childComm));
return ncclSuccess;
}
static ncclResult_t parseCommConfig(ncclComm_t comm, ncclConfig_t *config) {
ncclResult_t ret = ncclSuccess;
/* config must not be NULL in this function */
2023-02-27 02:48:21 -08:00
ncclConfig_t defaultConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t *internalConfigPtr;
size_t realSize;
2022-08-18 02:53:17 -07:00
2024-06-11 01:28:01 -07:00
internalConfig.magic = 0;
2023-02-27 02:48:21 -08:00
internalConfigPtr = &internalConfig;
2022-08-18 02:53:17 -07:00
if (config) {
2023-02-27 02:48:21 -08:00
memcpy((void*)&realSize, (void*)config, sizeof(size_t));
realSize = realSize > sizeof(ncclConfig_t) ? sizeof(ncclConfig_t) : realSize;
memcpy((void*)internalConfigPtr, (void*)config, realSize);
if (internalConfigPtr->magic != 0xcafebeef) {
WARN("ncclConfig_t argument not initialized via NCCL_CONFIG_INITIALIZER");
ret = ncclInvalidArgument;
goto fail;
}
/* check version. */
if (internalConfigPtr->version < NCCL_VERSION(2, 14, 0)) {
internalConfigPtr->blocking = defaultConfig.blocking;
}
if (internalConfigPtr->version < NCCL_VERSION(2, 17, 0)) {
internalConfigPtr->cgaClusterSize = defaultConfig.cgaClusterSize;
internalConfigPtr->minCTAs = defaultConfig.minCTAs;
internalConfigPtr->maxCTAs = defaultConfig.maxCTAs;
internalConfigPtr->netName = defaultConfig.netName;
}
2025-05-29 20:56:40 -07:00
if (internalConfigPtr->version < NCCL_VERSION(2, 25, 0)) {
internalConfigPtr->trafficClass = defaultConfig.trafficClass;
}
if (internalConfigPtr->version < NCCL_VERSION(2, 27, 0)) {
internalConfigPtr->collnetEnable = defaultConfig.collnetEnable;
internalConfigPtr->CTAPolicy = defaultConfig.CTAPolicy;
internalConfigPtr->shrinkShare = defaultConfig.shrinkShare;
internalConfigPtr->nvlsCTAs = defaultConfig.nvlsCTAs;
}
2025-09-02 13:21:14 -07:00
if (internalConfigPtr->version < NCCL_VERSION(2, 28, 0)) {
internalConfigPtr->nChannelsPerNetPeer = defaultConfig.nChannelsPerNetPeer;
internalConfigPtr->nvlinkCentricSched = defaultConfig.nvlinkCentricSched;
}
2023-02-27 02:48:21 -08:00
}
/* check input config attributes, -1 means user-undefined and we should use default value from NCCL. */
if (internalConfigPtr->blocking != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->blocking != 0 && internalConfigPtr->blocking != 1) {
WARN("Invalid config blocking attribute value %d", internalConfigPtr->blocking);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->cgaClusterSize != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->cgaClusterSize < 0) {
WARN("Invalid config cgaClusterSize attribute value %d", internalConfigPtr->cgaClusterSize);
ret = ncclInvalidArgument;
goto fail;
}
if ((internalConfigPtr->minCTAs != NCCL_CONFIG_UNDEF_INT &&
internalConfigPtr->minCTAs <= 0) ||
(internalConfigPtr->maxCTAs != NCCL_CONFIG_UNDEF_INT &&
internalConfigPtr->maxCTAs <= 0) ||
(internalConfigPtr->minCTAs > internalConfigPtr->maxCTAs)) {
WARN("Invalid config min/max channels attribute value %d/%d", internalConfigPtr->minCTAs, internalConfigPtr->maxCTAs);
ret = ncclInvalidArgument;
goto fail;
}
2023-04-03 05:32:07 -07:00
if (internalConfigPtr->splitShare != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->splitShare != 0 && internalConfigPtr->splitShare != 1) {
WARN("Invalid config splitShare attribute value %d", internalConfigPtr->splitShare);
ret = ncclInvalidArgument;
goto fail;
}
2025-05-29 20:56:40 -07:00
if (internalConfigPtr->collnetEnable != NCCL_CONFIG_UNDEF_INT && (internalConfigPtr->collnetEnable < 0 || internalConfigPtr->collnetEnable > 1)) {
WARN("Invalid config collnetEnable attribute value %d", internalConfigPtr->collnetEnable);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->CTAPolicy != NCCL_CONFIG_UNDEF_INT && (internalConfigPtr->CTAPolicy < NCCL_CTA_POLICY_DEFAULT ||
2025-09-02 13:21:14 -07:00
internalConfigPtr->CTAPolicy > NCCL_CTA_POLICY_ZERO)) {
2025-05-29 20:56:40 -07:00
WARN("Invalid config policy attribute value %d", internalConfigPtr->CTAPolicy);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->shrinkShare != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->shrinkShare != 0 && internalConfigPtr->shrinkShare != 1) {
WARN("Invalid config shrinkShare attribute value %d", internalConfigPtr->shrinkShare);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->nvlsCTAs != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->nvlsCTAs <= 0) {
WARN("Invalid config nvlsCTAs attribute value %d", internalConfigPtr->nvlsCTAs);
ret = ncclInvalidArgument;
goto fail;
}
2025-09-02 13:21:14 -07:00
if (internalConfigPtr->nChannelsPerNetPeer != NCCL_CONFIG_UNDEF_INT && (internalConfigPtr->nChannelsPerNetPeer <= 0 || internalConfigPtr->nChannelsPerNetPeer > MAXCHANNELS)) {
WARN("Invalid config nChannelsPerNetPeer attribute value %d", internalConfigPtr->nChannelsPerNetPeer);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->nvlinkCentricSched != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->nvlinkCentricSched != 0 && internalConfigPtr->nvlinkCentricSched != 1) {
WARN("Invalid config nvlinkCentricSched attribute value %d", internalConfigPtr->nvlinkCentricSched);
ret = ncclInvalidArgument;
goto fail;
}
2023-02-27 02:48:21 -08:00
/* default config value can be tuned on different platform. */
NCCL_CONFIG_DEFAULT(internalConfigPtr, blocking, NCCL_CONFIG_UNDEF_INT, 1, "Blocking", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, cgaClusterSize, NCCL_CONFIG_UNDEF_INT, 4, "CGA cluster size", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, minCTAs, NCCL_CONFIG_UNDEF_INT, 1, "Min CTAs", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, maxCTAs, NCCL_CONFIG_UNDEF_INT, MAXCHANNELS, "Max CTAs", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, netName, NCCL_CONFIG_UNDEF_PTR, NULL, "Net name", "%s");
2023-04-03 05:32:07 -07:00
NCCL_CONFIG_DEFAULT(internalConfigPtr, splitShare, NCCL_CONFIG_UNDEF_INT, 0, "Split share", "%d");
2025-03-12 13:46:21 -07:00
NCCL_CONFIG_DEFAULT(internalConfigPtr, trafficClass, NCCL_CONFIG_UNDEF_INT, NCCL_CONFIG_UNDEF_INT, "Traffic class", "%d");
2025-05-29 20:56:40 -07:00
NCCL_CONFIG_DEFAULT(internalConfigPtr, commName, NCCL_CONFIG_UNDEF_PTR, NULL, "Comm name", "%s");
NCCL_CONFIG_DEFAULT(internalConfigPtr, collnetEnable, NCCL_CONFIG_UNDEF_INT, 0, "Collnet enable", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, CTAPolicy, NCCL_CONFIG_UNDEF_INT, NCCL_CTA_POLICY_DEFAULT, "CTA policy flags", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, shrinkShare, NCCL_CONFIG_UNDEF_INT, 0, "shrinkShare", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, nvlsCTAs, NCCL_CONFIG_UNDEF_INT, NCCL_CONFIG_UNDEF_INT, "nvlsCTAs", "%d");
2025-09-02 13:21:14 -07:00
NCCL_CONFIG_DEFAULT(internalConfigPtr, nChannelsPerNetPeer, NCCL_CONFIG_UNDEF_INT,
NCCL_CONFIG_UNDEF_INT, "nChannelsPerNetPeer", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, nvlinkCentricSched, NCCL_CONFIG_UNDEF_INT, 0, "nvlinkCentricSched", "%d");
2023-02-27 02:48:21 -08:00
/* assign config to communicator */
2023-04-03 05:32:07 -07:00
comm->config.blocking = internalConfigPtr->blocking;
comm->config.cgaClusterSize = internalConfigPtr->cgaClusterSize;
comm->config.minCTAs = internalConfigPtr->minCTAs;
comm->config.maxCTAs = internalConfigPtr->maxCTAs;
comm->config.netName = internalConfigPtr->netName;
comm->config.splitShare = internalConfigPtr->splitShare;
2025-03-12 13:46:21 -07:00
comm->config.trafficClass = internalConfigPtr->trafficClass;
2025-05-29 20:56:40 -07:00
comm->config.commName = internalConfigPtr->commName;
comm->config.collnetEnable = internalConfigPtr->collnetEnable;
comm->config.CTAPolicy = internalConfigPtr->CTAPolicy;
comm->config.shrinkShare = internalConfigPtr->shrinkShare;
comm->config.nvlsCTAs = internalConfigPtr->nvlsCTAs;
2025-09-02 13:21:14 -07:00
comm->config.nChannelsPerNetPeer = internalConfigPtr->nChannelsPerNetPeer;
comm->config.nvlinkCentricSched = internalConfigPtr->nvlinkCentricSched;
2023-04-03 05:32:07 -07:00
NCCLCHECKGOTO(envConfigOverride(comm), ret, fail);
2022-08-18 02:53:17 -07:00
2023-02-27 02:48:21 -08:00
exit:
2022-08-18 02:53:17 -07:00
return ret;
2023-02-27 02:48:21 -08:00
fail:
goto exit;
2022-08-18 02:53:17 -07:00
}
2024-09-10 05:57:10 -07:00
static void ncclCommInitJobFree(void* _job) {
struct ncclCommInitRankAsyncJob* job = (struct ncclCommInitRankAsyncJob*)_job;
free(job->commId);
free(_job);
}
static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, int nId, ncclUniqueId* commId, int myrank, int cudaDev, ncclConfig_t *config, const char funcName[]) {
if (nId <= 0 || nId > nranks) {
WARN("improper usage of ncclCommInitRank: nId = %d, nranks=%d", nId, nranks);
return ncclInvalidArgument;
}
2022-11-29 04:27:46 -08:00
ncclResult_t res = ncclSuccess;
2024-09-10 05:57:10 -07:00
const char* commIdEnv = NULL;
2022-08-18 02:53:17 -07:00
ncclComm_t comm = NULL;
2024-09-10 05:57:10 -07:00
struct ncclCommInitRankAsyncJob* job = NULL;
2025-03-12 13:46:21 -07:00
bool launchedJob = false;
2024-09-10 05:57:10 -07:00
// first call ncclInit, this will setup the environment
2022-08-18 02:53:17 -07:00
NCCLCHECKGOTO(ncclInit(), res, fail);
2024-09-10 05:57:10 -07:00
if (ncclDebugLevel > NCCL_LOG_WARN || (ncclDebugLevel >= NCCL_LOG_VERSION && myrank == 0)) {
2025-09-02 13:21:14 -07:00
static std::once_flag once;
std::call_once(once, showVersion);
2024-06-11 01:28:01 -07:00
}
2018-09-24 16:06:59 -07:00
// Make sure the CUDA runtime is initialized.
2022-11-07 14:09:26 -08:00
CUDACHECKGOTO(cudaFree(NULL), res, fail);
2018-09-24 16:06:59 -07:00
2022-08-18 02:53:17 -07:00
NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank", "newcomm"), res, fail);
2023-02-27 02:48:21 -08:00
NCCLCHECKGOTO(PtrCheck(config, "CommInitRank", "config"), res, fail);
2018-09-24 16:06:59 -07:00
if (nranks < 1 || myrank < 0 || myrank >= nranks) {
WARN("Invalid rank requested : %d/%d", myrank, nranks);
2019-11-19 14:57:39 -08:00
res = ncclInvalidArgument;
2022-08-18 02:53:17 -07:00
goto fail;
2018-09-24 16:06:59 -07:00
}
2022-08-18 02:53:17 -07:00
NCCLCHECKGOTO(ncclCalloc(&comm, 1), res, fail);
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclCalloc(&comm->abortFlag, 1), res, fail);
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->abortFlagDev, 1), res, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->abortFlagRefCount, 1), res, fail);
2024-03-26 06:08:55 -07:00
comm->startMagic = comm->endMagic = NCCL_MAGIC; // Used to detect comm corruption.
2023-04-03 05:32:07 -07:00
*comm->abortFlagRefCount = 1;
2022-08-18 02:53:17 -07:00
NCCLCHECKGOTO(parseCommConfig(comm, config), res, fail);
2024-12-18 08:26:06 -08:00
/* start with ncclInProgress and will be changed to ncclSuccess if init succeeds. */
comm->initState = ncclInProgress;
2022-08-18 02:53:17 -07:00
*newcomm = comm;
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
2024-09-10 05:57:10 -07:00
job->nId = nId;
2023-04-03 05:32:07 -07:00
job->comm = comm;
2022-05-24 02:02:31 -07:00
job->nranks = nranks;
job->myrank = myrank;
job->cudaDev = cudaDev;
2024-09-10 05:57:10 -07:00
snprintf(job->funcName, NCCL_COMMINIT_FUNCNAME_LEN, "%s", funcName);
// need to copy the commIds to allow async commInit and to avoid alignement issues when casting from ncclUNiqueId and ncclBootstrapHandle
// ncclUniqueIds and ncclBootstrapHandle don't have the same alignment requirements.
// Therefore the array of Ids coming from the user might not be properly aligned to be cast into a ncclBootstrapHandle
// copying into allocated memory guarantees that the memory is properly aligned for any objects, removing that issue
NCCLCHECKGOTO(ncclCalloc(&job->commId, nId), res, fail);
memcpy(job->commId, commId, nId * NCCL_UNIQUE_ID_BYTES);
commIdEnv = ncclGetEnv("NCCL_COMM_ID");
if (commIdEnv && myrank == 0) {
INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", commIdEnv);
if (nId > 1) {
INFO(NCCL_INIT | NCCL_ENV, "NCCL_COMM_ID cannot be used with more than one ncclUniqueId");
job->nId = 1;
}
// start the bootstrap root before bootstrapping, use only the first handle
NCCLCHECKGOTO(bootstrapCreateRoot((struct ncclBootstrapHandle*)&job->commId[0], true), res, fail);
}
2025-03-12 13:46:21 -07:00
launchedJob = true;
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, NULL, ncclCommInitJobFree, comm), res, fail);
2021-04-12 16:00:11 -07:00
2022-08-18 02:53:17 -07:00
exit:
2025-04-19 00:21:27 -04:00
// for loggin only, not ready for replaying
// !recording at sink
NCCLCHECK(Recorder::instance().record(rrCommInitDev, nranks, myrank, commId, comm, cudaDev));
2022-05-24 02:02:31 -07:00
return ncclGroupErrCheck(res);
2022-08-18 02:53:17 -07:00
fail:
2025-03-12 13:46:21 -07:00
if (job && !launchedJob) ncclCommInitJobFree(job);
2022-11-29 04:27:46 -08:00
if (comm) {
2024-06-11 01:28:01 -07:00
free(comm->abortFlag);
2024-09-10 05:57:10 -07:00
if (comm->abortFlagDev) (void)ncclCudaHostFree((void*)comm->abortFlagDev);
2024-06-11 01:28:01 -07:00
free(comm->abortFlagRefCount);
2022-11-29 04:27:46 -08:00
free(comm);
}
if (newcomm) *newcomm = NULL;
2022-08-18 02:53:17 -07:00
goto exit;
2018-09-24 16:06:59 -07:00
}
2019-11-19 14:57:39 -08:00
NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank);
ncclResult_t ncclCommInitRank_impl(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
2025-04-19 00:21:27 -04:00
NCCLCHECK(Recorder::instance().record(rrCommInitRank, nranks, myrank, &commId));
2025-01-27 03:30:22 -08:00
NVTX3_RANGE(NcclNvtxParamsCommInitRank)
2022-05-24 02:02:31 -07:00
// Load the CUDA driver and dlsym hooks (can fail on old drivers)
2023-10-03 03:17:48 -06:00
rocmLibraryInit();
2019-11-19 14:57:39 -08:00
int cudaDev;
2023-02-27 02:48:21 -08:00
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
2022-11-07 14:09:26 -08:00
CUDACHECK(cudaGetDevice(&cudaDev));
2022-11-29 04:27:46 -08:00
2024-09-10 05:57:10 -07:00
NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, 1, &commId, myrank, cudaDev, &config, __func__));
2025-01-27 03:30:22 -08:00
NVTX3_RANGE_ADD_PAYLOAD(CommInitRank, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD((*newcomm)->commHash, nranks, myrank, cudaDev));
2018-09-24 16:06:59 -07:00
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommInitAll, ncclComm_t* comms, int ndev, const int* devlist);
ncclResult_t ncclCommInitAll_impl(ncclComm_t* comms, int ndev, const int* devlist) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record(comms, ndev, devlist);
2022-08-18 02:53:17 -07:00
ncclResult_t ret = ncclSuccess;
int totalnDev;
int *gpuFlags = NULL;
2023-02-27 02:48:21 -08:00
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
2024-09-10 05:57:10 -07:00
int oldDev = 0;
2022-11-29 04:27:46 -08:00
2025-01-27 03:30:22 -08:00
NVTX3_RANGE(NcclNvtxParamsCommInitAll);
2022-11-29 04:27:46 -08:00
2022-05-24 02:02:31 -07:00
// Load the CUDA driver and dlsym hooks (can fail on old drivers)
2023-10-03 03:17:48 -06:00
rocmLibraryInit();
2022-05-24 02:02:31 -07:00
2024-09-10 05:57:10 -07:00
CUDACHECK(cudaGetDevice(&oldDev));
2022-08-18 02:53:17 -07:00
NCCLCHECKGOTO(PtrCheck(comms, "CommInitAll", "comms"), ret, fail);
2019-11-19 14:57:39 -08:00
if (ndev < 0) {
2018-09-24 16:06:59 -07:00
WARN("Invalid device count requested : %d", ndev);
2022-08-18 02:53:17 -07:00
ret = ncclInvalidArgument;
goto fail;
}
2022-11-07 14:09:26 -08:00
CUDACHECKGOTO(cudaGetDeviceCount(&totalnDev), ret, fail);
2022-08-18 02:53:17 -07:00
if (devlist) {
NCCLCHECKGOTO(ncclCalloc(&gpuFlags, totalnDev), ret, fail);
for (int i = 0; i < ndev; ++i) {
/* invalid device check. */
if (devlist[i] < 0 || devlist[i] >= totalnDev) {
2024-09-10 05:57:10 -07:00
WARN("Invalid device %d (totalnDev=%d)", devlist[i], totalnDev);
ret = ncclInvalidArgument;
2022-08-18 02:53:17 -07:00
goto fail;
}
/* duplicate device check. */
if (gpuFlags[devlist[i]] != 0) {
ret = ncclInvalidUsage;
goto fail;
}
gpuFlags[devlist[i]] = 1;
}
free(gpuFlags);
2022-10-25 00:55:55 -07:00
gpuFlags = nullptr;
2018-09-24 16:06:59 -07:00
}
2019-11-19 14:57:39 -08:00
ncclUniqueId uniqueId;
2022-08-18 02:53:17 -07:00
NCCLCHECKGOTO(ncclGetUniqueId(&uniqueId), ret, fail);
2025-01-27 03:30:22 -08:00
NCCLCHECKGOTO(ncclGroupStartInternal(), ret, fail);
2018-09-24 16:06:59 -07:00
for (int i=0; i<ndev; i++) {
2019-11-19 14:57:39 -08:00
// Ignore return codes .. we need to call ncclGroupEnd to clean up anyway
2024-09-10 05:57:10 -07:00
int dev = devlist ? devlist[i] : i;
CUDACHECKGOTO(cudaSetDevice(dev), ret, fail);
ncclCommInitRankDev(comms+i, ndev,1, &uniqueId, i, dev, &config, __func__);
2018-09-24 16:06:59 -07:00
}
2025-01-27 03:30:22 -08:00
NCCLCHECKGOTO(ncclGroupEndInternal(), ret, fail);
NVTX3_RANGE_ADD_PAYLOAD(CommInitAll, NcclNvtxParamsCommInitAllSchema,
NVTX3_PAYLOAD(comms[0]->commHash, ndev));
2022-08-18 02:53:17 -07:00
2024-09-10 05:57:10 -07:00
exit:
2024-12-18 08:26:06 -08:00
(void)cudaSetDevice(oldDev);
2022-10-25 00:55:55 -07:00
free(gpuFlags);
return ret;
2024-09-10 05:57:10 -07:00
fail:
goto exit;
2018-09-24 16:06:59 -07:00
}
2022-08-18 02:53:17 -07:00
ncclResult_t ncclCommSetAsyncError(ncclComm_t comm, ncclResult_t nextState) {
if (nextState < 0 || nextState >= ncclNumResults || comm == NULL) {
WARN("ncclCommSetAsyncError: error comm %p sets state %d", comm, nextState);
2022-01-07 06:39:55 -08:00
return ncclInvalidArgument;
}
2022-08-18 02:53:17 -07:00
__atomic_store_n(&comm->asyncResult, nextState, __ATOMIC_RELEASE);
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommInitRankConfig, ncclComm_t* comm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config);
ncclResult_t ncclCommInitRankConfig_impl(ncclComm_t *newcomm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record(rrCommInitRankConfig, nranks, myrank, &commId, config);
2022-08-18 02:53:17 -07:00
int cudaDev;
ncclResult_t ret = ncclSuccess;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
2023-02-27 02:48:21 -08:00
ncclConfig_t *internalConfigPtr = NULL;
2025-01-27 03:30:22 -08:00
NVTX3_RANGE(NcclNvtxParamsCommInitRankConfig);
2022-08-18 02:53:17 -07:00
NCCLCHECK(ncclGroupStartInternal());
2023-10-03 03:17:48 -06:00
rocmLibraryInit();
2024-09-10 05:57:10 -07:00
CUDACHECK(cudaGetDevice(&cudaDev));
2023-02-27 02:48:21 -08:00
if (config == NULL)
internalConfigPtr = &internalConfig;
else
internalConfigPtr = config;
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclCommInitRankDev(newcomm, nranks, 1, &commId, myrank, cudaDev, internalConfigPtr, __func__), ret, fail);
exit:
ncclGroupErrCheck(ret);
NCCLCHECK(ncclGroupEndInternal());
2025-01-27 03:30:22 -08:00
if (newcomm && *newcomm) {
if (!(*newcomm)->config.blocking) {
(void) ncclCommGetAsyncError(*newcomm, &ret);
}
NVTX3_RANGE_ADD_PAYLOAD(CommInitRankConfig, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD((*newcomm)->commHash, nranks, myrank, cudaDev));
}
2024-09-10 05:57:10 -07:00
return ret;
fail:
if (newcomm && *newcomm && !(*newcomm)->config.blocking) (void) ncclCommSetAsyncError(*newcomm, ret);
goto exit;
}
NCCL_API(ncclResult_t, ncclCommInitRankScalable, ncclComm_t* newcomm, int nranks, int myrank, int nId, ncclUniqueId* commId, ncclConfig_t* config);
ncclResult_t ncclCommInitRankScalable(ncclComm_t* newcomm, int nranks, int myrank, int nId, ncclUniqueId* commId, ncclConfig_t* config) {
2025-01-27 03:30:22 -08:00
NVTX3_RANGE(NcclNvtxParamsCommInitRankScalable);
2024-09-10 05:57:10 -07:00
int cudaDev;
ncclResult_t ret = ncclSuccess;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t *internalConfigPtr = NULL;
NCCLCHECK(ncclGroupStartInternal());
rocmLibraryInit();
2024-09-10 05:57:10 -07:00
CUDACHECK(cudaGetDevice(&cudaDev));
2023-02-27 02:48:21 -08:00
if (config == NULL)
internalConfigPtr = &internalConfig;
else
internalConfigPtr = config;
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclCommInitRankDev(newcomm, nranks, nId, commId, myrank, cudaDev, internalConfigPtr, __func__), ret, fail);
2022-08-18 02:53:17 -07:00
exit:
ncclGroupErrCheck(ret);
NCCLCHECK(ncclGroupEndInternal());
2025-01-27 03:30:22 -08:00
if (newcomm && *newcomm) {
if (!(*newcomm)->config.blocking) {
(void) ncclCommGetAsyncError(*newcomm, &ret);
}
NVTX3_RANGE_ADD_PAYLOAD(CommInitRankScalable, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD((*newcomm)->commHash, nranks, myrank, cudaDev));
}
2022-08-18 02:53:17 -07:00
return ret;
fail:
2023-04-03 05:32:07 -07:00
if (newcomm && *newcomm && !(*newcomm)->config.blocking) (void) ncclCommSetAsyncError(*newcomm, ret);
2022-08-18 02:53:17 -07:00
goto exit;
}
static ncclResult_t commDestroySync(struct ncclAsyncJob* job_) {
struct ncclCommFinalizeAsyncJob* job = (struct ncclCommFinalizeAsyncJob*) job_;
ncclComm_t comm = job->comm;
2022-11-29 04:27:46 -08:00
ncclResult_t ret = ncclSuccess;
2018-09-24 16:06:59 -07:00
2024-12-18 08:26:06 -08:00
CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), ret, fail);
2018-09-24 16:06:59 -07:00
2025-07-30 14:59:28 -07:00
NCCLCHECKGOTO(latency_profiler::collTraceDestroy(comm), ret, fail);
2022-08-18 02:53:17 -07:00
TRACE(NCCL_INIT, "Destroying comm %p rank %d abortFlag %d asyncResult %d", comm, comm->rank, *comm->abortFlag, comm->asyncResult);
if (comm->initState == ncclSuccess) {
2024-09-10 05:57:10 -07:00
if ((ret = ncclStrongStreamSynchronize(&comm->sharedRes->hostStream)) != ncclSuccess) {
WARN("commDestroySync: comm %p rank %d sync hostStream error %d\n", comm, comm->rank, ret);
}
if ((ret = ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream)) != ncclSuccess) {
WARN("commDestroySync: comm %p rank %d sync deviceStream error %d\n", comm, comm->rank, ret);
}
2025-05-29 20:56:40 -07:00
NCCLCHECKGOTO(ncclCommPollEventCallbacks(comm, true), ret, fail);
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclCommPollCallbacks(comm, false), ret, fail);
// And keep polling until all graphs referencing us die.
2025-03-12 13:46:21 -07:00
while (comm->localPersistentRefs != 0) {
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclCommPollCallbacks(comm, /*waitSome=*/true), ret, fail);
}
2024-09-10 05:57:10 -07:00
while (!ncclIntruQueueEmpty(&comm->legacyRegCleanupQueue)) {
struct ncclCommCallback* cb = ncclIntruQueueDequeue(&comm->legacyRegCleanupQueue);
if (cb->fn(comm, cb) != ncclSuccess) {
WARN("Legacy IPC cleanup callback failed comm %p (rank = %d) cb %p", comm, comm->rank, cb);
}
}
2022-08-18 02:53:17 -07:00
}
2024-06-11 01:28:01 -07:00
if ((ret = ncclProxyStop(comm)) != ncclSuccess) {
WARN("ncclProxyStop: comm %p (rank = %d) destroys proxy resource error %d", comm, comm->rank, ret);
2018-09-24 16:06:59 -07:00
}
2022-08-18 02:53:17 -07:00
exit:
return ret;
fail:
goto exit;
}
static ncclResult_t commCleanup(ncclComm_t comm) {
bool mscclEnabledForTopo = comm->topo->mscclEnabled;
2022-08-18 02:53:17 -07:00
2024-12-18 08:26:06 -08:00
CUDACHECK(cudaSetDevice(comm->cudaDev));
2023-09-26 05:47:28 -07:00
if (comm->tuner != NULL) {
2025-09-02 13:21:14 -07:00
NCCLCHECK(comm->tuner->finalize(comm->tunerContext));
2024-06-11 01:28:01 -07:00
NCCLCHECK(ncclTunerPluginUnload(comm));
2023-09-26 05:47:28 -07:00
}
if (mscclEnabled() && (mscclEnabledForTopo || mscclForceEnabled())) {
NCCLCHECK(mscclTeardown(comm));
2023-09-26 05:47:28 -07:00
}
2018-09-24 16:06:59 -07:00
NCCLCHECK(commFree(comm));
#if defined(ENABLE_NPKIT)
// Dump NPKit events and shutdown
const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR");
if (npkitDumpDir == nullptr) {
npkitDumpDir = "./npkit_dump";
INFO(NCCL_INIT, "NPKIT_DUMP_DIR is not set, using default directory: %s", npkitDumpDir);
}
struct stat st;
if (stat(npkitDumpDir, &st) != 0) {
if (mkdir(npkitDumpDir, 0755) != 0) {
WARN("Failed to create NPKIT_DUMP_DIR directory: %s", npkitDumpDir);
}
}
NCCLCHECK(NpKit::Dump(npkitDumpDir));
NCCLCHECK(NpKit::Shutdown());
#endif
2018-09-24 16:06:59 -07:00
return ncclSuccess;
}
2022-08-18 02:53:17 -07:00
NCCL_API(ncclResult_t, ncclCommFinalize, ncclComm_t comm);
ncclResult_t ncclCommFinalize_impl(ncclComm_t comm) {
2025-04-19 00:21:27 -04:00
NCCLCHECK(Recorder::instance().record(rrCommFinalize, comm));
2025-01-27 03:30:22 -08:00
NVTX3_RANGE(NcclNvtxParamsCommFinalize);
2022-08-18 02:53:17 -07:00
ncclResult_t ret = ncclSuccess;
2024-06-11 01:28:01 -07:00
struct ncclCommFinalizeAsyncJob *job = NULL;
2022-08-18 02:53:17 -07:00
NCCLCHECK(ncclGroupStartInternal());
if (comm == NULL) goto exit;
/* wait comm ready before finalize. */
NCCLCHECKGOTO(ncclCommEnsureReady(comm), ret, fail);
/* prevent double finalize. */
if (comm->finalizeCalled) {
ret = ncclInvalidArgument;
goto fail;
}
2024-06-11 01:28:01 -07:00
comm->finalizeCalled = true;
/* launch async thread to finalize comm. */
NCCLCHECKGOTO(ncclCalloc(&job, 1), ret, fail);
job->comm = comm;
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, commDestroySync, NULL, free, comm), ret, fail);
2022-08-18 02:53:17 -07:00
exit:
ncclGroupErrCheck(ret);
NCCLCHECK(ncclGroupEndInternal());
2025-01-27 03:30:22 -08:00
if (comm) {
if (!comm->config.blocking) {
NCCLCHECK(ncclCommGetAsyncError(comm, &ret));
}
NVTX3_RANGE_ADD_PAYLOAD(CommFinalize, NcclNvtxParamsCommFinalizeSchema,
NVTX3_PAYLOAD(comm->commHash));
}
2022-08-18 02:53:17 -07:00
return ret;
fail:
2023-04-03 05:32:07 -07:00
if (comm && !comm->config.blocking) (void) ncclCommSetAsyncError(comm, ret);
2022-08-18 02:53:17 -07:00
goto exit;
}
2024-06-11 01:28:01 -07:00
static ncclResult_t commReclaim(struct ncclAsyncJob* job_) {
struct ncclCommFinalizeAsyncJob* job = (struct ncclCommFinalizeAsyncJob*) job_;
ncclComm_t comm = job->comm;
2022-08-18 02:53:17 -07:00
ncclResult_t ret = ncclSuccess;
2023-02-02 12:52:47 -08:00
if (comm->intraComm0 != NULL) {
2022-08-18 02:53:17 -07:00
int curRankCnt;
2024-06-11 01:28:01 -07:00
int curRank; /* Debug info */
2022-08-18 02:53:17 -07:00
int intraRanks = comm->intraRanks;
ncclComm_t intracomm0 = comm->intraComm0;
int *finalizeRankCnt = &intracomm0->finalizeRankCnt;
assert(intracomm0 != NULL && finalizeRankCnt != NULL);
curRankCnt = __atomic_add_fetch(finalizeRankCnt, 1, __ATOMIC_ACQ_REL);
if (curRankCnt == intraRanks) {
ncclComm_t curIntraComm;
ncclComm_t nextIntraComm = intracomm0;
2022-11-29 04:27:46 -08:00
/* this is the last call to ncclCommDestroy/Abort, we need to make sure all comms
* in the process have been finalized before we free local resources. */
2022-08-18 02:53:17 -07:00
while (nextIntraComm) {
curIntraComm = nextIntraComm;
curRank = curIntraComm->rank;
nextIntraComm = nextIntraComm->intraNext;
2022-11-29 04:27:46 -08:00
if (curIntraComm->finalizeCalled == false) {
2022-08-18 02:53:17 -07:00
struct ncclCommFinalizeAsyncJob job;
job.comm = curIntraComm;
/* every comm aborts, commDestroySync should not be blocked. */
if ((ret = commDestroySync((struct ncclAsyncJob*) &job)) != ncclSuccess)
2024-06-11 01:28:01 -07:00
WARN("commReclaim: comm %p (rank = %d) in commDestroySync, error %d", curIntraComm, curRank, ret);
2022-11-29 04:27:46 -08:00
}
}
/* free local resources. */
nextIntraComm = intracomm0;
while (nextIntraComm) {
curIntraComm = nextIntraComm;
curRank = curIntraComm->rank;
nextIntraComm = nextIntraComm->intraNext;
2022-08-18 02:53:17 -07:00
if ((ret = commCleanup(curIntraComm)) != ncclSuccess) {
2024-09-10 05:57:10 -07:00
// We pass a freed pointer, but we don't dereference; we merely print its value, so it's OK.
// coverity[pass_freed_arg]
2022-08-18 02:53:17 -07:00
WARN("commReclaim: cleanup comm %p rank %d failed in destroy/abort, error %d", curIntraComm, curRank, ret);
}
}
}
}
2024-09-10 05:57:10 -07:00
return ncclSuccess;
2022-08-18 02:53:17 -07:00
}
2018-12-13 15:56:12 -08:00
NCCL_API(ncclResult_t, ncclCommDestroy, ncclComm_t comm);
ncclResult_t ncclCommDestroy_impl(ncclComm_t comm) {
2025-04-19 00:21:27 -04:00
NCCLCHECK(Recorder::instance().record(rrCommDestroy, comm));
2022-11-29 04:27:46 -08:00
if (comm == NULL) {
2025-09-02 13:21:14 -07:00
NCCL_NVTX3_FUNC_RANGE;
2018-12-13 15:56:12 -08:00
return ncclSuccess;
2022-11-29 04:27:46 -08:00
}
2025-08-25 07:55:10 -05:00
INFO(NCCL_INIT, "Memory used = %ld", allocTracker[comm->cudaDev].totalAllocSize);
2018-12-13 15:56:12 -08:00
#ifdef ENABLE_MSCCLPP
if (comm->mscclppCompatible) {
2024-09-11 09:55:16 -06:00
auto& mscclppUniqueId = mscclpp_commToUniqueIdMap[comm->mscclpp_comm];
auto& uniqueIds = mscclpp_uniqueIdReverseMap[mscclppUniqueId];
auto& ncclUniqueId = ncclCommToUniqueIdMap[comm];
if (uniqueIds.find(ncclUniqueId) == uniqueIds.end()) {
WARN("MSCCL++: comm=%p not found in mscclpp_uniqueIdReverseMap for key=%p", comm, comm->mscclpp_comm);
}
2024-09-11 09:55:16 -06:00
uniqueIds.erase(ncclUniqueId);
if (uniqueIds.size() == 0) {
mscclpp_uniqueIdReverseMap.erase(mscclppUniqueId);
ncclResult_t res = mscclpp_ncclCommDestroy(comm->mscclpp_comm);
TRACE_CALL("mscclpp_ncclCommDestroy");
if (res != ncclSuccess) {
WARN("MSCCL++: mscclpp_ncclCommDestroy failed (%s)", ncclGetErrorString(res));
}
}
comm->mscclppCompatible = false;
comm->mscclpp_comm = nullptr;
}
#endif
#ifdef ENABLE_ROCSHMEM
if (comm->enableRocshmem) {
for (int i = 0; i < NUM_SYM_BUF; i++) {
rocshmem::rocshmem_free(comm->sourceRshmem[i]);
rocshmem::rocshmem_free(comm->destRshmem[i]);
}
free(comm->sourceRshmem);
free(comm->destRshmem);
//TODO: subcomm check
rocshmem::rocshmem_team_t team;
if (!ncclCommToRshmemTeam.empty()) {
team = ncclCommToRshmemTeam[comm];
rocshmem::rocshmem_team_destroy(team);
ncclCommToRshmemTeam.erase(comm);
}
if (ncclCommToRshmemTeam.empty()) {
rocshmem::rocshmem_finalize();
}
}
#endif
2022-01-07 06:39:55 -08:00
int rank = comm->rank, nranks = comm->nRanks, cudaDev = comm->cudaDev;
2024-06-11 01:28:01 -07:00
struct ncclCommFinalizeAsyncJob *job = NULL;
ncclResult_t res = ncclSuccess;
2022-11-29 04:27:46 -08:00
2025-01-27 03:30:22 -08:00
NVTX3_FUNC_WITH_PARAMS(CommDestroy, NcclNvtxParamsCommInitRank,
NVTX3_PAYLOAD(comm->commHash, nranks, rank, cudaDev));
2022-11-29 04:27:46 -08:00
2024-06-11 01:28:01 -07:00
TRACE(NCCL_INIT, "comm %p rank %d nRanks %d cudaDev %d busId %lx", comm, rank, nranks, cudaDev, comm->busId);
2025-07-11 07:32:13 -07:00
NCCLCHECK(ncclGroupStartInternal());
2022-08-18 02:53:17 -07:00
// Try and prevent a double free of the comm struct (user error)
if (comm->rank == -1 || comm->nRanks == -1 || comm->cudaDev == -1 || comm->busId == -1) {
WARN("comm %p has already been destroyed", comm);
return ncclInvalidArgument;
}
2024-06-11 01:28:01 -07:00
comm->destroyFlag = 1;
2022-08-18 02:53:17 -07:00
/* init thread must be joined before we destroy the comm. */
NCCLCHECK(ncclCommEnsureReady(comm));
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->comm = comm;
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, commReclaim, NULL, free, comm), res, fail);
2019-03-14 19:39:20 -07:00
2024-06-11 01:28:01 -07:00
exit:
2025-07-11 07:32:13 -07:00
ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
2024-06-11 01:28:01 -07:00
return res;
fail:
goto exit;
2018-12-13 15:56:12 -08:00
}
2025-05-29 20:56:40 -07:00
static ncclResult_t setCommAbortFlags(ncclComm_t comm, int value) {
// Set abort flags
if (comm->childAbortFlag != nullptr) {
__atomic_store_n(comm->childAbortFlag, value, __ATOMIC_RELEASE);
__atomic_store_n(comm->childAbortFlagDev, value, __ATOMIC_RELEASE);
}
__atomic_store_n(comm->abortFlag, value, __ATOMIC_RELEASE);
__atomic_store_n(comm->abortFlagDev, value, __ATOMIC_RELEASE);
return ncclSuccess;
}
2018-12-13 15:56:12 -08:00
NCCL_API(ncclResult_t, ncclCommAbort, ncclComm_t comm);
ncclResult_t ncclCommAbort_impl(ncclComm_t comm) {
2025-04-19 00:21:27 -04:00
NCCLCHECK(Recorder::instance().record(rrCommAbort, comm));
2025-01-27 03:30:22 -08:00
NVTX3_RANGE(NcclNvtxParamsCommAbort);
2022-11-29 04:27:46 -08:00
if (comm == NULL) {
2018-12-13 15:56:12 -08:00
return ncclSuccess;
2022-11-29 04:27:46 -08:00
}
2025-09-02 13:21:14 -07:00
INFO(NCCL_INIT, "comm %p rank %d nRanks %d cudaDev %d busId %lx - Abort START",
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->busId);
2025-07-11 07:32:13 -07:00
NCCLCHECK(ncclGroupStartInternal());
2018-12-13 15:56:12 -08:00
// Ask anything that might still be running on the device to quit
2025-05-29 20:56:40 -07:00
NCCLCHECK(setCommAbortFlags(comm,1));
2024-06-11 01:28:01 -07:00
comm->destroyFlag = 1;
2022-08-18 02:53:17 -07:00
/* init thread must be joined before we destroy the comm,
* and we should ignore the init error here. */
2024-09-10 05:57:10 -07:00
(void)ncclCommEnsureReady(comm);
// once the comm is ready, we can access ranks etc
int rank = comm->rank, nranks = comm->nRanks, cudaDev = comm->cudaDev;
struct ncclCommFinalizeAsyncJob *job = NULL;
ncclResult_t res = ncclSuccess;
2025-01-27 03:30:22 -08:00
NVTX3_RANGE_ADD_PAYLOAD(CommAbort, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD(comm->commHash, nranks, rank, cudaDev));
2024-09-10 05:57:10 -07:00
TRACE(NCCL_INIT, "comm %p rank %d nRanks %d cudaDev %d busId %lx", comm, rank, nranks, cudaDev, comm->busId);
2018-12-13 15:56:12 -08:00
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->comm = comm;
2024-09-10 05:57:10 -07:00
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, commReclaim, NULL, free, comm), res, fail);
2022-08-18 02:53:17 -07:00
2024-06-11 01:28:01 -07:00
exit:
2025-07-11 07:32:13 -07:00
ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
2024-06-11 01:28:01 -07:00
fail:
goto exit;
2018-12-13 15:56:12 -08:00
}
2025-05-29 20:56:40 -07:00
static void childCommCleanupJob(void* job) {
struct ncclCommInitRankAsyncJob* initJob = (struct ncclCommInitRankAsyncJob*)job;
if (initJob->excludeRanksList) free(initJob->excludeRanksList);
free(job);
}
// initializing a child communicator (for both split and shrink)
static ncclResult_t ncclCommInitChildComm(ncclComm_t comm, ncclComm_t* newcomm, bool isShrink, int flags, int color, int key, int* excludeRanksList, int excludeRanksCount,
ncclConfig_t* config, const char* caller) {
2023-04-03 05:32:07 -07:00
struct ncclCommInitRankAsyncJob *job = NULL;
struct ncclComm* childComm = NCCL_COMM_NULL;
ncclResult_t res = ncclSuccess;
2024-09-10 05:57:10 -07:00
int oldDev;
CUDACHECK(cudaGetDevice(&oldDev));
2025-05-29 20:56:40 -07:00
NCCLCHECKGOTO(CommCheck(comm, caller, "comm"), res, exit);
NCCLCHECKGOTO(PtrCheck(newcomm, caller, "newcomm"), res, exit);
if (isShrink) {
NCCLCHECKGOTO(PtrCheck(excludeRanksList, caller, "excludeRanksList"), res, exit);
NCCLCHECKGOTO(excludeRanksCount > 0 ? ncclSuccess : ncclInvalidArgument, res, exit);
// excludeRanksList may not be sorted, need to sort it
qsort(excludeRanksList, excludeRanksCount, sizeof(int), compareInts);
// ranks in excludeRanksList should not call into this function
NCCLCHECKGOTO(bsearch(&comm->rank, excludeRanksList, excludeRanksCount, sizeof(int), compareInts) ? ncclInvalidArgument : ncclSuccess, res, exit);
}
NCCLCHECKGOTO(ncclCommEnsureReady(comm), res, exit);
CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), res, exit);
2024-09-10 05:57:10 -07:00
2023-04-03 05:32:07 -07:00
/* *newcomm should be NCCL_COMM_NULL until comm split fully complete. */
*newcomm = NCCL_COMM_NULL;
2025-05-29 20:56:40 -07:00
if (!isShrink && color == NCCL_SPLIT_NOCOLOR) {
2023-04-03 05:32:07 -07:00
INFO(NCCL_INIT, "Rank %d has color with NCCL_SPLIT_NOCOLOR, not creating a new communicator", comm->rank);
} else {
NCCLCHECKGOTO(ncclCalloc(&childComm, 1), res, fail);
2024-03-26 06:08:55 -07:00
childComm->startMagic = childComm->endMagic = NCCL_MAGIC;
2025-05-29 20:56:40 -07:00
// Set the shareResource field, this is used throughout the init and must be reset every time.
// If we shrink, we only reuse resources if we shrink in the default mode
comm->shareResources = isShrink ? (!(flags & NCCL_SHRINK_ABORT) && comm->config.shrinkShare) : comm->config.splitShare;
if (comm->shareResources) {
2023-04-03 05:32:07 -07:00
childComm->abortFlag = comm->abortFlag;
2024-06-11 01:28:01 -07:00
childComm->abortFlagDev = comm->abortFlagDev;
2023-04-03 05:32:07 -07:00
childComm->abortFlagRefCount = comm->abortFlagRefCount;
2023-06-13 00:19:57 -07:00
comm->childAbortFlag = NULL;
ncclAtomicRefCountIncrement(comm->abortFlagRefCount);
2023-04-03 05:32:07 -07:00
} else {
2024-06-11 01:28:01 -07:00
NCCLCHECKGOTO(ncclCalloc(&childComm->abortFlag, 1), res, fail);
NCCLCHECKGOTO(ncclCudaHostCalloc(&childComm->abortFlagDev, 1), res, fail);
NCCLCHECKGOTO(ncclCalloc(&childComm->abortFlagRefCount, 1), res, fail);
2023-04-03 05:32:07 -07:00
/* temporarily used to abort everything during child comm init. */
comm->childAbortFlag = childComm->abortFlag;
2024-06-11 01:28:01 -07:00
comm->childAbortFlagDev = childComm->abortFlagDev;
2023-04-03 05:32:07 -07:00
*childComm->abortFlagRefCount = 1;
}
if (config == NULL) {
NCCLCHECKGOTO(copyCommConfig(childComm, comm), res, fail);
} else {
NCCLCHECKGOTO(parseCommConfig(childComm, config), res, fail);
}
2025-05-29 20:56:40 -07:00
/* start with ncclInternalError and will be changed to ncclSuccess if init succeeds. */
childComm->initState = ncclInternalError;
2023-04-03 05:32:07 -07:00
}
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->comm = childComm;
job->newcomm = newcomm;
job->parent = comm;
job->color = color;
job->key = key;
2025-05-29 20:56:40 -07:00
if (excludeRanksList) {
// need to copy the list of ranks to exclude because the job is async
job->excludeRanksCount = excludeRanksCount;
NCCLCHECKGOTO(ncclCalloc(&job->excludeRanksList, excludeRanksCount), res, fail);
memcpy(job->excludeRanksList, excludeRanksList, excludeRanksCount * sizeof(int));
} else {
// each split has to lead to a unique comm, so increment the splitCount
job->splitCount = ++comm->splitCount;
job->excludeRanksList = NULL;
}
2023-04-03 05:32:07 -07:00
job->cudaDev = comm->cudaDev;
2025-05-29 20:56:40 -07:00
snprintf(job->funcName, NCCL_COMMINIT_FUNCNAME_LEN, "%s", caller);
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, /*undo=*/NULL, /*destructor=*/childCommCleanupJob, comm), res, fail);
2023-04-03 05:32:07 -07:00
exit:
2025-04-19 00:21:27 -04:00
// for loggin only, not ready for replaying
// TODO: further integrate overloaded record header
// !recording at sink
Recorder::instance().record(rrCommSplit, color, key, (ncclUniqueId*)comm, config, *newcomm);
2024-12-18 08:26:06 -08:00
(void)cudaSetDevice(oldDev);
2023-04-03 05:32:07 -07:00
return res;
fail:
if (childComm) {
2025-05-29 20:56:40 -07:00
if (!comm->shareResources) {
if (childComm->abortFlag) free(childComm->abortFlag);
2024-06-11 01:28:01 -07:00
if (childComm->abortFlagDev) ncclCudaHostFree(childComm->abortFlagDev);
2025-05-29 20:56:40 -07:00
if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount);
2023-04-03 05:32:07 -07:00
}
free(childComm);
}
if (newcomm) *newcomm = NULL;
goto exit;
}
2025-05-29 20:56:40 -07:00
NCCL_API(ncclResult_t, ncclCommShrink, ncclComm_t comm, int* excludeRanksList, int excludeRanksCount, ncclComm_t* newcomm, ncclConfig_t* config, int shrinkFlags);
ncclResult_t ncclCommShrink_impl(ncclComm_t comm, int* excludeRanksList, int excludeRanksCount, ncclComm_t *newcomm, ncclConfig_t* config, int shrinkFlags) {
2025-05-29 20:56:40 -07:00
NVTX3_RANGE(NcclNvtxParamsCommShrink)
ncclResult_t res = ncclSuccess;
NCCLCHECK(ncclGroupStartInternal());
// Handle error mode by setting abort flags and waiting for kernels to complete and unset the flags to avoid bootstrap issues
if (shrinkFlags & NCCL_SHRINK_ABORT) {
NCCLCHECKGOTO(setCommAbortFlags(comm, 1), res, exit);
NCCLCHECKGOTO(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream), res, exit);
NCCLCHECKGOTO(setCommAbortFlags(comm, 0), res, exit);
}
NCCLCHECKGOTO(ncclCommInitChildComm(comm, newcomm, /*isShrink=*/true, shrinkFlags, /*color=*/0, /*key=*/comm->rank, excludeRanksList, excludeRanksCount, config, __func__), res, exit);
if (*newcomm) NVTX3_RANGE_ADD_PAYLOAD(CommShrink, NcclNvtxParamsCommShrinkSchema, NVTX3_PAYLOAD(comm->commHash, comm->nRanks, comm->rank, comm->cudaDev, excludeRanksCount));
exit:
(void)ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
}
NCCL_API(ncclResult_t, ncclCommSplit, ncclComm_t comm, int color, int key, ncclComm_t *newcomm, ncclConfig_t *config);
ncclResult_t ncclCommSplit_impl(ncclComm_t comm, int color, int key, ncclComm_t *newcomm, ncclConfig_t *config) {
2025-05-29 20:56:40 -07:00
NVTX3_RANGE(NcclNvtxParamsCommSplit)
ncclResult_t res = ncclSuccess;
NCCLCHECK(ncclGroupStartInternal());
NCCLCHECKGOTO(ncclCommInitChildComm(comm, newcomm, /*isShrink=*/false, /*shrink mode=*/NCCL_SHRINK_DEFAULT, color, key, NULL, 0, config, __func__), res, exit);
if (*newcomm)
NVTX3_RANGE_ADD_PAYLOAD(CommSplit, NcclNvtxParamsCommSplitSchema, NVTX3_PAYLOAD((*newcomm)->commHash, comm->commHash, comm->nRanks, comm->rank, comm->cudaDev, color, key));
exit:
(void)ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
}
2018-09-24 16:06:59 -07:00
NCCL_API(const char*, ncclGetErrorString, ncclResult_t code);
const char* ncclGetErrorString_impl(ncclResult_t code) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("GetErrorString");
2018-09-24 16:06:59 -07:00
switch (code) {
case ncclSuccess : return "no error";
2023-04-03 05:32:07 -07:00
case ncclUnhandledCudaError : return "unhandled cuda error (run with NCCL_DEBUG=INFO for details)";
case ncclSystemError : return "unhandled system error (run with NCCL_DEBUG=INFO for details)";
case ncclInternalError : return "internal error - please report this issue to the NCCL developers";
case ncclInvalidArgument : return "invalid argument (run with NCCL_DEBUG=WARN for details)";
case ncclInvalidUsage : return "invalid usage (run with NCCL_DEBUG=WARN for details)";
2022-05-24 02:02:31 -07:00
case ncclRemoteError : return "remote process exited or there was a network error";
2022-08-18 02:53:17 -07:00
case ncclInProgress : return "NCCL operation in progress";
2018-09-24 16:06:59 -07:00
default : return "unknown result code";
}
}
2022-05-24 02:02:31 -07:00
/* Returns a human-readable message of the last error that occurred.
* comm is currently unused and can be set to NULL
*/
NCCL_API(const char*, ncclGetLastError, const ncclComm_t comm);
const char* ncclGetLastError_impl(ncclComm_t comm) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("GetLastEror");
2022-05-24 02:02:31 -07:00
return ncclLastError;
}
2018-12-13 15:56:12 -08:00
NCCL_API(ncclResult_t, ncclCommGetAsyncError, ncclComm_t comm, ncclResult_t *asyncError);
ncclResult_t ncclCommGetAsyncError_impl(ncclComm_t comm, ncclResult_t *asyncError) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("GetAsyncError");
2024-03-26 06:08:55 -07:00
NCCLCHECK(CommCheck(comm, "ncclGetAsyncError", "comm"));
2018-12-13 15:56:12 -08:00
NCCLCHECK(PtrCheck(asyncError, "ncclGetAsyncError", "asyncError"));
2022-08-18 02:53:17 -07:00
*asyncError = __atomic_load_n(&comm->asyncResult, __ATOMIC_ACQUIRE);
2023-09-26 05:47:28 -07:00
if (*asyncError == ncclSuccess && comm->proxyState) *asyncError = __atomic_load_n(&comm->proxyState->asyncResult, __ATOMIC_ACQUIRE);
2025-03-12 13:46:21 -07:00
/* if there is linked group job, we should complete it. */
if (*asyncError == ncclSuccess && comm->groupJob) {
NCCLCHECK(ncclGroupJobComplete(comm->groupJob));
comm->groupJob = NULL;
}
2018-12-13 15:56:12 -08:00
return ncclSuccess;
}
2018-09-24 16:06:59 -07:00
NCCL_API(ncclResult_t, ncclCommCount, const ncclComm_t comm, int* count);
ncclResult_t ncclCommCount_impl(const ncclComm_t comm, int* count) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("CommCount");
2025-09-02 13:21:14 -07:00
NCCL_NVTX3_FUNC_RANGE;
2022-08-18 02:53:17 -07:00
2024-03-26 06:08:55 -07:00
NCCLCHECK(CommCheck(comm, "CommCount", "comm"));
2018-09-24 16:06:59 -07:00
NCCLCHECK(PtrCheck(count, "CommCount", "count"));
2022-08-18 02:53:17 -07:00
/* init thread must be joined before we access the attributes of comm. */
NCCLCHECK(ncclCommEnsureReady(comm));
2018-09-24 16:06:59 -07:00
*count = comm->nRanks;
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommCuDevice, const ncclComm_t comm, int* devid);
ncclResult_t ncclCommCuDevice_impl(const ncclComm_t comm, int* devid) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("CuDevice");
2025-09-02 13:21:14 -07:00
NCCL_NVTX3_FUNC_RANGE;
2022-08-18 02:53:17 -07:00
2024-03-26 06:08:55 -07:00
NCCLCHECK(CommCheck(comm, "CommCuDevice", "comm"));
2018-09-24 16:06:59 -07:00
NCCLCHECK(PtrCheck(devid, "CommCuDevice", "devid"));
2022-08-18 02:53:17 -07:00
NCCLCHECK(ncclCommEnsureReady(comm));
2018-09-24 16:06:59 -07:00
*devid = comm->cudaDev;
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommUserRank, const ncclComm_t comm, int* rank);
ncclResult_t ncclCommUserRank_impl(const ncclComm_t comm, int* rank) {
2025-04-19 00:21:27 -04:00
Recorder::instance().record("CommUserRank");
2025-09-02 13:21:14 -07:00
NCCL_NVTX3_FUNC_RANGE;
2022-08-18 02:53:17 -07:00
2024-03-26 06:08:55 -07:00
NCCLCHECK(CommCheck(comm, "CommUserRank", "comm"));
2018-09-24 16:06:59 -07:00
NCCLCHECK(PtrCheck(rank, "CommUserRank", "rank"));
2022-08-18 02:53:17 -07:00
NCCLCHECK(ncclCommEnsureReady(comm));
2018-09-24 16:06:59 -07:00
*rank = comm->rank;
return ncclSuccess;
}