/************************************************************************* * Copyright (c) 2017-2020, NVIDIA CORPORATION. All rights reserved. * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ #include "enqueue.h" #include "argcheck.h" #include "coll_net.h" #include "graph/topo.h" // Only generate inline kernels for LL #define NCCL_FUNC5(func, algo, redop, dtype) \ NCCL_KERN_NAME(func, algo, LL, redop, dtype), \ NCCL_KERN_NAME(func, algo, LL, redop, dtype), \ NCCL_KERN_NAME(func, algo, LL, redop, dtype) #define NCCL_FUNC4(func, redop, type) \ NCCL_FUNC5(func, TREE, redop, type), \ NCCL_FUNC5(func, RING, redop, type), \ NCCL_FUNC5(func, COLLNET, redop, type) // Must be consistent with ncclDataType_t #define NCCL_FUNCS3A(func, redop) \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, uint8_t), \ NCCL_FUNC4(func, redop, int32_t), \ NCCL_FUNC4(func, redop, uint32_t), \ NCCL_FUNC4(func, redop, int64_t), \ NCCL_FUNC4(func, redop, uint64_t), \ NCCL_FUNC4(func, redop, half), \ NCCL_FUNC4(func, redop, float), \ NCCL_FUNC4(func, redop, double), \ NCCL_FUNC4(func, redop, rccl_bfloat16) #define NCCL_FUNCS3B(func, redop) \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t), \ NCCL_FUNC4(func, redop, int8_t) // Must be consistent with ncclRedOp_t -- but we only generate kernel for sums. #define NCCL_FUNCS2A(func) \ NCCL_FUNCS3A(func, Sum), \ NCCL_FUNCS3A(func, Sum), \ NCCL_FUNCS3A(func, Sum), \ NCCL_FUNCS3A(func, Sum) #define NCCL_FUNCS2B(func) \ NCCL_FUNCS3B(func, Sum), \ NCCL_FUNCS3B(func, Sum), \ NCCL_FUNCS3B(func, Sum), \ NCCL_FUNCS3B(func, Sum) typedef void(*ncclKern_t)(struct ncclWorkElem first); // Must be consistent with the ncclFuncSet enum static ncclKern_t const ncclKerns[1] = { NCCL_KERN_NAME(SendRecv, RING, SIMPLE, Sum, int8_t), }; /*****************************************************************************/ /* Launch system : synchronization and CUDA kernel launch */ /*****************************************************************************/ ncclResult_t ncclLaunchCooperativeKernelMultiDevice(hipLaunchParams *paramsList, int* cudaDevs, int numDevices, int cgMode) { if (cgMode & 0x01) { CUDACHECK(hipExtLaunchMultiKernelMultiDevice(paramsList, numDevices, // These flags are to reduce the latency of using this API hipCooperativeLaunchMultiDeviceNoPreSync|hipCooperativeLaunchMultiDeviceNoPostSync)); return ncclSuccess; } int savedDev; CUDACHECK(hipGetDevice(&savedDev)); for (int i = 0; i < numDevices; i++) { hipLaunchParams* params = paramsList+i; CUDACHECK(hipSetDevice(cudaDevs[i])); hipLaunchKernelGGL(((void (*)(struct ncclWorkElem))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclWorkElem**)params->args)); } CUDACHECK(hipSetDevice(savedDev)); return ncclSuccess; } static ncclResult_t getNextOp(struct ncclChannel* channel, struct ncclWork** work, struct ncclWorkElem* base) { if (channel->workCount == NCCL_MAX_OPS) { WARN("Too many aggregated operations on channel %d (%d max)", channel->id, NCCL_MAX_OPS); return ncclInvalidUsage; } int opIndex = channel->workFifoTail%NCCL_MAX_OPS; struct ncclWork* w = channel->workFifo+opIndex; struct ncclWorkElem* e = w->elems; volatile uint8_t* activePtr = (volatile uint8_t*)&e->active; while (LOAD(activePtr) != 0) sched_yield(); memset(w, 0, sizeof(struct ncclWork)); // Initialize with work elem if provided if (base) memcpy(e, base, sizeof(struct ncclWorkElem)); STORE(&e->active, 1); e->index = opIndex; channel->workFifoTail++; channel->workCount++; if (work) *work = w; return ncclSuccess; } static ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) { // Only launch blocks where we have work to do. for (int c=0; cnChannels, comm->p2pnChannels); c++) { if (comm->channels[c].workCount) params->gridDim.x = c+1; } // Set active = 2 for the last operation and add a no-op on empty channels (p2p case). for (int c=0; cgridDim.x; c++) { struct ncclChannel* channel = comm->channels+c; if (channel->workCount == 0) { struct ncclWork* w; NCCLCHECK(getNextOp(channel, &w, NULL)); struct ncclWorkElem* e = w->elems; e->comm = comm->devComm; e->funcIndex = FUNC_INDEX_P2P; e->p2p.nThreads = 0; } STORE(&channel->workFifo[(channel->workFifoTail-1)%NCCL_MAX_OPS].elems[0].active, 2); } // Find the first operation, choose the kernel accordingly and pass it // as the first argument. struct ncclChannel* c0 = comm->channels; struct ncclWork* work = c0->workFifo+((c0->workFifoTail-c0->workCount)%NCCL_MAX_OPS); struct ncclWorkElem* elem = work->elems; memcpy(&comm->args, elem, sizeof(struct ncclWorkElem)); // As we inline the first coll directly, we can free it immediately. if (elem->funcIndex != FUNC_INDEX_P2P) elem->active = 0; params->func = (void *)ncclKerns[0]; return ncclSuccess; } ncclResult_t ncclCpuBarrierIn(struct ncclComm* comm, int* isLast) { volatile int* ptr = (volatile int*)(comm->intraBarrier+comm->intraPhase); int val = LOAD(ptr); bool done = false; while (done == false) { if (val >= comm->intraRanks) { WARN("Trying to launch too many work elements, max is %d", NCCL_MAX_OPS); return ncclInvalidUsage; } if (val+1 == comm->intraRanks) { // Reset the barrier. comm->intraBarrier[comm->intraPhase^1] = 0; *isLast = 1; return ncclSuccess; } done = __sync_bool_compare_and_swap(ptr, val, val+1); val++; } *isLast = 0; return ncclSuccess; } ncclResult_t ncclCpuBarrierLast(struct ncclComm* comm) { volatile int* ptr = (volatile int*)(comm->intraBarrier+comm->intraPhase); int val = LOAD(ptr); if (__sync_bool_compare_and_swap(ptr, val, val+1) != true) { WARN("Trying to launch too many work elements, max is %d", NCCL_MAX_OPS); return ncclInternalError; } return ncclSuccess; } ncclResult_t ncclCpuBarrierOut(struct ncclComm* comm) { volatile int* ptr = (volatile int*)(comm->intraBarrier+comm->intraPhase); while (LOAD(ptr) < comm->intraRanks) pthread_yield(); comm->intraPhase ^= 1; return ncclSuccess; } ncclResult_t ncclBarrierEnqueue(struct ncclComm* comm) { hipLaunchParams* params = comm->myParams; if (params->gridDim.x == 0) return ncclSuccess; NCCLCHECK(setupLaunch(comm, params)); // Use internal NCCL stream for CGMD/GROUP launch if required or if the user stream is NULL if (comm->launchMode == ncclComm::GROUP && (comm->groupCudaStream || comm->userStream == NULL)) { // Enqueue event in user stream CUDACHECK(hipEventRecord(comm->doneEvent, comm->userStream)); // Create dependency between user stream and internal NCCL stream CUDACHECK(hipStreamWaitEvent(comm->groupStream, comm->doneEvent, 0)); params->stream = comm->groupStream; } else { if (comm->userStream != params->stream) { // Stream changed from last call, create dependency against last NCCL kernel launch CUDACHECK(hipStreamWaitEvent(comm->userStream, comm->doneEvent, 0)); } params->stream = comm->userStream; } if (comm->launchMode == ncclComm::GROUP) { int isLast = 0; NCCLCHECK(ncclCpuBarrierIn(comm, &isLast)); if (isLast) { // I'm the last. Launch all operations. NCCLCHECK(ncclLaunchCooperativeKernelMultiDevice(comm->intraParams, comm->intraCudaDevs, comm->intraRanks, *comm->intraCGMode)); NCCLCHECK(ncclCpuBarrierLast(comm)); } } return ncclSuccess; } ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) { hipLaunchParams *params = comm->myParams; if (params->gridDim.x == 0) return ncclSuccess; // We can't print the CG mode before the first barrier happened. if (comm->rank == 0 && *comm->intraCGMode & 0x10) { *comm->intraCGMode ^= 0x10; INFO(NCCL_INIT,"Launch mode %s%s%s", comm->launchMode == ncclComm::GROUP ? "Group" : "Parallel", *comm->intraCGMode ? "/CGMD" : "", (comm->launchMode == ncclComm::GROUP && comm->groupCudaStream) ? "/Stream" : ""); } if (comm->launchMode == ncclComm::PARALLEL) { hipLaunchKernelGGL(((void (*)(struct ncclWorkElem))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclWorkElem**)params->args)); } else { NCCLCHECK(ncclCpuBarrierOut(comm)); } // Start the network proxies as soon as the kernel has been launched. We can't // perform any CUDA call between the two or having a cudaFree between the CUDA // launch and the ncclProxyStart call could cause a deadlock. // Also, starting the proxies after the CUDA launch seems to be better for // performance (latency). uint64_t max = 0ULL; for (int r=0; rgridDim.x; r++) { struct ncclChannel* channel = comm->channels+r; max = std::max(max, channel->workFifoTail); channel->workCount = 0; } for (int r=0; rnChannels, comm->p2pnChannels); r++) { struct ncclChannel* channel = comm->channels+r; channel->workFifoTail = max; } params->gridDim.x = params->blockDim.x = 0; comm->lastOpCount = max; NCCLCHECK(ncclProxyStart(comm)); return ncclSuccess; } ncclResult_t ncclEnqueueEvents(ncclComm_t comm) { hipLaunchParams *params = comm->myParams; // Enqueue event after NCCL kernel CUDACHECK(hipEventRecord(comm->doneEvent, params->stream)); // Use internal NCCL stream for CGMD/GROUP launch if required or if the user stream is NULL if (comm->launchMode == ncclComm::GROUP && (comm->groupCudaStream || comm->userStream == NULL)) { // Create dependency between NCCL internal stream and user stream CUDACHECK(hipStreamWaitEvent(comm->userStream, comm->doneEvent, 0)); } comm->userStreamSet = false; return ncclSuccess; } /*****************************************************************************/ /* Enqueueing system : computation of kernel and proxy operations parameters */ /*****************************************************************************/ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { struct ncclComm* comm = info->comm; float minTime = 3600000000.0; // Hopefully no operation will take an hour to complete. // Find algorithm / protocol. if (info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllToAllv) { info->algorithm = NCCL_ALGO_RING; info->protocol = NCCL_PROTO_SIMPLE; info->nChannels = comm->nChannels; info->nThreads = NCCL_MAX_NTHREADS; return ncclSuccess; } info->algorithm = -1; info->protocol = -1; int nAlgos = NCCL_NUM_ALGORITHMS; // Check collNet support int collNetTypeSupport = 0; if (info->comm->collNetSupport) NCCLCHECK(collNetReduceSupport(info->datatype, info->op, &collNetTypeSupport)); if (collNetTypeSupport != 1) nAlgos--; for (int a=0; a= 0 && time < minTime) { info->algorithm = a; info->protocol = p; minTime = time; } } } if (info->algorithm == -1 || info->protocol == -1) { WARN("Error : no algorithm/protocol available"); return ncclInternalError; } //if (comm->rank == 0) INFO(NCCL_TUNING, "%ld Bytes -> Algo %d proto %d time %f", info->nBytes, info->algorithm, info->protocol, minTime); TRACE(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %f", info->nBytes, info->algorithm, info->protocol, minTime); int nc = (info->nChannels > 0) ? info->nChannels : (info->algorithm == NCCL_ALGO_COLLNET) ? comm->nChannels/2 : comm->nChannels; // CollNet uses one channel for up and one channel for down int nt = comm->maxThreads[info->algorithm][info->protocol]; int threadThreshold = comm->threadThresholds[info->algorithm][info->protocol]; while (info->nBytes < nc*nt*threadThreshold) { if (info->algorithm != NCCL_ALGO_COLLNET && nc >= 2) nc--; #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) // do not reduce threads count on VEGA #else else if ((nt % 128) == 0) nt/=2; #endif else break; } #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) #else if (info->protocol == NCCL_PROTO_SIMPLE) nt += WARP_SIZE; // Extra warp for sync if (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_TREE) nt += WARP_SIZE; #endif info->nChannels = nc; info->nThreads = nt; return ncclSuccess; } static ncclResult_t getPatternInfo(struct ncclInfo* info) { switch (info->coll) { case ncclFuncBroadcast: info->pattern = info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeDown : ncclPatternPipelineFrom; break; case ncclFuncReduce: info->pattern = info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUp : ncclPatternPipelineTo; break; case ncclFuncReduceScatter: case ncclFuncAllGather: info->pattern = ncclPatternRing; break; case ncclFuncAllReduce: info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUp : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break; case ncclFuncAllToAll: case ncclFuncAllToAllv: info->pattern = ncclPatternAll; break; default: WARN("Unknown pattern for collective %d algorithm %d", info->coll, info->algorithm); return ncclInternalError; } return ncclSuccess; } static ncclResult_t getLoopInfo(struct ncclInfo* info) { switch (info->pattern) { case ncclPatternTreeUp: case ncclPatternTreeDown: case ncclPatternTreeUpDown: case ncclPatternPipelineFrom: case ncclPatternPipelineTo: case ncclPatternCollTreeUp: case ncclPatternCollTreeDown: info->nstepsPerLoop = info-> nchunksPerLoop = 1; break; case ncclPatternRing: info->nstepsPerLoop = info->comm->nRanks-1; info->nchunksPerLoop = info->comm->nRanks; break; case ncclPatternRingTwice: info->nstepsPerLoop = 2*(info->comm->nRanks-1); info->nchunksPerLoop = info->comm->nRanks; break; case ncclPatternAll: info->nstepsPerLoop = 1; info->nchunksPerLoop = info->comm->nRanks; break; default: WARN("Unknown pattern %d\n", info->pattern); return ncclInternalError; } return ncclSuccess; } static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWorkElem* work, struct ncclProxyArgs* proxyArgs /* output */) { work->comm = info->comm->devComm; // Set nstepsPerLoop and nchunksPerLoop NCCLCHECK(getAlgoInfo(info)); NCCLCHECK(getPatternInfo(info)); NCCLCHECK(getLoopInfo(info)); if ((info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllToAllv) && info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME) info->nChannels =info->comm->p2pnChannels; work->opCount = info->comm->opCount; work->sendbuff = info->sendbuff; work->recvbuff = info->recvbuff; if (info->coll == ncclFuncAllToAllv) { work->a2av.count = info->count; work->a2av.nChannels = info->nChannels; } else { work->coll.root = info->root; work->coll.count = info->count; work->coll.nChannels = info->nChannels; } work->nThreads = info->nThreads; work->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol); int stepSize = info->comm->buffSizes[info->protocol]/NCCL_STEPS; int chunkSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->chunkSteps : 1; int sliceSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->sliceSteps : 1; int chunkSize = stepSize*chunkSteps; // Compute lastChunkSize if (info->algorithm == NCCL_ALGO_TREE && info->protocol == NCCL_PROTO_SIMPLE) { if (info->pattern == ncclPatternTreeUpDown) { // Optimize chunkSize / nSteps while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].tree.depth*8 && chunkSize > 131072) chunkSize /= 2; while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].tree.depth*4 && chunkSize > 65536) chunkSize /= 2; while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].tree.depth && chunkSize > 32768) chunkSize /= 2; } // Use lastChunkSize as chunkSize work->coll.lastChunkSize = chunkSize / ncclTypeSize(info->datatype); } else if (info->algorithm == NCCL_ALGO_COLLNET && info->protocol == NCCL_PROTO_SIMPLE) { // Optimize chunkSize / nSteps while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].collTree.depth*16 && chunkSize > 131072) chunkSize /= 2; while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].collTree.depth*4 && chunkSize > 65536) chunkSize /= 2; while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].collTree.depth && chunkSize > 32768) chunkSize /= 2; // Use lastChunkSize as chunkSize work->coll.lastChunkSize = chunkSize / ncclTypeSize(info->datatype); } else if (info->protocol == NCCL_PROTO_LL) { const ssize_t sliceSize = stepSize*sizeof(uint64_t)/sizeof(union ncclLLFifoLine); const ssize_t loopSize = info->nChannels*info->nchunksPerLoop*(ssize_t)sliceSize; work->coll.lastChunkSize = DIVUP((info->nBytes-(info->nBytes/loopSize)*loopSize), info->nChannels*info->nchunksPerLoop); ALIGN_SIZE(work->coll.lastChunkSize, info->nThreads*sizeof(uint64_t)); work->coll.lastChunkSize /= ncclTypeSize(info->datatype); } else if (info->algorithm == NCCL_ALGO_TREE && info->protocol == NCCL_PROTO_LL128) { int nNodes = info->comm->nNodes; float ppn = info->comm->nRanks / (float)nNodes; float nstepsLL128 = 1+log2i(nNodes) + 0.1*ppn; while (info->nBytes / (info->nChannels*chunkSize) < nstepsLL128*64/ppn && chunkSize > 131072) chunkSize /= 2; while (info->nBytes / (info->nChannels*chunkSize) < nstepsLL128*16/ppn && chunkSize > 32768) chunkSize /= 2; // Use lastChunkSize as chunkSize work->coll.lastChunkSize = chunkSize*NCCL_LL128_DATAELEMS/(NCCL_LL128_LINEELEMS*ncclTypeSize(info->datatype)); } // Compute nSteps for proxies int chunkEffectiveSize = chunkSize; if (info->protocol == NCCL_PROTO_LL) chunkEffectiveSize /= 2; if (info->protocol == NCCL_PROTO_LL128) chunkEffectiveSize = (chunkSize / NCCL_LL128_LINEELEMS) * NCCL_LL128_DATAELEMS; //if (info->comm->rank == 0) printf("Coll %d, size %ld -> %dx%d, chunkSize %d (algo %d proto%d)\n", info->coll, info->nBytes, info->nChannels, info->nThreads, chunkSize, info->algorithm, info->protocol); int nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize))); proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps; proxyArgs->sliceSteps = sliceSteps; proxyArgs->chunkSteps = chunkSteps; proxyArgs->protocol = info->protocol; proxyArgs->dtype = info->datatype; proxyArgs->redOp = info->op; // This is used by P2P to reduce the receive buffer size. We don't use it in collectives // because some protocols need to transmit more than the total size, plus they sometimes // round up proxyArgs->recvbytes = stepSize*proxyArgs->sliceSteps; TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p", proxyArgs->opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, info->nBytes, info->protocol, info->nChannels, info->nThreads, nLoops, proxyArgs->nsteps, info->comm); return ncclSuccess; } static ncclResult_t checkSetStream(struct ncclInfo* info) { if (info->comm->userStreamSet == false) { info->comm->userStream = info->stream; info->comm->userStreamSet = true; } else if (info->stream != info->comm->userStream) { WARN("Error : mixing different streams within a group call is not supported."); return ncclInvalidUsage; } return ncclSuccess; } ncclResult_t ncclSaveKernel(struct ncclInfo* info) { if (info->comm->nRanks == 1) { if (info->sendbuff != info->recvbuff) CUDACHECK(hipMemcpyAsync(info->recvbuff, info->sendbuff, info->nBytes, hipMemcpyDeviceToDevice, info->stream)); return ncclSuccess; } struct ncclWorkElem work; struct ncclProxyArgs proxyArgs; memset(&proxyArgs, 0, sizeof(struct ncclProxyArgs)); NCCLCHECK(computeColl(info, &work, &proxyArgs)); info->comm->myParams->blockDim.x = std::max(info->comm->myParams->blockDim.x, info->nThreads); int nChannels = work.coll.nChannels; int nSubChannels = (info->pattern == ncclPatternCollTreeUp || info->pattern == ncclPatternCollTreeDown) ? 2 : 1; for (int bid=0; bidcomm->myParams->gridDim.x % info->comm->nChannels; struct ncclChannel* channel = info->comm->channels+channelId; // Proxy proxyArgs.channel = channel; // Adjust pattern for CollNet based on channel index if (nSubChannels == 2) { info->pattern = (channelId < info->comm->nChannels/nSubChannels) ? ncclPatternCollTreeUp : ncclPatternCollTreeDown; } if (proxyArgs.nsteps) NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks)); info->comm->myParams->gridDim.x++; if (info->coll == ncclFuncAllToAllv) { work.a2av.bid = bid % work.a2av.nChannels; } else { work.coll.bid = bid % nChannels; } struct ncclWork* w; NCCLCHECK(getNextOp(channel, &w, &work)); if (info->coll == ncclFuncAllToAllv) { struct ncclWorkElem* e = w->elems; size_t* params = channel->a2avParams + info->comm->nRanks*4*e->index; memcpy(params, info->sendcounts, sizeof(size_t*)*(info->comm->nRanks)); memcpy(params+info->comm->nRanks, info->sdispls, sizeof(size_t*)*(info->comm->nRanks)); memcpy(params+info->comm->nRanks*2, info->recvcounts, sizeof(size_t*)*(info->comm->nRanks)); memcpy(params+info->comm->nRanks*3, info->rdispls, sizeof(size_t*)*(info->comm->nRanks)); } } info->comm->opCount++; return ncclSuccess; } #define NCCL_MIN_CHANNEL_SIZE (NCCL_LL_THREAD_THRESHOLD*64) #define NCCL_AGG_CHANNEL_SIZE (1LL << 21) /* 2 MiB, ideal per-channel size to fully utilize bandwidth */ ncclResult_t ncclSaveCommKernels(ncclComm_t comm) { if (comm->asyncOpCount == 0) { return ncclSuccess; } else if (comm->asyncOpCount == 1) { // No aggregation struct ncclInfo* info = comm->asyncOps; info->nChannels = 0; NCCLCHECK(ncclSaveKernel(info)); } else { // Aggregation size_t channelSize = NCCL_AGG_CHANNEL_SIZE * comm->nRanks; // scale channel size based on nranks as latency increases // Reduce the per-channel size if we cannot fully utilize the channels while (comm->asyncTotalSize < channelSize * comm->nChannels && channelSize > NCCL_MIN_CHANNEL_SIZE) channelSize /= 2; for (int c = 0; c < comm->asyncOpCount; c++) { struct ncclInfo* info = comm->asyncOps+c; info->nChannels = std::min((int)DIVUP(info->nBytes, channelSize), comm->nChannels); // assign number of channels NCCLCHECK(ncclSaveKernel(info)); } } // Reset counters comm->asyncOpCount = 0; comm->asyncTotalSize = 0; return ncclSuccess; } static ncclResult_t ncclSaveAsyncColl(struct ncclInfo* info) { ncclComm_t comm = info->comm; if (comm->asyncOpCount >= NCCL_MAX_OPS) { WARN("Too many async operations in progress, max is %d", NCCL_MAX_OPS); return ncclInvalidUsage; } memcpy(comm->asyncOps+comm->asyncOpCount, info, sizeof(struct ncclInfo)); comm->asyncOpCount++; comm->asyncTotalSize += info->nBytes; return ncclSuccess; } // Save p2p operations in comm->p2pSends and p2pRecvs. Operations will be posted to channels // during ncclGroupEnd() static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { struct ncclComm* comm = info->comm; int peer = info->root; ssize_t nBytes = info->count*ncclTypeSize(info->datatype); if (info->opName[0] == 'S') { // Send if (peer != comm->rank) { int delta = (comm->nRanks - (comm->rank-peer)) % comm->nRanks; for (int c=0; cp2pnChannelsPerPeer; c++) { int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; if (comm->channels[channelId].peers[peer].send.connected == 0) { comm->connectSend[peer] |= (1<connect = 1; } } } NCCLCHECK(enqueueP2pInfo(comm->p2pSends+info->root, (void*)info->sendbuff, nBytes)); comm->p2pSendCount++; } else { if (peer != comm->rank) { int delta = (comm->nRanks + (comm->rank-peer)) % comm->nRanks; for (int c=0; cp2pnChannelsPerPeer; c++) { int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; if (comm->channels[channelId].peers[peer].recv.connected == 0) { comm->connectRecv[peer] |= (1<connect = 1; } } } NCCLCHECK(enqueueP2pInfo(comm->p2pRecvs+info->root, info->recvbuff, nBytes)); comm->p2pRecvCount++; } return ncclSuccess; } static int getSegment(struct ncclInfo* info, struct ncclWork* work) { const int e = (info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME) ? 1 : NCCL_MAX_WORK_ELEMENTS; for (int s=0; selems[s].p2p.delta != info->delta; s++) { if (work->elems[s].p2p.nThreads == 0) return s; } return -1; } static ncclResult_t saveP2pOp(struct ncclInfo* info /* input */, struct ncclWork* work, int s) { struct ncclWorkElem* elem = work->elems+s; elem->comm = info->comm->devComm; elem->funcIndex = FUNC_INDEX_P2P; elem->nThreads = info->nThreads = NCCL_MAX_NTHREADS; elem->sendbuff = info->sendbuff; elem->recvbuff = info->recvbuff; elem->opCount = info->comm->lastOpCount; elem->p2p.sendCount = info->sendbytes; elem->p2p.recvCount = info->recvbytes; elem->p2p.delta = info->delta; const int nsegments = s+1; int nThreads = 512; while (nsegments*nThreads > 256) nThreads /= 2; //if (nThreads >= 128) nThreads += WARP_SIZE; for (int i=0; ielems[i].p2p.nThreads = nThreads; return ncclSuccess; } ncclResult_t ncclSaveP2pKernel(struct ncclInfo* info) { int channelId = info->channelId; struct ncclChannel* channel = info->comm->channels+channelId; // Try to reuse last p2p operation if not full yet int opIndex = (channel->workFifoTail-1+NCCL_MAX_OPS)%NCCL_MAX_OPS; struct ncclWork* w = channel->workFifo+opIndex; int segment = -1; if (channel->workCount && w->elems[0].funcIndex == FUNC_INDEX_P2P && w->elems[NCCL_MAX_WORK_ELEMENTS-1].p2p.nThreads == 0) { // Try to pack more segments into a single operation segment = getSegment(info, w); } if (segment == -1) { NCCLCHECK(getNextOp(channel, &w, NULL)); segment = 0; } NCCLCHECK(ncclProxySaveP2p(info, channel, segment)); NCCLCHECK(saveP2pOp(info, w, segment)); info->comm->myParams->gridDim.x = std::max(info->comm->myParams->gridDim.x, channelId+1); info->comm->myParams->blockDim.x = std::max(info->comm->myParams->blockDim.x, info->nThreads); return ncclSuccess; } ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) { // Launch asynchronously if needed if (ncclAsyncMode()) { ncclResult_t ret = ncclSuccess; int savedDev = -1; // Check arguments NCCLCHECK(PtrCheck(info->comm, info->opName, "comm")); if (info->comm->checkPointers) { CUDACHECKGOTO(hipGetDevice(&savedDev), ret, end); CUDACHECKGOTO(hipSetDevice(info->comm->cudaDev), ret, end); } NCCLCHECKGOTO(ArgsCheck(info), ret, end); // Always register comm even in case of error to make sure ncclGroupEnd // cleans it up. NCCLCHECKGOTO(ncclAsyncColl(info->comm), ret, end); NCCLCHECKGOTO(checkSetStream(info), ret, end); if (info->coll == ncclFuncAllToAllv) INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p sendcounts %p sdispls %p recvbuff %p recvcounts %p rdispls %p datatype %d typesize %zi op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->opCount, info->sendbuff, info->sendcounts, info->sdispls, info->recvbuff, info->recvcounts, info->rdispls, info->datatype, info->count, info->op, info->root, info->comm, info->comm->nRanks, info->stream); else if (info->coll != ncclFuncSendRecv) INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); if (info->coll == ncclFuncSendRecv) { //p2p stored separately INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->lastOpCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); NCCLCHECKGOTO(ncclSaveP2p(info), ret, end); } else { NCCLCHECKGOTO(ncclSaveAsyncColl(info), ret, end); } end: if (savedDev != -1) CUDACHECK(hipSetDevice(savedDev)); ncclAsyncErrCheck(ret); return ret; } else { NCCLCHECK(PtrCheck(info->comm, info->opName, "comm")); NCCLCHECK(ArgsCheck(info)); NCCLCHECK(checkSetStream(info)); if (info->coll == ncclFuncAllToAllv) INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p sendcounts %p sdispls %p recvbuff %p recvcounts %p rdispls %p datatype %d typesize %zi op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->opCount, info->sendbuff, info->sendcounts, info->sdispls, info->recvbuff, info->recvcounts, info->rdispls, info->datatype, info->count, info->op, info->root, info->comm, info->comm->nRanks, info->stream); else INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); NCCLCHECK(ncclSaveKernel(info)); NCCLCHECK(ncclBarrierEnqueue(info->comm)); NCCLCHECK(ncclBarrierEnqueueWait(info->comm)); NCCLCHECK(ncclEnqueueEvents(info->comm)); return ncclSuccess; } }