Files
systems-assistant[bot] 1211790607 Direct Reduce Scatter Implementation (#2765)
* Add new implementation of direct send/recv reduce scatter

* Resolved conflicts

* Add multiple channels support to the reduction kernel of direct reduce scatter and adjust offset into buffer to utilize multiple channels.

* Resolve validation issue when number of elements is not divisible by number of channels leaving elements unaccount for in reduction.

* fix proxy hang

* set maxSrcs to 64 in reduceCopy

* optimize multi-channel code

* fix validation issue in single node MI300

* Tune the message size range for 2,4, and 8 Nodes

* Move Direct RS into separate kernel

* Add Copyright

* resolve review comments

* resolve review comments

* fix merge build issue

* revert move Direct RS into separate kernel

* address review comments

* address review comments

---------

Co-authored-by: KawtharShafie <kawtharshafie@gmail.com>
Co-authored-by: Ghadeer Alabandi <abandiga@gmail.com>
Co-authored-by: systems-assistant[bot] <systems-assistant[bot]@users.noreply.github.com>
2026-01-30 09:27:27 -06:00

3500 строки
143 KiB
C++
Исходник Постоянная ссылка Ответственный История

Этот файл содержит неоднозначные символы Юникода
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*************************************************************************
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2023 Advanced Micro Devices, Inc. All rights reserved.
* Modifications Copyright (c) Microsoft Corporation. Licensed under the MIT License.
*
* See LICENSE.txt for license information
************************************************************************/
#include "nccl.h"
#include "channel.h"
#include "nvmlwrap.h"
#include "gdrwrap.h"
#include "bootstrap.h"
#include "transport.h"
#include "group.h"
#include "net.h"
#include "coll_net.h"
#include "enqueue.h"
#include "graph.h"
#include "argcheck.h"
#include "device.h"
#include "collectives.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
#include "tuner.h"
#include "ras.h"
#include "profiler.h"
#include "mnnvl.h"
#include <fcntl.h>
#include <unistd.h>
#include <hip/hip_runtime.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <dlfcn.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/resource.h>
#include <unistd.h>
#include "graph/topo.h"
#include "graph/xml.h"
#include "archinfo.h"
#include "param.h"
#include "nvtx_payload_schemas.h"
#include "utils.h"
#include <mutex>
#include "ce_coll.h"
#include "nvtx.h"
// [RCCL]
#include "git_version.h"
#include "rccl_vars.h"
#include "hip_rocm_version_info.h"
//#include <hsa/hsa_ext_amd.h>
#ifdef ENABLE_MSCCLPP
#include "mscclpp/mscclpp_nccl.h"
#endif
#ifdef USE_AMDSMI
#include "amdsmi_wrap.h"
#else
#include "rocm_smi_wrap.h"
#endif
#include "rccl_common.h"
// [/RCCL]
#ifdef ENABLE_ROCSHMEM
#include <rocshmem/rocshmem.hpp>
#define NUM_SYM_BUF 8
#endif
#include "msccl/msccl_lifecycle.h"
#include "msccl/msccl_status.h"
#include "latency_profiler/CollTrace.h"
#include "latency_profiler/CollTraceFunc.h"
#include <cpuid.h>
#ifndef STR2
#define STR2(v) #v
#endif
#ifndef STR
#define STR(v) STR2(v)
#endif
#if CUDART_VERSION >= 9020 || defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#define NCCL_GROUP_CUDA_STREAM 0 // CGMD: CUDA 9.2,10.X Don't need to use an internal CUDA stream
#else
#define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream
#endif
#define TEMP_BUFF_SIZE (4 * 1024 * 1024) // Define Size for Temporary Buffer for Direct RS
using namespace rccl;
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3] = { "AllGather", "AllReduce", "AlltoAllPivot", "AllToAllGda", "Broadcast", "Reduce", "ReduceScatter", "SendRecv"};
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNetDirect", "CollNetChain", "NVLS", "NVLSTree", "PAT" };
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
const char* ncclDevRedOpStr[ncclNumDevRedOps] = { "Sum", "Prod", "MinMax", "PreMulSum", "SumPostDiv" };
const char *ncclTypeStr[ncclNumTypes] = {"_i8", "_u8", "_i32", "_u32", "_i64", "_u64", "_f16", "_f32", "_f64", "_b16"};
NCCL_PARAM(GroupCudaStream, "GROUP_CUDA_STREAM", NCCL_GROUP_CUDA_STREAM);
NCCL_PARAM(CheckPointers, "CHECK_POINTERS", 0);
NCCL_PARAM(CommBlocking, "COMM_BLOCKING", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(RuntimeConnect, "RUNTIME_CONNECT", 1);
NCCL_PARAM(WinEnable, "WIN_ENABLE", 1);
NCCL_PARAM(CollnetEnable, "COLLNET_ENABLE", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(CtaPolicy, "CTA_POLICY", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(NvlsChannels, "NVLS_NCHANNELS", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(SetCpuStackSize, "SET_CPU_STACK_SIZE", 1);
extern int64_t ncclParamSingleProcMemRegEnable();
struct allocationTracker allocTracker[MAX_ALLOC_TRACK_NGPU] = {};
ncclResult_t commReclaim(ncclComm_t comm);
#ifdef ENABLE_ROCSHMEM
RCCL_PARAM(RocshmemThreshold, "ROCSHMEM_THRESHOLD", (size_t)(262144));
RCCL_PARAM(RocshmemEnabled, "ROCSHMEM_ENABLE", 1);
std::unordered_map<ncclComm_t, rocshmem::rocshmem_team_t> ncclCommToRshmemTeam;
#endif
#ifdef ENABLE_MSCCLPP
size_t std::hash<ncclUniqueId>::operator ()(const ncclUniqueId& uniqueId) const noexcept {
return (size_t)getHash(uniqueId.internal, NCCL_UNIQUE_ID_BYTES);
}
bool operator ==(const ncclUniqueId& a, const ncclUniqueId& b) {
return memcmp(a.internal, b.internal, NCCL_UNIQUE_ID_BYTES) == 0;
}
RCCL_PARAM(MscclppThreshold, "MSCCLPP_THRESHOLD", (size_t)(16*1024*1024));
#endif
static constexpr int64_t defaultEnableMscclpp = 0;
RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp);
RCCL_PARAM(MscclppForceEnabled, "MSCCLPP_FORCE_ENABLE", 0);
// Turn off cheap fence for gfx942/gfx950
RCCL_PARAM(Gfx9CheapFenceOff, "GFX9_CHEAP_FENCE_OFF", 0);
// GDRCOPY support: Off by default
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0);
// GDRCOPY support
gdr_t ncclGdrCopy = NULL;
ncclResult_t initGdrCopy() {
if (ncclParamGdrCopyEnable() == 1) {
ncclGdrCopy = ncclGdrInit();
}
return ncclSuccess;
}
// The default Linux stack size (8MB) is safe.
#define SAFE_STACK_SIZE (8192*1024)
static ncclResult_t setCpuStackSize() {
if (ncclParamSetCpuStackSize() != 0) {
// Query the stack size used for newly launched threads.
pthread_attr_t attr;
size_t stackSize;
PTHREADCHECK(pthread_attr_init(&attr), "pthread_attr_init");
PTHREADCHECK(pthread_attr_getstacksize(&attr, &stackSize), "pthread_attr_getstacksize");
if (stackSize < SAFE_STACK_SIZE) {
// GNU libc normally uses RLIMIT_STACK as the default pthread stack size, unless it's set to "unlimited" --
// in that case a fallback value of 2MB (!) is used.
// Query the actual resource limit so that we can distinguish between the settings of 2MB and unlimited.
struct rlimit stackLimit;
char buf[30];
SYSCHECK(getrlimit(RLIMIT_STACK, &stackLimit), "getrlimit");
if (stackLimit.rlim_cur == RLIM_INFINITY)
strcpy(buf, "unlimited");
else
snprintf(buf, sizeof(buf), "%ldKB", stackLimit.rlim_cur/1024);
INFO(NCCL_INIT|NCCL_ENV, "Stack size limit (%s) is unsafe; will use %dKB for newly launched threads",
buf, SAFE_STACK_SIZE/1024);
// Change the default pthread stack size (via a nonportable API, which will become necessary if we switch
// to C++ threads).
PTHREADCHECK(pthread_attr_setstacksize(&attr, SAFE_STACK_SIZE), "pthread_attr_setstacksize");
PTHREADCHECK(pthread_setattr_default_np(&attr), "pthread_setattr_default_np");
}
PTHREADCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy");
}
return ncclSuccess;
}
static ncclResult_t initResult = ncclSuccess;
static std::once_flag initOnceFlag;
ncclResult_t checkHsaEnvSetting() {
// get user-specified value for `HSA_NO_SCRATCH_RECLAIM`
const char* hsaScratchEnv = getenv("HSA_NO_SCRATCH_RECLAIM");
int hipRuntimeVersion = 0;
// hipVer is an integer e.g., 6.2.41133 -> 60241133
CUDACHECK(hipRuntimeGetVersion(&hipRuntimeVersion));
const int firmwareVersion = getFirmwareVersion();
hipDeviceProp_t devProp;
// use GPU0 should be good enough
CUDACHECK(hipGetDeviceProperties(&devProp, 0));
INFO(NCCL_INIT, "Hipruntime version: %d, firmware version: %d", hipRuntimeVersion, firmwareVersion);
if (!validHsaScratchEnvSetting(hsaScratchEnv, hipRuntimeVersion, firmwareVersion, devProp.gcnArchName)) {
// Always print out this warning message
ERROR("HSA_NO_SCRATCH_RECLAIM=1 must be set to avoid performance degradation with the current HIP configuration. (Runtime version:%d, GPU Firmware version:%d)", hipRuntimeVersion, firmwareVersion);
ERROR("Please set HSA_NO_SCRATCH_RECLAIM=1 and rerun.");
return ncclSystemError;
}
return ncclSuccess;
}
// Fail the job if build flag HIP_HOST_UNCACHED_MEMORY is not set on mi350x
ncclResult_t checkHostUncacheMemSetting(struct ncclComm* comm) {
#if defined(HIP_HOST_UNCACHED_MEMORY)
return ncclSuccess;
#else
if( IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx950") ){
ERROR("Build flag HIP_HOST_UNCACHED_MEMORY must be set to avoid memory corruption on mi350x");
return ncclSystemError;
}
else {
return ncclSuccess;
}
#endif
}
static void initOnceFunc() {
NCCLCHECKGOTO(checkHsaEnvSetting(), initResult, exit);
initEnv();
setCpuStackSize();
initGdrCopy();
// Always initialize bootstrap network
NCCLCHECKGOTO(bootstrapNetInit(), initResult, exit);
#ifndef NVTX_NO_IMPL
initNvtxRegisteredEnums();
#endif
exit:;
}
static ncclResult_t ncclInit() {
char strValue[2048];
NCCLCHECK(ncclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
if (strcmp(strValue, "1") == 0)
WARN("NUMA auto balancing enabled which can lead to variability in the RCCL performance! Disable by \"sudo sysctl kernel.numa_balancing=0\"");
NCCLCHECK(ncclTopoGetStrFromSys("/proc", "version", strValue));
char *verStr, *state;
verStr = strtok_r(strValue, " ", &state);
for (int i = 0; i < 2; i ++) {
verStr = strtok_r(NULL, " ", &state);
if (verStr == NULL) break;
}
INFO(NCCL_INIT, "Kernel version: %s", verStr);
if (strstr(verStr, "cray") == NULL) {
unsigned int eax, ebx, ecx, edx;
if (!__get_cpuid(1, &eax, &ebx, &ecx, &edx))
ecx = 0; // cpuid not supported
NCCLCHECK(ncclTopoGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue));
// Check BIOS string and hypervisor presence on ecx bit 31
if (strncmp("Hyper-V UEFI Release", strValue, 20) != 0 && (ecx & (1u << 31)) == 0) {
FILE* file;
if ((file = fopen("/proc/cmdline", "r")) != NULL) {
if (feof(file) == 0 && ferror(file) == 0) {
int len = fread(strValue, 1, 2047, file);
strValue[len] = '\0';
}
fclose(file);
}
if (strstr(strValue, "iommu=pt") == NULL)
WARN("Missing \"iommu=pt\" from kernel command line which can lead to system instablity or hang!");
}
#ifndef HIP_UNCACHED_MEMORY
char *env = getenv("HSA_FORCE_FINE_GRAIN_PCIE");
if (env == NULL || strcmp(env, "1") != 0)
WARN("Missing \"HSA_FORCE_FINE_GRAIN_PCIE=1\" from environment which can lead to low RCCL performance, system instablity or hang!");
#endif
}
std::call_once(initOnceFlag, initOnceFunc);
return initResult;
}
NCCL_API(ncclResult_t, ncclGetVersion, int* version);
ncclResult_t ncclGetVersion_impl(int* version) {
Recorder::instance().record("GetVersion");
if (version == NULL) return ncclInvalidArgument;
*version = NCCL_VERSION_CODE;
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclGetUniqueId, ncclUniqueId* out);
ncclResult_t ncclGetUniqueId_impl(ncclUniqueId* out) {
NCCLCHECK(ncclInit());
NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
struct ncclBootstrapHandle handle;
NCCLCHECK(bootstrapGetUniqueId(&handle));
// ncclUniqueId and bootstrapHandle don't have the same size and alignment
// reset to 0 to avoid undefined data
memset(out, 0, sizeof(*out));
// copy to avoid alignment mismatch
memcpy(out, &handle, sizeof(handle));
Recorder::instance().record(rrGetUniqueId, -1, -1, out);
TRACE_CALL("ncclGetUniqueId(0x%llx)", (unsigned long long)getHash(out->internal, NCCL_UNIQUE_ID_BYTES));
return ncclSuccess;
}
// Prevent compiler from optimizing out these operations
#ifdef __clang__
#define NCCL_NO_OPTIMIZE __attribute__((optnone))
#else
#define NCCL_NO_OPTIMIZE __attribute__((optimize("O0")))
#endif
void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) {
// Important that this does not trash intraComm0.
comm->rank = comm->cudaDev = comm->busId = comm->nRanks = -1;
comm->startMagic = comm->endMagic = 0;
}
RCCL_PARAM_DECLARE(EnableProxyTrace);
RCCL_PARAM(EnableProxyTrace, "ENABLE_PROXY_TRACE", 0);
RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0);
RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0);
RCCL_PARAM(EnableContextTracking, "ENABLE_CONTEXT_TRACKING", 0);
#ifdef ENABLE_COLLTRACE
// Should be in sync with 'ALL_COLLS' in Generator.cmake
void *ncclCommThreadMain(void *arg) {
ncclComm_t comm = (ncclComm_t)arg;
int head[MAXCHANNELS];
double vega_gpu_rtc_freq;
vega_gpu_rtc_freq = GetDeviceWallClockRateInKhz(comm->cudaDev) * 1.0E3;
for (int channel = 0; channel < MAXCHANNELS; channel++) {
int tail = comm->collTraceTail[channel].tail;
if (tail < COLLTRACE_NUM_ITEMS)
head[channel] = 0;
else
head[channel] = tail - COLLTRACE_NUM_ITEMS;
}
do {
int numActiveChans = MAXCHANNELS;
for (int channel = 0; channel < MAXCHANNELS; channel++) {
int tail = comm->collTraceTail[channel].tail;
int count;
count = tail - head[channel];
if (count == 0) {
numActiveChans--;
continue;
}
for (int i = 0; i < count; i++) {
volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel]%COLLTRACE_NUM_ITEMS;
const uint8_t type = td->type;
if (type == ncclCollTraceNotReady)
break;
head[channel] ++;
char line[1024];
int offset = 0;
const uint16_t fIdx = td->funcIndex;
if (type == ncclCollTraceDataType) {
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] L:%04d DT %08x %016lx %016lx",
(double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, fIdx, td->data_0, td->opCount, td->data_1);
} else {
if (type & ncclCollTraceP2pElemType)
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06x-%06x", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, td->p2pOpCount[0], td->p2pOpCount[1]);
else
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, td->opCount);
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d root %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d",
td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase,
comm->busId, comm->nRanks);
} else {
switch (type&0xf) {
case ncclCollTraceKernelLaunchType:
case ncclCollTraceCollLaunchType:
if ((type&0xf) == ncclCollTraceKernelLaunchType)
sprintf(line+offset, " KL %s [%02d:%02d-%02d:%02x] HWID %d:%x ", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, td->xccId, td->data_0);
else if ((type&0xf) == ncclCollTraceCollLaunchType)
sprintf(line+offset, " CL %s [%02d:%02d-%02d:%02x] %d ", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, td->batchIx);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d root %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d",
td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase,
comm->busId, comm->nRanks);
break;
case ncclCollTraceKernelEndType:
sprintf(line+offset, " KE %s [%02d:%02d-%02d:%02x] busId %lx nRanks %d", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, comm->busId, comm->nRanks);
break;
case ncclCollTraceAbortType:
sprintf(line+offset, " KA %s [%02d:%02d-%02d:%02x]", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid);
break;
default:
sprintf(line+offset, " unknown collective trace data type");
break;
}
}
}
INFO(NCCL_COLL, "%s td->type:%d", line, type);
volatile uint8_t *tdtype = &td->type;
*tdtype = ncclCollTraceNotReady;
(*tdtype); // read back for flushing
}
}
if (comm->collTraceExit && numActiveChans == 0)
break;
usleep(1000); //sleep 1ms
} while(true);
if (comm->collTraceThread)
pthread_exit(NULL);
else
return 0;
}
#endif
#undef NCCL_NO_OPTIMIZE
static ncclResult_t ncclDestructorFnFree(struct ncclDestructor* dtor) {
free(dtor->obj);
return ncclSuccess;
}
void ncclCommPushFree(struct ncclComm* comm, void* obj) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnFree;
dtor->obj = obj;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t ncclDestructorFnCudaFree(struct ncclDestructor* dtor) {
NCCLCHECK(ncclCudaFree(dtor->obj));
return ncclSuccess;
}
void ncclCommPushCudaFree(struct ncclComm* comm, void* obj) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnCudaFree;
dtor->obj = obj;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t ncclDestructorFnCudaHostFree(struct ncclDestructor* dtor) {
NCCLCHECK(ncclCudaHostFree(dtor->obj));
return ncclSuccess;
}
void ncclCommPushCudaHostFree(struct ncclComm* comm, void* obj) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnCudaHostFree;
dtor->obj = obj;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t ncclDestructorFnCudaGdrFree(struct ncclDestructor* dtor) {
NCCLCHECK(ncclGdrCudaFree(dtor->obj));
return ncclSuccess;
}
void ncclCommPushCudaGdrFree(struct ncclComm* comm, void* handle) {
struct ncclDestructor* dtor = ncclMemoryStackAlloc<struct ncclDestructor>(&comm->memPermanent);
dtor->fn = ncclDestructorFnCudaGdrFree;
dtor->obj = handle;
dtor->next = comm->destructorHead;
comm->destructorHead = dtor;
}
static ncclResult_t commFree(ncclComm_t comm) {
int abort = 0;
/* commFree() should not involve any sync among ranks. */
if (comm == NULL)
return ncclSuccess;
NCCLCHECK(ncclCeFinalize(comm));
// tempBuff is allocated per-communicator for direct ReduceScatter on gfx950.
// It is owned by the communicator; free it during communicator teardown.
if (comm->tempBuff) {
NCCLCHECK(ncclCudaFree(comm->tempBuff));
comm->tempBuff = nullptr;
}
if (comm->symmetricSupport) {
NCCLCHECK(ncclSymkFinalize(comm));
NCCLCHECK(ncclDevrFinalize(comm));
}
NCCLCHECK(ncclRasCommFini(comm));
/* in commReclaim, we have guaranteed only last rank which calls ncclCommDestroy() will
* free all intra-process communicators; therefore, we only need to focus on local
* resource cleanup in commFree(). */
if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) {
PTHREADCHECK(pthread_join(comm->proxyState->thread, nullptr), "pthread_join");
if (comm->proxyState->threadUDS) {
// UDS support
PTHREADCHECK(pthread_join(comm->proxyState->threadUDS, nullptr), "pthread_join");
}
}
if (comm->memPool) CUDACHECK(cudaMemPoolDestroy(comm->memPool));
delete[] comm->userRedOps;
free(comm->connectSend);
free(comm->connectRecv);
if (rcclParamEnableProxyTrace()) {
WARN("commFree() ProxyTrace:");
if (comm->proxyState && comm->proxyState->proxyTrace){
WARN("%s", comm->proxyState->proxyTrace->dump().c_str());
}
}
#ifdef ENABLE_PROFILING
struct ncclProf *prof, *prof_seq;
prof = (struct ncclProf*)malloc(sizeof(struct ncclProf)*MAXCHANNELS*PROFILE_NUM_LAUNCHES);
if (prof == nullptr) {
WARN("Failed to allocate profiling buffer");
// Skip profiling but continue with destruction
goto skip_profiling;
}
CUDACHECK(hipMemcpy(prof, comm->devComm->devProf, sizeof(struct ncclProf)*MAXCHANNELS*PROFILE_NUM_LAUNCHES, hipMemcpyDeviceToHost));
#define VEGA_GPU_RTC_FREQUENCY 2.5E7
for (int i=0; i<comm->nChannels; i++) {
for (int s=0; s<prof[MAXCHANNELS*i].seq; s++) {
if (prof[MAXCHANNELS*s+i].count == 0) continue;
for (int j=0; j<prof[MAXCHANNELS*s+i].count; j++) {
INFO(NCCL_INIT, "# [%02d:%02d] %02d-%02d L:%04u %6.2fus", comm->rank, i, s, j, prof[MAXCHANNELS*s+i].elem[j].line, (prof[MAXCHANNELS*s+i].elem[j].timeStamp-prof[MAXCHANNELS*s+i].elem[0].timeStamp)/VEGA_GPU_RTC_FREQUENCY*1.0E6);
}
}
}
free(prof);
CUDACHECK(hipFree(comm->devComm->devProf));
skip_profiling:
#endif
#ifdef ENABLE_COLLTRACE
comm->collTraceExit = 1;
if (comm->collTraceEnabled) {
if (comm->collTraceThread)
pthread_join(comm->collTraceThread, NULL);
else
ncclCommThreadMain((void *)comm);
}
NCCLCHECK(ncclCudaFree((void *)comm->collTrace));
NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail));
#endif
free(comm->peerInfo);
if (comm->topo)
ncclTopoFree(comm->topo);
if (comm->nodeRanks) {
for (int n=0; n<comm->nNodes; n++) free(comm->nodeRanks[n].localRankToRank);
free(comm->nodeRanks);
}
free(comm->rankToNode);
free(comm->rankToLocalRank);
free(comm->collNetHeads);
free(comm->clique.ranks);
if (comm->bootstrap)
NCCLCHECK(bootstrapClose(comm->bootstrap));
for (int channel=0; channel<MAXCHANNELS; channel++)
NCCLCHECK(freeChannel(comm->channels+channel, comm->nRanks, 1, comm->localRanks));
if (comm->doneEvent != NULL)
CUDACHECK(hipEventDestroy(comm->doneEvent));
if (comm->sharedRes) {
if (ncclAtomicRefCountDecrement(&comm->sharedRes->refCount) == 0) {
for (int c=0; c<MAXCHANNELS; c++) {
if (comm->sharedRes->peers[c]) free(comm->sharedRes->peers[c]);
if (comm->sharedRes->devPeers[c]) ncclCudaFree(comm->sharedRes->devPeers[c]);
}
free(comm->sharedRes->tpRankToLocalRank);
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream));
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream));
CUDACHECK(cudaEventDestroy(comm->sharedRes->launchEvent));
CUDACHECK(cudaEventDestroy(comm->sharedRes->scratchEvent));
NCCLCHECK(ncclProxyDestroy(comm));
free(comm->sharedRes);
}
}
#if CUDART_VERSION >= 12010
if (comm->nvlsSupport) NCCLCHECK(ncclNvlsFree(comm));
#endif
struct ncclDestructor* dtor = comm->destructorHead;
while (dtor != nullptr) {
NCCLCHECK(dtor->fn(dtor));
dtor = dtor->next;
}
ncclMemoryStackDestruct(&comm->memScoped);
ncclMemoryStackDestruct(&comm->memPermanent);
abort = *comm->abortFlag;
if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) {
free(comm->abortFlag);
NCCLCHECK(ncclCudaHostFree((void*)comm->abortFlagDev));
free(comm->abortFlagRefCount);
}
free((void*)comm->config.netName);
free(comm->topParentRanks);
free(comm->topParentLocalRanks);
free(comm->gproxyConn);
free(comm->archName);
NCCLCHECK(ncclRegCleanup(comm));
NCCLCHECK(ncclDestroySideStream(comm->cudaDev));
INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx - %s COMPLETE", comm, comm->rank, comm->nRanks, comm->cudaDev, comm->busId, abort ? "Abort" : "Destroy");
commPoison(comm); // poison comm before free to avoid comm reuse.
NCCLCHECK(ncclProfilerPluginFinalize(comm));
NCCLCHECK(ncclNetFinalize(comm));
// Disable until we validate NCCL_LAUNCH_IMPLICIT_ORDER support.
// but can be enabled via environment variable
if (rcclParamEnableContextTracking() == 1) {
ncclCudaContextDrop(comm->context);
INFO(NCCL_INIT, "cudaDev %d context tracking destroyed", comm->cudaDev);
}
free(comm);
return ncclSuccess;
}
RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 1);
RCCL_PARAM(PivotAlltoallEnable, "PIVOT_ALLTOALL_ENABLE", 1);
RCCL_PARAM(LL128ForceEnable, "LL128_FORCE_ENABLE", 0);
NCCL_PARAM(AggChannelSize, "AGG_CHANNEL_SIZE", -2);
NCCL_PARAM(DisableGraphHelper, "GRAPH_HELPER_DISABLE", 0);
// GDRCOPY support: FIFO_ENABLE when enabled locates a workFifo in CUDA memory
NCCL_PARAM(GdrCopyFifoEnable, "GDRCOPY_FIFO_ENABLE", 1);
#define NCCL_WORK_FIFO_BYTES_DEFAULT (1<<22)
NCCL_PARAM(WorkFifoBytes, "WORK_FIFO_BYTES", NCCL_WORK_FIFO_BYTES_DEFAULT);
NCCL_PARAM(WorkArgsBytes, "WORK_ARGS_BYTES", INT64_MAX);
enum ncclLaunchMode ncclParamLaunchMode;
// Detect DMA-BUF support
static ncclResult_t dmaBufSupported(struct ncclComm* comm) {
if (comm->ncclNet->regMrDmaBuf == NULL || rocmLibraryInit() != ncclSuccess) return ncclInternalError;
#if CUDA_VERSION >= 11070
int flag = 0;
CUdevice dev;
int cudaDriverVersion;
CUDACHECK(cudaDriverGetVersion(&cudaDriverVersion));
if (CUPFN(cuDeviceGet) == NULL || cudaDriverVersion < 11070) return ncclInternalError;
CUCHECK(cuDeviceGet(&dev, comm->cudaDev));
// Query device to see if DMA-BUF support is available
(void) CUPFN(cuDeviceGetAttribute(&flag, CU_DEVICE_ATTRIBUTE_DMA_BUF_SUPPORTED, dev));
if (flag == 0) return ncclInternalError;
INFO(NCCL_INIT, "DMA-BUF is available on GPU device %d", comm->cudaDev);
return ncclSuccess;
#else
return pfn_hsa_amd_portable_export_dmabuf != NULL ? ncclSuccess : ncclInternalError;
#endif
return ncclInternalError;
}
ncclResult_t ncclCommEnsureReady(ncclComm_t comm) {
/* comm must be ready, or error will be reported */
ncclResult_t ret = ncclSuccess;
if (__atomic_load_n(comm->abortFlag, __ATOMIC_ACQUIRE)) {
ncclGroupJobAbort(comm->groupJob);
} else {
NCCLCHECK(ncclCommGetAsyncError(comm, &ret));
if (ret == ncclInProgress) {
WARN("Attempt to use communicator before the previous operation returned ncclSuccess");
ret = ncclInvalidArgument;
goto exit;
}
/* if ret is not ncclInProgress, we just keep it. */
}
exit:
return ret;
}
RCCL_PARAM(InjectFaults, "INJECT_FAULTS", 0);
static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, int ndev, int rank) {
if (ndev < 1) {
WARN("invalid device count (%d) requested", ndev);
return ncclInvalidArgument;
}
if (rank >= ndev || rank < 0) {
WARN("rank %d exceeds ndev=%d", rank, ndev);
return ncclInvalidArgument;
}
ncclMemoryStackConstruct(&comm->memPermanent);
ncclMemoryStackConstruct(&comm->memScoped);
comm->destructorHead = nullptr;
comm->rank = rank;
comm->nRanks = ndev;
NCCLCHECK(ncclNetInit(comm));
INFO(NCCL_INIT, "Using network %s", comm->ncclNet->name);
if (parent && parent->shareResources) {
if (parent->ncclNet != comm->ncclNet) {
WARN("Split shares resources, but parent comm netName %s is different from child comm netName %s", parent->ncclNet->name, comm->ncclNet->name);
return ncclInvalidUsage;
}
}
// Try to create a CUDA object right away. If there is something wrong with
// the device we're on (failure cause #1) , better know it early.
hipEvent_t doneEvent;
CUDACHECK(hipEventCreateWithFlags(&doneEvent, hipEventDisableTiming));
comm->doneEvent = doneEvent;
comm->lastStream = nullptr;
CUDACHECK(cudaGetDevice(&comm->cudaDev));
// RCCL: create persistent stream for calloc
NCCLCHECK(ncclCreateSideStream(comm->cudaDev));
// Disable until we validate NCCL_LAUNCH_IMPLICIT_ORDER support.
// but can be enabled via environment variable
if (rcclParamEnableContextTracking() == 1) {
NCCLCHECK(ncclCudaContextTrack(&comm->context));
INFO(NCCL_INIT, "cudaDev %d context tracking created", comm->cudaDev);
}
NCCLCHECK(getBusId(comm->cudaDev, &comm->busId));
char busId[]="0000:00:00.0";
NCCLCHECK(int64ToBusId(comm->busId, busId));
#ifdef USE_AMDSMI
NCCLCHECK(amd_smi_init());
NCCLCHECK(amd_smi_getDeviceIndexByPciBusId(busId, (unsigned int*)&comm->nvmlDev));
#else
NCCLCHECK(rocm_smi_init());
NCCLCHECK(rocm_smi_getDeviceIndexByPciBusId(busId, (unsigned int*)&comm->nvmlDev));
#endif
comm->compCap = ncclCudaCompCap();
TRACE(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx compCap %d", comm, rank, ndev, comm->cudaDev, comm->busId, comm->compCap);
comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false;
comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;
#ifdef ENABLE_COLLTRACE
NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS));
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS, hipDeviceMallocUncached));
#else
NCCLCHECK(ncclCudaCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS));
#endif
comm->collTraceExit = 0;
comm->collTraceEnabled = false; // we can enable colltrace without starting a thread
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) {
comm->collTraceEnabled = true;
if (rcclParamKernelCollTraceThreadEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->collTraceThread = 0;
}
#endif
if (rcclParamInjectFaults() != 0) {
#ifdef ENABLE_FAULT_INJECTION
comm->faults = rcclParamInjectFaults();
if (comm->rank == 0) INFO(NCCL_INIT, "Enabled RCCL faults injection with value 0x%lx", comm->faults);
#else
WARN("Ignore faults injection of value 0x%lx as RCCL is not compiled to support it", rcclParamInjectFaults());
#endif
}
memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));
ncclMemoryPoolConstruct(&comm->memPool_ncclKernelPlan);
ncclMemoryPoolConstruct(&comm->memPool_ncclProxyOp);
for (int i = 0; i < ncclGroupTaskTypeNum; i++) {
comm->groupNext[i] = reinterpret_cast<struct ncclComm*>(0x1);
}
comm->preconnectNext = reinterpret_cast<struct ncclComm*>(0x1);
static_assert(MAXCHANNELS <= sizeof(*comm->connectSend)*8, "comm->connectSend must have enough bits for all channels");
static_assert(MAXCHANNELS <= sizeof(*comm->connectRecv)*8, "comm->connectRecv must have enough bits for all channels");
NCCLCHECK(ncclCalloc(&comm->connectSend, comm->nRanks*NCCL_MAX_CONNS));
NCCLCHECK(ncclCalloc(&comm->connectRecv, comm->nRanks*NCCL_MAX_CONNS));
// Mark channels as non initialized.
for (int c=0; c < MAXCHANNELS; c++) comm->channels[c].id = -1;
if (parent == NULL || !parent->shareResources) {
struct ncclSharedResources* sharedRes = NULL;
NCCLCHECK(ncclCalloc(&sharedRes, 1));
/* most of attributes are assigned later in initTransportsRank(). */
sharedRes->owner = comm;
sharedRes->tpNRanks = comm->nRanks;
NCCLCHECK(ncclCalloc(&sharedRes->tpRankToLocalRank, comm->nRanks));
NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->deviceStream));
NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->hostStream));
CUDACHECK(cudaEventCreateWithFlags(&sharedRes->launchEvent, cudaEventDisableTiming));
CUDACHECK(cudaEventCreateWithFlags(&sharedRes->scratchEvent, cudaEventDisableTiming));
comm->sharedRes = sharedRes;
sharedRes->refCount = 1;
} else {
comm->sharedRes = parent->sharedRes;
ncclAtomicRefCountIncrement(&parent->sharedRes->refCount);
}
CUDACHECK(hipDeviceGetAttribute(&comm->WarpSize, hipDeviceAttributeWarpSize, comm->cudaDev));
if (comm->topParentRanks == NULL) {
NCCLCHECK(ncclCalloc(&comm->topParentRanks, comm->nRanks));
for (int i = 0; i < comm->nRanks; ++i)
comm->topParentRanks[i] = i;
}
ncclIntruQueueMpscConstruct(&comm->callbackQueue);
ncclIntruQueueConstruct(&comm->legacyRegCleanupQueue);
ncclIntruQueueConstruct(&comm->ceInitTaskQueue);
comm->regCache.pageSize = sysconf(_SC_PAGESIZE);
do {
cudaMemPoolProps props = {};
props.allocType = cudaMemAllocationTypePinned;
props.handleTypes = cudaMemHandleTypeNone;
props.location.type = cudaMemLocationTypeDevice;
props.location.id = comm->cudaDev;
CUDACHECK(cudaMemPoolCreate(&comm->memPool, &props));
uint64_t releaseThreshold = ~uint64_t(0);
CUDACHECK(cudaMemPoolSetAttribute(comm->memPool, cudaMemPoolAttrReleaseThreshold, &releaseThreshold));
} while (0);
ncclIntruQueueConstruct(&comm->eventCallbackQueue);
return ncclSuccess;
}
static ncclResult_t devCommSetup(ncclComm_t comm) {
ncclResult_t ret = ncclSuccess;
int nRanks = comm->nRanks;
struct ncclKernelCommAndChannels tmpCommAndChans;
struct ncclKernelCommAndChannels *devCommAndChans = NULL;
//struct ncclNvmlCCStatus ccStatus; //unused variable - compiler warning
bool ccEnable = false;
cudaStream_t deviceStream;
memset(&tmpCommAndChans, '\0', sizeof(tmpCommAndChans));
NCCLCHECKGOTO(ncclStrongStreamAcquire(ncclCudaGraphNone(), &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream), ret, fail);
NCCLCHECKGOTO(ncclCudaCallocAsync(&devCommAndChans, 1, deviceStream), ret, fail);
ncclCommPushCudaFree(comm, devCommAndChans);
NCCLCHECKGOTO(ncclCudaCallocAsync(&tmpCommAndChans.comm.rankToLocalRank, comm->nRanks, deviceStream), ret, fail);
ncclCommPushCudaFree(comm, tmpCommAndChans.comm.rankToLocalRank);
NCCLCHECKGOTO(ncclCudaMemcpyAsync(tmpCommAndChans.comm.rankToLocalRank, comm->rankToLocalRank, comm->nRanks, deviceStream), ret, fail);
comm->devComm = &devCommAndChans->comm;
tmpCommAndChans.comm.rank = comm->rank;
tmpCommAndChans.comm.nRanks = nRanks;
tmpCommAndChans.comm.node = comm->node;
tmpCommAndChans.comm.nNodes = comm->nNodes;
tmpCommAndChans.comm.abortFlag = comm->abortFlagDev;
tmpCommAndChans.comm.isAllNvlink = comm->isAllNvlink;
tmpCommAndChans.comm.p2pnChannelsPerPeer = comm->p2pnChannelsPerPeer;
for (int p=0; p < NCCL_NUM_PROTOCOLS; p++) {
tmpCommAndChans.comm.buffSizes[p] = comm->buffSizes[p];
}
tmpCommAndChans.comm.p2pChunkSize = comm->p2pChunkSize;
tmpCommAndChans.comm.channels = &devCommAndChans->channels[0];
comm->workArgsBytes = std::min<size_t>(ncclParamWorkArgsBytes(), ncclMaxKernelArgsSize(comm->cudaArch));
#if !defined(__HIP_PLATFORM_AMD__) && !defined(__HIPCC__)
memset(&ccStatus, 0, sizeof(ccStatus));
ccEnable = (ncclSuccess == ncclNvmlGetCCStatus(&ccStatus)) && (ccStatus.CCEnabled || ccStatus.multiGpuProtectedPCIE || ccStatus.multiGpuNVLE);
if (ccEnable) {
comm->workFifoBytes = 0;
} else {
comm->workFifoBytes = ncclParamWorkFifoBytes();
if (0 != (comm->workFifoBytes & (comm->workFifoBytes-1))) {
WARN("NCCL_WORK_FIFO_BYTES=%d is being ignored because it is not a power of 2.", comm->workFifoBytes);
comm->workFifoBytes = NCCL_WORK_FIFO_BYTES_DEFAULT;
}
comm->workFifoBytes = std::min(comm->workFifoBytes, 1u<<30);
}
#else
comm->workFifoBytes = ncclParamWorkFifoBytes();
if (0 != (comm->workFifoBytes & (comm->workFifoBytes-1))) {
WARN("NCCL_WORK_FIFO_BYTES=%d is being ignored because it is not a power of 2.", comm->workFifoBytes);
comm->workFifoBytes = NCCL_WORK_FIFO_BYTES_DEFAULT;
}
comm->workFifoBytes = std::min(comm->workFifoBytes, 1u<<30);
#endif
if (comm->rank == 0) {
INFO(NCCL_INIT, "CC %s, workFifoBytes %d", ccEnable ? "On" : "Off", comm->workFifoBytes);
}
if (ncclGdrCopy != NULL && ncclParamGdrCopyFifoEnable() == 1) {
// The workFifoBuf lives in GDR mapped CUDA memory.
NCCLCHECKGOTO(ncclGdrCudaCalloc(&comm->workFifoBuf, &comm->workFifoBufDev, comm->workFifoBytes, &comm->workFifoBufGdrHandle), ret, fail);
ncclCommPushCudaGdrFree(comm, comm->workFifoBufGdrHandle);
} else {
// The workFifoBuf lives in cudaHost memory.
comm->workFifoBufGdrHandle = nullptr;
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->workFifoBuf, comm->workFifoBytes), ret, fail);
ncclCommPushCudaHostFree(comm, comm->workFifoBuf);
comm->workFifoBufDev = comm->workFifoBuf;
}
comm->workFifoProduced = 0;
comm->workFifoProducedLastRecorded = 0;
comm->workFifoConsumed = 0;
// Alloc profiler counters for the kernel
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->profiler.workStarted, MAXCHANNELS), ret, fail);
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->profiler.workCompleted, MAXCHANNELS), ret, fail);
tmpCommAndChans.comm.workStarted = comm->profiler.workStarted;
tmpCommAndChans.comm.workCompleted = comm->profiler.workCompleted;
ncclCommPushCudaHostFree(comm, comm->profiler.workStarted);
ncclCommPushCudaHostFree(comm, comm->profiler.workCompleted);
if (comm->collNetDenseToUserRank != nullptr) {
NCCLCHECKGOTO(ncclCudaCallocAsync(&tmpCommAndChans.comm.collNetDenseToUserRank, nRanks, deviceStream), ret, fail);
ncclCommPushCudaFree(comm, tmpCommAndChans.comm.collNetDenseToUserRank);
NCCLCHECKGOTO(ncclCudaMemcpyAsync(tmpCommAndChans.comm.collNetDenseToUserRank, comm->collNetDenseToUserRank, nRanks, deviceStream), ret, fail);
}
for (int c=0; c < MAXCHANNELS; c++) {
tmpCommAndChans.channels[c].peers = comm->channels[c].devPeers;
tmpCommAndChans.channels[c].ring = comm->channels[c].ring;
tmpCommAndChans.channels[c].ring.userRanks = comm->channels[c].devRingUserRanks;
tmpCommAndChans.channels[c].tree = comm->channels[c].tree;
tmpCommAndChans.channels[c].collnetChain = comm->channels[c].collnetChain;
tmpCommAndChans.channels[c].collnetDirect = comm->channels[c].collnetDirect;
tmpCommAndChans.channels[c].binTree = comm->channels[c].binTree;
tmpCommAndChans.channels[c].nvls = comm->channels[c].nvls;
if (comm->channels[c].ring.userRanks != nullptr) {
NCCLCHECKGOTO(ncclCudaMemcpyAsync(tmpCommAndChans.channels[c].ring.userRanks, comm->channels[c].ring.userRanks, nRanks, deviceStream), ret, fail);
}
}
#ifdef ENABLE_COLLTRACE
tmpCommAndChans.comm.collTrace = comm->collTrace;
tmpCommAndChans.comm.collTraceTail = comm->collTraceTail;
tmpCommAndChans.comm.collTraceThread = comm->collTraceThread;
#endif
#if defined(ENABLE_NPKIT)
WARN("NPKIT is deprecated, please use Profiler Plugin instead!");
// Init NPKit
NCCLCHECK(NpKit::Init(comm->rank));
tmpCommAndChans.comm.npKitEventCollectContexts = NpKit::GetGpuEventCollectContexts();
tmpCommAndChans.comm.cpuTimestamp = NpKit::GetCpuTimestamp();
#endif
#ifdef ENABLE_PROFILING
NCCLCHECK(ncclCudaCalloc(&tmpCommAndChans.comm.devProf, MAXCHANNELS*PROFILE_NUM_LAUNCHES));
#endif
#ifdef ENABLE_FAULT_INJECTION
tmpCommAndChans.comm.faults = comm->faults;
#endif
NCCLCHECKGOTO(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, deviceStream), ret, fail);
exit:
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->deviceStream, /*concurrent=*/false));
NCCLCHECK(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream));
return ret;
fail:
goto exit;
}
// Pre-process the string so that running "strings" on the lib can quickly reveal the version.
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#define VERSION_STRING "RCCL version : " STR(NCCL_MAJOR) "." STR(NCCL_MINOR) "." STR(NCCL_PATCH) NCCL_SUFFIX
#define VERSION_STRING_EXTENDED "HIP version : " HIP_BUILD_INFO "\nROCm version : " ROCM_BUILD_INFO
#else
#define VERSION_STRING "NCCL version " STR(NCCL_MAJOR) "." STR(NCCL_MINOR) "." STR(NCCL_PATCH) NCCL_SUFFIX
#define VERSION_STRING_EXTENDED "CUDA version " STR(CUDA_MAJOR) "." STR(CUDA_MINOR)
#endif
static void showVersion() {
char versionInfo[2048+2*HOST_NAME_MAX], hostInfo[HOST_NAME_MAX], libPathInfo[2048];
// Retrieve Hostname info
if (gethostname(hostInfo, sizeof(hostInfo)-1) != 0) {
// Returns Unknown in hostInfo if function call unsuccessful
strncpy(hostInfo, "Unknown", sizeof(hostInfo)-1);
}
// Retrieve librccl path
Dl_info pathInfo;
if (dladdr((void*)ncclCommInitRank, &pathInfo)) {
strncpy(libPathInfo, pathInfo.dli_fname, sizeof(libPathInfo)-1);
} else {
// Sets libPath to Unknown if the above function call is not successful
strncpy(libPathInfo, "Unknown", sizeof(libPathInfo)-1);
}
snprintf(versionInfo, sizeof(versionInfo),
"%s-%s\n%s\n"
"%-12s : %s\n%12s : %s",
VERSION_STRING, rcclGitHash, VERSION_STRING_EXTENDED,
"Hostname", hostInfo, "Librccl path", libPathInfo
);
if (ncclDebugLevel == NCCL_LOG_VERSION || ncclDebugLevel == NCCL_LOG_WARN) {
VERSION("%s", versionInfo);
} else {
INFO(NCCL_ALL,"%s", versionInfo);
}
}
NCCL_PARAM(MNNVLUUID, "MNNVL_UUID", -1);
NCCL_PARAM(MNNVLCliqueId, "MNNVL_CLIQUE_ID", -1);
static ncclResult_t fillInfo(struct ncclComm* comm, struct ncclPeerInfo* info, uint64_t commHash) {
cudaDeviceProp prop;
info->rank = comm->rank;
info->cudaDev = comm->cudaDev;
info->nvmlDev = comm->nvmlDev;
NCCLCHECK(ncclGetVersion(&info->version));
info->hostHash=getHostHash()+commHash;
info->pidHash=getPidHash()+commHash;
info->cuMemSupport = ncclCuMemEnable();
CUDACHECK(cudaGetDeviceProperties(&prop, comm->cudaDev));
info->totalGlobalMem = ROUNDUP(prop.totalGlobalMem, (1L << 32));
// Get the device MAJOR:MINOR of /dev/shm so we can use that
// information to decide whether we can use SHM for inter-process
// communication in a container environment
struct stat statbuf;
SYSCHECK(stat("/dev/shm", &statbuf), "stat");
info->shmDev = statbuf.st_dev;
info->busId = comm->busId;
// detect if fine grained memory is available on this GPU
int *ptr;
#if defined(HIP_UNCACHED_MEMORY)
if (hipExtMallocWithFlags((void**)&ptr, sizeof(int), hipDeviceMallocUncached) == hipSuccess) {
#else
if (hipExtMallocWithFlags((void**)&ptr, sizeof(int), hipDeviceMallocFinegrained) == hipSuccess) {
#endif
CUDACHECK(hipFree(ptr));
info->hasFineGrain = true;
// GPU supports GDR if DMABUF is supported
if (dmaBufSupported(comm) == ncclSuccess)
info->gdrSupport = 1;
else
NCCLCHECK(ncclGpuGdrSupport(comm, &info->gdrSupport));
}
else {
info->hasFineGrain = false;
info->gdrSupport = 0;
}
comm->hasFineGrain = info->hasFineGrain;
info->comm = comm;
info->cudaCompCap = comm->minCompCap = comm->maxCompCap = comm->compCap;
#if !defined(__HIP_PLATFORM_AMD__) && !defined(__HIPCC__)
// MNNVL support
{
// MNNVL: Request the fabric UUID and partition info
char busId[NVML_DEVICE_PCI_BUS_ID_BUFFER_SIZE];
nvmlDevice_t nvmlDev;
NCCLCHECK(int64ToBusId(info->busId, busId));
NCCLCHECK(ncclNvmlDeviceGetHandleByPciBusId(busId, &nvmlDev));
info->fabricInfo.state = NVML_GPU_FABRIC_STATE_NOT_SUPPORTED;
(void) ncclNvmlDeviceGetGpuFabricInfoV(nvmlDev, &info->fabricInfo);
if (info->fabricInfo.state != NVML_GPU_FABRIC_STATE_NOT_SUPPORTED) {
unsigned long uuid0 = 0;
unsigned long uuid1 = 0;
if (ncclParamMNNVLUUID() != -1) {
unsigned long temp_uuid0 = (unsigned long)ncclParamMNNVLUUID();
unsigned long temp_uuid1 = (unsigned long)ncclParamMNNVLUUID();
memcpy(info->fabricInfo.clusterUuid, &temp_uuid0, sizeof(temp_uuid0));
memcpy(info->fabricInfo.clusterUuid + sizeof(temp_uuid0), &temp_uuid1, sizeof(temp_uuid1));
}
memcpy(&uuid0, info->fabricInfo.clusterUuid, sizeof(uuid0));
memcpy(&uuid1, info->fabricInfo.clusterUuid + sizeof(uuid0), sizeof(uuid1));
if (ncclParamMNNVLCliqueId() == -2) {
nvmlPlatformInfo_t platformInfo = { 0 };
NCCLCHECK(ncclNvmlDeviceGetPlatformInfo(nvmlDev, &platformInfo));
INFO(NCCL_INIT, "MNNVL rack serial %s slot %d tray %d hostId %d peerType %d moduleId %d",
platformInfo.chassisSerialNumber, platformInfo.slotNumber, platformInfo.trayIndex,
platformInfo.hostId, platformInfo.peerType, platformInfo.moduleId);
// Use a hash of the Rack serial number to partition the NVLD clique
info->fabricInfo.cliqueId = getHash(platformInfo.chassisSerialNumber, sizeof(platformInfo.chassisSerialNumber));
} else if (ncclParamMNNVLCliqueId() != -1) info->fabricInfo.cliqueId = ncclParamMNNVLCliqueId();
INFO(NCCL_INIT, "MNNVL busId 0x%lx fabric UUID %lx.%lx cliqueId 0x%x state %d healthMask 0x%x",
info->busId,
uuid0, uuid1,
info->fabricInfo.cliqueId, info->fabricInfo.state, info->fabricInfo.healthMask);
}
}
#endif
return ncclSuccess;
}
static ncclResult_t setupChannel(struct ncclComm* comm, int channelId, int rank, int nranks, int* ringRanks) {
TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
NCCLCHECK(initChannel(comm, channelId));
struct ncclRing* ring = &comm->channels[channelId].ring;
// Find our ring-distance from rank zero and reorganize ranks to start with rank.
int ixZero=0, ixRank=0;
for (int i=0; i < nranks; i++) {
if (ringRanks[i] == 0) ixZero = i;
if (ringRanks[i] == rank) ixRank = i;
}
ring->index = (ixRank-ixZero + nranks)%nranks;
for (int i=0; i<nranks; i++) {
ring->userRanks[i] = ringRanks[(i+ixRank)%nranks];
}
return ncclSuccess;
}
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#else
#define DEFAULT_LL_BUFFSIZE (NCCL_LL_LINES_PER_THREAD*NCCL_LL_MAX_NTHREADS*NCCL_STEPS*sizeof(union ncclLLFifoLine))
#define DEFAULT_LL128_BUFFSIZE (NCCL_LL128_ELEMS_PER_THREAD*NCCL_LL128_MAX_NTHREADS*NCCL_STEPS*sizeof(uint64_t))
#define DEFAULT_BUFFSIZE (1 << 22) /* 4MiB */
#endif
NCCL_PARAM(BuffSize, "BUFFSIZE", -2);
NCCL_PARAM(LlBuffSize, "LL_BUFFSIZE", -2);
NCCL_PARAM(Ll128BuffSize, "LL128_BUFFSIZE", -2);
// Default value of P2P_NET_CHUNKSIZE may be overwritten by changes in src/rccl_wrap.cc
NCCL_PARAM(P2pNetChunkSize, "P2P_NET_CHUNKSIZE", (1 << 17)); /* 128 kB */
NCCL_PARAM(P2pPciChunkSize, "P2P_PCI_CHUNKSIZE", (1 << 17)); /* 128 kB */
NCCL_PARAM(P2pNvlChunkSize, "P2P_NVL_CHUNKSIZE", (1 << 19)); /* 512 kB */
static ncclResult_t computeBuffSizes(struct ncclComm* comm) {
int64_t envs[NCCL_NUM_PROTOCOLS] = { ncclParamLlBuffSize(), ncclParamLl128BuffSize(), ncclParamBuffSize() };
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
int defaults[NCCL_NUM_PROTOCOLS];
rcclSetDefaultBuffSizes(comm, defaults);
#else
int defaults[NCCL_NUM_PROTOCOLS] = { DEFAULT_LL_BUFFSIZE, DEFAULT_LL128_BUFFSIZE, DEFAULT_BUFFSIZE };
#endif
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
comm->buffSizes[p] = envs[p] != -2 ? envs[p] : defaults[p];
}
if (comm->nNodes > 1) {
rcclSetP2pNetChunkSize(comm, comm->p2pChunkSize);
comm->p2pChunkSize = (comm->p2pChunkSize > RCCL_VALUE_INVALID)? comm->p2pChunkSize : ncclParamP2pNetChunkSize();
}
else if (comm->isAllNvlink) comm->p2pChunkSize = ncclParamP2pNvlChunkSize();
else comm->p2pChunkSize = ncclParamP2pPciChunkSize();
// Make sure P2P chunksize is not larger than coll chunksize.
if (comm->p2pChunkSize * NCCL_STEPS > comm->buffSizes[NCCL_PROTO_SIMPLE]) comm->p2pChunkSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS;
if (comm->sharedRes->owner != comm) {
/* make sure split comm p2pChunkSize won't exceed shared p2pChunkSize. */
comm->p2pChunkSize = std::min(comm->p2pChunkSize, comm->sharedRes->tpP2pChunkSize);
} else {
comm->sharedRes->tpP2pChunkSize = comm->p2pChunkSize;
}
INFO(NCCL_INIT, "P2P Chunksize set to %d", comm->p2pChunkSize);
return ncclSuccess;
}
NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0);
NCCL_PARAM(CollNetNodeThreshold, "COLLNET_NODE_THRESHOLD", 2);
NCCL_PARAM(NvbPreconnect, "NVB_PRECONNECT", 0);
NCCL_PARAM(AllocP2pNetLLBuffers, "ALLOC_P2P_NET_LL_BUFFERS", 0);
#ifdef ENABLE_WARP_SPEED
extern int64_t rcclParamWarpSpeedEnable();
extern int64_t rcclParamWarpSpeedAutoMode();
extern int64_t rcclParamWarpSpeedCuCount();
#endif
// MNNVL: Flag to indicate whether to enable Multi-Node NVLink
NCCL_PARAM(MNNVLEnable, "MNNVL_ENABLE", 2);
#define TIMER_INIT_TOTAL 0
#define TIMER_INIT_KERNELS 1
#define TIMER_INIT_BOOTSTRAP 2
#define TIMER_INIT_ALLGATHER 3
#define TIMER_INIT_TOPO 4
#define TIMER_INIT_GRAPHS 5
#define TIMER_INIT_CONNECT 6
#define TIMER_INIT_ALLOC 7
#define TIMERS_INIT_COUNT 8
static ncclResult_t initNvlDomainInfo(struct ncclComm* comm) {
// Initialize NVLink domain info
comm->nvlDomainInfo.nNvlDomains = comm->nNodes;
comm->nvlDomainInfo.minRanksPerNvlDomain = comm->minLocalRanks;
comm->nvlDomainInfo.maxRanksPerNvlDomain = comm->maxLocalRanks;
TRACE(NCCL_INIT, "NVLink domains: %d domains, min ranks per domain: %d, max ranks per domain: %d",
comm->nNodes, comm->nvlDomainInfo.minRanksPerNvlDomain, comm->nvlDomainInfo.maxRanksPerNvlDomain);
return ncclSuccess;
}
static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* parent, uint64_t timers[TIMERS_INIT_COUNT]) {
// We use 2 AllGathers
// 1. { peerInfo, comm, compCap}
// 2. { nChannels, graphInfo, topoRanks }
ncclResult_t ret = ncclSuccess;
int rank = comm->rank;
int nranks = comm->nRanks;
int nNodes = 1;
cpu_set_t affinitySave;
struct ncclTopoGraph* ringGraph = &comm->graphs[NCCL_ALGO_RING];
struct ncclTopoGraph* treeGraph = &comm->graphs[NCCL_ALGO_TREE];
struct ncclTopoGraph* collNetChainGraph = &comm->graphs[NCCL_ALGO_COLLNET_CHAIN];
struct ncclTopoGraph* collNetDirectGraph = &comm->graphs[NCCL_ALGO_COLLNET_DIRECT];
struct ncclTopoGraph* nvlsGraph = &comm->graphs[NCCL_ALGO_NVLS];
struct ncclTopoGraph* graphs[NCCL_NUM_ALGORITHMS] = { treeGraph, ringGraph, collNetDirectGraph, collNetChainGraph, nvlsGraph, nvlsGraph, treeGraph };
struct graphInfo {
int pattern;
int nChannels;
int sameChannels;
float bwIntra;
float bwInter;
int typeIntra;
int typeInter;
int crossNic;
};
struct allGatherInfo {
struct graphInfo graphInfo[NCCL_NUM_ALGORITHMS];
struct ncclTopoRanks topoRanks;
int cpuArch;
int cpuVendor;
int localRanks;
int nc;
bool pivotA2AEnabled;
bool ll128Enabled;
bool mscclEnabled;
};
int nChannelsOrig;
struct allGatherInfo *allGather3Data = NULL;
struct ncclTopoRanks** allTopoRanks = NULL;
int *nodesFirstRank = NULL, *nodesTreePatterns = NULL;
int *rings = NULL;
int* nvbPeers = NULL;
struct ncclProxyConnector proxyConn;
int* pxnPeers = NULL;
int *topParentLocalRanks = NULL;
int p2pLevel = -1;
bool needsProxy = false;
bool mscclNeedsProxy = needsProxy;
timers[TIMER_INIT_ALLGATHER] = clockNano();
// AllGather1 - begin
NCCLCHECKGOTO(ncclCalloc(&comm->peerInfo, nranks+1), ret, fail); // Extra rank to represent CollNet root
NCCLCHECKGOTO(fillInfo(comm, comm->peerInfo+rank, comm->commHash), ret, fail);
NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, comm->peerInfo, sizeof(struct ncclPeerInfo)), ret, fail);
__atomic_store_n(&comm->peerInfoValid, true, __ATOMIC_RELEASE);
comm->cuMemSupport = 1;
for (int i = 0; i < nranks; i++) {
if (comm->peerInfo[i].version != comm->peerInfo[rank].version) {
WARN("Mismatched NCCL version detected : rank %d version %d rank %d version %d",
i, comm->peerInfo[i].version, rank, comm->peerInfo[rank].version);
ret = ncclInvalidUsage;
goto fail;
}
if (comm->peerInfo[i].hostHash != comm->peerInfo[rank].hostHash) nNodes++;
if (!comm->peerInfo[i].cuMemSupport) comm->cuMemSupport = 0;
if ((i != rank) && (comm->peerInfo[i].hostHash == comm->peerInfo[rank].hostHash) && (comm->peerInfo[i].busId == comm->peerInfo[rank].busId)) {
WARN("Duplicate GPU detected : rank %d and rank %d both on CUDA device %lx", rank, i, comm->peerInfo[rank].busId);
ret = ncclInvalidUsage;
goto fail;
}
}
// AllGather1 - end
timers[TIMER_INIT_ALLGATHER] = clockNano() - timers[TIMER_INIT_ALLGATHER];
// Check for MNNVL support
NCCLCHECKGOTO(ncclGetUserP2pLevel(&p2pLevel), ret, fail);
if ((nNodes > 1 && ncclParamMNNVLEnable() != 0 && p2pLevel != 0) || ncclParamMNNVLEnable() == 1) {
NCCLCHECKGOTO(ncclMnnvlCheck(comm), ret, fail);
}
do {
// Compute intra-process ranks
int intraProcRank0 = -1, intraProcRank = -1, intraProcRanks = 0;
for (int i = 0; i < nranks; i++) comm->minCompCap = std::min(comm->minCompCap, comm->peerInfo[i].cudaCompCap);
for (int i = 0; i < nranks; i++) comm->maxCompCap = std::max(comm->maxCompCap, comm->peerInfo[i].cudaCompCap);
comm->nvlsRegSupport = 1;
for (int i = 0; i < nranks; i++) {
if ((comm->peerInfo[i].hostHash == comm->peerInfo[rank].hostHash)
&& (comm->peerInfo[i].pidHash == comm->peerInfo[rank].pidHash)) {
// Rank is in same process
if (intraProcRanks == 0) intraProcRank0 = i;
if (i == rank) intraProcRank = intraProcRanks;
intraProcRanks++;
if (intraProcRank0 == rank && rank != i) {
comm->peerInfo[i].comm->intraNext = comm->intraNext;
comm->intraNext = comm->peerInfo[i].comm;
}
}
if (comm->nvlsRegSupport) {
for (int j = i + 1; j < nranks; j++) {
if (comm->peerInfo[i].hostHash == comm->peerInfo[j].hostHash &&
comm->peerInfo[i].pidHash == comm->peerInfo[j].pidHash) {
comm->nvlsRegSupport = 0;
break;
}
}
}
}
// Buffer Registration is not supported with MNNVL
if (comm->MNNVL) comm->nvlsRegSupport = 0;
else if (ncclParamSingleProcMemRegEnable()) comm->nvlsRegSupport = 1;
TRACE(NCCL_INIT,"pidHash[%d] %lx intraProcRank %d intraProcRanks %d intraProcRank0 %d",
rank, comm->peerInfo[rank].pidHash, intraProcRank, intraProcRanks, intraProcRank0);
if (intraProcRank == -1 || intraProcRank0 == -1 || comm->peerInfo[intraProcRank0].comm == NULL) {
WARN("Failed to determine intra proc ranks rank %d hostHash %lx pidHash %lx intraProcRank %d intraProcRanks %d intraProcRank0 %d",
rank, comm->peerInfo[rank].hostHash, comm->peerInfo[rank].pidHash,
intraProcRank, intraProcRanks, intraProcRank0);
ret = ncclInternalError;
goto fail;
}
#if defined(ENABLE_NPKIT)
if (intraProcRanks != 1) {
WARN("NPKit currently does not support more than 1 device per process");
ret = ncclInternalError;
goto fail;
}
#endif
struct ncclComm* comm0 = comm->peerInfo[intraProcRank0].comm;
assert(intraProcRank==0 ? comm==comm0 : true);
comm->intraComm0 = comm0;
comm->intraRank = intraProcRank;
comm->intraRanks = intraProcRanks;
comm->intraBarrierPhase = 0;
comm->intraBarrierCounter = 0;
comm->intraBarrierGate = 0;
} while(0);
timers[TIMER_INIT_TOPO] = clockNano();
// Dump XML if requested by user
const char* dumpXmlFile;
dumpXmlFile = ncclGetEnv("NCCL_TOPO_DUMP_FILE");
if (dumpXmlFile) {
NCCLCHECKGOTO(ncclTopoGetSystem(comm, NULL, dumpXmlFile), ret, fail);
}
// Topo detection / System graph creation
NCCLCHECKGOTO(ncclTopoGetSystem(comm, &comm->topo), ret, fail);
comm->topo->tuning = rcclGetTuningIndexForArch(comm->archName);
INFO(NCCL_INIT, "Tuning index set to: %d", comm->topo->tuning);
// save nRanks to ncclTopoSystem as indicator of multi-node
comm->topo->nRanks = comm->nRanks;
// init netGdrLevel
comm->topo->netGdrLevel = -2;
// init Pivot A2A related fields
comm->topo->pivotA2AEnabled = false;
comm->topo->pivotA2ANumBiRings = 0;
// LL128
comm->topo->ll128Enabled = false;
// Topology hint for MSCCL internal scheduler about whether to enable MSCCL
comm->topo->mscclEnabled = false;
// Topology hint if tree has been defined by model or User
comm->topo->treeDefined = false;
// Compute paths between GPUs and NICs
NCCLCHECKGOTO(ncclTopoComputePaths(comm->topo, comm), ret, fail);
// Remove inaccessible GPUs and unused NICs
NCCLCHECKGOTO(ncclTopoTrimSystem(comm->topo, comm), ret, fail);
// Recompute paths after trimming
NCCLCHECKGOTO(ncclTopoComputePaths(comm->topo, comm), ret, fail);
// Init search
NCCLCHECKGOTO(ncclTopoSearchInit(comm->topo), ret, fail);
// Decide on comm's CPU architecture.
NCCLCHECKGOTO(ncclTopoComputeCommCPU(comm), ret, fail);
// Print final topology
NCCLCHECKGOTO(ncclTopoPrint(comm->topo), ret, fail);
timers[TIMER_INIT_TOPO] = clockNano() - timers[TIMER_INIT_TOPO];
// Set Affinity to a CPU local the our GPU, so that all memory we allocate
// on the host is local.
NCCLCHECKGOTO(ncclTopoGetCpuAffinity(comm->topo, comm->rank, &comm->cpuAffinity), ret, fail);
if (CPU_COUNT(&comm->cpuAffinity)) {
sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave);
sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity);
}
// Determine local CollNet support
if (!collNetSupport(comm)) {
comm->config.collnetEnable = 0;
}
// Determine local Nvls support
NCCLCHECK(ncclNvlsInit(comm));
// [RCCL] Compute hostIdx (based on hostHash)
{
comm->topo->nHosts = 0;
for (int r = 0; r < nranks; r++) {
int isNewHost = 1;
// Check if this is the first time this hostname has been used
for (int i = 0; i < r && isNewHost; i++) {
if (comm->peerInfo[i].hostHash == comm->peerInfo[r].hostHash) {
isNewHost = 0;
}
}
if (isNewHost)
{
// Check if this is the same hostname associated with this rank
if (comm->peerInfo[r].hostHash == comm->peerInfo[rank].hostHash)
comm->topo->hostIdx = comm->topo->nHosts;
comm->topo->nHosts++;
}
}
}
timers[TIMER_INIT_GRAPHS] = clockNano();
// Get rings and trees
memset(ringGraph, 0, sizeof(struct ncclTopoGraph));
ringGraph->id = 0;
ringGraph->pattern = NCCL_TOPO_PATTERN_RING;
ringGraph->minChannels = 1;
ringGraph->maxChannels = MAXCHANNELS/2;
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, ringGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, ringGraph), ret, fail);
memset(treeGraph, 0, sizeof(struct ncclTopoGraph));
treeGraph->id = 1;
treeGraph->pattern = NCCL_TOPO_PATTERN_BALANCED_TREE;
treeGraph->minChannels = ringGraph->nChannels;
treeGraph->maxChannels = ringGraph->nChannels;
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, treeGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, treeGraph), ret, fail);
memset(collNetChainGraph, 0, sizeof(struct ncclTopoGraph));
collNetChainGraph->id = 2;
collNetChainGraph->pattern = NCCL_TOPO_PATTERN_TREE;
collNetChainGraph->collNet = 1;
collNetChainGraph->minChannels = ringGraph->nChannels;
collNetChainGraph->maxChannels = ringGraph->nChannels;
memset(collNetDirectGraph, 0, sizeof(struct ncclTopoGraph));
collNetDirectGraph->id = 4;
collNetDirectGraph->pattern = NCCL_TOPO_PATTERN_COLLNET_DIRECT;
collNetDirectGraph->collNet = 1;
collNetDirectGraph->minChannels = 1;
collNetDirectGraph->maxChannels = MAXCHANNELS;
if (comm->config.collnetEnable) {
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, collNetChainGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, collNetChainGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, collNetDirectGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, collNetDirectGraph), ret, fail);
}
memset(nvlsGraph, 0, sizeof(struct ncclTopoGraph));
nvlsGraph->id = 3;
nvlsGraph->pattern = NCCL_TOPO_PATTERN_NVLS;
nvlsGraph->minChannels = 1;
nvlsGraph->maxChannels = MAXCHANNELS;
if (comm->nvlsSupport) {
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, nvlsGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, nvlsGraph), ret, fail);
}
timers[TIMER_INIT_GRAPHS] = clockNano() - timers[TIMER_INIT_GRAPHS];
bool allXgmi, hasPeerAccess;
allXgmi = true;
hasPeerAccess = true;
// Check that all the GPUs have peer access to one another and are XGMI connected
for (int i = 0; i < nranks && hasPeerAccess; i++) {
int cudaDev1 = comm->peerInfo[i].cudaDev;
for (int j = 0; j < nranks; j++) {
if (i == j) continue;
int cudaDev2 = comm->peerInfo[j].cudaDev;
int p2p;
if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p)
{
hasPeerAccess = false;
break;
}
bool isXGMI;
// Limit to single intermediate GPU for enabling clique
NCCLCHECK(ncclTopoGetLinkType(comm->topo, i, j, &isXGMI, 1));
allXgmi &= isXGMI;
}
}
// Initialize num P2P LL buffers for this communicator
comm->allocP2pNetLLBuffers = ncclParamAllocP2pNetLLBuffers() == 1;
if (comm->rank == ncclParamGraphDumpFileRank()) {
struct ncclTopoGraph* dumpGraphs[5] = { ringGraph, treeGraph, collNetDirectGraph, collNetChainGraph, nvlsGraph };
NCCLCHECKGOTO(ncclTopoDumpGraphs(comm->topo, 5, dumpGraphs), ret, fail);
}
if ((comm->topo->type & RCCL_TOPO_4P2H_ROME) && (comm->topo->type & RCCL_TOPO_GDR_ALL)) {
if (rcclParamP2pNetDisable() == 0) {
if (!(comm->topo->type & RCCL_TOPO_FORCE_INTRA)) comm->p2pNet = 1;
INFO(NCCL_INIT, "RCCL enabled same node P2P over network");
}
else
INFO(NCCL_INIT, "RCCL force disabled same node P2P over network");
}
// Because timers[[TIMER_INIT_ALLGATHER] already contains the timing of the first allgather,
// we temporarily store the start time of the subsequent one in an as-of-yet unused CONNECT timer.
timers[TIMER_INIT_CONNECT] = clockNano();
// AllGather3 - begin
NCCLCHECKGOTO(ncclCalloc(&allGather3Data, nranks), ret, fail);
int idx;
NCCLCHECK(ncclTopoIdToIndex(comm->topo, GPU, comm->busId, &idx));
allGather3Data[rank].nc = 2;
if (comm->topo->nodes[GPU].count == comm->topo->nRanks &&
IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx906") && allXgmi)
allGather3Data[rank].nc = 4;
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx908"))
allGather3Data[rank].nc = std::max(4/ringGraph->nChannels, 2);
if (comm->topo->nodes[GPU].count == comm->topo->nRanks &&
(comm->topo->type & RCCL_TOPO_CR8G))
allGather3Data[rank].nc = 4;
if (comm->topo->nodes[GPU].count == comm->topo->nRanks &&
IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx90a"))
allGather3Data[rank].nc = 4;
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx90a"))
allGather3Data[rank].nc = std::max(allGather3Data[rank].nc, 4/ringGraph->nChannels);
if (ringGraph->nChannels > MAXCHANNELS/2)
allGather3Data[rank].nc = 1;
comm -> gfx9CheapFenceOff = 1;
#ifdef HIP_UNCACHED_MEMORY
if(!rcclParamGfx9CheapFenceOff()){
if(IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx942")){
comm -> gfx9CheapFenceOff = 0;
}
else if(IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx950")){
comm -> gfx9CheapFenceOff = ROCM_VERSION < 70002 && nNodes > 1; // Enable for single node only prior to ROCm 7.0.2
}
}
INFO(NCCL_INIT, "GFX9 cheap fence is %s", comm -> gfx9CheapFenceOff ? "OFF" : "ON");
#endif
// RCCL: Only use one slice per primitive on some single node gfx9xx systems, only currently enabled for AllReduce, ReduceScatter, and AllGather
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx942") || IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx950")){
comm->rcclUseOneSlice = nNodes == 1;
}
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx942")) {
// Multi-node MI300A
int managed = 0;
CUDACHECK(hipDeviceGetAttribute(&managed, hipDeviceAttributeDirectManagedMemAccessFromHost, 0));
if (managed && nNodes > 1) {
// This forces the minimum channels to 24
allGather3Data[rank].nc = 6;
} else {
// MI300X
if (nranks == 2)
// NCCL_MIN_NCHANNELS=32
allGather3Data[rank].nc = 16;
else if (nranks == 4)
// NCCL_MIN_NCHANNELS=24
allGather3Data[rank].nc = 4;
}
}
#ifdef ENABLE_WARP_SPEED
comm->topo->warpSpeedEnabled = (rcclParamWarpSpeedEnable() != 0 || rcclParamWarpSpeedAutoMode() != 0 || rcclParamWarpSpeedCuCount() > 0);
#endif
// For single node communicators that do not uses the full xgmi links per gpu, i.e., nranks < 8
// Inflate the nChannels a bit to achieve higher b/w.
if (IsArchMatch(comm->topo->nodes[GPU].nodes[idx].gpu.gcn, "gfx950")) {
if (nranks == 2 && nNodes == 1){
allGather3Data[rank].nc = 16;
} else if (nranks == 4 && nNodes == 1){
allGather3Data[rank].nc = 8;
} else {
allGather3Data[rank].nc = 4;
}
}
#ifdef ENABLE_WARP_SPEED
// Double default channels for WarpSpeed enabled communicators
if (comm->topo->warpSpeedEnabled) {
allGather3Data[rank].nc *= 2;
}
#endif
allGather3Data[rank].pivotA2AEnabled = comm->topo->pivotA2AEnabled && rcclParamPivotAlltoallEnable();
comm->topo->ll128Enabled = comm->topo->ll128Enabled || rcclParamLL128ForceEnable();
allGather3Data[rank].ll128Enabled = comm->topo->ll128Enabled;
allGather3Data[rank].mscclEnabled = comm->topo->mscclEnabled;
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
allGather3Data[rank].graphInfo[a].pattern = graphs[a]->pattern;
allGather3Data[rank].graphInfo[a].nChannels = graphs[a]->nChannels;
allGather3Data[rank].graphInfo[a].sameChannels = graphs[a]->sameChannels;
allGather3Data[rank].graphInfo[a].bwIntra = graphs[a]->bwIntra;
allGather3Data[rank].graphInfo[a].bwInter = graphs[a]->bwInter;
allGather3Data[rank].graphInfo[a].typeIntra = graphs[a]->typeIntra;
allGather3Data[rank].graphInfo[a].typeInter = graphs[a]->typeInter;
allGather3Data[rank].graphInfo[a].crossNic = graphs[a]->crossNic;
}
allGather3Data[rank].cpuArch = comm->cpuArch;
allGather3Data[rank].cpuVendor = comm->cpuVendor;
comm->nChannels = std::min(treeGraph->nChannels, ringGraph->nChannels);
//For a 1rank job theres no topology constraint, so ncclTopoCompute drives the ring to its allowed maximum, which results 4 x MAXCHANNELS channels for single rank comms and causes issues.
if (comm->nRanks == 1) {
comm->nChannels = treeGraph->nChannels = ringGraph->nChannels = 8;
}
NCCLCHECKGOTO(ncclTopoPreset(comm, graphs, &allGather3Data[rank].topoRanks), ret, fail);
NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allGather3Data, sizeof(*allGather3Data)), ret, fail);
// Determine nNodes, firstRanks, ...
NCCLCHECKGOTO(ncclCalloc(&nodesFirstRank, nranks), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&nodesTreePatterns, nranks), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->rankToNode, comm->nRanks), ret, fail);
for (int r=0; r<nranks; r++) {
int node;
int firstRank = allGather3Data[r].topoRanks.ringRecv[0];
for (node=0; node<comm->nNodes && nodesFirstRank[node] != firstRank; node++);
if (node == comm->nNodes) {
comm->nNodes++;
nodesFirstRank[node] = firstRank;
// Record tree pattern of each node as they can be different depending on sm arch
nodesTreePatterns[node] = allGather3Data[r].graphInfo[NCCL_ALGO_TREE].pattern;
}
comm->rankToNode[r] = node;
if (comm->cpuArch != allGather3Data[r].cpuArch &&
comm->cpuArch != NCCL_TOPO_CPU_ARCH_MIXED) {
comm->cpuArch = NCCL_TOPO_CPU_ARCH_MIXED;
}
if (comm->cpuVendor != allGather3Data[r].cpuVendor &&
comm->cpuVendor != NCCL_TOPO_CPU_VENDOR_MIXED) {
comm->cpuVendor = NCCL_TOPO_CPU_VENDOR_MIXED;
}
}
// Alert the user to the presence of mixed CPUs. In the past this has caused
// locks in some collective routines. This may help debug issues in the future.
if (rank==0) {
if (comm->cpuArch == NCCL_TOPO_CPU_ARCH_MIXED) {
INFO(NCCL_GRAPH, "CPUs with mixed architecture were detected.");
}
if (comm->cpuVendor == NCCL_TOPO_CPU_VENDOR_MIXED) {
INFO(NCCL_GRAPH, "CPUs with mixed vendors were detected.");
}
}
// Now that we know nNodes, alloc nodeRanks and compute localRanks for each node
NCCLCHECKGOTO(ncclCalloc(&comm->nodeRanks, comm->nNodes), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->rankToLocalRank, comm->nRanks), ret, fail);
for (int r=0; r<comm->nRanks; r++) {
int node = comm->rankToNode[r];
comm->rankToLocalRank[r] = comm->nodeRanks[node].localRanks;
comm->nodeRanks[node].localRanks++;
}
comm->minLocalRanks = INT_MAX;
// Allocate ranks arrays for each node
for (int n=0; n<comm->nNodes; n++) {
NCCLCHECKGOTO(ncclCalloc(&comm->nodeRanks[n].localRankToRank, comm->nodeRanks[n].localRanks), ret, fail);
comm->maxLocalRanks = std::max(comm->maxLocalRanks, comm->nodeRanks[n].localRanks);
comm->minLocalRanks = std::min(comm->minLocalRanks, comm->nodeRanks[n].localRanks);
comm->nodeRanks[n].localRanks = 0;
}
// And fill the ranks arrays
for (int r=0; r<comm->nRanks; r++) {
int node = comm->rankToNode[r];
comm->nodeRanks[node].localRankToRank[comm->nodeRanks[node].localRanks++] = r;
}
comm->node = comm->rankToNode[rank];
comm->localRankToRank = comm->nodeRanks[comm->node].localRankToRank;
comm->localRank = comm->rankToLocalRank[rank];
comm->localRanks = comm->nodeRanks[comm->node].localRanks;
NCCLCHECKGOTO(initNvlDomainInfo(comm), ret, fail);
TRACE(NCCL_INIT,"hostHash[%d] %lx localRank %d localRanks %d localRank0 %d",
rank, comm->peerInfo[rank].hostHash, comm->localRank, comm->localRanks, comm->localRankToRank[0]);
if (comm->localRank == -1 || comm->localRankToRank[0] == -1 || comm->localRanks == 0) {
WARN("Failed to determine local ranks rank %d hostHash %lx pidHash %lx localRank %d localRanks %d localRank0 %d",
rank, comm->peerInfo[rank].hostHash, comm->peerInfo[rank].pidHash,
comm->localRank, comm->localRanks, comm->localRankToRank[0]);
ret = ncclInternalError;
goto fail;
}
INFO(NCCL_INIT, "comm %p rank %d nRanks %d nNodes %d localRanks %d localRank %d MNNVL %d",
comm, rank, comm->nRanks, comm->nNodes, comm->localRanks, comm->localRank, comm->MNNVL);
nChannelsOrig = comm->nChannels;
NCCLCHECKGOTO(ncclCalloc(&allTopoRanks, comm->nRanks), ret, fail);
int nc;
nc = allGather3Data[0].nc;
for (int i=0; i<nranks; i++) {
allTopoRanks[i] = &allGather3Data[i].topoRanks;
nc = std::min(allGather3Data[i].nc, nc);
// Make sure we align all ranks so that the tuning is consistent across ranks
comm->topo->pivotA2AEnabled = comm->topo->pivotA2AEnabled && allGather3Data[i].pivotA2AEnabled;
comm->topo->ll128Enabled = comm->topo->ll128Enabled && allGather3Data[i].ll128Enabled;
comm->topo->mscclEnabled = comm->topo->mscclEnabled && allGather3Data[i].mscclEnabled;
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
graphs[a]->nChannels = std::min(allGather3Data[i].graphInfo[a].nChannels, graphs[a]->nChannels);
graphs[a]->sameChannels = std::min(allGather3Data[i].graphInfo[a].sameChannels, graphs[a]->sameChannels);
graphs[a]->bwIntra = std::min(allGather3Data[i].graphInfo[a].bwIntra, graphs[a]->bwIntra);
graphs[a]->bwInter = std::min(allGather3Data[i].graphInfo[a].bwInter, graphs[a]->bwInter);
graphs[a]->typeIntra = std::max(allGather3Data[i].graphInfo[a].typeIntra, graphs[a]->typeIntra);
graphs[a]->typeInter = std::max(allGather3Data[i].graphInfo[a].typeInter, graphs[a]->typeInter);
graphs[a]->crossNic = std::max(allGather3Data[i].graphInfo[a].crossNic, graphs[a]->crossNic);
}
comm->maxTreePattern = std::max(comm->maxTreePattern, allGather3Data[i].graphInfo[NCCL_ALGO_TREE].pattern);
}
if (graphs[NCCL_ALGO_COLLNET_CHAIN]->nChannels == 0) comm->config.collnetEnable = 0;
if (graphs[NCCL_ALGO_NVLS]->nChannels == 0) comm->nvlsSupport = comm->nvlsChannels = 0;
comm->nChannels = treeGraph->nChannels = ringGraph->nChannels =
(comm->topo->nodes[GPU].count != comm->topo->nRanks && comm->topo->nodes[NET].count)
? std::min(treeGraph->nChannels, ringGraph->nChannels) : ringGraph->nChannels;
if (comm->nChannels < nChannelsOrig) {
// We started duplicating channels during Preset(), so we need to move the
// duplicated channels since we have removed some.
for (int i=0; i<comm->nChannels; i++) memcpy(comm->channels+comm->nChannels+i, comm->channels+nChannelsOrig+i, sizeof(struct ncclChannel));
}
// Determine CollNet support after all-gather now that we know nNodes and each node localRanks
if (comm->config.collnetEnable == 1) {
int collNetNodeThreshold = ncclParamCollNetNodeThreshold();
if (comm->nNodes < collNetNodeThreshold) {
INFO(NCCL_INIT, "Communicator has %d nodes which is less than CollNet node threshold %d, disabling CollNet", comm->nNodes, collNetNodeThreshold);
comm->config.collnetEnable = 0;
}
}
NCCLCHECK(ncclTopoPathAllNVLink(comm->topo, &comm->isAllNvlink));
comm->isOneRPN = (comm->maxLocalRanks == 1);
NCCLCHECKGOTO(ncclCalloc(&rings, nranks*MAXCHANNELS), ret, fail);
NCCLCHECKGOTO(ncclTopoPostset(comm, nodesFirstRank, nodesTreePatterns, allTopoRanks, rings, graphs, parent, nc), ret, fail);
if (comm->topo->treeDefined) NCCLCHECK(ncclTreeBasePostset(comm, treeGraph));
// AllGather3 - end
timers[TIMER_INIT_ALLGATHER] += clockNano() - timers[TIMER_INIT_CONNECT];
TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels);
char line[4096];
line[0]='\0';
for (int c=0; c<comm->nChannels; c++) {
struct ncclTree* tree = &comm->channels[c].tree;
snprintf(line+strlen(line), 2047-strlen(line), " [%d] %d/%d/%d->%d->%d",
c, tree->down[0], tree->down[1], tree->down[2], rank, tree->up);
INFO(NCCL_GRAPH, "Ring %d : %d -> %d -> %d comm %p nRanks %02d busId %lx", c, comm->channels[c].ring.prev,
comm->rank, comm->channels[c].ring.next, comm, comm->nRanks, comm->busId);
}
line[4095] = '\0';
INFO(NCCL_INIT, "Trees%s comm %p nRanks %02d busId %lx", line, comm, comm->nRanks, comm->busId);
NCCLCHECKGOTO(computeBuffSizes(comm), ret, fail);
// Compute nChannels per peer for p2p
NCCLCHECKGOTO(ncclTopoComputeP2pChannels(comm), ret, fail);
/* until now, all info of comm should be known. We can initialize shared resources and
* map localRanks to top parent local ranks. NOTE: this shareRes init must be put before
* all proxy operations. */
if (comm->sharedRes->owner == comm) {
comm->sharedRes->tpNLocalRanks = comm->localRanks;
comm->sharedRes->magic = comm->magic;
comm->sharedRes->tpNChannels = comm->nChannels;
comm->sharedRes->tpP2pNChannels = comm->p2pnChannels;
memcpy(comm->sharedRes->tpRankToLocalRank, comm->rankToLocalRank, sizeof(int) * comm->nRanks);
}
NCCLCHECKGOTO(ncclCalloc(&topParentLocalRanks, comm->localRanks), ret, fail);
for (int i = 0; i < comm->localRanks; ++i) {
int tpRank = comm->topParentRanks[comm->localRankToRank[i]];
topParentLocalRanks[i] = comm->sharedRes->tpRankToLocalRank[tpRank];
}
comm->topParentLocalRanks = topParentLocalRanks;
// Profiler plugin context has to be initialized before proxy thread
NCCLCHECK(ncclProfilerPluginInit(comm));
NCCLCHECKGOTO(ncclTransportCheckP2pType(comm, &comm->isAllDirectP2p, &comm->directMode), ret, fail);
// Launch proxy service thread, after this, the proxy calls can be used.
if (parent && parent->shareResources) {
comm->proxyState = parent->sharedRes->proxyState;
ncclAtomicRefCountIncrement(&parent->sharedRes->proxyState->refCount);
} else {
NCCLCHECKGOTO(ncclProxyCreate(comm), ret, fail);
}
NCCLCHECKGOTO(ncclCalloc(&comm->gproxyConn, comm->nRanks), ret, fail);
timers[TIMER_INIT_CONNECT] = clockNano();
do { // Build p2p schedule
int node = comm->node;
int nNodes = comm->nNodes;
int nRanks = comm->nRanks;
int local = comm->localRank;
int nLocals = comm->maxLocalRanks;
struct ncclNodeRanks* nodeRanks = comm->nodeRanks;
bool flat = false;
for (int node = 0; node < nNodes; node++) {
if (nodeRanks[node].localRanks != nLocals) {
flat = true;
nNodes = 1; node = 0;
nLocals = nRanks; local = rank;
break;
}
}
int nNodesPow2 = pow2Up(nNodes);
int nLocalsPow2 = pow2Up(nLocals);
comm->p2pSchedule = ncclMemoryStackAlloc<ncclComm::P2pSchedulePair>(&comm->memPermanent, nRanks);
comm->planner.peers = ncclMemoryStackAlloc<ncclKernelPlanner::Peer>(&comm->memPermanent, nRanks);
uint32_t nodeRound = 0;
uint32_t nodeDelta = 0;
int round = 0;
// When enumerating peer deltas we use the quadratic formula (x*x+x)/2 mod N.
// Since that formula only produces valid permutations when N is a pow of 2,
// we let N = pow2Up(n) and filter out results greater-eq to n.
// Example sequence for 16 ranks: 0, 1, 3, 6, 10, 15, 5, 12, 4, 13, 7, 2, 14, 11, 9, 8
do {
if (nodeDelta < nNodes) { // Filter nonsensical node deltas
int sendNode = (node + nodeDelta) % nNodes;
int recvNode = (node - nodeDelta + nNodes) % nNodes;
uint32_t localRound = 0;
uint32_t localDelta = 0;
do {
if (localDelta < nLocals) { // Filter nonsensical node-local deltas
int sendLocal = (local + localDelta) % nLocals;
int recvLocal = (local - localDelta + nLocals) % nLocals;
comm->p2pSchedule[round].sendRank = flat ? sendLocal : nodeRanks[sendNode].localRankToRank[sendLocal];
comm->p2pSchedule[round].recvRank = flat ? recvLocal : nodeRanks[recvNode].localRankToRank[recvLocal];
round += 1;
}
localRound += 1;
localDelta = (localDelta + localRound) & (nLocalsPow2 - 1); // Quadratic update
} while (localRound != nLocalsPow2);
}
nodeRound += 1;
nodeDelta = (nodeDelta + nodeRound) & (nNodesPow2 - 1); // Quadratic update
} while (nodeRound != nNodesPow2);
if (round != nRanks) {
WARN("P2p schedule creation has bugs.");
ret = ncclInternalError;
goto fail;
}
} while (0);
comm->runtimeConn = comm->cuMemSupport && ncclParamRuntimeConnect();
if (comm->runtimeConn) {
for (int c=0; c<comm->nChannels; c++) {
NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, fail);
}
// Attempt to setup NVLS, may silently fail and disable NVLS
NCCLCHECKGOTO(ncclNvlsSetup(comm, parent), ret, fail);
// Check if we can setup CollNet
if (comm->config.collnetEnable) ncclCollNetSetup(comm, parent, graphs);
} else {
for (int c=0; c<comm->nChannels; c++) {
NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, fail);
}
NCCLCHECKGOTO(ncclTransportRingConnect(comm), ret, fail);
// Connect NET for intranode use
if (comm->graphs[NCCL_ALGO_RING].nIntraChannels && rcclParamP2pNetDisable() == 0) {
comm->useIntraNet = 1;
for (int c = 0; c < comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
if (comm->nRanks == 1) continue;
NCCLCHECKGOTO(ncclTransportP2pConnect(comm, c, 1, &channel->ring.prev, 1, &channel->ring.next, NCCL_CONN_IDX_P2P_NET), ret, fail);
}
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &comm->graphs[NCCL_ALGO_RING], NCCL_CONN_IDX_P2P_NET), ret, fail);
}
// Connect Trees
NCCLCHECKGOTO(ncclTransportTreeConnect(comm), ret, fail);
// Connect PAT only for communicators with 1 GPU per node
if (comm->maxLocalRanks == 1) NCCLCHECKGOTO(ncclTransportPatConnect(comm), ret, fail);
// Attempt to setup NVLS, may silently fail and disable NVLS
NCCLCHECKGOTO(ncclNvlsSetup(comm, parent), ret, fail);
NCCLCHECKGOTO(ncclNvlsBufferSetup(comm), ret, fail);
// And NVLS trees if needed
NCCLCHECKGOTO(ncclNvlsTreeConnect(comm), ret, fail);
// Check if we can setup CollNet
if (comm->config.collnetEnable) {
ncclCollNetSetup(comm, parent, graphs);
NCCLCHECKGOTO(ncclCollNetChainBufferSetup(comm), ret, fail);
if (comm->maxLocalRanks <= NCCL_MAX_DIRECT_ARITY+1) {
NCCLCHECKGOTO(ncclCollNetDirectBufferSetup(comm), ret, fail);
}
}
// Connect to local net proxy
NCCLCHECKGOTO(ncclProxyConnect(comm, TRANSPORT_NET, 1, comm->rank, &proxyConn), ret, fail);
NCCLCHECKGOTO(ncclProxyCallBlocking(comm, &proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0), ret, fail);
// Then to remote ones when using PXN
if (ncclPxnDisable(comm) == 0) {
int nranks;
NCCLCHECKGOTO(ncclTopoGetPxnRanks(comm, &pxnPeers, &nranks), ret, fail);
for (int r=0; r<nranks; r++) {
NCCLCHECKGOTO(ncclProxyConnect(comm, TRANSPORT_NET, 1, pxnPeers[r], &proxyConn), ret, fail);
NCCLCHECKGOTO(ncclProxyCallBlocking(comm, &proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0), ret, fail);
}
}
if (ncclParamNvbPreconnect()) {
// Connect p2p when using NVB path
int nvbNpeers;
NCCLCHECKGOTO(ncclTopoGetNvbGpus(comm->topo, comm->rank, &nvbNpeers, &nvbPeers), ret, fail);
for (int r=0; r<nvbNpeers; r++) {
int peer = nvbPeers[r];
int sendRound=0, recvRound=0;
while (comm->p2pSchedule[sendRound].sendRank != peer) sendRound++;
while (comm->p2pSchedule[recvRound].recvRank != peer) recvRound++;
uint8_t sendBase = ncclP2pChannelBaseForRound(comm, sendRound);
uint8_t recvBase = ncclP2pChannelBaseForRound(comm, recvRound);
for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
int channelId;
channelId = ncclP2pChannelForPart(comm->p2pnChannels, sendBase, c, comm->p2pnChannelsPerPeer, comm->nNodes);
if (comm->channels[channelId].peers[peer]->send[1].connected == 0) {
comm->connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64));
}
channelId = ncclP2pChannelForPart(comm->p2pnChannels, recvBase, c, comm->p2pnChannelsPerPeer, comm->nNodes);
if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) {
comm->connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64));
}
}
}
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, NULL, 1), ret, fail);
}
}
TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels);
// Compute time models for algorithm and protocol combinations
NCCLCHECKGOTO(ncclTopoInitTunerConstants(comm), ret, fail);
NCCLCHECKGOTO(ncclTunerPluginLoad(comm), ret, fail);
if (comm->tuner) {
NCCLCHECK(comm->tuner->init(&comm->tunerContext, comm->commHash, comm->nRanks, comm->nNodes, ncclDebugLog, &comm->nvlDomainInfo, &comm->tunerConstants));
}
NCCLCHECKGOTO(ncclTopoTuneModel(comm, comm->minCompCap, comm->maxCompCap, graphs), ret, fail);
INFO(NCCL_INIT, "comm:%p, nRanks:%d, nNodes:%d, coll channels:%d collnet channels:%d, nvls channels:%d, p2p channels:%d, p2p channels per peer:%d", comm, comm->nRanks, comm->nNodes, comm->nChannels, comm->nChannels, comm->nvlsChannels, comm->p2pnChannels, comm->p2pnChannelsPerPeer);
if (comm->intraRank == 0) { // Load ncclParamLaunchMode
const char* str = ncclGetEnv("NCCL_LAUNCH_MODE");
enum ncclLaunchMode mode, modeOld;
if (str && strcasecmp(str, "GROUP") == 0) {
mode = ncclLaunchModeGroup;
} else {
mode = ncclLaunchModeParallel;
}
// In theory we could be racing with other communicators not associated with
// this one if the user is connecting to multiple ncclUniqueId's concurrently.
modeOld = __atomic_exchange_n(&ncclParamLaunchMode, mode, __ATOMIC_RELAXED);
if (modeOld == ncclLaunchModeInvalid && str && str[0]!='\0') {
INFO(NCCL_ENV, "NCCL_LAUNCH_MODE set by environment to %s", mode == ncclLaunchModeParallel ? "PARALLEL" : "GROUP");
}
}
comm->symmetricSupport = comm->isAllDirectP2p && comm->nNodes == 1 && ncclParamWinEnable() && ncclCuMemEnable();
comm->devrState.bigSize = 0;
comm->ceColl.baseUCSymReadyPtr = NULL;
comm->ceColl.baseUCSymComplPtr = NULL;
// Call devCommSetup before the last barrier, making sure we don't have a thread running in front and starting to
// launch NCCL kernels before all cuda mem allocation is complete. That could cause a deadlock.
NCCLCHECKGOTO(devCommSetup(comm), ret, fail);
timers[TIMER_INIT_CONNECT] = clockNano() - timers[TIMER_INIT_CONNECT];
if (mscclEnabled() && (comm->topo->mscclEnabled || mscclForceEnabled())) {
WARN("MSCCL is deprecated, please be careful with this feature!");
NCCLCHECK(mscclInit(comm));
mscclStatus& status = mscclGetStatus(comm);
status.needsProxy |= mscclNeedsProxy;
}
/* Local intra-node barrier */
NCCLCHECKGOTO(bootstrapIntraNodeBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0]), ret, fail);
// We should have allocated all buffers, collective fifos, ... we can
// restore the affinity.
TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
exit:
if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave);
/* If split resource is shared, we are not able to unlink the proxy ops pool here since the child comm can
* attach the proxy ops pool of parent at any time; otherwise, unlink it here to make sure the pool will be
* properly cleaned up. */
if (comm->sharedRes->owner == comm && !comm->shareResources && ret == ncclSuccess && !ncclCuMemEnable()) ncclProxyShmUnlink(comm);
free(allTopoRanks);
free(nodesTreePatterns);
free(nodesFirstRank);
free(allGather3Data);
free(rings);
free(nvbPeers);
free(pxnPeers);
return ret;
fail:
goto exit;
}
#ifdef USE_INDIRECT_FUNCTION_CALL
NCCL_PARAM(SetStackSize, "SET_STACK_SIZE", 1);
#else
NCCL_PARAM(SetStackSize, "SET_STACK_SIZE", 0);
#endif
RCCL_PARAM(StackSizeOverride, "STACK_SIZE_OVERRIDE", 0);
NCCL_PARAM(CGAClusterSize, "CGA_CLUSTER_SIZE", NCCL_CONFIG_UNDEF_INT);
// Match config max/minCTAs
NCCL_PARAM(MaxCTAs, "MAX_CTAS", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(MinCTAs, "MIN_CTAS", NCCL_CONFIG_UNDEF_INT);
#define NCCL_MAX_CGA_CLUSTER_SIZE 8
NCCL_PARAM(NChannelsPerNetPeer, "NCHANNELS_PER_NET_PEER", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(NvlinkUtilCentricSchedEnable, "NVLINK_UTIL_CENTRIC_SCHED_ENABLE", 0);
#define NCCL_COMMINIT_FUNCNAME_LEN 128
struct ncclCommInitRankAsyncJob {
struct ncclAsyncJob base;
struct ncclComm* comm;
struct ncclComm** newcomm;
int cudaDev;
// For ncclCommInitRank
int nranks, myrank, nId;
ncclUniqueId* commId;
// for ncclCommSplit
struct ncclComm* parent;
int color, key;
int splitCount;
// For Shrink
int* excludeRanksList;
int excludeRanksCount;
// name of the function calling
char funcName[NCCL_COMMINIT_FUNCNAME_LEN];
};
struct ncclCommFinalizeAsyncJob {
struct ncclAsyncJob base;
ncclComm_t comm;
};
NCCL_PARAM(CommSplitShareResources, "COMM_SPLIT_SHARE_RESOURCES", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(CommShrinkShareResources, "COMM_SHRINK_SHARE_RESOURCES", NCCL_CONFIG_UNDEF_INT);
typedef struct{
int key;
int color;
} commSplitInfo;
static ncclResult_t commGetSplitInfo(struct ncclComm* comm, struct ncclComm* parent, int color, int key, int* nRanksRet, int* myRankRet, int* parentRanksRet) {
int nRanks = 0, myRank = 0;
ncclResult_t ret = ncclSuccess;
commSplitInfo* info = NULL;
NCCLCHECKGOTO(ncclCalloc(&info, parent->nRanks), ret, fail);
// Compute nRanks, my rank and the ranks (of the original comm) before and after me
info[parent->rank].color = color;
info[parent->rank].key = key;
NCCLCHECKGOTO(bootstrapAllGather(parent->bootstrap, info, sizeof(commSplitInfo)), ret, fail);
// Negative color does not create a new comm. Return now.
if (color == NCCL_SPLIT_NOCOLOR) goto exit;
memset(parentRanksRet, 0xff, sizeof(int) * parent->nRanks);
for (int i = 0; i < parent->nRanks; i++) {
if (info[i].color != color) continue;
// Find where to insert this rank
int insert = 0;
while (insert < nRanks && info[parentRanksRet[insert]].key <= info[i].key) insert++;
// Shift ranks by one after insert
for (int r = nRanks; r > insert; r--) parentRanksRet[r] = parentRanksRet[r - 1];
// Insert our rank
parentRanksRet[insert] = i;
nRanks++;
}
for (int i = 0; i < nRanks; i++) {
if (parentRanksRet[i] == parent->rank) myRank = i;
}
*nRanksRet = nRanks;
*myRankRet = myRank;
exit:
free(info);
return ret;
fail:
goto exit;
}
static ncclResult_t getParentRanks(int parentRanks, int parentRank, int* excludeRanksList, int excludeRanksCount, int* nRanksRet, int* myRankRet, int* parentRanksRet) {
int count = 0, j = 0;
for (int i = 0; i < parentRanks; i++) {
// we assume excludeRanksList is sorted
if (j < excludeRanksCount && excludeRanksList[j] == i) {
j++;
continue;
}
if (i == parentRank) *myRankRet = count;
parentRanksRet[count++] = i;
}
*nRanksRet = parentRanks - excludeRanksCount;
return ncclSuccess;
}
static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
struct ncclCommInitRankAsyncJob* job = (struct ncclCommInitRankAsyncJob*)job_;
ncclComm_t comm = job->comm;
ncclResult_t res = ncclSuccess;
int archMajor, archMinor;
size_t maxLocalSizeBytes = 0;
int cudaDev = job->cudaDev;
int* parentRanks = NULL;
int cudaArch;
int maxSharedMem = 0;
double sum_timers = 0;
uint64_t timers[TIMERS_INIT_COUNT] = {0};
unsigned long long commIdHash;
char* archName;
int cuCount;
hipDeviceProp_t devProp;
#ifdef USE_INDIRECT_FUNCTION_CALL
int64_t stackSize;
#endif
timers[TIMER_INIT_TOTAL] = clockNano();
CUDACHECKGOTO(cudaSetDevice(cudaDev), res, fail);
CUDACHECKGOTO(cudaDeviceGetAttribute(&maxSharedMem, cudaDevAttrMaxSharedMemoryPerBlockOptin, cudaDev), res, fail);
CUDACHECKGOTO(cudaDeviceGetAttribute(&archMajor, cudaDevAttrComputeCapabilityMajor, cudaDev), res, fail);
CUDACHECKGOTO(cudaDeviceGetAttribute(&archMinor, cudaDevAttrComputeCapabilityMinor, cudaDev), res, fail);
cudaArch = 100*archMajor + 10*archMinor;
CUDACHECKGOTO(hipGetDeviceProperties(&devProp, cudaDev), res, fail);
cuCount = devProp.multiProcessorCount;
archName = strdup(devProp.gcnArchName);
if (!archName) {
res = ncclSystemError;
WARN("strdup failed for architecture name");
goto fail;
}
timers[TIMER_INIT_KERNELS] = clockNano();
NCCLCHECK(ncclInitKernelsForDevice(cudaArch, maxSharedMem, &maxLocalSizeBytes));
// Set the maximum kernel stack size of all kernels to avoid
// a CUDA memory reconfig on load (c.f. NVSHMEM issue)
#ifdef USE_INDIRECT_FUNCTION_CALL
if (ncclParamSetStackSize() == 1 && !IsArchMatch(archName,"gfx942") && !IsArchMatch(archName,"gfx950")) {
stackSize = rcclParamStackSizeOverride() ? rcclParamStackSizeOverride() : maxLocalSizeBytes;
if (stackSize == 0) {
if (IsArchMatch(archName,"gfx906"))
stackSize = 1024;
else
stackSize = 512;
}
INFO(NCCL_INIT, "Setting cudaLimitStackSize to %zi maxLocalSizeBytes %zi", stackSize, maxLocalSizeBytes);
CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, stackSize));
}
#endif
if (maxLocalSizeBytes > 0 && ncclParamSetStackSize() == 1) {
TRACE(NCCL_INIT, "Setting cudaLimitStackSize to %zu", maxLocalSizeBytes);
CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, maxLocalSizeBytes));
}
timers[TIMER_INIT_KERNELS] = clockNano() - timers[TIMER_INIT_KERNELS];
if (job->parent) {
NCCLCHECKGOTO(ncclCalloc(&parentRanks, job->parent->nRanks), res, fail);
if (job->excludeRanksCount) {
NCCLCHECKGOTO(getParentRanks(job->parent->nRanks, job->parent->rank, job->excludeRanksList, job->excludeRanksCount, &job->nranks, &job->myrank, parentRanks), res, fail);
} else {
NCCLCHECKGOTO(commGetSplitInfo(comm, job->parent, job->color, job->key, &job->nranks, &job->myrank, parentRanks), res, fail);
// Negative color does not create a new comm object. We needed to take part in the allgather, but we're done now.
if (job->color == NCCL_SPLIT_NOCOLOR) goto exit;
}
// child hash obtained from (parent hash, split count, color)
uint64_t hacc[2] = {1, 1};
eatHash(hacc, &job->parent->commHash);
eatHash(hacc, &job->splitCount);
eatHash(hacc, &job->color);
comm->commHash = digestHash(hacc);
timers[TIMER_INIT_ALLOC] = clockNano();
NCCLCHECKGOTO(commAlloc(comm, job->parent, job->nranks, job->myrank), res, fail);
timers[TIMER_INIT_ALLOC] = clockNano() - timers[TIMER_INIT_ALLOC];
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p splitCount %d color %d key %d- Init START", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->splitCount, job->color, job->key);
timers[TIMER_INIT_BOOTSTRAP] = clockNano();
NCCLCHECKGOTO(bootstrapSplit(comm->commHash, comm, job->parent, job->color, job->key, parentRanks), res, fail);
timers[TIMER_INIT_BOOTSTRAP] = clockNano() - timers[TIMER_INIT_BOOTSTRAP];
// debug info, no commId was used
commIdHash = 0;
} else {
// obtain a unique hash using the first commId
comm->commHash = commIdHash = getHash(job->commId->internal, NCCL_UNIQUE_ID_BYTES);
timers[TIMER_INIT_ALLOC] = clockNano();
NCCLCHECKGOTO(commAlloc(comm, NULL, job->nranks, job->myrank), res, fail);
timers[TIMER_INIT_ALLOC] = clockNano() - timers[TIMER_INIT_ALLOC];
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init START", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, commIdHash);
timers[TIMER_INIT_BOOTSTRAP] = clockNano();
NCCLCHECKGOTO(bootstrapInit(job->nId, (struct ncclBootstrapHandle*)job->commId, comm), res, fail);
timers[TIMER_INIT_BOOTSTRAP] = clockNano() - timers[TIMER_INIT_BOOTSTRAP];
}
comm->cudaArch = cudaArch;
comm->archName = archName;
comm->cuCount = cuCount;
NCCLCHECKGOTO(initTransportsRank(comm, job->parent, timers), res, fail);
// Check if using host uncached mem correctly
NCCLCHECK(checkHostUncacheMemSetting(comm));
// RCCL: determine and set unroll factor for comm
NCCLCHECK(commSetUnrollFactor(comm));
#ifdef ENABLE_ROCSHMEM
if (rcclParamRocshmemEnabled()) { // @TODO - This doesn't seem to disable when I set ROCSHMEM_ENABLE=0 on command line
INFO(NCCL_INIT,"Initializing rocSHMEM inside of RCCL");
int ret;
rocshmem::rocshmem_uniqueid_t rocshmemUniqueId;
rocshmem::rocshmem_init_attr_t rocshmemAttr;
if(comm->rank == 0 ) {
ret = rocshmem::rocshmem_get_uniqueid (&rocshmemUniqueId);
if (ret != rocshmem::ROCSHMEM_SUCCESS) {
ERROR("Error in rocshmem_get_uniqueid, Rocshmem cannot be initialized.");
return ncclSystemError;
}
}
NCCLCHECKGOTO(bootstrapBroadcast(comm->bootstrap, comm->rank, comm->nRanks, 0, &rocshmemUniqueId,
sizeof(rocshmemUniqueId)), res, fail);
ret = rocshmem::rocshmem_set_attr_uniqueid_args(job->myrank, job->nranks, &rocshmemUniqueId, &rocshmemAttr);
if (ret != rocshmem::ROCSHMEM_SUCCESS) {
ERROR("Error in rocshmem_set_attr_uniqueid_args, Rocshmem cannot be initialized.");
return ncclSystemError;
}
ret = rocshmem::rocshmem_init_attr(rocshmem::ROCSHMEM_INIT_WITH_UNIQUEID, &rocshmemAttr);
if (ret != rocshmem::ROCSHMEM_SUCCESS) {
ERROR("Error in rocshmem_init_attr, Rocshmem cannot be initialized.");
return ncclSystemError;
}
comm->sourceRshmem = (void**) malloc(NUM_SYM_BUF * sizeof(void *));
comm->destRshmem = (void**) malloc(NUM_SYM_BUF * sizeof(void *));
for (int i = 0; i < NUM_SYM_BUF; i++) {
comm->sourceRshmem[i] = (void *)rocshmem::rocshmem_malloc((size_t)(1*1024*1024));
comm->destRshmem[i] = (void *)rocshmem::rocshmem_malloc((size_t)(1*1024*1024));
}
comm->enableRocshmem = rcclParamRocshmemEnabled();
comm->rocshmemThreshold = rcclParamRocshmemThreshold();
comm->numSymBuf = NUM_SYM_BUF;
comm->symId = 0;
//rocshmem::rocshmem_team_t team_reduce_world_dup;
comm->team_reduce_world_dup = rocshmem::ROCSHMEM_TEAM_INVALID;
rocshmem::rocshmem_team_split_strided(rocshmem::ROCSHMEM_TEAM_WORLD, 0, 1, job->nranks, nullptr, 0,
&(comm->team_reduce_world_dup));
ncclCommToRshmemTeam[comm] = comm->team_reduce_world_dup;
CUDACHECK(hipDeviceSynchronize());
}
#endif
// Allocate Temp Buffer for Direct Reduce Scatter
if (IsArchMatch(archName,"gfx950")) {
NCCLCHECK(ncclCudaMalloc(&(comm->tempBuff), TEMP_BUFF_SIZE));
}
#ifdef ENABLE_MSCCLPP
if (job->parent) {
if (job->parent->mscclppCompatible) {
INFO(NCCL_INIT, "MSCCL++: Splitting a compatible communicator; using parent mscclpp_comm");
comm->mscclppCompatible = true;
comm->mscclpp_threshold = job->parent->mscclpp_threshold;
comm->mscclpp_comm = job->parent->mscclpp_comm;
const ncclUniqueId& parentUniqueId = ncclCommToUniqueIdMap[job->parent];
auto& mscclppUniqueId = mscclpp_uniqueIdMap[parentUniqueId];
mscclpp_uniqueIdReverseMap[mscclppUniqueId].insert(parentUniqueId);
ncclCommToUniqueIdMap[comm] = parentUniqueId;
}
}
else
#endif
if (rcclParamMscclppEnabled()) {
#ifdef ENABLE_MSCCLPP
if (mscclEnabled() && (comm->topo->mscclEnabled || mscclForceEnabled()) && mscclppCommCompatible(comm)) {
comm->mscclppCompatible = IsArchMatch(archName, "gfx942") || IsArchMatch(archName, "gfx950");
if (comm->mscclppCompatible) {
bool mapContainsId = (mscclpp_uniqueIdMap.count(*job->commId) > 0);
auto& mscclppUniqueId = mscclpp_uniqueIdMap[*job->commId];
if (comm->localRank == 0 && !mapContainsId) {
NCCLCHECKGOTO(mscclpp_ncclGetUniqueId(&mscclppUniqueId), res, fail);
TRACE_CALL("mscclpp_ncclGetUniqueId(0x%llx)", (unsigned long long)getHash(mscclppUniqueId.internal, NCCL_UNIQUE_ID_BYTES));
}
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, &mscclppUniqueId, sizeof(mscclppUniqueId)), res, fail);
unsigned long long mscclppUniqueIdHash; (void)mscclppUniqueIdHash;
TRACE_CALL("bootstrapIntraNodeBroadcast(rank=%d, nranks=%d, root=%d, bcastData=hash:0x%llx)", comm->localRank, comm->localRanks, 0, (mscclppUniqueIdHash = (unsigned long long)getHash(mscclppUniqueId.internal, NCCL_UNIQUE_ID_BYTES)));
mscclpp_uniqueIdReverseMap[mscclppUniqueId].insert(*job->commId);
comm->mscclpp_threshold = rcclParamMscclppThreshold();
INFO(NCCL_INIT, "MSCCL++: Enabled! Msg size threshold=%zu", comm->mscclpp_threshold);
NCCLCHECKGOTO(mscclpp_ncclCommInitRank(&(comm->mscclpp_comm), job->nranks, mscclppUniqueId, job->myrank), res, fail);
TRACE_CALL("mscclpp_ncclCommInitRank (*comm=%p, nranks=%d, commId=hash:0x%llx, myrank=%d)", comm->mscclpp_comm, job->nranks, mscclppUniqueIdHash, job->myrank);
mscclpp_commToUniqueIdMap[comm->mscclpp_comm] = mscclppUniqueId;
ncclCommToUniqueIdMap[comm] = *job->commId;
if (rcclParamMscclppForceEnabled()) {
comm->mscclppForceEnable = true;
} else {
comm->mscclppForceEnable = false;
}
} else {
WARN("MSCCL++: Cannot enable MSCCL++ on %s architecture", devProp.gcnArchName);
}
} else {
comm->mscclppCompatible = false;
WARN("MSCCL++: Cannot enable MSCCL++; environment is not MSCCL compatible");
}
#else
WARN("MSCCL++: Feature not enabled. ENABLE_MSCCLPP must be defined at compile-time to enable this feature.");
#endif
}
NCCLCHECKGOTO(latency_profiler::collTraceInit(comm), res, fail);
// update communicator state
comm->initState = ncclSuccess;
timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL];
// Trace this call for replay tool
if (job->parent) {
/* unlink child abort flag. */
__atomic_store_n(&job->parent->childAbortFlag, NULL, __ATOMIC_RELEASE);
TRACE_CALL("ncclCommSplit(%p, %d, %d, %p, %d, %d)", job->parent, job->color, job->key, comm, comm->rank, comm->nRanks);
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p splitCount %d color %d key %d - Init COMPLETE", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->splitCount, job->color, job->key);
} else {
// the name for the replay tool is ncclCommInitRank for all the variations
TRACE_CALL("ncclCommInitRank(%p, %d, 0x%llx, %d, %d)", comm, comm->nRanks, commIdHash, comm->rank, comm->cudaDev);
INFO(NCCL_INIT, "%s comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init COMPLETE", job->funcName,
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, commIdHash);
}
sum_timers = 0.0;
for (int it = 1; it < TIMERS_INIT_COUNT; ++it)
sum_timers += (timers[it] / 1e9);
INFO(NCCL_INIT | NCCL_PROFILE,
"Init timings - %s: rank %d nranks %d total %.2f (kernels %.2f, alloc %.2f, bootstrap %.2f, allgathers %.2f, topo %.2f, graphs %.2f, "
"connections %.2f, rest %.2f)",
job->funcName, comm->rank, comm->nRanks,
timers[TIMER_INIT_TOTAL] / 1e9, timers[TIMER_INIT_KERNELS] / 1e9, timers[TIMER_INIT_ALLOC] / 1e9,
timers[TIMER_INIT_BOOTSTRAP] / 1e9, timers[TIMER_INIT_ALLGATHER] / 1e9, timers[TIMER_INIT_TOPO] / 1e9,
timers[TIMER_INIT_GRAPHS] / 1e9, timers[TIMER_INIT_CONNECT] / 1e9, timers[TIMER_INIT_TOTAL] / 1e9 - sum_timers);
exit:
if (job->newcomm) {
/* assign it to user pointer. */
__atomic_store_n(job->newcomm, comm, __ATOMIC_RELEASE);
}
free(parentRanks);
return res;
fail:
comm->initState = res;
goto exit;
}
#define NCCL_CONFIG_DEFAULT(config, field, undef, defvalue, fieldStr, format) \
if (config->field == undef) { \
config->field = defvalue; \
} else { \
INFO(NCCL_ENV, "Comm config " fieldStr " set to " format, config->field); \
}
static ncclResult_t envConfigOverride(ncclComm_t comm) {
ncclResult_t ret = ncclSuccess;
const char* tmpNetName = comm->config.netName;
const char* envNetName;
int blockingEnv;
int cgaClusterSizeEnv;
int minCTAsEnv;
int maxCTAsEnv;
int splitShareEnv;
const char* collnetEnableEnv;
int ctaPolicyEnv;
int shrinkShareEnv;
int nvlsCTAsEnv;
int nChannelsPerNetPeerEnv;
int nvlinkUtilCentricSchedEnableEnv;
/* override configuration with env variable. */
blockingEnv = ncclParamCommBlocking();
if (blockingEnv == 0 || blockingEnv == 1)
comm->config.blocking = blockingEnv;
cgaClusterSizeEnv = ncclParamCGAClusterSize();
if (0 <= cgaClusterSizeEnv && cgaClusterSizeEnv <= NCCL_MAX_CGA_CLUSTER_SIZE) {
comm->config.cgaClusterSize = cgaClusterSizeEnv;
} else if (cgaClusterSizeEnv > NCCL_MAX_CGA_CLUSTER_SIZE) {
INFO(NCCL_ENV, "NCCL_CGA_CLUSTER_SIZE value %d is too big. Limiting value to %d.", cgaClusterSizeEnv, NCCL_MAX_CGA_CLUSTER_SIZE);
comm->config.cgaClusterSize = NCCL_MAX_CGA_CLUSTER_SIZE;
}
minCTAsEnv = ncclParamMinCTAs();
if (minCTAsEnv != NCCL_CONFIG_UNDEF_INT) {
if (minCTAsEnv <= 0)
INFO(NCCL_ENV, "NCCL_MIN_CTAS %d is too low, leaving it set at %d", minCTAsEnv, comm->config.minCTAs);
else
comm->config.minCTAs = minCTAsEnv;
}
maxCTAsEnv = ncclParamMaxCTAs();
if (maxCTAsEnv != NCCL_CONFIG_UNDEF_INT) {
if (maxCTAsEnv <= 0)
INFO(NCCL_ENV, "NCCL_MAX_CTAS %d is too low, leaving it set at %d", maxCTAsEnv, comm->config.maxCTAs);
else
comm->config.maxCTAs = maxCTAsEnv;
}
/* override configuration with env variable. */
nChannelsPerNetPeerEnv = ncclParamNChannelsPerNetPeer();
if (nChannelsPerNetPeerEnv != NCCL_CONFIG_UNDEF_INT) {
if (nChannelsPerNetPeerEnv <= 0)
INFO(NCCL_ENV, "NCCL_NCHANNELS_PER_NET_PEER %d is too low, leaving it set at %d", nChannelsPerNetPeerEnv, comm->config.nChannelsPerNetPeer);
else
comm->config.nChannelsPerNetPeer = nChannelsPerNetPeerEnv;
}
nvlinkUtilCentricSchedEnableEnv = ncclParamNvlinkUtilCentricSchedEnable();
if (nvlinkUtilCentricSchedEnableEnv != NCCL_CONFIG_UNDEF_INT) {
if (nvlinkUtilCentricSchedEnableEnv != 0 && nvlinkUtilCentricSchedEnableEnv != 1)
INFO(NCCL_ENV, "NCCL_NVLINK_UTIL_CENTRIC_SCHED_ENABLE %d is not valid, leaving it set at %d", nvlinkUtilCentricSchedEnableEnv, comm->config.nvlinkCentricSched);
else
comm->config.nvlinkCentricSched = nvlinkUtilCentricSchedEnableEnv;
}
envNetName = ncclGetEnv("NCCL_NET");
if (envNetName)
tmpNetName = envNetName;
if (tmpNetName != NULL) {
int netNameLen = strlen(tmpNetName) + 1;
comm->config.netName = (char*)malloc(netNameLen);
if (comm->config.netName == nullptr) {
WARN("Failed to allocate memory for network name");
return ncclSystemError;
}
memcpy((void*)comm->config.netName, tmpNetName, netNameLen);
} else {
comm->config.netName = NULL;
}
splitShareEnv = ncclParamCommSplitShareResources();
if (splitShareEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.splitShare = splitShareEnv;
}
shrinkShareEnv = ncclParamCommShrinkShareResources();
if (shrinkShareEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.shrinkShare = shrinkShareEnv;
}
// NCCL_COLLNET_ENABLE needs to be reloaded each time for comm init
// since users might change the env on the fly to enable/disable collnet
collnetEnableEnv = ncclGetEnv("NCCL_COLLNET_ENABLE");
if (collnetEnableEnv != NULL) {
int collnetEnableInt = (int)strtol(collnetEnableEnv, NULL, 0);
if (collnetEnableInt != NCCL_CONFIG_UNDEF_INT) {
comm->config.collnetEnable = collnetEnableInt;
INFO(NCCL_ENV, "NCCL_COLLNET_ENABLE set by environment to %d.", collnetEnableInt);
}
}
ctaPolicyEnv = ncclParamCtaPolicy();
if (ctaPolicyEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.CTAPolicy = ctaPolicyEnv;
}
nvlsCTAsEnv = ncclParamNvlsChannels();
if (nvlsCTAsEnv != NCCL_CONFIG_UNDEF_INT) {
comm->config.nvlsCTAs = nvlsCTAsEnv;
}
/* cap channels if needed */
if (comm->config.minCTAs > MAXCHANNELS) {
INFO(NCCL_ENV, "minCTAs %d is larger than #channels upper limit %d, cap it to %d", comm->config.minCTAs, MAXCHANNELS, MAXCHANNELS);
comm->config.minCTAs = MAXCHANNELS;
}
if (comm->config.maxCTAs > MAXCHANNELS) {
INFO(NCCL_ENV, "maxCTAs %d is larger than #channels upper limit %d, cap it to %d", comm->config.maxCTAs, MAXCHANNELS, MAXCHANNELS);
comm->config.maxCTAs = MAXCHANNELS;
}
if (comm->config.minCTAs > comm->config.maxCTAs) {
INFO(NCCL_ENV, "minCTAs %d is larger than maxCTAs %d, set both to %d", comm->config.minCTAs, comm->config.maxCTAs, comm->config.maxCTAs);
comm->config.minCTAs = comm->config.maxCTAs;
}
if (comm->config.splitShare != 1 && comm->config.splitShare != 0) {
INFO(NCCL_ENV, "splitShare %d is not a valid value 0/1, set it to 0", comm->config.splitShare);
comm->config.splitShare = 0;
}
if (comm->config.collnetEnable != 1 && comm->config.collnetEnable != 0) {
INFO(NCCL_ENV, "collnetEnable %d is not a valid value 0/1, set it to 0", comm->config.collnetEnable);
comm->config.collnetEnable = 0;
}
if (comm->config.CTAPolicy < NCCL_CTA_POLICY_DEFAULT || comm->config.CTAPolicy > NCCL_CTA_POLICY_ZERO) {
INFO(NCCL_ENV, "CTAPolicy %d is not a valid value, set it to %d", comm->config.CTAPolicy, NCCL_CTA_POLICY_DEFAULT);
comm->config.CTAPolicy = NCCL_CTA_POLICY_DEFAULT;
}
if (comm->config.nvlsCTAs != NCCL_CONFIG_UNDEF_INT && comm->config.nvlsCTAs <= 0) {
INFO(NCCL_ENV, "nvlsCTAs %d is not a valid value, NCCL will decide the default value automatically", comm->config.nvlsCTAs);
comm->config.nvlsCTAs = NCCL_CONFIG_UNDEF_INT;
}
return ret;
}
static ncclResult_t copyCommConfig(ncclComm_t childComm, ncclComm_t parnet) {
memcpy(&childComm->config, &parnet->config, sizeof(ncclConfig_t));
NCCLCHECK(envConfigOverride(childComm));
return ncclSuccess;
}
static ncclResult_t parseCommConfig(ncclComm_t comm, ncclConfig_t *config) {
ncclResult_t ret = ncclSuccess;
/* config must not be NULL in this function */
ncclConfig_t defaultConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t *internalConfigPtr;
size_t realSize;
internalConfig.magic = 0;
internalConfigPtr = &internalConfig;
if (config) {
memcpy((void*)&realSize, (void*)config, sizeof(size_t));
realSize = realSize > sizeof(ncclConfig_t) ? sizeof(ncclConfig_t) : realSize;
memcpy((void*)internalConfigPtr, (void*)config, realSize);
if (internalConfigPtr->magic != 0xcafebeef) {
WARN("ncclConfig_t argument not initialized via NCCL_CONFIG_INITIALIZER");
ret = ncclInvalidArgument;
goto fail;
}
/* check version. */
if (internalConfigPtr->version < NCCL_VERSION(2, 14, 0)) {
internalConfigPtr->blocking = defaultConfig.blocking;
}
if (internalConfigPtr->version < NCCL_VERSION(2, 17, 0)) {
internalConfigPtr->cgaClusterSize = defaultConfig.cgaClusterSize;
internalConfigPtr->minCTAs = defaultConfig.minCTAs;
internalConfigPtr->maxCTAs = defaultConfig.maxCTAs;
internalConfigPtr->netName = defaultConfig.netName;
}
if (internalConfigPtr->version < NCCL_VERSION(2, 25, 0)) {
internalConfigPtr->trafficClass = defaultConfig.trafficClass;
}
if (internalConfigPtr->version < NCCL_VERSION(2, 27, 0)) {
internalConfigPtr->collnetEnable = defaultConfig.collnetEnable;
internalConfigPtr->CTAPolicy = defaultConfig.CTAPolicy;
internalConfigPtr->shrinkShare = defaultConfig.shrinkShare;
internalConfigPtr->nvlsCTAs = defaultConfig.nvlsCTAs;
}
if (internalConfigPtr->version < NCCL_VERSION(2, 28, 0)) {
internalConfigPtr->nChannelsPerNetPeer = defaultConfig.nChannelsPerNetPeer;
internalConfigPtr->nvlinkCentricSched = defaultConfig.nvlinkCentricSched;
}
}
/* check input config attributes, -1 means user-undefined and we should use default value from NCCL. */
if (internalConfigPtr->blocking != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->blocking != 0 && internalConfigPtr->blocking != 1) {
WARN("Invalid config blocking attribute value %d", internalConfigPtr->blocking);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->cgaClusterSize != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->cgaClusterSize < 0) {
WARN("Invalid config cgaClusterSize attribute value %d", internalConfigPtr->cgaClusterSize);
ret = ncclInvalidArgument;
goto fail;
}
if ((internalConfigPtr->minCTAs != NCCL_CONFIG_UNDEF_INT &&
internalConfigPtr->minCTAs <= 0) ||
(internalConfigPtr->maxCTAs != NCCL_CONFIG_UNDEF_INT &&
internalConfigPtr->maxCTAs <= 0) ||
(internalConfigPtr->minCTAs > internalConfigPtr->maxCTAs)) {
WARN("Invalid config min/max channels attribute value %d/%d", internalConfigPtr->minCTAs, internalConfigPtr->maxCTAs);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->splitShare != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->splitShare != 0 && internalConfigPtr->splitShare != 1) {
WARN("Invalid config splitShare attribute value %d", internalConfigPtr->splitShare);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->collnetEnable != NCCL_CONFIG_UNDEF_INT && (internalConfigPtr->collnetEnable < 0 || internalConfigPtr->collnetEnable > 1)) {
WARN("Invalid config collnetEnable attribute value %d", internalConfigPtr->collnetEnable);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->CTAPolicy != NCCL_CONFIG_UNDEF_INT && (internalConfigPtr->CTAPolicy < NCCL_CTA_POLICY_DEFAULT ||
internalConfigPtr->CTAPolicy > NCCL_CTA_POLICY_ZERO)) {
WARN("Invalid config policy attribute value %d", internalConfigPtr->CTAPolicy);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->shrinkShare != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->shrinkShare != 0 && internalConfigPtr->shrinkShare != 1) {
WARN("Invalid config shrinkShare attribute value %d", internalConfigPtr->shrinkShare);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->nvlsCTAs != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->nvlsCTAs <= 0) {
WARN("Invalid config nvlsCTAs attribute value %d", internalConfigPtr->nvlsCTAs);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->nChannelsPerNetPeer != NCCL_CONFIG_UNDEF_INT && (internalConfigPtr->nChannelsPerNetPeer <= 0 || internalConfigPtr->nChannelsPerNetPeer > MAXCHANNELS)) {
WARN("Invalid config nChannelsPerNetPeer attribute value %d", internalConfigPtr->nChannelsPerNetPeer);
ret = ncclInvalidArgument;
goto fail;
}
if (internalConfigPtr->nvlinkCentricSched != NCCL_CONFIG_UNDEF_INT && internalConfigPtr->nvlinkCentricSched != 0 && internalConfigPtr->nvlinkCentricSched != 1) {
WARN("Invalid config nvlinkCentricSched attribute value %d", internalConfigPtr->nvlinkCentricSched);
ret = ncclInvalidArgument;
goto fail;
}
/* default config value can be tuned on different platform. */
NCCL_CONFIG_DEFAULT(internalConfigPtr, blocking, NCCL_CONFIG_UNDEF_INT, 1, "Blocking", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, cgaClusterSize, NCCL_CONFIG_UNDEF_INT, 4, "CGA cluster size", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, minCTAs, NCCL_CONFIG_UNDEF_INT, 1, "Min CTAs", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, maxCTAs, NCCL_CONFIG_UNDEF_INT, MAXCHANNELS, "Max CTAs", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, netName, NCCL_CONFIG_UNDEF_PTR, NULL, "Net name", "%s");
NCCL_CONFIG_DEFAULT(internalConfigPtr, splitShare, NCCL_CONFIG_UNDEF_INT, 0, "Split share", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, trafficClass, NCCL_CONFIG_UNDEF_INT, NCCL_CONFIG_UNDEF_INT, "Traffic class", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, commName, NCCL_CONFIG_UNDEF_PTR, NULL, "Comm name", "%s");
NCCL_CONFIG_DEFAULT(internalConfigPtr, collnetEnable, NCCL_CONFIG_UNDEF_INT, 0, "Collnet enable", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, CTAPolicy, NCCL_CONFIG_UNDEF_INT, NCCL_CTA_POLICY_DEFAULT, "CTA policy flags", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, shrinkShare, NCCL_CONFIG_UNDEF_INT, 0, "shrinkShare", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, nvlsCTAs, NCCL_CONFIG_UNDEF_INT, NCCL_CONFIG_UNDEF_INT, "nvlsCTAs", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, nChannelsPerNetPeer, NCCL_CONFIG_UNDEF_INT,
NCCL_CONFIG_UNDEF_INT, "nChannelsPerNetPeer", "%d");
NCCL_CONFIG_DEFAULT(internalConfigPtr, nvlinkCentricSched, NCCL_CONFIG_UNDEF_INT, 0, "nvlinkCentricSched", "%d");
/* assign config to communicator */
comm->config.blocking = internalConfigPtr->blocking;
comm->config.cgaClusterSize = internalConfigPtr->cgaClusterSize;
comm->config.minCTAs = internalConfigPtr->minCTAs;
comm->config.maxCTAs = internalConfigPtr->maxCTAs;
comm->config.netName = internalConfigPtr->netName;
comm->config.splitShare = internalConfigPtr->splitShare;
comm->config.trafficClass = internalConfigPtr->trafficClass;
comm->config.commName = internalConfigPtr->commName;
comm->config.collnetEnable = internalConfigPtr->collnetEnable;
comm->config.CTAPolicy = internalConfigPtr->CTAPolicy;
comm->config.shrinkShare = internalConfigPtr->shrinkShare;
comm->config.nvlsCTAs = internalConfigPtr->nvlsCTAs;
comm->config.nChannelsPerNetPeer = internalConfigPtr->nChannelsPerNetPeer;
comm->config.nvlinkCentricSched = internalConfigPtr->nvlinkCentricSched;
NCCLCHECKGOTO(envConfigOverride(comm), ret, fail);
exit:
return ret;
fail:
goto exit;
}
static void ncclCommInitJobFree(void* _job) {
struct ncclCommInitRankAsyncJob* job = (struct ncclCommInitRankAsyncJob*)_job;
free(job->commId);
free(_job);
}
static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, int nId, ncclUniqueId* commId, int myrank, int cudaDev, ncclConfig_t *config, const char funcName[]) {
if (nId <= 0 || nId > nranks) {
WARN("improper usage of ncclCommInitRank: nId = %d, nranks=%d", nId, nranks);
return ncclInvalidArgument;
}
ncclResult_t res = ncclSuccess;
const char* commIdEnv = NULL;
ncclComm_t comm = NULL;
struct ncclCommInitRankAsyncJob* job = NULL;
bool launchedJob = false;
// first call ncclInit, this will setup the environment
NCCLCHECKGOTO(ncclInit(), res, fail);
if (ncclDebugLevel > NCCL_LOG_WARN || (ncclDebugLevel >= NCCL_LOG_VERSION && myrank == 0)) {
static std::once_flag once;
std::call_once(once, showVersion);
}
// Make sure the CUDA runtime is initialized.
CUDACHECKGOTO(cudaFree(NULL), res, fail);
NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank", "newcomm"), res, fail);
NCCLCHECKGOTO(PtrCheck(config, "CommInitRank", "config"), res, fail);
if (nranks < 1 || myrank < 0 || myrank >= nranks) {
WARN("Invalid rank requested : %d/%d", myrank, nranks);
res = ncclInvalidArgument;
goto fail;
}
NCCLCHECKGOTO(ncclCalloc(&comm, 1), res, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->abortFlag, 1), res, fail);
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->abortFlagDev, 1), res, fail);
NCCLCHECKGOTO(ncclCalloc(&comm->abortFlagRefCount, 1), res, fail);
comm->startMagic = comm->endMagic = NCCL_MAGIC; // Used to detect comm corruption.
*comm->abortFlagRefCount = 1;
NCCLCHECKGOTO(parseCommConfig(comm, config), res, fail);
/* start with ncclInProgress and will be changed to ncclSuccess if init succeeds. */
comm->initState = ncclInProgress;
*newcomm = comm;
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->nId = nId;
job->comm = comm;
job->nranks = nranks;
job->myrank = myrank;
job->cudaDev = cudaDev;
snprintf(job->funcName, NCCL_COMMINIT_FUNCNAME_LEN, "%s", funcName);
// need to copy the commIds to allow async commInit and to avoid alignement issues when casting from ncclUNiqueId and ncclBootstrapHandle
// ncclUniqueIds and ncclBootstrapHandle don't have the same alignment requirements.
// Therefore the array of Ids coming from the user might not be properly aligned to be cast into a ncclBootstrapHandle
// copying into allocated memory guarantees that the memory is properly aligned for any objects, removing that issue
NCCLCHECKGOTO(ncclCalloc(&job->commId, nId), res, fail);
memcpy(job->commId, commId, nId * NCCL_UNIQUE_ID_BYTES);
commIdEnv = ncclGetEnv("NCCL_COMM_ID");
if (commIdEnv && myrank == 0) {
INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", commIdEnv);
if (nId > 1) {
INFO(NCCL_INIT | NCCL_ENV, "NCCL_COMM_ID cannot be used with more than one ncclUniqueId");
job->nId = 1;
}
// start the bootstrap root before bootstrapping, use only the first handle
NCCLCHECKGOTO(bootstrapCreateRoot((struct ncclBootstrapHandle*)&job->commId[0], true), res, fail);
}
launchedJob = true;
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, NULL, ncclCommInitJobFree, comm), res, fail);
exit:
// for loggin only, not ready for replaying
// !recording at sink
NCCLCHECK(Recorder::instance().record(rrCommInitDev, nranks, myrank, commId, comm, cudaDev));
return ncclGroupErrCheck(res);
fail:
if (job && !launchedJob) ncclCommInitJobFree(job);
if (comm) {
free(comm->abortFlag);
if (comm->abortFlagDev) (void)ncclCudaHostFree((void*)comm->abortFlagDev);
free(comm->abortFlagRefCount);
free(comm);
}
if (newcomm) *newcomm = NULL;
goto exit;
}
NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank);
ncclResult_t ncclCommInitRank_impl(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
NCCLCHECK(Recorder::instance().record(rrCommInitRank, nranks, myrank, &commId));
NVTX3_RANGE(NcclNvtxParamsCommInitRank)
// Load the CUDA driver and dlsym hooks (can fail on old drivers)
rocmLibraryInit();
int cudaDev;
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
CUDACHECK(cudaGetDevice(&cudaDev));
NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, 1, &commId, myrank, cudaDev, &config, __func__));
NVTX3_RANGE_ADD_PAYLOAD(CommInitRank, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD((*newcomm)->commHash, nranks, myrank, cudaDev));
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommInitAll, ncclComm_t* comms, int ndev, const int* devlist);
ncclResult_t ncclCommInitAll_impl(ncclComm_t* comms, int ndev, const int* devlist) {
Recorder::instance().record(comms, ndev, devlist);
ncclResult_t ret = ncclSuccess;
int totalnDev;
int *gpuFlags = NULL;
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
int oldDev = 0;
NVTX3_RANGE(NcclNvtxParamsCommInitAll);
// Load the CUDA driver and dlsym hooks (can fail on old drivers)
rocmLibraryInit();
CUDACHECK(cudaGetDevice(&oldDev));
NCCLCHECKGOTO(PtrCheck(comms, "CommInitAll", "comms"), ret, fail);
if (ndev < 0) {
WARN("Invalid device count requested : %d", ndev);
ret = ncclInvalidArgument;
goto fail;
}
CUDACHECKGOTO(cudaGetDeviceCount(&totalnDev), ret, fail);
if (devlist) {
NCCLCHECKGOTO(ncclCalloc(&gpuFlags, totalnDev), ret, fail);
for (int i = 0; i < ndev; ++i) {
/* invalid device check. */
if (devlist[i] < 0 || devlist[i] >= totalnDev) {
WARN("Invalid device %d (totalnDev=%d)", devlist[i], totalnDev);
ret = ncclInvalidArgument;
goto fail;
}
/* duplicate device check. */
if (gpuFlags[devlist[i]] != 0) {
ret = ncclInvalidUsage;
goto fail;
}
gpuFlags[devlist[i]] = 1;
}
free(gpuFlags);
gpuFlags = nullptr;
}
ncclUniqueId uniqueId;
NCCLCHECKGOTO(ncclGetUniqueId(&uniqueId), ret, fail);
NCCLCHECKGOTO(ncclGroupStartInternal(), ret, fail);
for (int i=0; i<ndev; i++) {
// Ignore return codes .. we need to call ncclGroupEnd to clean up anyway
int dev = devlist ? devlist[i] : i;
CUDACHECKGOTO(cudaSetDevice(dev), ret, fail);
ncclCommInitRankDev(comms+i, ndev,1, &uniqueId, i, dev, &config, __func__);
}
NCCLCHECKGOTO(ncclGroupEndInternal(), ret, fail);
NVTX3_RANGE_ADD_PAYLOAD(CommInitAll, NcclNvtxParamsCommInitAllSchema,
NVTX3_PAYLOAD(comms[0]->commHash, ndev));
exit:
(void)cudaSetDevice(oldDev);
free(gpuFlags);
return ret;
fail:
goto exit;
}
ncclResult_t ncclCommSetAsyncError(ncclComm_t comm, ncclResult_t nextState) {
if (nextState < 0 || nextState >= ncclNumResults || comm == NULL) {
WARN("ncclCommSetAsyncError: error comm %p sets state %d", comm, nextState);
return ncclInvalidArgument;
}
__atomic_store_n(&comm->asyncResult, nextState, __ATOMIC_RELEASE);
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommInitRankConfig, ncclComm_t* comm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config);
ncclResult_t ncclCommInitRankConfig_impl(ncclComm_t *newcomm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config) {
Recorder::instance().record(rrCommInitRankConfig, nranks, myrank, &commId, config);
int cudaDev;
ncclResult_t ret = ncclSuccess;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t *internalConfigPtr = NULL;
NVTX3_RANGE(NcclNvtxParamsCommInitRankConfig);
NCCLCHECK(ncclGroupStartInternal());
rocmLibraryInit();
CUDACHECK(cudaGetDevice(&cudaDev));
if (config == NULL)
internalConfigPtr = &internalConfig;
else
internalConfigPtr = config;
NCCLCHECKGOTO(ncclCommInitRankDev(newcomm, nranks, 1, &commId, myrank, cudaDev, internalConfigPtr, __func__), ret, fail);
exit:
ncclGroupErrCheck(ret);
NCCLCHECK(ncclGroupEndInternal());
if (newcomm && *newcomm) {
if (!(*newcomm)->config.blocking) {
(void) ncclCommGetAsyncError(*newcomm, &ret);
}
NVTX3_RANGE_ADD_PAYLOAD(CommInitRankConfig, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD((*newcomm)->commHash, nranks, myrank, cudaDev));
}
return ret;
fail:
if (newcomm && *newcomm && !(*newcomm)->config.blocking) (void) ncclCommSetAsyncError(*newcomm, ret);
goto exit;
}
NCCL_API(ncclResult_t, ncclCommInitRankScalable, ncclComm_t* newcomm, int nranks, int myrank, int nId, ncclUniqueId* commId, ncclConfig_t* config);
ncclResult_t ncclCommInitRankScalable(ncclComm_t* newcomm, int nranks, int myrank, int nId, ncclUniqueId* commId, ncclConfig_t* config) {
NVTX3_RANGE(NcclNvtxParamsCommInitRankScalable);
int cudaDev;
ncclResult_t ret = ncclSuccess;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
ncclConfig_t *internalConfigPtr = NULL;
NCCLCHECK(ncclGroupStartInternal());
rocmLibraryInit();
CUDACHECK(cudaGetDevice(&cudaDev));
if (config == NULL)
internalConfigPtr = &internalConfig;
else
internalConfigPtr = config;
NCCLCHECKGOTO(ncclCommInitRankDev(newcomm, nranks, nId, commId, myrank, cudaDev, internalConfigPtr, __func__), ret, fail);
exit:
ncclGroupErrCheck(ret);
NCCLCHECK(ncclGroupEndInternal());
if (newcomm && *newcomm) {
if (!(*newcomm)->config.blocking) {
(void) ncclCommGetAsyncError(*newcomm, &ret);
}
NVTX3_RANGE_ADD_PAYLOAD(CommInitRankScalable, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD((*newcomm)->commHash, nranks, myrank, cudaDev));
}
return ret;
fail:
if (newcomm && *newcomm && !(*newcomm)->config.blocking) (void) ncclCommSetAsyncError(*newcomm, ret);
goto exit;
}
static ncclResult_t commDestroySync(struct ncclAsyncJob* job_) {
struct ncclCommFinalizeAsyncJob* job = (struct ncclCommFinalizeAsyncJob*) job_;
ncclComm_t comm = job->comm;
ncclResult_t ret = ncclSuccess;
CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), ret, fail);
NCCLCHECKGOTO(latency_profiler::collTraceDestroy(comm), ret, fail);
TRACE(NCCL_INIT, "Destroying comm %p rank %d abortFlag %d asyncResult %d", comm, comm->rank, *comm->abortFlag, comm->asyncResult);
if (comm->initState == ncclSuccess) {
if ((ret = ncclStrongStreamSynchronize(&comm->sharedRes->hostStream)) != ncclSuccess) {
WARN("commDestroySync: comm %p rank %d sync hostStream error %d\n", comm, comm->rank, ret);
}
if ((ret = ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream)) != ncclSuccess) {
WARN("commDestroySync: comm %p rank %d sync deviceStream error %d\n", comm, comm->rank, ret);
}
NCCLCHECKGOTO(ncclCommPollEventCallbacks(comm, true), ret, fail);
NCCLCHECKGOTO(ncclCommPollCallbacks(comm, false), ret, fail);
// And keep polling until all graphs referencing us die.
while (comm->localPersistentRefs != 0) {
NCCLCHECKGOTO(ncclCommPollCallbacks(comm, /*waitSome=*/true), ret, fail);
}
while (!ncclIntruQueueEmpty(&comm->legacyRegCleanupQueue)) {
struct ncclCommCallback* cb = ncclIntruQueueDequeue(&comm->legacyRegCleanupQueue);
if (cb->fn(comm, cb) != ncclSuccess) {
WARN("Legacy IPC cleanup callback failed comm %p (rank = %d) cb %p", comm, comm->rank, cb);
}
}
}
if ((ret = ncclProxyStop(comm)) != ncclSuccess) {
WARN("ncclProxyStop: comm %p (rank = %d) destroys proxy resource error %d", comm, comm->rank, ret);
}
exit:
return ret;
fail:
goto exit;
}
static ncclResult_t commCleanup(ncclComm_t comm) {
bool mscclEnabledForTopo = comm->topo->mscclEnabled;
CUDACHECK(cudaSetDevice(comm->cudaDev));
if (comm->tuner != NULL) {
NCCLCHECK(comm->tuner->finalize(comm->tunerContext));
NCCLCHECK(ncclTunerPluginUnload(comm));
}
if (mscclEnabled() && (mscclEnabledForTopo || mscclForceEnabled())) {
NCCLCHECK(mscclTeardown(comm));
}
NCCLCHECK(commFree(comm));
#if defined(ENABLE_NPKIT)
// Dump NPKit events and shutdown
const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR");
if (npkitDumpDir == nullptr) {
npkitDumpDir = "./npkit_dump";
INFO(NCCL_INIT, "NPKIT_DUMP_DIR is not set, using default directory: %s", npkitDumpDir);
}
struct stat st;
if (stat(npkitDumpDir, &st) != 0) {
if (mkdir(npkitDumpDir, 0755) != 0) {
WARN("Failed to create NPKIT_DUMP_DIR directory: %s", npkitDumpDir);
}
}
NCCLCHECK(NpKit::Dump(npkitDumpDir));
NCCLCHECK(NpKit::Shutdown());
#endif
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommFinalize, ncclComm_t comm);
ncclResult_t ncclCommFinalize_impl(ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrCommFinalize, comm));
NVTX3_RANGE(NcclNvtxParamsCommFinalize);
ncclResult_t ret = ncclSuccess;
struct ncclCommFinalizeAsyncJob *job = NULL;
NCCLCHECK(ncclGroupStartInternal());
if (comm == NULL) goto exit;
/* wait comm ready before finalize. */
NCCLCHECKGOTO(ncclCommEnsureReady(comm), ret, fail);
/* prevent double finalize. */
if (comm->finalizeCalled) {
ret = ncclInvalidArgument;
goto fail;
}
comm->finalizeCalled = true;
/* launch async thread to finalize comm. */
NCCLCHECKGOTO(ncclCalloc(&job, 1), ret, fail);
job->comm = comm;
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, commDestroySync, NULL, free, comm), ret, fail);
exit:
ncclGroupErrCheck(ret);
NCCLCHECK(ncclGroupEndInternal());
if (comm) {
if (!comm->config.blocking) {
NCCLCHECK(ncclCommGetAsyncError(comm, &ret));
}
NVTX3_RANGE_ADD_PAYLOAD(CommFinalize, NcclNvtxParamsCommFinalizeSchema,
NVTX3_PAYLOAD(comm->commHash));
}
return ret;
fail:
if (comm && !comm->config.blocking) (void) ncclCommSetAsyncError(comm, ret);
goto exit;
}
static ncclResult_t commReclaim(struct ncclAsyncJob* job_) {
struct ncclCommFinalizeAsyncJob* job = (struct ncclCommFinalizeAsyncJob*) job_;
ncclComm_t comm = job->comm;
ncclResult_t ret = ncclSuccess;
if (comm->intraComm0 != NULL) {
int curRankCnt;
int curRank; /* Debug info */
int intraRanks = comm->intraRanks;
ncclComm_t intracomm0 = comm->intraComm0;
int *finalizeRankCnt = &intracomm0->finalizeRankCnt;
assert(intracomm0 != NULL && finalizeRankCnt != NULL);
curRankCnt = __atomic_add_fetch(finalizeRankCnt, 1, __ATOMIC_ACQ_REL);
if (curRankCnt == intraRanks) {
ncclComm_t curIntraComm;
ncclComm_t nextIntraComm = intracomm0;
/* this is the last call to ncclCommDestroy/Abort, we need to make sure all comms
* in the process have been finalized before we free local resources. */
while (nextIntraComm) {
curIntraComm = nextIntraComm;
curRank = curIntraComm->rank;
nextIntraComm = nextIntraComm->intraNext;
if (curIntraComm->finalizeCalled == false) {
struct ncclCommFinalizeAsyncJob job;
job.comm = curIntraComm;
/* every comm aborts, commDestroySync should not be blocked. */
if ((ret = commDestroySync((struct ncclAsyncJob*) &job)) != ncclSuccess)
WARN("commReclaim: comm %p (rank = %d) in commDestroySync, error %d", curIntraComm, curRank, ret);
}
}
/* free local resources. */
nextIntraComm = intracomm0;
while (nextIntraComm) {
curIntraComm = nextIntraComm;
curRank = curIntraComm->rank;
nextIntraComm = nextIntraComm->intraNext;
if ((ret = commCleanup(curIntraComm)) != ncclSuccess) {
// We pass a freed pointer, but we don't dereference; we merely print its value, so it's OK.
// coverity[pass_freed_arg]
WARN("commReclaim: cleanup comm %p rank %d failed in destroy/abort, error %d", curIntraComm, curRank, ret);
}
}
}
}
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommDestroy, ncclComm_t comm);
ncclResult_t ncclCommDestroy_impl(ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrCommDestroy, comm));
if (comm == NULL) {
NCCL_NVTX3_FUNC_RANGE;
return ncclSuccess;
}
INFO(NCCL_INIT, "Memory used = %ld", allocTracker[comm->cudaDev].totalAllocSize);
#ifdef ENABLE_MSCCLPP
if (comm->mscclppCompatible) {
auto& mscclppUniqueId = mscclpp_commToUniqueIdMap[comm->mscclpp_comm];
auto& uniqueIds = mscclpp_uniqueIdReverseMap[mscclppUniqueId];
auto& ncclUniqueId = ncclCommToUniqueIdMap[comm];
if (uniqueIds.find(ncclUniqueId) == uniqueIds.end()) {
WARN("MSCCL++: comm=%p not found in mscclpp_uniqueIdReverseMap for key=%p", comm, comm->mscclpp_comm);
}
uniqueIds.erase(ncclUniqueId);
if (uniqueIds.size() == 0) {
mscclpp_uniqueIdReverseMap.erase(mscclppUniqueId);
ncclResult_t res = mscclpp_ncclCommDestroy(comm->mscclpp_comm);
TRACE_CALL("mscclpp_ncclCommDestroy");
if (res != ncclSuccess) {
WARN("MSCCL++: mscclpp_ncclCommDestroy failed (%s)", ncclGetErrorString(res));
}
}
comm->mscclppCompatible = false;
comm->mscclpp_comm = nullptr;
}
#endif
#ifdef ENABLE_ROCSHMEM
if (comm->enableRocshmem) {
for (int i = 0; i < NUM_SYM_BUF; i++) {
rocshmem::rocshmem_free(comm->sourceRshmem[i]);
rocshmem::rocshmem_free(comm->destRshmem[i]);
}
free(comm->sourceRshmem);
free(comm->destRshmem);
//TODO: subcomm check
rocshmem::rocshmem_team_t team;
if (!ncclCommToRshmemTeam.empty()) {
team = ncclCommToRshmemTeam[comm];
rocshmem::rocshmem_team_destroy(team);
ncclCommToRshmemTeam.erase(comm);
}
if (ncclCommToRshmemTeam.empty()) {
rocshmem::rocshmem_finalize();
}
}
#endif
int rank = comm->rank, nranks = comm->nRanks, cudaDev = comm->cudaDev;
struct ncclCommFinalizeAsyncJob *job = NULL;
ncclResult_t res = ncclSuccess;
NVTX3_FUNC_WITH_PARAMS(CommDestroy, NcclNvtxParamsCommInitRank,
NVTX3_PAYLOAD(comm->commHash, nranks, rank, cudaDev));
TRACE(NCCL_INIT, "comm %p rank %d nRanks %d cudaDev %d busId %lx", comm, rank, nranks, cudaDev, comm->busId);
NCCLCHECK(ncclGroupStartInternal());
// Try and prevent a double free of the comm struct (user error)
if (comm->rank == -1 || comm->nRanks == -1 || comm->cudaDev == -1 || comm->busId == -1) {
WARN("comm %p has already been destroyed", comm);
return ncclInvalidArgument;
}
comm->destroyFlag = 1;
/* init thread must be joined before we destroy the comm. */
NCCLCHECK(ncclCommEnsureReady(comm));
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->comm = comm;
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, commReclaim, NULL, free, comm), res, fail);
exit:
ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
fail:
goto exit;
}
static ncclResult_t setCommAbortFlags(ncclComm_t comm, int value) {
// Set abort flags
if (comm->childAbortFlag != nullptr) {
__atomic_store_n(comm->childAbortFlag, value, __ATOMIC_RELEASE);
__atomic_store_n(comm->childAbortFlagDev, value, __ATOMIC_RELEASE);
}
__atomic_store_n(comm->abortFlag, value, __ATOMIC_RELEASE);
__atomic_store_n(comm->abortFlagDev, value, __ATOMIC_RELEASE);
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommAbort, ncclComm_t comm);
ncclResult_t ncclCommAbort_impl(ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrCommAbort, comm));
NVTX3_RANGE(NcclNvtxParamsCommAbort);
if (comm == NULL) {
return ncclSuccess;
}
INFO(NCCL_INIT, "comm %p rank %d nRanks %d cudaDev %d busId %lx - Abort START",
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->busId);
NCCLCHECK(ncclGroupStartInternal());
// Ask anything that might still be running on the device to quit
NCCLCHECK(setCommAbortFlags(comm,1));
comm->destroyFlag = 1;
/* init thread must be joined before we destroy the comm,
* and we should ignore the init error here. */
(void)ncclCommEnsureReady(comm);
// once the comm is ready, we can access ranks etc
int rank = comm->rank, nranks = comm->nRanks, cudaDev = comm->cudaDev;
struct ncclCommFinalizeAsyncJob *job = NULL;
ncclResult_t res = ncclSuccess;
NVTX3_RANGE_ADD_PAYLOAD(CommAbort, NcclNvtxParamsCommInitRankSchema,
NVTX3_PAYLOAD(comm->commHash, nranks, rank, cudaDev));
TRACE(NCCL_INIT, "comm %p rank %d nRanks %d cudaDev %d busId %lx", comm, rank, nranks, cudaDev, comm->busId);
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->comm = comm;
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, commReclaim, NULL, free, comm), res, fail);
exit:
ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
fail:
goto exit;
}
static void childCommCleanupJob(void* job) {
struct ncclCommInitRankAsyncJob* initJob = (struct ncclCommInitRankAsyncJob*)job;
if (initJob->excludeRanksList) free(initJob->excludeRanksList);
free(job);
}
// initializing a child communicator (for both split and shrink)
static ncclResult_t ncclCommInitChildComm(ncclComm_t comm, ncclComm_t* newcomm, bool isShrink, int flags, int color, int key, int* excludeRanksList, int excludeRanksCount,
ncclConfig_t* config, const char* caller) {
struct ncclCommInitRankAsyncJob *job = NULL;
struct ncclComm* childComm = NCCL_COMM_NULL;
ncclResult_t res = ncclSuccess;
int oldDev;
CUDACHECK(cudaGetDevice(&oldDev));
NCCLCHECKGOTO(CommCheck(comm, caller, "comm"), res, exit);
NCCLCHECKGOTO(PtrCheck(newcomm, caller, "newcomm"), res, exit);
if (isShrink) {
NCCLCHECKGOTO(PtrCheck(excludeRanksList, caller, "excludeRanksList"), res, exit);
NCCLCHECKGOTO(excludeRanksCount > 0 ? ncclSuccess : ncclInvalidArgument, res, exit);
// excludeRanksList may not be sorted, need to sort it
qsort(excludeRanksList, excludeRanksCount, sizeof(int), compareInts);
// ranks in excludeRanksList should not call into this function
NCCLCHECKGOTO(bsearch(&comm->rank, excludeRanksList, excludeRanksCount, sizeof(int), compareInts) ? ncclInvalidArgument : ncclSuccess, res, exit);
}
NCCLCHECKGOTO(ncclCommEnsureReady(comm), res, exit);
CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), res, exit);
/* *newcomm should be NCCL_COMM_NULL until comm split fully complete. */
*newcomm = NCCL_COMM_NULL;
if (!isShrink && color == NCCL_SPLIT_NOCOLOR) {
INFO(NCCL_INIT, "Rank %d has color with NCCL_SPLIT_NOCOLOR, not creating a new communicator", comm->rank);
} else {
NCCLCHECKGOTO(ncclCalloc(&childComm, 1), res, fail);
childComm->startMagic = childComm->endMagic = NCCL_MAGIC;
// Set the shareResource field, this is used throughout the init and must be reset every time.
// If we shrink, we only reuse resources if we shrink in the default mode
comm->shareResources = isShrink ? (!(flags & NCCL_SHRINK_ABORT) && comm->config.shrinkShare) : comm->config.splitShare;
if (comm->shareResources) {
childComm->abortFlag = comm->abortFlag;
childComm->abortFlagDev = comm->abortFlagDev;
childComm->abortFlagRefCount = comm->abortFlagRefCount;
comm->childAbortFlag = NULL;
ncclAtomicRefCountIncrement(comm->abortFlagRefCount);
} else {
NCCLCHECKGOTO(ncclCalloc(&childComm->abortFlag, 1), res, fail);
NCCLCHECKGOTO(ncclCudaHostCalloc(&childComm->abortFlagDev, 1), res, fail);
NCCLCHECKGOTO(ncclCalloc(&childComm->abortFlagRefCount, 1), res, fail);
/* temporarily used to abort everything during child comm init. */
comm->childAbortFlag = childComm->abortFlag;
comm->childAbortFlagDev = childComm->abortFlagDev;
*childComm->abortFlagRefCount = 1;
}
if (config == NULL) {
NCCLCHECKGOTO(copyCommConfig(childComm, comm), res, fail);
} else {
NCCLCHECKGOTO(parseCommConfig(childComm, config), res, fail);
}
/* start with ncclInternalError and will be changed to ncclSuccess if init succeeds. */
childComm->initState = ncclInternalError;
}
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
job->comm = childComm;
job->newcomm = newcomm;
job->parent = comm;
job->color = color;
job->key = key;
if (excludeRanksList) {
// need to copy the list of ranks to exclude because the job is async
job->excludeRanksCount = excludeRanksCount;
NCCLCHECKGOTO(ncclCalloc(&job->excludeRanksList, excludeRanksCount), res, fail);
memcpy(job->excludeRanksList, excludeRanksList, excludeRanksCount * sizeof(int));
} else {
// each split has to lead to a unique comm, so increment the splitCount
job->splitCount = ++comm->splitCount;
job->excludeRanksList = NULL;
}
job->cudaDev = comm->cudaDev;
snprintf(job->funcName, NCCL_COMMINIT_FUNCNAME_LEN, "%s", caller);
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, /*undo=*/NULL, /*destructor=*/childCommCleanupJob, comm), res, fail);
exit:
// for loggin only, not ready for replaying
// TODO: further integrate overloaded record header
// !recording at sink
Recorder::instance().record(rrCommSplit, color, key, (ncclUniqueId*)comm, config, *newcomm);
(void)cudaSetDevice(oldDev);
return res;
fail:
if (childComm) {
if (!comm->shareResources) {
if (childComm->abortFlag) free(childComm->abortFlag);
if (childComm->abortFlagDev) ncclCudaHostFree(childComm->abortFlagDev);
if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount);
}
free(childComm);
}
if (newcomm) *newcomm = NULL;
goto exit;
}
NCCL_API(ncclResult_t, ncclCommShrink, ncclComm_t comm, int* excludeRanksList, int excludeRanksCount, ncclComm_t* newcomm, ncclConfig_t* config, int shrinkFlags);
ncclResult_t ncclCommShrink_impl(ncclComm_t comm, int* excludeRanksList, int excludeRanksCount, ncclComm_t *newcomm, ncclConfig_t* config, int shrinkFlags) {
NVTX3_RANGE(NcclNvtxParamsCommShrink)
ncclResult_t res = ncclSuccess;
NCCLCHECK(ncclGroupStartInternal());
// Handle error mode by setting abort flags and waiting for kernels to complete and unset the flags to avoid bootstrap issues
if (shrinkFlags & NCCL_SHRINK_ABORT) {
NCCLCHECKGOTO(setCommAbortFlags(comm, 1), res, exit);
NCCLCHECKGOTO(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream), res, exit);
NCCLCHECKGOTO(setCommAbortFlags(comm, 0), res, exit);
}
NCCLCHECKGOTO(ncclCommInitChildComm(comm, newcomm, /*isShrink=*/true, shrinkFlags, /*color=*/0, /*key=*/comm->rank, excludeRanksList, excludeRanksCount, config, __func__), res, exit);
if (*newcomm) NVTX3_RANGE_ADD_PAYLOAD(CommShrink, NcclNvtxParamsCommShrinkSchema, NVTX3_PAYLOAD(comm->commHash, comm->nRanks, comm->rank, comm->cudaDev, excludeRanksCount));
exit:
(void)ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
}
NCCL_API(ncclResult_t, ncclCommSplit, ncclComm_t comm, int color, int key, ncclComm_t *newcomm, ncclConfig_t *config);
ncclResult_t ncclCommSplit_impl(ncclComm_t comm, int color, int key, ncclComm_t *newcomm, ncclConfig_t *config) {
NVTX3_RANGE(NcclNvtxParamsCommSplit)
ncclResult_t res = ncclSuccess;
NCCLCHECK(ncclGroupStartInternal());
NCCLCHECKGOTO(ncclCommInitChildComm(comm, newcomm, /*isShrink=*/false, /*shrink mode=*/NCCL_SHRINK_DEFAULT, color, key, NULL, 0, config, __func__), res, exit);
if (*newcomm)
NVTX3_RANGE_ADD_PAYLOAD(CommSplit, NcclNvtxParamsCommSplitSchema, NVTX3_PAYLOAD((*newcomm)->commHash, comm->commHash, comm->nRanks, comm->rank, comm->cudaDev, color, key));
exit:
(void)ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
return res;
}
NCCL_API(const char*, ncclGetErrorString, ncclResult_t code);
const char* ncclGetErrorString_impl(ncclResult_t code) {
Recorder::instance().record("GetErrorString");
switch (code) {
case ncclSuccess : return "no error";
case ncclUnhandledCudaError : return "unhandled cuda error (run with NCCL_DEBUG=INFO for details)";
case ncclSystemError : return "unhandled system error (run with NCCL_DEBUG=INFO for details)";
case ncclInternalError : return "internal error - please report this issue to the NCCL developers";
case ncclInvalidArgument : return "invalid argument (run with NCCL_DEBUG=WARN for details)";
case ncclInvalidUsage : return "invalid usage (run with NCCL_DEBUG=WARN for details)";
case ncclRemoteError : return "remote process exited or there was a network error";
case ncclInProgress : return "NCCL operation in progress";
default : return "unknown result code";
}
}
/* Returns a human-readable message of the last error that occurred.
* comm is currently unused and can be set to NULL
*/
NCCL_API(const char*, ncclGetLastError, const ncclComm_t comm);
const char* ncclGetLastError_impl(ncclComm_t comm) {
Recorder::instance().record("GetLastEror");
return ncclLastError;
}
NCCL_API(ncclResult_t, ncclCommGetAsyncError, ncclComm_t comm, ncclResult_t *asyncError);
ncclResult_t ncclCommGetAsyncError_impl(ncclComm_t comm, ncclResult_t *asyncError) {
Recorder::instance().record("GetAsyncError");
NCCLCHECK(CommCheck(comm, "ncclGetAsyncError", "comm"));
NCCLCHECK(PtrCheck(asyncError, "ncclGetAsyncError", "asyncError"));
*asyncError = __atomic_load_n(&comm->asyncResult, __ATOMIC_ACQUIRE);
if (*asyncError == ncclSuccess && comm->proxyState) *asyncError = __atomic_load_n(&comm->proxyState->asyncResult, __ATOMIC_ACQUIRE);
/* if there is linked group job, we should complete it. */
if (*asyncError == ncclSuccess && comm->groupJob) {
NCCLCHECK(ncclGroupJobComplete(comm->groupJob));
comm->groupJob = NULL;
}
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommCount, const ncclComm_t comm, int* count);
ncclResult_t ncclCommCount_impl(const ncclComm_t comm, int* count) {
Recorder::instance().record("CommCount");
NCCL_NVTX3_FUNC_RANGE;
NCCLCHECK(CommCheck(comm, "CommCount", "comm"));
NCCLCHECK(PtrCheck(count, "CommCount", "count"));
/* init thread must be joined before we access the attributes of comm. */
NCCLCHECK(ncclCommEnsureReady(comm));
*count = comm->nRanks;
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommCuDevice, const ncclComm_t comm, int* devid);
ncclResult_t ncclCommCuDevice_impl(const ncclComm_t comm, int* devid) {
Recorder::instance().record("CuDevice");
NCCL_NVTX3_FUNC_RANGE;
NCCLCHECK(CommCheck(comm, "CommCuDevice", "comm"));
NCCLCHECK(PtrCheck(devid, "CommCuDevice", "devid"));
NCCLCHECK(ncclCommEnsureReady(comm));
*devid = comm->cudaDev;
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclCommUserRank, const ncclComm_t comm, int* rank);
ncclResult_t ncclCommUserRank_impl(const ncclComm_t comm, int* rank) {
Recorder::instance().record("CommUserRank");
NCCL_NVTX3_FUNC_RANGE;
NCCLCHECK(CommCheck(comm, "CommUserRank", "comm"));
NCCLCHECK(PtrCheck(rank, "CommUserRank", "rank"));
NCCLCHECK(ncclCommEnsureReady(comm));
*rank = comm->rank;
return ncclSuccess;
}