From f3d51667838f7542df8ea32ea4e144d812b3ed7c Mon Sep 17 00:00:00 2001 From: Sylvain Jeaugey Date: Thu, 2 Feb 2023 12:52:47 -0800 Subject: [PATCH] 2.16.5-1 Add support for 400Gbit NDR network adapters (CX7) Handle EINTR in socket poll() function Add NCCL_PROGRESS_APPENDOP_FREQ to control op append overhead Resource cleanup fixes Fix double free in case of init failure Fix crash in ncclCommAbort Revert AMD speed commit --- makefiles/version.mk | 2 +- src/debug.cc | 2 +- src/graph/search.cc | 15 ++++++++++----- src/graph/topo.cc | 3 --- src/graph/topo.h | 1 - src/init.cc | 29 ++++++----------------------- src/misc/param.cc | 2 +- src/misc/shmutils.cc | 2 +- src/misc/socket.cc | 13 ++++++++++--- src/proxy.cc | 32 +++++++++++++++++++++----------- src/transport/net.cc | 17 ++++++++++------- src/transport/net_ib.cc | 11 ++++++++++- 12 files changed, 71 insertions(+), 58 deletions(-) diff --git a/makefiles/version.mk b/makefiles/version.mk index f4a0d8d1df..e8e7b7a952 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 16 -NCCL_PATCH := 2 +NCCL_PATCH := 5 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/debug.cc b/src/debug.cc index e2d6f4725d..5955a6eb3f 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -15,7 +15,7 @@ static int pid = -1; static char hostname[1024]; thread_local int ncclDebugNoWarn = 0; char ncclLastError[1024] = ""; // Global string for the last error in human readable form -uint64_t ncclDebugMask = NCCL_INIT; // Default debug sub-system mask is INIT +uint64_t ncclDebugMask = NCCL_INIT|NCCL_ENV; // Default debug sub-system mask is INIT and ENV FILE *ncclDebugFile = stdout; pthread_mutex_t ncclDebugLock = PTHREAD_MUTEX_INITIALIZER; std::chrono::steady_clock::time_point ncclEpoch; diff --git a/src/graph/search.cc b/src/graph/search.cc index eb0b7dd419..534d401ae1 100644 --- a/src/graph/search.cc +++ b/src/graph/search.cc @@ -731,6 +731,11 @@ float speedArrayInter[] = { 48.0, 30.0, 28.0, 24.0, 22.0, 18.0, 15.0, 12.0, 10.0 #define NSPEEDSINTRA (sizeof(speedArrayIntra)/sizeof(float)) #define NSPEEDSINTER (sizeof(speedArrayInter)/sizeof(float)) +float sm90SpeedArrayIntra[] = { 66.0, 33.0, 24.0, 20.0, 15.0, 12.0, 6.0, 3.0 }; +float sm90SpeedArrayInter[] = { 48.0, 45.0, 30.0, 24.0, 15.0, 12.0, 6.0, 3.0, 2.4, 1.2, 0.24, 0.12 }; +#define NSPEEDSINTRA_SM90 (sizeof(sm90SpeedArrayIntra)/sizeof(float)) +#define NSPEEDSINTER_SM90 (sizeof(sm90SpeedArrayInter)/sizeof(float)) + NCCL_PARAM(CrossNic, "CROSS_NIC", 2); ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph) { @@ -771,11 +776,11 @@ ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph int nspeeds = 0; float* speedArray = NULL; if (system->nodes[NET].count == 0) { - nspeeds = NSPEEDSINTRA; - speedArray = speedArrayIntra; + nspeeds = ccMin >= 90 ? NSPEEDSINTRA_SM90 : NSPEEDSINTRA; + speedArray = ccMin >= 90 ? sm90SpeedArrayIntra : speedArrayIntra; } else { - nspeeds = NSPEEDSINTER; - speedArray = speedArrayInter; + nspeeds = ccMin >= 90 ? NSPEEDSINTER_SM90 : NSPEEDSINTER; + speedArray = ccMin >= 90 ? sm90SpeedArrayInter : speedArrayInter; } int pass = 1; int speedIndex = 0; @@ -890,7 +895,7 @@ done: graph->nChannels = 1; } - if (graph->bwIntra >= 25.0) { + if ((ccMin <= 80 && graph->bwIntra >= 25.0) || (ccMin <= 90 && graph->bwIntra >= 50.0)) { int dupChannels = std::min(graph->nChannels*2, graph->maxChannels); memcpy(graph->intra+graph->nChannels*ngpus, graph->intra, (dupChannels-graph->nChannels)*ngpus*sizeof(int)); memcpy(graph->inter+graph->nChannels*2,graph->inter, (dupChannels-graph->nChannels)*2*sizeof(int)); diff --git a/src/graph/topo.cc b/src/graph/topo.cc index d91aa634ff..9e4c978606 100644 --- a/src/graph/topo.cc +++ b/src/graph/topo.cc @@ -72,9 +72,6 @@ static ncclResult_t ncclTopoGetInterCpuBw(struct ncclTopoNode* cpu, float* bw) { if (cpu->cpu.arch == NCCL_TOPO_CPU_ARCH_X86 && cpu->cpu.vendor == NCCL_TOPO_CPU_VENDOR_INTEL) { *bw = cpu->cpu.model == NCCL_TOPO_CPU_TYPE_SKL ? SKL_QPI_BW : QPI_BW; } - if (cpu->cpu.arch == NCCL_TOPO_CPU_ARCH_X86 && cpu->cpu.vendor == NCCL_TOPO_CPU_VENDOR_AMD) { - *bw = AMD_BW; - } if (cpu->cpu.arch == NCCL_TOPO_CPU_ARCH_X86 && cpu->cpu.vendor == NCCL_TOPO_CPU_VENDOR_ZHAOXIN) { *bw = cpu->cpu.model == NCCL_TOPO_CPU_TYPE_YONGFENG ? YONGFENG_ZPI_BW : ZPI_BW; } diff --git a/src/graph/topo.h b/src/graph/topo.h index 1a1a04cdec..20a3e9d680 100644 --- a/src/graph/topo.h +++ b/src/graph/topo.h @@ -18,7 +18,6 @@ #define PCI_BW 12.0 // PCI Gen3 x16 #define QPI_BW 6.0 #define SKL_QPI_BW 9.0 -#define AMD_BW 16.0 #define ZPI_BW 6.0 #define YONGFENG_ZPI_BW 9.0 #define P9_BW 32.0 diff --git a/src/init.cc b/src/init.cc index 91a87937ba..6a5f3c3d56 100644 --- a/src/init.cc +++ b/src/init.cc @@ -498,7 +498,7 @@ static ncclResult_t computeBuffSizes(struct ncclComm* comm) { NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0); NCCL_PARAM(CollNetNodeThreshold, "COLLNET_NODE_THRESHOLD", 2); NCCL_PARAM(NvbPreconnect, "NVB_PRECONNECT", 1); -NCCL_PARAM(AllocP2pNetLLBuffers, "NCCL_ALLOC_P2P_NET_LL_BUFFERS", 0); +NCCL_PARAM(AllocP2pNetLLBuffers, "ALLOC_P2P_NET_LL_BUFFERS", 0); static ncclResult_t collNetTrySetup(ncclComm_t comm, struct ncclTopoGraph* collNetGraph) { ncclResult_t ret = ncclSuccess; @@ -1102,6 +1102,10 @@ static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) { // update communicator state comm->initState = ncclSuccess; + // Trace this call for replay tool + TRACE_CALL("ncclCommInitRank(%p, %d, 0x%llx, %d, %d)", + *newcomm, nranks, (unsigned long long)hashUniqueId(commId), myrank, (*newcomm)->cudaDev); + INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx commId 0x%llx - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId, (unsigned long long)hashUniqueId(commId)); exit: return res; @@ -1171,7 +1175,6 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUni exit: return ncclGroupErrCheck(res); fail: - if (job) free(job); if (comm) { if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag); free(comm); @@ -1396,7 +1399,6 @@ static ncclResult_t commFinalize(ncclComm_t comm, bool userCalled) { exit: return ncclGroupErrCheck(ret); fail: - if (job) free(job); goto exit; } @@ -1443,26 +1445,7 @@ static ncclResult_t commReclaim(ncclComm_t comm) { NCCLCHECKGOTO(commFinalize(comm, false), ret, fail); } - if (comm->intraComm0 == NULL) { - /* if init errors happen and comm->intraComm0 == NULL, no proxy connection is built up, and no finalize thread - * have been launched. Main thread can reclaim everything since no NCCL kernel was issued. */ - struct ncclCommFinalizeAsyncJob job; - - job.comm = comm; - curRank = comm->rank; - /* comm aborts, commDestroySync should not be blocked. */ - if ((ret = commDestroySync((struct ncclAsyncJob*) &job)) != ncclSuccess) { - WARN("commReclaim: comm %p (rank = %d) in abort, error %d", comm, curRank, ret); - } - - if ((ret = ncclProxyDestroy(comm)) != ncclSuccess) { - WARN("commReclaim: comm %p (rank = %d) destroys proxy resource error %d", comm, curRank, ret); - } - - if ((ret = commCleanup(comm)) != ncclSuccess) { - WARN("commReclaim: cleanup comm %p rank %d failed in destroy/abort, error %d", comm, curRank, ret); - } - } else { + if (comm->intraComm0 != NULL) { int curRankCnt; int intraRanks = comm->intraRanks; ncclComm_t intracomm0 = comm->intraComm0; diff --git a/src/misc/param.cc b/src/misc/param.cc index 48099b3b7e..bf7aa00871 100644 --- a/src/misc/param.cc +++ b/src/misc/param.cc @@ -72,7 +72,7 @@ void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int6 value = deftVal; INFO(NCCL_ALL,"Invalid value %s for %s, using default %lld.", str, env, (long long)deftVal); } else { - INFO(NCCL_ALL,"%s set by environment to %lld.", env, (long long)value); + INFO(NCCL_ENV,"%s set by environment to %lld.", env, (long long)value); } } __atomic_store_n(cache, value, __ATOMIC_RELAXED); diff --git a/src/misc/shmutils.cc b/src/misc/shmutils.cc index 9f17903d49..69f7b1bde0 100644 --- a/src/misc/shmutils.cc +++ b/src/misc/shmutils.cc @@ -54,7 +54,7 @@ ncclResult_t ncclShmOpen(char* shmPath, size_t shmSize, void** shmPtr, void** de const size_t realShmSize = shmSize + refSize; *handle = *shmPtr = NULL; /* assume shmPtr and handle always set correctly by users. */ - EQCHECKGOTO(tmphandle = (struct shmHandleInternal*)malloc(sizeof(struct shmHandleInternal)), NULL, ret, fail); + EQCHECKGOTO(tmphandle = (struct shmHandleInternal*)calloc(1, sizeof(struct shmHandleInternal)), NULL, ret, fail); if (create) { /* refcount > 0 means the caller tries to allocate a shared memory. This shared memory segment will have * refcount references; when the peer attaches, it should pass -1 to reduce one reference count. When it diff --git a/src/misc/socket.cc b/src/misc/socket.cc index e861480b1b..9f93e26044 100644 --- a/src/misc/socket.cc +++ b/src/misc/socket.cc @@ -490,11 +490,18 @@ static ncclResult_t socketPollConnect(struct ncclSocket* sock) { memset(&pfd, 0, sizeof(struct pollfd)); pfd.fd = sock->fd; pfd.events = POLLOUT; - SYSCHECK(ret = poll(&pfd, 1, timeout), "poll"); - if (ret == 0) return ncclSuccess; + ret = poll(&pfd, 1, timeout); + + if (ret == 0 || (ret < 0 && errno == EINTR)) { + return ncclSuccess; + } else if (ret < 0) { + WARN("socketPollConnect poll() failed with error %s", strerror(errno)); + return ncclRemoteError; + } else { + EQCHECK(ret == 1 && (pfd.revents & POLLOUT), 0); + } /* check socket status */ - EQCHECK(ret == 1 && (pfd.revents & POLLOUT), 0); SYSCHECK(getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, (void*)&ret, &rlen), "getsockopt"); if (ret == 0) { diff --git a/src/proxy.cc b/src/proxy.cc index 2103b7a469..cfaa266fc9 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -663,6 +663,7 @@ ncclResult_t ncclSetThreadContext(struct ncclComm* comm) { // Set to SIGUSR1 or SIGUSR2 to help debug proxy state during hangs NCCL_PARAM(ProxyDumpSignal, "PROXY_DUMP_SIGNAL", -1); +NCCL_PARAM(ProgressAppendOpFreq, "PROGRESS_APPENDOP_FREQ", 8); void* ncclProxyProgress(void *comm_) { struct ncclComm* comm = (struct ncclComm*)comm_; @@ -683,6 +684,12 @@ void* ncclProxyProgress(void *comm_) { nvtxNameOsThreadA(syscall(SYS_gettid), threadName); int lastIdle = 0; + /* Too frequent call of ncclProxyGetPostedOps() will result in perf regression for small message + * communication. proxyOpAppendCounter is a counter that helps us decide if we need to append proxy ops. + * After each progress, proxyOpAppendCounter will increase by 1 and compare with environment variable + * ncclParamProgressAppendOpFreq(). If they are equal, we will append proxy ops. This will decrease the + * frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */ + int proxyOpAppendCounter = 0; struct ncclProxyArgs profArgs; // Only used for profiling purposes while ((state->stop == false || (state->stop == true && state->active)) && *comm->abortFlag == 0) { int idle = 1; @@ -694,17 +701,20 @@ void* ncclProxyProgress(void *comm_) { } if (lastIdle == 0 && idle == 1) ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileIdle); if (lastIdle == 1 && idle == 0) ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileActive); - int added = 0; - TIME_START(3); - if (state->stop == false) - ret = ncclProxyGetPostedOps(comm, &added); - if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } - if (ret != ncclSuccess) { - (void) ncclCommSetAsyncError(comm, ret); - INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); - } - if (added == 0) { - sched_yield(); // No request progressed. Let others run. + if (idle || (++proxyOpAppendCounter == ncclParamProgressAppendOpFreq())) { + int added = 0; + proxyOpAppendCounter = 0; + TIME_START(3); + if (state->stop == false) + ret = ncclProxyGetPostedOps(comm, &added); + if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } + if (ret != ncclSuccess) { + (void) ncclCommSetAsyncError(comm, ret); + INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); + } + if (added == 0) { + sched_yield(); // No request progressed. Let others run. + } } lastIdle = idle; } diff --git a/src/transport/net.cc b/src/transport/net.cc index bdb2e2d1b4..b358ad6032 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -63,7 +63,8 @@ struct connectMapMem{ char shmPath[PATH_MAX]; cudaIpcMemHandle_t ipc; }; - ncclShmHandle_t handle; + ncclShmHandle_t attachHandle; + ncclShmHandle_t createHandle; }; struct connectMap { @@ -225,12 +226,12 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph } static ncclResult_t netMapShm(struct connectMapMem* mem) { - NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, (void**)&mem->gpuPtr, -1, &mem->handle)); + NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, (void**)&mem->gpuPtr, -1, &mem->attachHandle)); return ncclSuccess; } static ncclResult_t netCreateShm(struct connectMapMem* mem) { mem->shmPath[0] = '\0'; // Let ncclShmOpen create a tmp file - NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, NULL, 1, &mem->handle)); + NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, NULL, 1, &mem->createHandle)); return ncclSuccess; } @@ -339,17 +340,19 @@ static ncclResult_t sendFree(struct ncclConnector* send) { struct connectMap* map = (struct connectMap*)(send->transportResources); if (map) { if (map->sameProcess == 0) { - NCCLCHECK(ncclShmClose(map->mems[NCCL_NET_MAP_HOSTMEM].handle)); + NCCLCHECK(ncclShmClose(map->mems[NCCL_NET_MAP_HOSTMEM].attachHandle)); if (map->mems[NCCL_NET_MAP_DEVMEM].size) { CUDACHECK(cudaIpcCloseMemHandle(map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr)); } } + free(map); } return ncclSuccess; } static ncclResult_t recvFree(struct ncclConnector* recv) { + if (recv->transportResources) free(recv->transportResources); return ncclSuccess; } @@ -763,7 +766,7 @@ static ncclResult_t sendProxyFree(struct ncclProxyConnection* connection, struct if (resources->map.sameProcess) { NCCLCHECK(ncclCudaHostFree(mems[NCCL_NET_MAP_HOSTMEM].cpuPtr)); } else { - NCCLCHECK(ncclShmClose(mems[NCCL_NET_MAP_HOSTMEM].handle)); + NCCLCHECK(ncclShmClose(mems[NCCL_NET_MAP_HOSTMEM].createHandle)); } CUDACHECK(cudaFree(mems[NCCL_NET_MAP_DEVMEM].cpuPtr)); if (mems[NCCL_NET_MAP_GDCMEM].cpuPtr) NCCLCHECK(ncclGdrCudaFree(resources->gdrDesc)); @@ -781,7 +784,7 @@ static ncclResult_t sendProxyFree(struct ncclProxyConnection* connection, struct } } - if (connection->state == connSetupDone) free(resources); + if (resources) free(resources); return ncclSuccess; } @@ -816,7 +819,7 @@ static ncclResult_t recvProxyFree(struct ncclProxyConnection* connection, struct } } - if (connection->state == connSetupDone) free(resources); + if (resources) free(resources); return ncclSuccess; } diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 8818554206..064500571f 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -122,7 +122,16 @@ static ncclResult_t ncclIbGetPciPath(char* devName, char** path, int* realPort) } static int ibvWidths[] = { 1, 4, 8, 12, 2 }; -static int ibvSpeeds[] = { 2500, 5000, 10000, 10000, 14000, 25000, 50000 }; +static int ibvSpeeds[] = { + 2500, /* SDR */ + 5000, /* DDR */ + 10000, /* QDR */ + 10000, /* QDR */ + 14000, /* FDR */ + 25000, /* EDR */ + 50000, /* HDR */ + 100000 /* NDR */ }; + static int firstBitSet(int val, int max) { int i = 0; while (i