Add support for 400Gbit NDR network adapters (CX7)
Handle EINTR in socket poll() function
Add NCCL_PROGRESS_APPENDOP_FREQ to control op append overhead
Resource cleanup fixes
Fix double free in case of init failure
Fix crash in ncclCommAbort
Revert AMD speed commit
This commit is contained in:
Sylvain Jeaugey
2023-02-02 12:52:47 -08:00
orang tua 93840e7476
melakukan f3d5166783
12 mengubah file dengan 71 tambahan dan 58 penghapusan
+1 -1
Melihat File
@@ -1,6 +1,6 @@
##### version
NCCL_MAJOR := 2
NCCL_MINOR := 16
NCCL_PATCH := 2
NCCL_PATCH := 5
NCCL_SUFFIX :=
PKG_REVISION := 1
+1 -1
Melihat File
@@ -15,7 +15,7 @@ static int pid = -1;
static char hostname[1024];
thread_local int ncclDebugNoWarn = 0;
char ncclLastError[1024] = ""; // Global string for the last error in human readable form
uint64_t ncclDebugMask = NCCL_INIT; // Default debug sub-system mask is INIT
uint64_t ncclDebugMask = NCCL_INIT|NCCL_ENV; // Default debug sub-system mask is INIT and ENV
FILE *ncclDebugFile = stdout;
pthread_mutex_t ncclDebugLock = PTHREAD_MUTEX_INITIALIZER;
std::chrono::steady_clock::time_point ncclEpoch;
+10 -5
Melihat File
@@ -731,6 +731,11 @@ float speedArrayInter[] = { 48.0, 30.0, 28.0, 24.0, 22.0, 18.0, 15.0, 12.0, 10.0
#define NSPEEDSINTRA (sizeof(speedArrayIntra)/sizeof(float))
#define NSPEEDSINTER (sizeof(speedArrayInter)/sizeof(float))
float sm90SpeedArrayIntra[] = { 66.0, 33.0, 24.0, 20.0, 15.0, 12.0, 6.0, 3.0 };
float sm90SpeedArrayInter[] = { 48.0, 45.0, 30.0, 24.0, 15.0, 12.0, 6.0, 3.0, 2.4, 1.2, 0.24, 0.12 };
#define NSPEEDSINTRA_SM90 (sizeof(sm90SpeedArrayIntra)/sizeof(float))
#define NSPEEDSINTER_SM90 (sizeof(sm90SpeedArrayInter)/sizeof(float))
NCCL_PARAM(CrossNic, "CROSS_NIC", 2);
ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph) {
@@ -771,11 +776,11 @@ ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph
int nspeeds = 0;
float* speedArray = NULL;
if (system->nodes[NET].count == 0) {
nspeeds = NSPEEDSINTRA;
speedArray = speedArrayIntra;
nspeeds = ccMin >= 90 ? NSPEEDSINTRA_SM90 : NSPEEDSINTRA;
speedArray = ccMin >= 90 ? sm90SpeedArrayIntra : speedArrayIntra;
} else {
nspeeds = NSPEEDSINTER;
speedArray = speedArrayInter;
nspeeds = ccMin >= 90 ? NSPEEDSINTER_SM90 : NSPEEDSINTER;
speedArray = ccMin >= 90 ? sm90SpeedArrayInter : speedArrayInter;
}
int pass = 1;
int speedIndex = 0;
@@ -890,7 +895,7 @@ done:
graph->nChannels = 1;
}
if (graph->bwIntra >= 25.0) {
if ((ccMin <= 80 && graph->bwIntra >= 25.0) || (ccMin <= 90 && graph->bwIntra >= 50.0)) {
int dupChannels = std::min(graph->nChannels*2, graph->maxChannels);
memcpy(graph->intra+graph->nChannels*ngpus, graph->intra, (dupChannels-graph->nChannels)*ngpus*sizeof(int));
memcpy(graph->inter+graph->nChannels*2,graph->inter, (dupChannels-graph->nChannels)*2*sizeof(int));
-3
Melihat File
@@ -72,9 +72,6 @@ static ncclResult_t ncclTopoGetInterCpuBw(struct ncclTopoNode* cpu, float* bw) {
if (cpu->cpu.arch == NCCL_TOPO_CPU_ARCH_X86 && cpu->cpu.vendor == NCCL_TOPO_CPU_VENDOR_INTEL) {
*bw = cpu->cpu.model == NCCL_TOPO_CPU_TYPE_SKL ? SKL_QPI_BW : QPI_BW;
}
if (cpu->cpu.arch == NCCL_TOPO_CPU_ARCH_X86 && cpu->cpu.vendor == NCCL_TOPO_CPU_VENDOR_AMD) {
*bw = AMD_BW;
}
if (cpu->cpu.arch == NCCL_TOPO_CPU_ARCH_X86 && cpu->cpu.vendor == NCCL_TOPO_CPU_VENDOR_ZHAOXIN) {
*bw = cpu->cpu.model == NCCL_TOPO_CPU_TYPE_YONGFENG ? YONGFENG_ZPI_BW : ZPI_BW;
}
-1
Melihat File
@@ -18,7 +18,6 @@
#define PCI_BW 12.0 // PCI Gen3 x16
#define QPI_BW 6.0
#define SKL_QPI_BW 9.0
#define AMD_BW 16.0
#define ZPI_BW 6.0
#define YONGFENG_ZPI_BW 9.0
#define P9_BW 32.0
+6 -23
Melihat File
@@ -498,7 +498,7 @@ static ncclResult_t computeBuffSizes(struct ncclComm* comm) {
NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0);
NCCL_PARAM(CollNetNodeThreshold, "COLLNET_NODE_THRESHOLD", 2);
NCCL_PARAM(NvbPreconnect, "NVB_PRECONNECT", 1);
NCCL_PARAM(AllocP2pNetLLBuffers, "NCCL_ALLOC_P2P_NET_LL_BUFFERS", 0);
NCCL_PARAM(AllocP2pNetLLBuffers, "ALLOC_P2P_NET_LL_BUFFERS", 0);
static ncclResult_t collNetTrySetup(ncclComm_t comm, struct ncclTopoGraph* collNetGraph) {
ncclResult_t ret = ncclSuccess;
@@ -1102,6 +1102,10 @@ static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
// update communicator state
comm->initState = ncclSuccess;
// Trace this call for replay tool
TRACE_CALL("ncclCommInitRank(%p, %d, 0x%llx, %d, %d)",
*newcomm, nranks, (unsigned long long)hashUniqueId(commId), myrank, (*newcomm)->cudaDev);
INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx commId 0x%llx - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId, (unsigned long long)hashUniqueId(commId));
exit:
return res;
@@ -1171,7 +1175,6 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUni
exit:
return ncclGroupErrCheck(res);
fail:
if (job) free(job);
if (comm) {
if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag);
free(comm);
@@ -1396,7 +1399,6 @@ static ncclResult_t commFinalize(ncclComm_t comm, bool userCalled) {
exit:
return ncclGroupErrCheck(ret);
fail:
if (job) free(job);
goto exit;
}
@@ -1443,26 +1445,7 @@ static ncclResult_t commReclaim(ncclComm_t comm) {
NCCLCHECKGOTO(commFinalize(comm, false), ret, fail);
}
if (comm->intraComm0 == NULL) {
/* if init errors happen and comm->intraComm0 == NULL, no proxy connection is built up, and no finalize thread
* have been launched. Main thread can reclaim everything since no NCCL kernel was issued. */
struct ncclCommFinalizeAsyncJob job;
job.comm = comm;
curRank = comm->rank;
/* comm aborts, commDestroySync should not be blocked. */
if ((ret = commDestroySync((struct ncclAsyncJob*) &job)) != ncclSuccess) {
WARN("commReclaim: comm %p (rank = %d) in abort, error %d", comm, curRank, ret);
}
if ((ret = ncclProxyDestroy(comm)) != ncclSuccess) {
WARN("commReclaim: comm %p (rank = %d) destroys proxy resource error %d", comm, curRank, ret);
}
if ((ret = commCleanup(comm)) != ncclSuccess) {
WARN("commReclaim: cleanup comm %p rank %d failed in destroy/abort, error %d", comm, curRank, ret);
}
} else {
if (comm->intraComm0 != NULL) {
int curRankCnt;
int intraRanks = comm->intraRanks;
ncclComm_t intracomm0 = comm->intraComm0;
+1 -1
Melihat File
@@ -72,7 +72,7 @@ void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int6
value = deftVal;
INFO(NCCL_ALL,"Invalid value %s for %s, using default %lld.", str, env, (long long)deftVal);
} else {
INFO(NCCL_ALL,"%s set by environment to %lld.", env, (long long)value);
INFO(NCCL_ENV,"%s set by environment to %lld.", env, (long long)value);
}
}
__atomic_store_n(cache, value, __ATOMIC_RELAXED);
+1 -1
Melihat File
@@ -54,7 +54,7 @@ ncclResult_t ncclShmOpen(char* shmPath, size_t shmSize, void** shmPtr, void** de
const size_t realShmSize = shmSize + refSize;
*handle = *shmPtr = NULL; /* assume shmPtr and handle always set correctly by users. */
EQCHECKGOTO(tmphandle = (struct shmHandleInternal*)malloc(sizeof(struct shmHandleInternal)), NULL, ret, fail);
EQCHECKGOTO(tmphandle = (struct shmHandleInternal*)calloc(1, sizeof(struct shmHandleInternal)), NULL, ret, fail);
if (create) {
/* refcount > 0 means the caller tries to allocate a shared memory. This shared memory segment will have
* refcount references; when the peer attaches, it should pass -1 to reduce one reference count. When it
+10 -3
Melihat File
@@ -490,11 +490,18 @@ static ncclResult_t socketPollConnect(struct ncclSocket* sock) {
memset(&pfd, 0, sizeof(struct pollfd));
pfd.fd = sock->fd;
pfd.events = POLLOUT;
SYSCHECK(ret = poll(&pfd, 1, timeout), "poll");
if (ret == 0) return ncclSuccess;
ret = poll(&pfd, 1, timeout);
if (ret == 0 || (ret < 0 && errno == EINTR)) {
return ncclSuccess;
} else if (ret < 0) {
WARN("socketPollConnect poll() failed with error %s", strerror(errno));
return ncclRemoteError;
} else {
EQCHECK(ret == 1 && (pfd.revents & POLLOUT), 0);
}
/* check socket status */
EQCHECK(ret == 1 && (pfd.revents & POLLOUT), 0);
SYSCHECK(getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, (void*)&ret, &rlen), "getsockopt");
if (ret == 0) {
+21 -11
Melihat File
@@ -663,6 +663,7 @@ ncclResult_t ncclSetThreadContext(struct ncclComm* comm) {
// Set to SIGUSR1 or SIGUSR2 to help debug proxy state during hangs
NCCL_PARAM(ProxyDumpSignal, "PROXY_DUMP_SIGNAL", -1);
NCCL_PARAM(ProgressAppendOpFreq, "PROGRESS_APPENDOP_FREQ", 8);
void* ncclProxyProgress(void *comm_) {
struct ncclComm* comm = (struct ncclComm*)comm_;
@@ -683,6 +684,12 @@ void* ncclProxyProgress(void *comm_) {
nvtxNameOsThreadA(syscall(SYS_gettid), threadName);
int lastIdle = 0;
/* Too frequent call of ncclProxyGetPostedOps() will result in perf regression for small message
* communication. proxyOpAppendCounter is a counter that helps us decide if we need to append proxy ops.
* After each progress, proxyOpAppendCounter will increase by 1 and compare with environment variable
* ncclParamProgressAppendOpFreq(). If they are equal, we will append proxy ops. This will decrease the
* frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */
int proxyOpAppendCounter = 0;
struct ncclProxyArgs profArgs; // Only used for profiling purposes
while ((state->stop == false || (state->stop == true && state->active)) && *comm->abortFlag == 0) {
int idle = 1;
@@ -694,17 +701,20 @@ void* ncclProxyProgress(void *comm_) {
}
if (lastIdle == 0 && idle == 1) ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileIdle);
if (lastIdle == 1 && idle == 0) ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileActive);
int added = 0;
TIME_START(3);
if (state->stop == false)
ret = ncclProxyGetPostedOps(comm, &added);
if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); }
if (ret != ncclSuccess) {
(void) ncclCommSetAsyncError(comm, ret);
INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret);
}
if (added == 0) {
sched_yield(); // No request progressed. Let others run.
if (idle || (++proxyOpAppendCounter == ncclParamProgressAppendOpFreq())) {
int added = 0;
proxyOpAppendCounter = 0;
TIME_START(3);
if (state->stop == false)
ret = ncclProxyGetPostedOps(comm, &added);
if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); }
if (ret != ncclSuccess) {
(void) ncclCommSetAsyncError(comm, ret);
INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret);
}
if (added == 0) {
sched_yield(); // No request progressed. Let others run.
}
}
lastIdle = idle;
}
+10 -7
Melihat File
@@ -63,7 +63,8 @@ struct connectMapMem{
char shmPath[PATH_MAX];
cudaIpcMemHandle_t ipc;
};
ncclShmHandle_t handle;
ncclShmHandle_t attachHandle;
ncclShmHandle_t createHandle;
};
struct connectMap {
@@ -225,12 +226,12 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph
}
static ncclResult_t netMapShm(struct connectMapMem* mem) {
NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, (void**)&mem->gpuPtr, -1, &mem->handle));
NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, (void**)&mem->gpuPtr, -1, &mem->attachHandle));
return ncclSuccess;
}
static ncclResult_t netCreateShm(struct connectMapMem* mem) {
mem->shmPath[0] = '\0'; // Let ncclShmOpen create a tmp file
NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, NULL, 1, &mem->handle));
NCCLCHECK(ncclShmOpen(mem->shmPath, mem->size, (void**)&mem->cpuPtr, NULL, 1, &mem->createHandle));
return ncclSuccess;
}
@@ -339,17 +340,19 @@ static ncclResult_t sendFree(struct ncclConnector* send) {
struct connectMap* map = (struct connectMap*)(send->transportResources);
if (map) {
if (map->sameProcess == 0) {
NCCLCHECK(ncclShmClose(map->mems[NCCL_NET_MAP_HOSTMEM].handle));
NCCLCHECK(ncclShmClose(map->mems[NCCL_NET_MAP_HOSTMEM].attachHandle));
if (map->mems[NCCL_NET_MAP_DEVMEM].size) {
CUDACHECK(cudaIpcCloseMemHandle(map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr));
}
}
free(map);
}
return ncclSuccess;
}
static ncclResult_t recvFree(struct ncclConnector* recv) {
if (recv->transportResources) free(recv->transportResources);
return ncclSuccess;
}
@@ -763,7 +766,7 @@ static ncclResult_t sendProxyFree(struct ncclProxyConnection* connection, struct
if (resources->map.sameProcess) {
NCCLCHECK(ncclCudaHostFree(mems[NCCL_NET_MAP_HOSTMEM].cpuPtr));
} else {
NCCLCHECK(ncclShmClose(mems[NCCL_NET_MAP_HOSTMEM].handle));
NCCLCHECK(ncclShmClose(mems[NCCL_NET_MAP_HOSTMEM].createHandle));
}
CUDACHECK(cudaFree(mems[NCCL_NET_MAP_DEVMEM].cpuPtr));
if (mems[NCCL_NET_MAP_GDCMEM].cpuPtr) NCCLCHECK(ncclGdrCudaFree(resources->gdrDesc));
@@ -781,7 +784,7 @@ static ncclResult_t sendProxyFree(struct ncclProxyConnection* connection, struct
}
}
if (connection->state == connSetupDone) free(resources);
if (resources) free(resources);
return ncclSuccess;
}
@@ -816,7 +819,7 @@ static ncclResult_t recvProxyFree(struct ncclProxyConnection* connection, struct
}
}
if (connection->state == connSetupDone) free(resources);
if (resources) free(resources);
return ncclSuccess;
}
+10 -1
Melihat File
@@ -122,7 +122,16 @@ static ncclResult_t ncclIbGetPciPath(char* devName, char** path, int* realPort)
}
static int ibvWidths[] = { 1, 4, 8, 12, 2 };
static int ibvSpeeds[] = { 2500, 5000, 10000, 10000, 14000, 25000, 50000 };
static int ibvSpeeds[] = {
2500, /* SDR */
5000, /* DDR */
10000, /* QDR */
10000, /* QDR */
14000, /* FDR */
25000, /* EDR */
50000, /* HDR */
100000 /* NDR */ };
static int firstBitSet(int val, int max) {
int i = 0;
while (i<max && ((val & (1<<i)) == 0)) i++;