From b16882a024990b9e53be3bcf3e15098981c62a66 Mon Sep 17 00:00:00 2001 From: Sylvain Jeaugey Date: Mon, 13 Nov 2023 10:26:55 -0800 Subject: [PATCH 1/2] 2.19.4-1 Split transport connect phase into multiple steps to avoid port exhaustion when connecting alltoall at large scale. Defaults to 128 peers per round. Fix memory leaks on CUDA graph capture. Fix alltoallv crash on self-sendrecv. Make topology detection more deterministic when PCI speeds are not available (fix issue #1020). Properly close shared memory in NVLS resources. Revert proxy detach after 5 seconds. Add option to print progress during transport connect. Add option to set NCCL_DEBUG to INFO on first WARN. [ROCm/rccl commit: 88d44d777f6970bdbf6610badcbd7e25a05380f0] --- projects/rccl/makefiles/version.mk | 2 +- projects/rccl/src/debug.cc | 3 + projects/rccl/src/enqueue.cc | 23 +++- projects/rccl/src/graph/xml.cc | 5 +- projects/rccl/src/include/comm.h | 4 +- projects/rccl/src/include/proxy.h | 7 +- projects/rccl/src/include/transport.h | 2 + projects/rccl/src/init.cc | 16 +-- projects/rccl/src/proxy.cc | 97 +++----------- projects/rccl/src/transport.cc | 179 ++++++++++++++++---------- projects/rccl/src/transport/nvls.cc | 19 +-- 11 files changed, 174 insertions(+), 183 deletions(-) diff --git a/projects/rccl/makefiles/version.mk b/projects/rccl/makefiles/version.mk index 5e32150b1c..b383eebe80 100644 --- a/projects/rccl/makefiles/version.mk +++ b/projects/rccl/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 19 -NCCL_PATCH := 3 +NCCL_PATCH := 4 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/projects/rccl/src/debug.cc b/projects/rccl/src/debug.cc index 21cec22faa..63b3e5bc08 100644 --- a/projects/rccl/src/debug.cc +++ b/projects/rccl/src/debug.cc @@ -139,6 +139,8 @@ void ncclDebugInit() { pthread_mutex_unlock(&ncclDebugLock); } +NCCL_PARAM(WarnSetDebugInfo, "WARN_ENABLE_DEBUG_INFO", 0); + /* Common logging function used by the INFO, WARN and TRACE macros * Also exported to the dynamically loadable Net transport modules so * they can share the debugging mechanisms and output files @@ -172,6 +174,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file if (level == NCCL_LOG_WARN) { len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ", hostname, pid, tid, cudaDev, filefunc, line); + if (ncclParamWarnSetDebugInfo()) ncclDebugLevel = NCCL_LOG_INFO; } else if (level == NCCL_LOG_INFO) { len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] NCCL INFO ", hostname, pid, tid, cudaDev); } else if (level == NCCL_LOG_TRACE && flags == NCCL_CALL) { diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index dbb9865bcf..ae56decd94 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -627,13 +627,12 @@ static ncclResult_t scheduleP2pTasksToPlan( while (nChannelsMax*nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2; } - bool fuseOk; + bool fuseOk = false; // We can perform 8 send/recv per round per CTA. Make sure we jump between fused blocks at node boundaries. while (tasks->nTasksP2p != 0) { for (int i=0; i < tasks->p2pOrderSteps; i++) { int sendPeer = sendOrder[i]; int recvPeer = recvOrder[i]; - if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false; struct ncclTaskP2p* send = sendPeer != -1 ? ncclIntruQueueHead(&peers[sendPeer].sendQueue) : NULL; struct ncclTaskP2p* recv = recvPeer != -1 ? ncclIntruQueueHead(&peers[recvPeer].recvQueue) : NULL; if (sendPeer == comm->rank) { @@ -669,6 +668,7 @@ static ncclResult_t scheduleP2pTasksToPlan( if (send) sendBytes -= send->chunk*sendChunkBytesMax; do { + if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false; ssize_t recvChunkBytes = std::min(recvBytes, recvChunkBytesMax); // -1 preserved ssize_t sendChunkBytes = std::min(sendBytes, sendChunkBytesMax); if (recvChunkBytes != 0) { @@ -879,6 +879,14 @@ static ncclResult_t reclaimPlan(struct ncclComm* comm, struct ncclCommCallback* if (plan->persistent) { comm->persistentRefs -= 1; NCCLCHECK(ncclCudaFree(plan->workHead)); + for (int c=0; c < plan->channelUbound; c++) { + struct ncclProxyOp* q = ncclIntruQueueHead(&plan->channels[c].proxyOpQueue); + while (q != nullptr) { + struct ncclProxyOp* q1 = q->enqNext; + ncclMemoryPoolFree(&plan->memPool_ncclProxyOp, q); + q = q1; + } + } while (!ncclIntruQueueEmpty(&plan->ipcMemQueue)) { struct ncclPointerList* q = ncclIntruQueueDequeue(&plan->ipcMemQueue); CUDACHECKIGNORE(cudaIpcCloseMemHandle(q->ptr)); @@ -1093,9 +1101,16 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) { if (!(plan->persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking)) { - // If this isn't being captured and there aren't any CUDA graphs alive - // then we don't need to do our proxyOp pushing on the host stream. + // We are not using the host stream for proxy ops and reclaimation submission. NCCLCHECK(hostStreamPlanTask(comm, plan)); + } else { + // We are using the host stream for proxy ops and reclaimation submission. + // Only plans with proxy ops have a callback pushed by ncclLaunchPrepare. + // Since non-persistent plans also require reclaimation, we have to do it + // here. + if (!plan->persistent && !plan->hasProxyOps) { + ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer); + } } return ncclSuccess; } diff --git a/projects/rccl/src/graph/xml.cc b/projects/rccl/src/graph/xml.cc index 47fda1f851..f06f65dfd6 100644 --- a/projects/rccl/src/graph/xml.cc +++ b/projects/rccl/src/graph/xml.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "core.h" #include "nvmlwrap.h" #include "xml.h" @@ -500,11 +501,11 @@ ncclResult_t ncclTopoGetXmlFromSys(struct ncclXmlNode* pciNode, struct ncclXml* if (index == -1) { if (path) { char deviceSpeedStr[MAX_STR_LEN]; - float deviceSpeed; + float deviceSpeed = FLT_MAX; NCCLCHECK(ncclTopoGetStrFromSys(path, "max_link_speed", deviceSpeedStr)); sscanf(deviceSpeedStr, "%f GT/s", &deviceSpeed); char portSpeedStr[MAX_STR_LEN]; - float portSpeed; + float portSpeed = FLT_MAX; NCCLCHECK(ncclTopoGetStrFromSys(path, "../max_link_speed", portSpeedStr)); sscanf(portSpeedStr, "%f GT/s", &portSpeed); NCCLCHECK(xmlSetAttr(pciNode, "link_speed", portSpeed < deviceSpeed ? portSpeedStr : deviceSpeedStr)); diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index bc5a9c5683..328ffef3b5 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -299,7 +299,7 @@ struct ncclComm { // Flag to ask NCCL kernels to abort volatile uint32_t *abortFlag; volatile uint32_t *childAbortFlag; - volatile uint32_t *abortFlagRefCount; + uint32_t *abortFlagRefCount; // Device side of the communicator (for cudaFree's) struct ncclDevComm* devComm; // actually = &ncclDevCommAndChannels::comm @@ -342,8 +342,6 @@ struct ncclComm { int nvlsRegSupport; /* sharable NVLS resource. */ struct ncclNvlsSharedRes* nvlsResources; - struct ncclShmemCollBuff nvlsShmem; - void *nvlsShmemHandle; ssize_t channelSize; // User requested work size (bytes) for channel partitions diff --git a/projects/rccl/src/include/proxy.h b/projects/rccl/src/include/proxy.h index daf3885829..8093c0ce63 100644 --- a/projects/rccl/src/include/proxy.h +++ b/projects/rccl/src/include/proxy.h @@ -194,7 +194,6 @@ struct ncclProxyRpcResponseHeader { }; struct ncclProxyState { - int internalRefCount; int refCount; int tpRank; int tpnRanks; @@ -209,11 +208,10 @@ struct ncclProxyState { ncclNet_t* ncclNet; ncclCollNet_t* ncclCollNet; volatile uint32_t* abortFlag; - volatile uint32_t* abortFlagRefCount; // Service thread pthread_t thread; struct ncclSocket* listenSock; - volatile int stop; + int stop; CUcontext cudaCtx; ncclResult_t asyncResult; @@ -294,6 +292,5 @@ ncclResult_t ncclProxyClientGetFdBlocking(struct ncclComm* comm, struct ncclProx ncclResult_t ncclProxyStop(struct ncclComm* comm); ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm); -ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState); -ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState); +ncclResult_t ncclProxyDestroy(struct ncclComm* comm); #endif diff --git a/projects/rccl/src/include/transport.h b/projects/rccl/src/include/transport.h index d0cd9747e2..27529df5e6 100644 --- a/projects/rccl/src/include/transport.h +++ b/projects/rccl/src/include/transport.h @@ -67,6 +67,8 @@ struct ncclNvlsSharedRes { char shareableHandle[NVLS_HANDLE_SIZE]; size_t ucGran; int nChannels; + struct ncclShmemCollBuff nvlsShmem; + void *nvlsShmemHandle; }; #endif /* CUDART_VERSION >= 12010 */ diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index c681f2afa8..e82e64e148 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -179,13 +179,7 @@ static ncclResult_t commFree(ncclComm_t comm) { * free all intra-process communicators; therefore, we only need to focus on local * resource cleanup in commFree(). */ if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) { - if (*comm->abortFlag == 0) { - /* regular thread join */ - pthread_join(comm->proxyState->thread, nullptr); - } else { - /* try to detach thread due to abort */ - ncclProxyTryDetach(comm->proxyState); - } + pthread_join(comm->proxyState->thread, nullptr); } delete[] comm->userRedOps; @@ -219,7 +213,7 @@ static ncclResult_t commFree(ncclComm_t comm) { free(comm->sharedRes->tpRankToLocalRank); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream)); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream)); - NCCLCHECK(ncclProxyDestroy(comm->sharedRes->proxyState)); + NCCLCHECK(ncclProxyDestroy(comm)); free(comm->sharedRes); } } @@ -237,7 +231,7 @@ static ncclResult_t commFree(ncclComm_t comm) { if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) { NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag)); - free((void*)comm->abortFlagRefCount); + free(comm->abortFlagRefCount); } free((void*)comm->config.netName); @@ -1645,7 +1639,7 @@ exit: fail: if (comm) { if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag); - if (comm->abortFlagRefCount) free((void*)comm->abortFlagRefCount); + if (comm->abortFlagRefCount) free(comm->abortFlagRefCount); free(comm); } if (newcomm) *newcomm = NULL; @@ -2086,7 +2080,7 @@ fail: if (childComm) { if (comm && !comm->config.splitShare) { if (childComm->abortFlag) ncclCudaHostFree((void*)childComm->abortFlag); - if (childComm->abortFlagRefCount) free((void*)childComm->abortFlagRefCount); + if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount); } free(childComm); } diff --git a/projects/rccl/src/proxy.cc b/projects/rccl/src/proxy.cc index 976b1d3ba5..db36a1573e 100644 --- a/projects/rccl/src/proxy.cc +++ b/projects/rccl/src/proxy.cc @@ -18,14 +18,6 @@ #include #include -#define PROGRESS_RUNNING 0 -#define PROGRESS_REQUEST_STOP 1 -#define PROGRESS_ABORT 2 -#define PROGRESS_COMPLETE 3 - -#define SERVICE_RUNNING 0 -#define SERVICE_COMPLETE 1 - enum { proxyRecv=0, proxySend=1 }; static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { @@ -720,13 +712,13 @@ static ncclResult_t ncclProxyGetPostedOps(struct ncclProxyState* proxyState, int if (state->active == NULL) { pthread_mutex_lock(&pool->mutex); - while (pool->nextOps == -1 && state->stop == PROGRESS_RUNNING) { + while (pool->nextOps == -1 && !state->stop) { struct ncclProxyArgs profArgs; // Only used for profiling purposes ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileSleep); pthread_cond_wait(&pool->cond, &pool->mutex); ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileWakeup); } - if (state->stop != PROGRESS_RUNNING) { // We might have been woken up to stop. + if (state->stop) { // We might have been woken up to stop. pthread_mutex_unlock(&pool->mutex); return ncclSuccess; } @@ -864,7 +856,7 @@ void* ncclProxyProgress(void *proxyState_) { * frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */ int proxyOpAppendCounter = 0; struct ncclProxyArgs profArgs; // Only used for profiling purposes - while (state->stop == PROGRESS_RUNNING || (state->stop == PROGRESS_REQUEST_STOP && state->active)) { + while ((state->stop == 0 || (state->stop == 1 && state->active)) && *proxyState->abortFlag == 0) { int idle = 1; ncclResult_t ret = progressOps(proxyState, state, state->active, &idle); if (ret != ncclSuccess) { @@ -878,7 +870,7 @@ void* ncclProxyProgress(void *proxyState_) { int added = 0; proxyOpAppendCounter = 0; TIME_START(3); - if (state->stop == PROGRESS_RUNNING) + if (state->stop == 0) ret = ncclProxyGetPostedOps(proxyState, &added); if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } if (ret != ncclSuccess) { @@ -891,9 +883,6 @@ void* ncclProxyProgress(void *proxyState_) { } lastIdle = idle; } - - /* progress serive thread should be waiting for me, I need to notify it. */ - __atomic_store_n(&state->stop, PROGRESS_COMPLETE, __ATOMIC_RELEASE); return NULL; } @@ -916,11 +905,7 @@ ncclResult_t ncclProxyStart(struct ncclComm* comm) { static ncclResult_t ncclProxyProgressCreate(struct ncclProxyState* proxyState) { struct ncclProxyProgressState* state = &proxyState->progressState; if (!state->thread) { - pthread_attr_t attr; - SYSCHECK(pthread_attr_init(&attr), "pthread_attr_init"); - SYSCHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED), "pthread_attr_setdetachstate"); - SYSCHECK(pthread_create(&state->thread, &attr, ncclProxyProgress, proxyState), "pthread_create"); - SYSCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy"); + pthread_create(&state->thread, NULL, ncclProxyProgress, proxyState); ncclSetThreadName(state->thread, "NCCL Progress%2d", proxyState->tpLocalnRanks); } return ncclSuccess; @@ -932,17 +917,10 @@ ncclResult_t ncclProxyProgressDestroy(struct ncclProxyState* proxyState) { // Request the proxy to stop and then wake it if (state->opsPool) { pthread_mutex_lock(&state->opsPool->mutex); - if (*proxyState->abortFlag == 0) - state->stop = PROGRESS_REQUEST_STOP; - else - state->stop = PROGRESS_ABORT; + state->stop = 1; pthread_cond_signal(&state->opsPool->cond); pthread_mutex_unlock(&state->opsPool->mutex); - /* progress thread is always detached, wait for it to exit. */ - uint64_t t0 = clockNano(); - while (__atomic_load_n(&state->stop, __ATOMIC_ACQUIRE) != PROGRESS_COMPLETE) { - if (clockNano() - t0 >= 1000) sched_yield(); - } + pthread_join(state->thread, NULL); } // Free off any memory allocated for the proxy arg pools @@ -1582,19 +1560,6 @@ void* ncclProxyService(void* _args) { ncclSocketClose(proxyState->listenSock); free(proxyState->listenSock); proxyOpsFree(proxyState); - - if (*proxyState->abortFlag) { - /* abort happened, need to notify main thread I am done. */ - __atomic_store_n(&proxyState->stop, SERVICE_COMPLETE, __ATOMIC_RELEASE); - } - - if (ncclAtomicRefCountDecrement(proxyState->abortFlagRefCount) == 0) { - ncclCudaHostFree((void *)proxyState->abortFlag); - free((void*)proxyState->abortFlagRefCount); - } - - /* proxy itself holds one internal ref count, needs to call ncclProxyDestroy */ - ncclProxyDestroy(proxyState); return NULL; } @@ -1603,8 +1568,6 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1)); comm->proxyState = comm->sharedRes->proxyState; comm->proxyState->refCount = 1; - /* ref count for communicator and proxy service thread. */ - comm->proxyState->internalRefCount = 2; comm->proxyState->listenSock = sock; comm->proxyState->peerAddresses = peerAddresses; // Seed the random number generator for UDS filename generation @@ -1627,8 +1590,6 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) { proxyState->tpLocalnRanks = comm->localRanks; proxyState->cudaDev = comm->cudaDev; proxyState->abortFlag = comm->abortFlag; - proxyState->abortFlagRefCount = comm->abortFlagRefCount; - ncclAtomicRefCountIncrement(comm->abortFlagRefCount); proxyState->p2pnChannels = comm->p2pnChannels; proxyState->p2pChunkSize = comm->p2pChunkSize; proxyState->nChannels = comm->nChannels; @@ -1686,41 +1647,15 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) { return ncclSuccess; } -ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState) { - if (__atomic_sub_fetch(&proxyState->internalRefCount, 1, __ATOMIC_ACQ_REL) == 0) { - free(proxyState->peerAddresses); - free(proxyState->peerSocks); - free(proxyState->proxyOps); - free(proxyState->sharedDevMems); - expectedProxyResponseFree(proxyState); - free(proxyState); - } - return ncclSuccess; -} +ncclResult_t ncclProxyDestroy(struct ncclComm* comm) { + struct ncclProxyState* sharedProxyState = comm->sharedRes->proxyState; -/* detach all proxy threads in case of abort */ -ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState) { - if (proxyState && proxyState->thread) { - /* proxy service thread can call cudaFreeHost to free pinned host mem, but - * it can cause a hang if main thread is issuing other cuda calls. To solution - * should be allocate/free pinned host mem using cuMem* driver API, this waiting - * 5 secs is just a workaround for now. */ - bool join = false; - struct timespec start, now; - clock_gettime(CLOCK_MONOTONIC, &start); - do { - clock_gettime(CLOCK_MONOTONIC, &now); - if (__atomic_load_n(&proxyState->stop, __ATOMIC_ACQUIRE) == SERVICE_COMPLETE) { - /* proxy thread is done, join it. */ - pthread_join(proxyState->thread, NULL); - join = true; - break; - } - } while(now.tv_sec - start.tv_sec < 5); - - if (join == false) { - pthread_detach(proxyState->thread); - } - } + assert(sharedProxyState->refCount == 0); + free(sharedProxyState->peerAddresses); + free(sharedProxyState->peerSocks); + free(sharedProxyState->proxyOps); + free(sharedProxyState->sharedDevMems); + expectedProxyResponseFree(sharedProxyState); + free(sharedProxyState); return ncclSuccess; } diff --git a/projects/rccl/src/transport.cc b/projects/rccl/src/transport.cc index c66a81ed7f..a465d6b5cf 100644 --- a/projects/rccl/src/transport.cc +++ b/projects/rccl/src/transport.cc @@ -65,13 +65,28 @@ void dumpData(struct ncclConnect* data, int ndata) { } } +NCCL_PARAM(ConnectRoundMaxPeers, "CONNECT_ROUND_MAX_PEERS", 128); +NCCL_PARAM(ReportConnectProgress, "REPORT_CONNECT_PROGRESS", 0); +#include + ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) { // Stream used during transport setup; need for P2P pre-connect + CUDA Graph ncclResult_t ret = ncclSuccess; int highestType = TRANSPORT_P2P; // track highest transport type - struct ncclConnect** data = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Store intermediate send/recvData structs for connect - struct ncclConnect** recvData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given recv connection within a channel - struct ncclConnect** sendData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given send connection within a channel + struct ncclConnect** data; // Store intermediate send/recvData structs for connect + struct ncclConnect** recvData; // Points to entries inside data for given recv connection within a channel + struct ncclConnect** sendData; // Points to entries inside data for given send connection within a channel + int done = 0; + + int maxPeers = ncclParamConnectRoundMaxPeers(); + NCCLCHECK(ncclCalloc(&data, maxPeers)); + NCCLCHECK(ncclCalloc(&recvData, maxPeers)); + NCCLCHECK(ncclCalloc(&sendData, maxPeers)); + + struct timeval timeStart, timeLast; + gettimeofday(&timeStart, NULL); + timeLast = timeStart; // struct copy + bool timeReported = false; NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail); // First time initialization @@ -87,23 +102,24 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* // The first N entries contain recvData, connection information for recv connections // The next M entries contain sendData, connection information for send connections // It's not guaranteed that each entry of data has the same number of total or send/recv specific connections - data[i] = (ncclConnect*) malloc(sizeof(ncclConnect) * 2*MAXCHANNELS); - recvData[i] = data[i]; + int p = i-(done+1); + if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS)); + recvData[p] = data[p]; int sendChannels = 0, recvChannels = 0; int type; TIME_START(0); for (int c=0; c(comm, graph, recvData[i]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); + NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); if (type > highestType) highestType = type; } } TIME_STOP(0); TIME_START(1); - sendData[i] = recvData[i]+recvChannels; + sendData[p] = recvData[p]+recvChannels; for (int c=0; c(comm, graph, sendData[i]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); + NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); if (type > highestType) highestType = type; } } @@ -112,70 +128,100 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_START(2); if (sendPeer == recvPeer) { if (recvChannels+sendChannels) { - NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); - NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); - sendData[i] = data[i]; - recvData[i] = data[i]+sendChannels; + NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); + NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); + sendData[p] = data[p]; + recvData[p] = data[p]+sendChannels; } } else { - if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); - if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); - if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); - if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); + if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail); + if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail); + if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail); + if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail); } TIME_STOP(2); + + if (i-done == maxPeers || i == comm->nRanks-1) { + // Loop until all channels with all ranks have been connected + bool allChannelsConnected; + allChannelsConnected = false; + while (!allChannelsConnected) { + allChannelsConnected = true; + for (int j=done+1; j<=i; j++) { + int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks; + int sendPeer = (comm->rank + j) % comm->nRanks; + uint64_t recvMask = comm->connectRecv[recvPeer]; + uint64_t sendMask = comm->connectSend[sendPeer]; + + int p = j-(done+1); + int sendDataOffset = 0; + int recvDataOffset = 0; + for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; + // This connector hasn't completed connection yet + if (conn->connected == 0) { + NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset++, 1, comm->rank, conn), ret, fail); + if (ret == ncclSuccess) { + conn->connected = 1; + /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + } else if (ret == ncclInProgress) { + allChannelsConnected = false; + } + } + } + TIME_STOP(3); + + // Start with recv channels + TIME_START(4); + if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; + // This connector hasn't completed connection yet + if (conn->connected == 0) { + NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[p] + recvDataOffset++, 1, comm->rank, conn), ret, fail); + if (ret == ncclSuccess) { + conn->connected = 1; + /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + } else if (ret == ncclInProgress) { + allChannelsConnected = false; + } + } + } + TIME_STOP(4); + } + if (sendMask || recvMask) { + free(data[p]); + data[p] = NULL; + } + } + if (ncclParamReportConnectProgress() && comm->rank == 0) { + struct timeval now; + gettimeofday(&now, NULL); + if (((now.tv_sec - timeLast.tv_sec)*1.0 + (now.tv_usec-timeLast.tv_usec)*1e-6) > 1) { + float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6; + float remaining = elapsed*(comm->nRanks-done)/done; + printf("%sP2p connect: %g%% Elapsed %d:%02d Remaining %d:%02d ", + timeReported ? "\r" : "", done*100.0/comm->nRanks, ((int)elapsed)/60, ((int)elapsed)%60, ((int)remaining)/60, ((int)remaining)%60); + fflush(stdout); + timeReported = true; + timeLast = now; // struct copy; + } + } + } + done = i; + } } - // Loop until all channels with all ranks have been connected - bool allChannelsConnected; - allChannelsConnected = false; - while (!allChannelsConnected) { - allChannelsConnected = true; - for (int i=1; inRanks; i++) { - int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; - int sendPeer = (comm->rank + i) % comm->nRanks; - uint64_t recvMask = comm->connectRecv[recvPeer]; - uint64_t sendMask = comm->connectSend[sendPeer]; - - int sendDataOffset = 0; - int recvDataOffset = 0; - for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; - // This connector hasn't completed connection yet - if (conn->connected == 0) { - NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail); - if (ret == ncclSuccess) { - conn->connected = 1; - /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); - } else if (ret == ncclInProgress) { - allChannelsConnected = false; - } - } - } - TIME_STOP(3); - - // Start with recv channels - TIME_START(4); - if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; - // This connector hasn't completed connection yet - if (conn->connected == 0) { - NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail); - if (ret == ncclSuccess) { - conn->connected = 1; - /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); - } else if (ret == ncclInProgress) { - allChannelsConnected = false; - } - } - } - TIME_STOP(4); - } - } + if (timeReported) { + struct timeval now; + gettimeofday(&now, NULL); + float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6; + printf("\rP2p connect done in %d:%02d \n", + ((int)elapsed)/60, ((int)elapsed)%60); + fflush(stdout); } /* We need to sync ranks here since some ranks might run too fast after connection setup @@ -205,7 +251,6 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* } } comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL; - free(data[i]); } free(data); diff --git a/projects/rccl/src/transport/nvls.cc b/projects/rccl/src/transport/nvls.cc index c9a3bbc289..4dfae51cfe 100644 --- a/projects/rccl/src/transport/nvls.cc +++ b/projects/rccl/src/transport/nvls.cc @@ -393,18 +393,18 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) { typeSize = sizeof(struct localRegData); if (comm->localRank == 0) { shmPath[0] = '\0'; - NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsShmemHandle), res, cleanup); + NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup); NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); } else { NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); - NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsShmemHandle), res, cleanup); + NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup); } /* need 2 pools and a shared counter for shmem-based collectives */ - comm->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; - comm->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsShmem.cnt[0] + sizeof(size_t)); - comm->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsShmem.ptr[0] + typeSize * comm->localRanks); - comm->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsShmem.cnt[1] + sizeof(size_t)); - comm->nvlsShmem.round = 0; + comm->nvlsResources->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; + comm->nvlsResources->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[0] + sizeof(size_t)); + comm->nvlsResources->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsResources->nvlsShmem.ptr[0] + typeSize * comm->localRanks); + comm->nvlsResources->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[1] + sizeof(size_t)); + comm->nvlsResources->nvlsShmem.round = 0; return res; @@ -418,6 +418,7 @@ ncclResult_t ncclNvlsFree(struct ncclComm* comm) { if (resources == NULL) return ncclSuccess; if (ncclAtomicRefCountDecrement(&resources->refCount) == 0) { + NCCLCHECK(ncclShmClose(resources->nvlsShmemHandle)); NCCLCHECK(nvlsGroupUnbind(comm, resources)); NCCLCHECK(nvlsGroupUnmapMem(comm, resources)); free(resources); @@ -476,7 +477,7 @@ ncclResult_t tryRegisterBuffer(struct ncclComm *comm, struct localRequestData *r /* get all buffer addresses */ NCCLCHECKGOTO(ncclCalloc(®Record->addrs, comm->localRanks), ret, fail); regRecord->addrs[comm->localRank] = regRecord->buff; - NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); + NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); /* enqueue record */ ncclIntruQueueEnqueue(&comm->regRecordQueue, regRecord); @@ -551,7 +552,7 @@ ncclResult_t ncclNvlsLocalRegisterBuffer(struct ncclComm *comm, const void *send regRequestHead = regRequestHead->next; } - NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); + NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); /* first check whether all local ranks find their registered buffer */ for (int i = 0; i < comm->localRanks; ++i) { From c78787e0899277a6264270cf47f164b9363b7d3c Mon Sep 17 00:00:00 2001 From: Alexander Grund Date: Tue, 14 Nov 2023 12:38:02 +0100 Subject: [PATCH 2/2] Fix use of CPUID overwriting registers in use. CPUID writes to EAX, EBX, ECX, and EDX so the inline-asm must state that. Otherwise currently in-use register might get overwritten which may cause all kinds of failures like segfaults or wrong results. Alternatively `__cpuid` can be used which avoids this and related issues. So do that as suggested in the GCC issue https://gcc.gnu.org/bugzilla/show_bug.cgi?id=112513 [ROCm/rccl commit: cece6415b043704932ae70bb215988cf6719a812] --- projects/rccl/src/graph/xml.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/projects/rccl/src/graph/xml.cc b/projects/rccl/src/graph/xml.cc index 47fda1f851..6147c74bce 100644 --- a/projects/rccl/src/graph/xml.cc +++ b/projects/rccl/src/graph/xml.cc @@ -12,6 +12,9 @@ #include "core.h" #include "nvmlwrap.h" #include "xml.h" +#if defined(__x86_64__) +#include +#endif /*******************/ /* XML File Parser */ @@ -412,7 +415,8 @@ ncclResult_t ncclTopoGetXmlFromCpu(struct ncclXmlNode* cpuNode, struct ncclXml* char vendor[12]; } cpuid0; - asm volatile("cpuid" : "=b" (cpuid0.ebx), "=c" (cpuid0.ecx), "=d" (cpuid0.edx) : "a" (0) : "memory"); + unsigned unused; + __cpuid(0, unused, cpuid0.ebx, cpuid0.ecx, cpuid0.edx); char vendor[13]; strncpy(vendor, cpuid0.vendor, 12); vendor[12] = '\0'; @@ -434,7 +438,8 @@ ncclResult_t ncclTopoGetXmlFromCpu(struct ncclXmlNode* cpuNode, struct ncclXml* }; uint32_t val; } cpuid1; - asm volatile("cpuid" : "=a" (cpuid1.val) : "a" (1) : "memory"); + unsigned unused; + __cpuid(1, cpuid1.val, unused, unused, unused); int familyId = cpuid1.familyId + (cpuid1.extFamilyId << 4); int modelId = cpuid1.modelId + (cpuid1.extModelId << 4); NCCLCHECK(xmlSetAttrInt(cpuNode, "familyid", familyId));