/************************************************************************* * Copyright (c) 2017-2022, NVIDIA CORPORATION. All rights reserved. * Modifications Copyright (c) 2019-2023 Advanced Micro Devices, Inc. All rights reserved. * Modifications Copyright (c) Microsoft Corporation. Licensed under the MIT License. * * See LICENSE.txt for license information ************************************************************************/ #include "enqueue.h" #include "argcheck.h" #include "coll_net.h" #include "graph/topo.h" #include #include #include "gdrwrap.h" #include "bootstrap.h" #include #include "channel.h" #include "rocmwrap.h" #include "rccl_vars.h" #include "profiler.h" #include "transport.h" #include "common.h" #include "api_trace.h" #include // std::memcpy #include // PRIx64 #include using namespace rccl; struct ncclKernelMatch { void* kernelFn; bool specialized; }; #ifdef ENABLE_COLLTRACE #define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceEnabled ? 2 : 0)) static ncclKernelMatch const ncclKerns[4] = { {(void *)ncclDevKernel_Generic, true}, {(void *)ncclDevKernel_Generic_4, true}, {(void *)ncclDevKernelDebug_Generic, true}, {(void *)ncclDevKernelDebug_Generic_4, true} }; #else #define ncclGetKernelIndex(p_comm) ((p_comm)->unroll) static ncclKernelMatch const ncclKerns[2] = { {(void*)ncclDevKernel_Generic, true}, {(void*)ncclDevKernel_Generic_4, true} }; #endif NCCL_PARAM(L1SharedMemoryCarveout, "L1_SHARED_MEMORY_CARVEOUT", 0); // Returns maximum kernel stack size of all CUDA kernels ncclResult_t ncclInitKernelsForDevice(int cudaArch, size_t* maxStackSize) { constexpr int KernelCount = sizeof(ncclKerns)/sizeof(ncclKerns[0]); ncclResult_t result = ncclSuccess; if (maxStackSize) *maxStackSize = 0; int carveout = ncclParamL1SharedMemoryCarveout(); // Keep track if we already visited a function pointer. void* lru[2] = {nullptr, nullptr}; for (int i=0; i < KernelCount; i++) { void* fn = ncclKerns[i].kernelFn; if (fn == lru[0] || fn == lru[1]) goto next_kernel; lru[1] = lru[0]; lru[0] = fn; if (maxStackSize) { cudaFuncAttributes attr = {0}; if (cudaFuncGetAttributes(&attr, fn) != cudaSuccess) WARN("Failed to get kernel attributes"); if (attr.localSizeBytes > *maxStackSize) *maxStackSize = attr.localSizeBytes; ignore0:; } if (carveout) { CUDACHECKGOTO(cudaFuncSetAttribute(fn, cudaFuncAttributePreferredSharedMemoryCarveout, carveout), result, ignore1); ignore1:; } if (ncclShmemDynamicSize(cudaArch) != 0) { CUDACHECKGOTO(cudaFuncSetAttribute(fn, cudaFuncAttributeMaxDynamicSharedMemorySize, ncclShmemDynamicSize(cudaArch)), result, next_kernel); } next_kernel:; } return result; } //////////////////////////////////////////////////////////////////////////////// // Data movement metrics. static inline int ncclFuncTrafficPerByte(ncclFunc_t func, int nRanks) { switch (func) { case ncclFuncAllReduce: return 2; case ncclFuncAllGather: return nRanks; case ncclFuncReduceScatter: return nRanks; default: return 1; } } /*****************************************************************************/ /* Launch system : synchronization and CUDA kernel launch */ /*****************************************************************************/ static ncclResult_t addProxyOpIfNeeded(struct ncclComm* comm, struct ncclKernelPlan* plan, struct ncclProxyOp* op) { bool needed = true; NCCLCHECK(ncclProxySaveOp(comm, op, &needed)); if (needed) { struct ncclProxyOp* q = ncclMemoryPoolAlloc(&comm->memPool_ncclProxyOp, &comm->memPermanent); *q = *op; // C++ struct assignment ncclIntruQueueEnqueue(&comm->planner.wipPlan.channels[op->channelId].proxyOpQueue, q); } return ncclSuccess; } static void addWorkBatchToPlan( struct ncclComm* comm, struct ncclKernelPlan* plan, int channelId, enum ncclDevWorkType workType, int devFuncId, uint32_t workOffset, int p2pRound = -1 ) { ncclKernelPlanner::WipPlan::Channel* chan = &comm->planner.wipPlan.channels[channelId]; size_t workSize = ncclDevWorkSize(workType); // Conditions causing us to create a new blank batch. bool newBatch = (chan->workBatchQueue.tail == nullptr); struct ncclDevWorkBatch* batch = nullptr; if (!newBatch) { batch = &chan->workBatchQueue.tail->batch; // All of the conditions that prevent us from appending to current batch. newBatch |= batch->workType != (uint8_t)workType; newBatch |= batch->funcId != devFuncId; // The following ensure the device can handle a batch this large. They have to // account for all extension batches being fused together which is why // wipBatch.workBytes and wipBatch.nP2ps aren't reset to 0 for a new extension // batch further down. newBatch |= NCCL_MAX_DEV_WORK_BATCH_BYTES < chan->wipBatch.workBytes + workSize; if (workType == ncclDevWorkTypeP2p) { newBatch |= chan->wipBatch.nP2ps == NCCL_MAX_DEV_WORK_P2P_PER_BATCH; for (int i=0; i < chan->wipBatch.nP2ps; i++) { newBatch |= p2pRound == chan->wipBatch.p2pRounds[i]; } } } // Conditions causing us to create an extension batch (prev->nextExtends=1) uint32_t offset = newBatch ? 0 : (workOffset - batch->offsetBase); bool extendBatch = 63*workSize < offset; extendBatch |= 0 != offset%workSize; if (newBatch || extendBatch) { if (!newBatch) batch->nextExtends = extendBatch; // Extending the previous batch. struct ncclWorkBatchList* batchNode = ncclMemoryStackAlloc(&comm->memScoped); // Coverity thinks that ncclIntruQueueEnqueue will access chan->workBatchQueue->tail, which might // be NULL. But that code is guarded by chan->workBatchQueue->head not being NULL, in which // case tail won't be NULL either. // coverity[var_deref_model:FALSE] ncclIntruQueueEnqueue(&chan->workBatchQueue, batchNode); batch = &batchNode->batch; batch->nextExtends = 0; batch->workType = (uint32_t)workType; batch->funcId = devFuncId; batch->offsetBase = workOffset; batch->offsetBitset = 0; offset = 0; if (newBatch) { // Since extension batches are fused together on the device, and these values // account for constraints on the fused batch, we only reset the values on // a new batch chan->wipBatch.workBytes = 0; chan->wipBatch.nP2ps = 0; // We don't count extension batches since this is used to derive a proxyOpCount, // and we wan't all ops which are fused together to have the same value. chan->nWorkBatchesP2p += (workType == ncclDevWorkTypeP2p ? 1 : 0); } plan->nWorkBatches += 1; } batch->offsetBitset |= 1ull<<(offset/workSize); chan->wipBatch.workBytes += workSize; if (workType == ncclDevWorkTypeP2p) { // We need to ensure that a single batch doesn't have multiple p2p's // of the same round since they would use the same connections. chan->wipBatch.p2pRounds[chan->wipBatch.nP2ps++] = p2pRound; } } static void finishPlan(struct ncclComm* comm, struct ncclKernelPlan* plan) { ncclKernelPlanner::WipPlan::Channel* wipChannels = comm->planner.wipPlan.channels; size_t workBytes = plan->workBytes; size_t batchBytes = plan->nWorkBatches*sizeof(struct ncclDevWorkBatch); plan->threadPerBlock = std::max(plan->threadPerBlock, NCCL_MIN_NTHREADS); // If we can fit everything into the kernel args we do so. if (sizeof(ncclDevKernelArgs) + batchBytes + workBytes <= comm->workArgsBytes) { plan->workStorageType = ncclDevWorkStorageTypeArgs; } plan->kernelArgsSize = sizeof(struct ncclDevKernelArgs) + batchBytes; plan->kernelArgsSize += (plan->workStorageType == ncclDevWorkStorageTypeArgs) ? workBytes : 0; plan->kernelArgsSize = alignUp(plan->kernelArgsSize, 16); plan->kernelArgs = (struct ncclDevKernelArgs*)ncclMemoryStackAlloc(&comm->memScoped, plan->kernelArgsSize, /*align=*/16); plan->kernelArgs->comm = comm->devComm; plan->kernelArgs->channelMask = plan->channelMask; plan->kernelArgs->workStorageType = plan->workStorageType; // Put batches into the kernel arguments. The first batch for each channel // must be located at batchZero[blockIdx.x]. To achieve this we round robin // over the channels in ascending order until they're exhausted. struct channelMasks hasBatchMask = plan->channelMask; struct ncclDevWorkBatch* batchPrev[MAXCHANNELS] = {}; // {0...} struct ncclDevWorkBatch* batchZero = (struct ncclDevWorkBatch*)(plan->kernelArgs+1); int batchIx = 0; for (int maskIdx = 0; maskIdx < MAXCHANNELS/64; maskIdx++) { while (hasBatchMask.masks[maskIdx] != 0) { uint64_t tmpMask = hasBatchMask.masks[maskIdx]; // channels with a batch for this round. do { int c = popFirstOneBit(&tmpMask) + maskIdx * 64; if (!ncclIntruQueueEmpty(&wipChannels[c].workBatchQueue)) { struct ncclWorkBatchList* batchNode = ncclIntruQueueDequeue(&wipChannels[c].workBatchQueue); if (batchPrev[c] != nullptr) { batchPrev[c]->nextJump = int(&batchZero[batchIx] - batchPrev[c]); } batchPrev[c] = &batchZero[batchIx]; batchZero[batchIx++] = batchNode->batch; } if (ncclIntruQueueEmpty(&wipChannels[c].workBatchQueue)) { hasBatchMask.masks[maskIdx] ^= 1ull<<(c%64); } } while (tmpMask != 0); } } // Merge-sort per-channel proxy-op lists by opCount when merging them into plan->proxyOpQueue // Phase 1: scan first op of each channel, store opCount in headIds[c]. uint64_t headIds[MAXCHANNELS]; int nHeads = 0; int channelUbound = 0; for (int c=0; c < MAXCHANNELS; c++) { struct ncclProxyOp* op = ncclIntruQueueHead(&wipChannels[c].proxyOpQueue); headIds[c] = op ? op->opCount : uint64_t(-1); if (op) nHeads += 1; if (op) plan->hasProxyOps = true; if (op) channelUbound = c+1; } // Phase 2: Dequeue from planner->channels[c], enqueue in merged order to plan while (nHeads != 0) { int c = -1; uint64_t minId = uint64_t(-1); // Find channel with least proxy-op id. We store the heads[c]->opCount in // headIds[c] to remove indirect loads from this loop. for (int c1=0; c1 < channelUbound; c1++) { uint64_t id = headIds[c1]; id = (id>>1 | id<<63); // Move tag bit to order collectives before p2p's if (id < minId) { c = c1; minId = id; } } struct ncclProxyOp* op = ncclIntruQueueDequeue(&wipChannels[c].proxyOpQueue); struct ncclProxyOp* opNext = ncclIntruQueueHead(&wipChannels[c].proxyOpQueue); headIds[c] = opNext ? opNext->opCount : uint64_t(-1); nHeads -= opNext ? 0 : 1; ncclIntruQueueEnqueue(&plan->proxyOpQueue, op); } } NCCL_PARAM(GraphRegister, "GRAPH_REGISTER", 1); static ncclResult_t getCollNetSupport(struct ncclComm* comm, struct ncclTaskColl* task, int* collNetSupport); rccl_static ncclResult_t getAlgoInfo( struct ncclComm* comm, struct ncclTaskColl* task, int collNetSupport, int nvlsSupport, int numPipeOps, ncclSimInfo_t* simInfo = NULL ); static ncclResult_t calcCollChunking( struct ncclComm* comm, struct ncclTaskColl* task, int nChannels, size_t nBytes, /*outputs*/uint32_t* outChunkSize, uint32_t* outDirectFlags, struct ncclProxyOp* proxyOp ); struct ncclKernelPlanBudget { ssize_t inArgsBytes; // Space available within kernel args struct ssize_t outArgsBytes; // Space available outside of args struct (fifo or persistent buf) }; static bool testBudget( struct ncclKernelPlanBudget* budget, int nWorkBatches, ssize_t workBytes ) { ssize_t batchBytes = nWorkBatches*sizeof(struct ncclDevWorkBatch); bool ok = false; ok |= (batchBytes + workBytes <= budget->inArgsBytes); ok |= (batchBytes <= budget->inArgsBytes) && (workBytes <= budget->outArgsBytes); return ok; } ncclResult_t ncclTasksRegAndEnqueue(struct ncclComm* comm) { struct ncclKernelPlanner* planner = &comm->planner; struct ncclTaskColl *task; task = ncclIntruQueueHead(&planner->collTaskQueue); while (task != nullptr) { // Build a ncclDevWorkColl[Reg?] struct for each task. void* regBufSend[NCCL_MAX_LOCAL_RANKS]; void* regBufRecv[NCCL_MAX_LOCAL_RANKS]; bool regNeedConnect = true; struct ncclWorkList* workNode = NULL; struct ncclDevWorkColl devWork = {}; if (task->algorithm == NCCL_ALGO_NVLS_TREE || task->algorithm == NCCL_ALGO_NVLS) { workNode = ncclIntruQueueDequeue(&planner->tmpCollWorkQueue); goto next; } ncclRegisterCollBuffers(comm, task, regBufSend, regBufRecv, &planner->collCleanupQueue, ®NeedConnect); devWork.sendbuff = (void*)task->sendbuff; devWork.recvbuff = (void*)task->recvbuff; devWork.sendbuffOffset = task->sendbuffOffset; devWork.recvbuffOffset = task->recvbuffOffset; devWork.sendbuffRmtAddrs = task->sendbuffRmtAddrs; devWork.recvbuffRmtAddrs = task->recvbuffRmtAddrs; devWork.root = task->root; devWork.nWarps = task->nWarps; devWork.redOpArg = task->opDev.scalarArg; devWork.redOpArgIsPtr = task->opDev.scalarArgIsPtr; devWork.oneNode = (comm->nNodes == 1); devWork.isOneRPN = comm->isOneRPN; devWork.netRegUsed = devWork.regUsed = 0; if (task->regBufType & NCCL_NET_REG_BUFFER) devWork.netRegUsed = 1; if (task->regBufType & (NCCL_IPC_REG_BUFFER | NCCL_NVLS_REG_BUFFER)) devWork.regUsed = 1; if (task->regBufType & NCCL_NVLS_REG_BUFFER) { struct ncclDevWorkCollReg workReg = {}; workReg.coll = devWork; // C++ struct assignment /* NVLS only has one send and recv buffer registered */ workReg.dnInputs[0] = regBufSend[0]; workReg.dnOutputs[0] = regBufRecv[0]; workNode = ncclMemoryStackAllocInlineArray(&comm->memScoped, 1); workNode->workType = ncclDevWorkTypeCollReg; workNode->size = sizeof(struct ncclDevWorkCollReg); memcpy((void*)(workNode+1), (void*)&workReg, workNode->size); } else { workNode = ncclMemoryStackAllocInlineArray(&comm->memScoped, 1); workNode->workType = ncclDevWorkTypeColl; workNode->size = sizeof(struct ncclDevWorkColl); memcpy((void*)(workNode+1), (void*)&devWork, workNode->size); } next: ncclIntruQueueEnqueue(&planner->collWorkQueue, workNode); task = task->next; } assert(ncclIntruQueueEmpty(&planner->tmpCollWorkQueue)); return ncclSuccess; } // Called once per ncclGroup to organize the user submitted tasks in // comm->planner so that they can be peeled off into plans. ncclResult_t ncclPrepareTasks(struct ncclComm* comm, bool* algoNeedConnect, bool* needConnect, ncclSimInfo_t* simInfo) { struct ncclKernelPlanner* planner = &comm->planner; planner->persistent = ncclCudaGraphValid(planner->capturingGraph); // Tasks from the sorter come out ordered size descending. struct ncclTaskColl* task = ncclTaskCollSorterDequeueAll(&planner->collSorter); // Tasks are assembled by (fn,op,ty) size ascending. struct ncclTaskColl* tasksByFnOpTy[ncclNumFuncs*ncclNumDevRedOps*ncclNumTypes]; memset(tasksByFnOpTy, 0, sizeof(tasksByFnOpTy)); int fnOpTyIndices[ncclNumFuncs*ncclNumDevRedOps*ncclNumTypes]; int fnOpTyCount = 0; // Walk the size sorted tasks, binning them by (fn,op,ty). while (task != nullptr) { struct ncclTaskColl* next = task->next; int index = ((int)task->func*ncclNumDevRedOps + (int)task->opDev.op)*ncclNumTypes + (int)task->datatype; // Add to set of (fn,op,ty) indices on first occurrence if (tasksByFnOpTy[index] == nullptr) fnOpTyIndices[fnOpTyCount++] = index; // Add to LIFO for this (fn,op,ty) task->next = tasksByFnOpTy[index]; tasksByFnOpTy[index] = task; // Next task task = next; } // Walk (fn,op,ty) bins, compute algo and proto etc. Then bin them by their // scheduling constraints (collnet x nvls). struct ncclIntruQueue collBins[2][2] = {}; for (int cursor=0; cursor < fnOpTyCount; cursor++) { struct ncclTaskColl* aggBeg = tasksByFnOpTy[fnOpTyIndices[cursor]]; int collNetSupport = 0; NCCLCHECK(getCollNetSupport(comm, aggBeg, &collNetSupport)); int nvlsSupport = comm->nvlsSupport && (ncclNvlsSupported(aggBeg->opDev.op, aggBeg->datatype) || aggBeg->func == ncclFuncAllGather); // Crudely estimate number of tasks per channel. This is using the wrong number // of channels for NVLS algos, but knowing the algo requires having this value, // so either be crude our iterate until fixed point, we chose the former. int nTasksPerChannel = divUp(comm->planner.nTasksColl, comm->nChannels); do { struct ncclTaskColl* aggEnd = aggBeg->next; struct ncclTaskColl agg = *aggBeg; // We aggregate operations that are within 4X size of each other. while (aggEnd != nullptr && aggEnd->trafficBytes < 4*aggBeg->trafficBytes) { agg.count += aggEnd->count; agg.trafficBytes += aggEnd->trafficBytes; aggEnd = aggEnd->next; } NCCLCHECK(getAlgoInfo(comm, &agg, collNetSupport, nvlsSupport, nTasksPerChannel, simInfo)); agg.devFuncId = ncclDevFuncId(agg.func, agg.opDev.op, agg.datatype, agg.algorithm, agg.protocol); if (agg.devFuncId < 0) { WARN("%s: unsupported collective. Please ensure the collective has been enabled in build.", __func__); return ncclInvalidUsage; } int isCollnet=0, isNvls=0; switch (agg.algorithm) { case NCCL_ALGO_NVLS: case NCCL_ALGO_NVLS_TREE: isNvls = 1; isCollnet = agg.algorithm == NCCL_ALGO_NVLS && comm->nNodes > 1; break; case NCCL_ALGO_COLLNET_CHAIN: case NCCL_ALGO_COLLNET_DIRECT: isCollnet = 1; break; } // Update the aggregated tasks with the computed values. do { struct ncclTaskColl* next = aggBeg->next; aggBeg->algorithm = agg.algorithm; aggBeg->protocol = agg.protocol; aggBeg->nMaxChannels = agg.nMaxChannels; aggBeg->nWarps = agg.nWarps; aggBeg->devFuncId = agg.devFuncId; aggBeg->isCollnet = isCollnet; aggBeg->isNvls = isNvls; ncclIntruQueueEnqueue(&collBins[isCollnet][isNvls], aggBeg); aggBeg = next; } while (aggBeg != aggEnd); } while (aggBeg != nullptr); } // Concatenate `collBins[*][*]` together into final list `planner->collTaskQueue`. // Collnet is the outer dimension since that affects how we divide over the // channels. for (int isCollnet=0; isCollnet <= 1; isCollnet++) { for (int isNvls=0; isNvls <= 1; isNvls++) { ncclIntruQueueTransfer(&planner->collTaskQueue, &collBins[isCollnet][isNvls]); } } // Walk tasks again to: // 1. Possibly register buffers. // 2. Build ncclDevWorkColl structs. // 3. Bin the work structs according to the number of valid channels they // may be assigned to {collnet, nvls, standard} task = ncclIntruQueueHead(&planner->collTaskQueue); while (task != nullptr) { // Build a ncclDevWorkColl[Reg?] struct for each task. void* regBufSend[NCCL_MAX_LOCAL_RANKS]; void* regBufRecv[NCCL_MAX_LOCAL_RANKS]; bool regNeedConnect = true; ncclRegisterCollNvlsBuffers(comm, task, regBufSend, regBufRecv, &planner->collCleanupQueue, ®NeedConnect); if (comm->runtimeConn && comm->initAlgoChannels[task->algorithm] == false) { if (task->algorithm == NCCL_ALGO_NVLS_TREE && comm->initAlgoChannels[NCCL_ALGO_NVLS] == false && regNeedConnect == true) { comm->initAlgoChannels[NCCL_ALGO_NVLS] = true; algoNeedConnect[NCCL_ALGO_NVLS] = true; } if (task->algorithm != NCCL_ALGO_NVLS || regNeedConnect == true) { comm->initAlgoChannels[task->algorithm] = true; algoNeedConnect[task->algorithm] = true; *needConnect = true; } } if (task->algorithm == NCCL_ALGO_NVLS_TREE || task->algorithm == NCCL_ALGO_NVLS) { struct ncclDevWorkColl devWork = {}; devWork.sendbuff = (void*)task->sendbuff; devWork.recvbuff = (void*)task->recvbuff; devWork.sendbuffOffset = task->sendbuffOffset; devWork.recvbuffOffset = task->recvbuffOffset; devWork.sendbuffRmtAddrs = task->sendbuffRmtAddrs; devWork.recvbuffRmtAddrs = task->recvbuffRmtAddrs; devWork.root = task->root; devWork.nWarps = task->nWarps; devWork.redOpArg = task->opDev.scalarArg; devWork.redOpArgIsPtr = task->opDev.scalarArgIsPtr; devWork.oneNode = (comm->nNodes == 1); devWork.netRegUsed = devWork.regUsed = 0; if (task->regBufType & NCCL_NET_REG_BUFFER) devWork.netRegUsed = 1; if (task->regBufType & (NCCL_IPC_REG_BUFFER | NCCL_NVLS_REG_BUFFER)) devWork.regUsed = 1; devWork.pivotA2ANumBiRings = comm->topo->pivotA2ANumBiRings; devWork.opCount = task->opCount; struct ncclWorkList* workNode; if (task->regBufType & NCCL_NVLS_REG_BUFFER) { struct ncclDevWorkCollReg workReg = {}; workReg.coll = devWork; // C++ struct assignment /* NVLS only has one send and recv buffer registered */ workReg.dnInputs[0] = regBufSend[0]; workReg.dnOutputs[0] = regBufRecv[0]; workNode = ncclMemoryStackAllocInlineArray(&comm->memScoped, 1); workNode->workType = ncclDevWorkTypeCollReg; workNode->size = sizeof(struct ncclDevWorkCollReg); memcpy((void*)(workNode + 1), (void*)&workReg, workNode->size); } else { workNode = ncclMemoryStackAllocInlineArray(&comm->memScoped, 1); workNode->workType = ncclDevWorkTypeColl; workNode->size = sizeof(struct ncclDevWorkColl); memcpy((void*)(workNode + 1), (void*)&devWork, workNode->size); } ncclIntruQueueEnqueue(&planner->tmpCollWorkQueue, workNode); } task = task->next; } return ncclSuccess; } RCCL_PARAM(IntraNetThreshold, "INTRANET_THRESHOLD", 8388608); static ncclResult_t scheduleCollTasksToPlan( struct ncclComm* comm, struct ncclKernelPlan* plan, struct ncclKernelPlanBudget* budget ) { struct ncclKernelPlanner* planner = &comm->planner; // Estimate number of tasks that will fit in this plan. int nPlanColls = 0; size_t trafficBytes[2*2] = {0, 0, 0, 0}; // [collnet][nvls] int nChannels[2*2] = {0, 0, 0, 0}; // [collnet][nvls] int const nMaxChannels[2*2] = {comm->nChannels, comm->nvlsChannels, // [collnet][nvls] comm->nChannels, comm->nvlsChannels}; constexpr size_t MinTrafficPerChannel = 512; // Traffic as minimal do { size_t workBytes = 0; struct ncclTaskColl* task = ncclIntruQueueHead(&planner->collTaskQueue); struct ncclWorkList* workNode = ncclIntruQueueHead(&planner->collWorkQueue); while (task != nullptr) { int nBatches = divUp(nPlanColls, 4); // Rough guess: 4 colls per batch. if (!testBudget(budget, nBatches, workBytes + workNode->size)) goto plan_full; nPlanColls += 1; workBytes += workNode->size; int kind = 2*task->isCollnet + task->isNvls; trafficBytes[kind] += std::max(MinTrafficPerChannel, task->trafficBytes); nChannels[kind] += task->nMaxChannels; nChannels[kind] = std::min(nChannels[kind], nMaxChannels[kind]); task = task->next; workNode = workNode->next; } plan_full:; } while (0); int kindPrev = -1; size_t trafficPerChannel = 0; int channelId = 0; size_t currentTraffic = 0; while (nPlanColls!=0 && !ncclIntruQueueEmpty(&planner->collTaskQueue)) { struct ncclTaskColl* task = ncclIntruQueueHead(&planner->collTaskQueue); struct ncclWorkList* workNode = ncclIntruQueueHead(&planner->collWorkQueue); struct ncclDevWorkColl* devWork = (struct ncclDevWorkColl*)(workNode+1); size_t elementSize = ncclTypeSize(task->datatype); int kind = 2*task->isCollnet + task->isNvls; if (kind != kindPrev) { trafficPerChannel = std::max(MinTrafficPerChannel, trafficBytes[kind]/nChannels[kind]); kindPrev = kind; channelId = 0; currentTraffic = 0; } if (task->isCollnet) { int nChannels = task->nMaxChannels; // Ensure room for worst case of one new batch per channel if (!testBudget(budget, plan->nWorkBatches + nChannels, plan->workBytes + workNode->size)) { return ncclSuccess; } size_t globalBytesPerElement = elementSize*ncclFuncMaxSendRecvCount(task->func, comm->nRanks, 1); struct ncclProxyOp proxyOp; uint32_t chunkSize, directFlags=0; size_t nBytes = globalBytesPerElement*task->count; NCCLCHECK(calcCollChunking(comm, task, nChannels, nBytes, &chunkSize, &directFlags, &proxyOp)); devWork->channelLo = 0; devWork->channelHi = nChannels-1; devWork->collnet.count = task->count; devWork->collnet.chunkCount = chunkSize/ncclTypeSize(task->datatype); devWork->direct = directFlags; uint64_t proxyOpId = uint64_t(plan->collOpCount++)<<1 | 0; for (int c=devWork->channelLo; c <= (int)devWork->channelHi; c++) { proxyOp.channelId = c; proxyOp.opCount = proxyOpId; proxyOp.task.coll = task; proxyOp.rank = comm->rank; addWorkBatchToPlan(comm, plan, c, workNode->workType, task->devFuncId, plan->workBytes); NCCLCHECK(addProxyOpIfNeeded(comm, plan, &proxyOp)); } } else { // not task->isCollnet int trafficPerByte = ncclFuncTrafficPerByte(task->func, comm->nRanks); size_t cellSize = divUp(divUp(MinTrafficPerChannel, (size_t)trafficPerByte), 16) * 16; int elementsPerCell = cellSize/elementSize; size_t cells = divUp(task->count*elementSize, cellSize); size_t trafficPerElement = elementSize*trafficPerByte; size_t trafficPerCell = cellSize*trafficPerByte; size_t cellsPerChannel = std::min(cells, divUp(trafficPerChannel, trafficPerCell)); size_t cellsLo; if (channelId+1 == nMaxChannels[kind]) { // On last channel everything goes to "lo" cellsLo = cells; } else { cellsLo = std::min(cells, divUp((trafficPerChannel-currentTraffic),trafficPerCell)); } int nMidChannels = (cells-cellsLo)/cellsPerChannel; size_t cellsHi = (cells-cellsLo)%cellsPerChannel; int nChannels = (cellsLo!=0 ? 1 : 0) + nMidChannels + (cellsHi!=0 ? 1 : 0); if (nMaxChannels[kind] < channelId + nChannels) { // Overflowed available channels nMidChannels = nMaxChannels[kind] - channelId - 2; cellsPerChannel = (cells-cellsLo)/(nMidChannels+1); cellsHi = cellsPerChannel + (cells-cellsLo)%(nMidChannels+1); } if (cellsHi == 0 && nMidChannels != 0) { cellsHi = cellsPerChannel; nMidChannels -= 1; } if (cellsLo == 0) { // Least channel skipped. Make the next channel the new least. channelId += 1; if (nMidChannels == 0) { cellsLo = cellsHi; cellsHi = 0; } else { cellsLo = cellsPerChannel; nMidChannels -= 1; } } size_t countMid = nMidChannels!=0 ? cellsPerChannel*elementsPerCell : 0; size_t countLo = cellsLo*elementsPerCell; size_t countHi = cellsHi*elementsPerCell; (countHi != 0 ? countHi : countLo) -= cells*elementsPerCell - task->count; nChannels = (countLo!=0 ? 1 : 0) + nMidChannels + (cellsHi!=0 ? 1 : 0); // Ensure room for worst case of one new batch per channel if (!testBudget(budget, plan->nWorkBatches + nChannels, plan->workBytes + workNode->size)) { return ncclSuccess; } devWork->channelLo = channelId; devWork->channelHi = channelId + nChannels-1; devWork->cbd.countLo = countLo; devWork->cbd.countMid = countMid; devWork->cbd.countHi = countHi; // calcCollChunking() uses global bytes instead of traffic which differs // in that allreduce isn't multiplied by 2. size_t globalBytesPerElement = elementSize*ncclFuncMaxSendRecvCount(task->func, comm->nRanks, 1); struct ncclProxyOp proxyOpLo, proxyOpMid, proxyOpHi; size_t nBytes = globalBytesPerElement*task->count; devWork->connIndex = 0; if (task->protocol == NCCL_PROTO_SIMPLE && task->algorithm == NCCL_ALGO_RING) { if (comm->useIntraNet && nBytes > rcclParamIntraNetThreshold()) { devWork->connIndex = NCCL_CONN_IDX_P2P_NET; } } uint32_t chunkSize, directFlags=0; size_t grainSize = ncclProtoGrainSize(task->protocol); if (countLo != 0) { NCCLCHECK(calcCollChunking(comm, task, /*nChannels=*/1, globalBytesPerElement*countLo, &chunkSize, &directFlags, &proxyOpLo)); devWork->cbd.chunkGrainsLo = chunkSize/grainSize; } if (countHi != 0) { NCCLCHECK(calcCollChunking(comm, task, /*nChannels=*/1, globalBytesPerElement*countHi, &chunkSize, &directFlags, &proxyOpHi)); devWork->cbd.chunkGrainsHi = chunkSize/grainSize; } if (nMidChannels != 0) { NCCLCHECK(calcCollChunking(comm, task, /*nChannels=*/1, globalBytesPerElement*countMid, &chunkSize, &directFlags, &proxyOpMid)); devWork->cbd.chunkGrainsMid = chunkSize/grainSize; } devWork->direct = directFlags; // Update the current channel and vacant traffic budget. if (countHi != 0) { channelId += nChannels-1; currentTraffic = cellsHi*elementsPerCell*trafficPerElement; } else if (nMidChannels != 0) { channelId += nChannels; currentTraffic = 0; } else { currentTraffic += cellsLo*elementsPerCell*trafficPerElement; } if (currentTraffic >= trafficPerChannel && channelId+1 != nMaxChannels[kind]) { channelId += 1; currentTraffic = 0; } uint64_t proxyOpId = uint64_t(plan->collOpCount++)<<1 | 0; for (int c=devWork->channelLo; c <= (int)devWork->channelHi; c++) { struct ncclProxyOp* proxyOp; if (c == (int)devWork->channelLo) { proxyOp = &proxyOpLo; proxyOp->loopOffset = 0; proxyOp->channelSize = countLo * elementSize; } else if (c == (int)devWork->channelHi) { proxyOp = &proxyOpHi; proxyOp->loopOffset = (countLo + nMidChannels * countMid) * elementSize; proxyOp->channelSize = countHi * elementSize; } else { proxyOp = &proxyOpMid; proxyOp->loopOffset = (countLo + (c - devWork->channelLo - 1) * countMid) * elementSize; proxyOp->channelSize = countMid * elementSize; } proxyOp->channelId = c; proxyOp->opCount = proxyOpId; proxyOp->task.coll = task; proxyOp->rank = comm->rank; proxyOp->ringAlgo = NULL; if (proxyOp->reg && task->algorithm == NCCL_ALGO_RING && (task->recvNetHandles[c] || task->sendNetHandles[c])) { if (task->func == ncclFuncAllGather) { proxyOp->ringAlgo = new RingAGAlgorithm(task->sendbuff, task->recvbuff, comm->nRanks, comm->channels[c].ring.userRanks, proxyOp->chunkSteps, proxyOp->sliceSteps, proxyOp->chunkSize, proxyOp->sliceSize, proxyOp->loopOffset, proxyOp->channelSize, elementSize, task->count * elementSize, task->sendNetHandles[c], task->recvNetHandles[c], task->srecvNetHandles[c]); } else if (task->func == ncclFuncAllReduce) { proxyOp->ringAlgo = new RingARAlgorithm(task->sendbuff, task->recvbuff, comm->nRanks, comm->channels[c].ring.index, proxyOp->chunkSteps, proxyOp->sliceSteps, proxyOp->chunkSize, proxyOp->sliceSize, proxyOp->loopOffset, proxyOp->channelSize, elementSize, task->sendNetHandles[c], task->recvNetHandles[c], task->srecvNetHandles[c]); } else if (task->func == ncclFuncBroadcast) { proxyOp->ringAlgo = new RingBCAlgorithm(task->sendbuff, task->recvbuff, comm->rank, task->root, comm->nRanks, comm->channels[c].ring.userRanks, proxyOp->chunkSteps, proxyOp->sliceSteps, proxyOp->chunkSize, proxyOp->sliceSize, proxyOp->loopOffset, proxyOp->channelSize, task->sendNetHandles[c], task->recvNetHandles[c], task->srecvNetHandles[c]); } proxyOp->ringAlgo->incRefCount(); } proxyOp->connIndex = 0; if (task->protocol == NCCL_PROTO_SIMPLE && task->algorithm == NCCL_ALGO_RING) { if (comm->useIntraNet && nBytes > rcclParamIntraNetThreshold()) { proxyOp->connIndex = NCCL_CONN_IDX_P2P_NET; } } addWorkBatchToPlan(comm, plan, c, workNode->workType, task->devFuncId, plan->workBytes); // Coverity reports "proxyOp->connection" as being possibly uninitialized. It's hard to // determine if that's actually true but it's also not clear if that would be an issue. // coverity[uninit_use_in_call:FALSE] NCCLCHECK(addProxyOpIfNeeded(comm, plan, proxyOp)); } } for (int c = devWork->channelLo; c <= devWork->channelHi; ++c) { int maskIdx = c / 64; int relativeIdx = c % 64; plan->channelMask.masks[maskIdx] |= (1ull<channelMask.masks[channelId/64] |= (2ull<channelHi) - (1ull<channelLo); plan->threadPerBlock = std::max(plan->threadPerBlock, 3*plan->comm->WarpSize); if (!plan->kernelSpecialized) { plan->kernelFn = ncclKerns[ncclGetKernelIndex(comm)].kernelFn; plan->kernelSpecialized = ncclKerns[ncclGetKernelIndex(comm)].specialized; } if (comm->rank == 0) { INFO(NCCL_TUNING, "%s: %ld Bytes -> Algo %s proto %s channel{Lo..Hi}={%d..%d}", ncclFuncToString(task->func), task->count * ncclTypeSize(task->datatype), ncclAlgoToString(task->algorithm), ncclProtoToString(task->protocol), devWork->channelLo, devWork->channelHi); if (task->isCollnet) { TRACE(NCCL_COLL, "Collective %s(%s, %s, %s, %s) count=%ld devFuncId=%d channel{Lo..Hi}={%d..%d} count=%ld chunkCount=%d", ncclFuncToString(task->func), ncclDevRedOpToString(task->opDev.op), ncclDatatypeToString(task->datatype), ncclAlgoToString(task->algorithm), ncclProtoToString(task->protocol), (long)task->count, task->devFuncId, devWork->channelLo, devWork->channelHi, (long)devWork->collnet.count, devWork->collnet.chunkCount); } else { TRACE(NCCL_COLL, "Collective %s(%s, %s, %s, %s) count=%ld devFuncId=%d channel{Lo..Hi}={%d..%d} count{Lo,Mid,Hi}={%ld,%ld,%ld} chunkBytes{Lo,Mid,Hi}={%d,%d,%d}", ncclFuncToString(task->func), ncclDevRedOpToString(task->opDev.op), ncclDatatypeToString(task->datatype), ncclAlgoToString(task->algorithm), ncclProtoToString(task->protocol), (long)task->count, task->devFuncId, devWork->channelLo, devWork->channelHi, (long)devWork->cbd.countLo, (long)devWork->cbd.countMid, (long)devWork->cbd.countHi, int(devWork->cbd.chunkGrainsLo*ncclProtoGrainSize(task->protocol)), int(devWork->cbd.chunkGrainsMid*ncclProtoGrainSize(task->protocol)), int(devWork->cbd.chunkGrainsHi*ncclProtoGrainSize(task->protocol))); } } for (int i=0; i < task->nCleanupQueueElts; i++) { ncclIntruQueueEnqueue(&plan->cleanupQueue, ncclIntruQueueDequeue(&planner->collCleanupQueue)); } ncclIntruQueueDequeue(&planner->collTaskQueue); ncclIntruQueueDequeue(&planner->collWorkQueue); nPlanColls -= 1; planner->nTasksColl -= 1; ncclIntruQueueEnqueue(&plan->collTaskQueue, task); ncclIntruQueueEnqueue(&plan->workQueue, workNode); plan->workBytes += workNode->size; } return ncclSuccess; } NCCL_PARAM(P2pLLThreshold, "P2P_LL_THRESHOLD", 16384); RCCL_PARAM(P2pNetThreshold, "P2P_NET_THRESHOLD", 131072); NCCL_PARAM(ChunkSize, "CHUNK_SIZE", 0); // Put p2p op in plan assuming there is sizeof(ncclDevWorkBatch) in batch budget // and sizeof(ncclDevWorkP2p) in work budget. "sendRank" and "recvRank" must // match the corresponding values for this round of the p2p schedule (no -1's). // No-op's are encoded with a -1 size. static ncclResult_t addP2pToPlan( struct ncclComm* comm, struct ncclKernelPlan* plan, int nChannelsMin, int nChannelsMax, int p2pRound, int sendRank, void* sendAddr, ssize_t sendBytes, int recvRank, void* recvAddr, ssize_t recvBytes, uint64_t sendOpCount, uint64_t recvOpCount, struct ncclTaskP2p** p2pTasks ) { int connIndex[2] = {1, 1}; bool selfSend = (sendRank == comm->rank); // recv: dir=0, send: dir=1 void* addrs[2] = {recvAddr, sendAddr}; ssize_t bytes[2] = {recvBytes, sendBytes}; bool protoLL[2] = {!selfSend, !selfSend}; bool network[2] = {false, false}; bool proxySameProcess[2] = {true, true}; void** handles[2] = {NULL, NULL}; uint8_t base = ncclP2pChannelBaseForRound(comm, p2pRound); if (comm->p2pNet) { for (int dir = 0; dir <= 1; dir++) { if (bytes[dir] > rcclParamP2pNetThreshold()) connIndex[dir] = NCCL_CONN_IDX_P2P_NET; } } if (!selfSend) { for (int part=0; part < nChannelsMax; part++) { int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, part, nChannelsMax, comm->nNodes); struct ncclChannelPeer** channelPeers = comm->channels[channelId].peers; for (int dir=0; dir <= 1; dir++) { int peerRank = dir ? sendRank : recvRank; struct ncclConnector* conn = dir ? &channelPeers[peerRank]->send[connIndex[dir]] : &channelPeers[peerRank]->recv[connIndex[dir]]; protoLL[dir] &= conn->conn.buffs[NCCL_PROTO_LL] != nullptr && !IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx12"); network[dir] |= conn->transportComm == (dir ? &netTransport.send : &netTransport.recv); proxySameProcess[dir] &= conn->proxyConn.sameProcess; } } } ssize_t thresholdLL = nChannelsMax*ncclParamP2pLLThreshold(); ssize_t paramChunkSize = ncclParamChunkSize(); // Arrays indexed by dir where recv=0, send=1: int nChannels[2]; int protocol[2]; int stepSize[2]; int chunkSize[2]; int chunkDataSize[2]; int chunkDataSize_u32fp8[2]; bool netRegistered[2] = {false, false}; bool ipcRegistered[2] = {false, false}; for (int dir=0; dir < 2; dir++) { // 0=recv, 1=send if (bytes[dir] != -1) protoLL[dir] &= bytes[dir] <= thresholdLL; protocol[dir] = protoLL[dir] ? NCCL_PROTO_LL : NCCL_PROTO_SIMPLE; stepSize[dir] = comm->buffSizes[protocol[dir]]/NCCL_STEPS; if (protocol[dir] == NCCL_PROTO_SIMPLE) stepSize[dir] = comm->p2pChunkSize; chunkSize[dir] = stepSize[dir]; if (paramChunkSize != 0) { chunkSize[dir] = paramChunkSize; } else if (network[dir]) { // Tune chunk size for the network if (protocol[dir] == NCCL_PROTO_SIMPLE && bytes[dir] < stepSize[dir]) chunkSize[dir] /= 4; else if (bytes[dir] < 8*stepSize[dir]) chunkSize[dir] /= 2; } chunkDataSize[dir] = chunkSize[dir]; if (protocol[dir] == NCCL_PROTO_LL) chunkDataSize[dir] /= 2; chunkDataSize_u32fp8[dir] = u32fp8Encode(chunkDataSize[dir]); chunkDataSize[dir] = u32fp8Decode(chunkDataSize_u32fp8[dir]); chunkSize[dir] = chunkDataSize[dir]; if (protocol[dir] == NCCL_PROTO_LL) chunkSize[dir] *= 2; if (network[dir]) { if (bytes[dir] > 0 && proxySameProcess[dir] && protocol[dir] == NCCL_PROTO_SIMPLE && (ncclPxnDisable(comm) || !comm->isAllNvlink)) { int regFlag = 0; NCCLCHECK(ncclCalloc(&handles[dir], nChannelsMax)); for (int part = 0; part < nChannelsMax; part++) { int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, part, nChannelsMax, comm->nNodes); struct ncclChannelPeer** channelPeers = comm->channels[channelId].peers; int peerRank = dir ? sendRank : recvRank; struct ncclConnector* conn = dir ? &channelPeers[peerRank]->send[connIndex[dir]] : &channelPeers[peerRank]->recv[connIndex[dir]]; if (conn->conn.flags & NCCL_DIRECT_NIC) ncclRegisterP2pNetBuffer(comm, addrs[dir], bytes[dir], conn, ®Flag, &handles[dir][part], &plan->cleanupQueue); if (!regFlag) break; } netRegistered[dir] = regFlag ? true : false; } } else if (bytes[dir] > 0 && addrs[dir] && protocol[dir] == NCCL_PROTO_SIMPLE && !selfSend) { int peerRank = dir ? sendRank : recvRank; int regFlag = 0; int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, 0, nChannelsMax, comm->nNodes); struct ncclChannelPeer** channelPeers = comm->channels[channelId].peers; struct ncclConnector* conn = dir ? &channelPeers[peerRank]->send[connIndex[dir]] : &channelPeers[peerRank]->recv[connIndex[dir]]; void* regAddr = NULL; if (conn->conn.flags & (NCCL_P2P_WRITE | NCCL_P2P_READ)) { // We require users registering buffers on both sides NCCLCHECK(ncclRegisterP2pIpcBuffer(comm, addrs[dir], bytes[dir], peerRank, ®Flag, ®Addr, &plan->cleanupQueue)); if (regFlag) { if (dir == 0 && (conn->conn.flags & NCCL_P2P_WRITE)) recvAddr = regAddr; else if (dir == 1 && (conn->conn.flags & NCCL_P2P_READ)) sendAddr = regAddr; } } ipcRegistered[dir] = regFlag ? true : false; } if (bytes[dir] == -1) nChannels[dir] = 0; else if (bytes[dir] == 0) nChannels[dir] = 1; else { ssize_t minPartSize = comm->nNodes > 1 ? stepSize[dir]/2 : stepSize[dir]/8; ssize_t maxPartSize = comm->nNodes > 1 ? stepSize[dir] : stepSize[dir]*32; nChannels[dir] = std::min(nChannelsMin, divUp(bytes[dir], minPartSize)); size_t partSize = std::max(minPartSize, divUp(bytes[dir], nChannels[dir])); while (partSize > maxPartSize && nChannels[dir] <= nChannelsMax/2) { nChannels[dir] *= 2; partSize = divUp(bytes[dir], nChannels[dir]); } } } struct ncclWorkList* workNode = ncclMemoryStackAllocInlineArray(&comm->memScoped, 1); workNode->workType = ncclDevWorkTypeP2p; workNode->size = sizeof(struct ncclDevWorkP2p); ncclIntruQueueEnqueue(&plan->workQueue, workNode); uint32_t workOffset = plan->workBytes; plan->workBytes += sizeof(struct ncclDevWorkP2p); struct ncclDevWorkP2p* work = (struct ncclDevWorkP2p*)(workNode+1); work->nP2pChannels = comm->p2pnChannels; work->channelBase = base; work->nSendChannels = nChannels[1]; work->sendProtoLL = protoLL[1]; work->sendNetReg = netRegistered[1]; work->sendIpcReg = ipcRegistered[1]; work->sendChunkSize_u32fp8 = chunkDataSize_u32fp8[1]; work->sendRank = sendRank; work->sendAddr = sendAddr; work->sendBytes = sendBytes==-1 ? 0 : sendBytes; work->sendConnIndex = connIndex[1]; work->sendOpCount = sendOpCount; work->nRecvChannels = nChannels[0]; work->recvProtoLL = protoLL[0]; work->recvNetReg = netRegistered[0]; work->recvIpcReg = ipcRegistered[0]; work->recvChunkSize_u32fp8 = chunkDataSize_u32fp8[0]; work->recvRank = recvRank; work->recvAddr = recvAddr; work->recvBytes = recvBytes==-1 ? 0 : recvBytes; work->recvConnIndex = connIndex[0]; work->recvOpCount = recvOpCount; struct ncclProxyOp proxyOps[2] = {}; int nProxyOps = selfSend ? 0 : 2; for (int dir=0; dir < nProxyOps; dir++) { struct ncclProxyOp* op = &proxyOps[dir]; op->root = dir ? sendRank : recvRank; op->sliceSteps = 1; op->chunkSteps = 1; op->dtype = ncclInt8; op->redOp = ncclSum; op->protocol = protocol[dir]; op->pattern = dir ? ncclPatternSend : ncclPatternRecv; op->chunkSize = chunkSize[dir]; op->reg = netRegistered[dir]; op->coll = p2pTasks[dir] ? p2pTasks[dir]->func : 0; op->task.p2p = p2pTasks[dir]; op->rank = comm->rank; op->connIndex = connIndex[dir]; // The following are modified per channel part in addWorkToChannels(): // op->buffer, op->nbytes, op->nsteps = ...; } nChannelsMax = std::max(nChannels[0], nChannels[1]); for (int part=0; part < nChannelsMax; part++) { int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, part, comm->p2pnChannelsPerPeer, comm->nNodes); plan->channelMask.masks[channelId/64] |= uint64_t(1)<<(channelId%64); // Add batch first. int funcIdx = ncclDevFuncId_P2p(); addWorkBatchToPlan(comm, plan, channelId, ncclDevWorkTypeP2p, funcIdx, workOffset, p2pRound); if (funcIdx < 0) { WARN("%s: unsupported collective. Please ensure the collective has been enabled in build.", __func__); return ncclInvalidUsage; } // Add proxy ops. for (int dir=0; dir < nProxyOps; dir++) { // Partition steps across channels. int nParts = dir ? work->nSendChannels : work->nRecvChannels; void* addr = dir ? work->sendAddr : work->recvAddr; size_t bytes = dir ? work->sendBytes : work->recvBytes; proxyOps[dir].recvbuff = nullptr; if (nParts <= part) { proxyOps[dir].nsteps = 0; } else if (bytes == 0) { proxyOps[dir].nsteps = 1; proxyOps[dir].nbytes = 0; } else { size_t chunkDataSize = u32fp8Decode(dir ? work->sendChunkSize_u32fp8 : work->recvChunkSize_u32fp8); size_t partBeg, partEnd; ncclP2pPartBounds(nParts, part, bytes, &partBeg, &partEnd); if (proxyOps[dir].reg) { (dir ? proxyOps[dir].sendbuff : proxyOps[dir].recvbuff) = (uint8_t*)addr + partBeg; (dir ? proxyOps[dir].sendMhandle : proxyOps[dir].recvMhandle) = handles[dir][part]; proxyOps[dir].nbytes = partEnd - partBeg; proxyOps[dir].nsteps = DIVUP(proxyOps[dir].nbytes, NCCL_MAX_NET_SIZE); } else { proxyOps[dir].nsteps = divUp(partEnd-partBeg, chunkDataSize); proxyOps[dir].nbytes = std::min(partEnd-partBeg, chunkDataSize); } if (proxyOps[dir].protocol == NCCL_PROTO_LL) { proxyOps[dir].nbytes *= 2; proxyOps[dir].nbytes = roundUp(proxyOps[dir].nbytes, sizeof(union ncclLLFifoLine)); } } if (proxyOps[dir].nsteps != 0) { // Calculate the opCount after adding batch since then the batch count will // equal one plus the batch index this p2p settled in. proxyOps[dir].channelId = channelId; proxyOps[dir].opCount = uint64_t(comm->planner.wipPlan.channels[channelId].nWorkBatchesP2p)<<1 | 1; NCCLCHECK(addProxyOpIfNeeded(comm, plan, &proxyOps[dir])); } } } return ncclSuccess; } static int calcP2pChannelCount(size_t totalSize, int minChannels, int maxChannels, size_t minSize, size_t maxSize) { size_t size = std::max(minSize, divUp(totalSize, minChannels)); int nChannels = minChannels; while (size > maxSize && nChannels <= maxChannels/2) { nChannels *= 2; size = divUp(totalSize, nChannels); } return nChannels; } static ncclResult_t scheduleP2pTasksToPlan( struct ncclComm* comm, struct ncclKernelPlan* plan, struct ncclKernelPlanBudget* budget ) { int nRanks = comm->nRanks; struct ncclKernelPlanner::Peer* peers = comm->planner.peers; plan->threadPerBlock = std::max(plan->threadPerBlock, NCCL_MAX_NTHREADS); if (!plan->kernelSpecialized) { plan->kernelFn = ncclKerns[ncclGetKernelIndex(comm)].kernelFn; plan->kernelSpecialized = ncclKerns[ncclGetKernelIndex(comm)].specialized; } // Compute how much to split operations // Try to use all channels int nChannelsMax = comm->p2pnChannelsPerPeer; int nChannelsMin = nChannelsMax; // Try to use all channels, but one channel per operation. while (nChannelsMin*nRanks > comm->p2pnChannels && nChannelsMin > 1) nChannelsMin /= 2; while (comm->planner.nTasksP2p != 0) { for (int round=0; round < nRanks; round++) { int sendRank = comm->p2pSchedule[round].sendRank; int recvRank = comm->p2pSchedule[round].recvRank; struct ncclTaskP2p* send = ncclIntruQueueHead(&peers[sendRank].sendQueue); struct ncclTaskP2p* recv = ncclIntruQueueHead(&peers[recvRank].recvQueue); if (send == nullptr && recv == nullptr) continue; if (sendRank == comm->rank) { if (send != nullptr && recv == nullptr) { WARN("Trying to send to self without a matching recv"); return ncclInvalidUsage; } if (send == nullptr && recv != nullptr) { WARN("Trying to recv to self without a matching send"); return ncclInvalidUsage; } } ssize_t sendBytes = send ? send->bytes : -1; ssize_t recvBytes = recv ? recv->bytes : -1; void* sendBuff = send ? send->buff : nullptr; void* recvBuff = recv ? recv->buff : nullptr; if (sendRank == comm->rank && send->buff == recv->buff) { // Skip send to self in-place (we don't need to support this). ncclIntruQueueDequeue(&peers[sendRank].sendQueue); ncclIntruQueueDequeue(&peers[recvRank].recvQueue); ncclMemoryPoolFree(&comm->memPool_ncclTaskP2p, send); ncclMemoryPoolFree(&comm->memPool_ncclTaskP2p, recv); comm->planner.nTasksP2p -= 2; } else { // Ensure room for worst case of one new batch per channel. if (!testBudget(budget, plan->nWorkBatches+nChannelsMax, plan->workBytes + sizeof(struct ncclDevWorkP2p))) { return ncclSuccess; } struct ncclTaskP2p* p2pTasks[2] = { recv, send }; NCCLCHECK(addP2pToPlan(comm, plan, nChannelsMin, nChannelsMax, round, sendRank, sendBuff, sendBytes, recvRank, recvBuff, recvBytes, send ? send->opCount : 0, recv ? recv->opCount : 0, p2pTasks)); if (send != nullptr) { ncclIntruQueueDequeue(&peers[sendRank].sendQueue); ncclIntruQueueEnqueue(&plan->p2pTaskQueue, send); comm->planner.nTasksP2p -= 1; } if (recv != nullptr) { ncclIntruQueueDequeue(&peers[recvRank].recvQueue); ncclIntruQueueEnqueue(&plan->p2pTaskQueue, recv); comm->planner.nTasksP2p -= 1; } } } } return ncclSuccess; } // Spin until its safe to increase comm->workFifoProduced to desiredProduced. static void waitWorkFifoAvailable(struct ncclComm* comm, uint32_t desiredProduced) { bool hasRoom = (desiredProduced - comm->workFifoConsumedLeast) <= comm->workFifoBytes; if (hasRoom) return; while (true) { // We have to poll for notifications from device. uint32_t* consumedLive = comm->workFifoConsumed; uint32_t consumed[MAXCHANNELS]; for (int c=0; c < MAXCHANNELS; c++) { consumed[c] = __atomic_load_n(&consumedLive[c], __ATOMIC_RELAXED); } // Compiler-only fence to prevent fusion of loops to encourage dense loads. __atomic_signal_fence(__ATOMIC_SEQ_CST); uint32_t produced = comm->workFifoProduced; uint32_t consumedLeast = produced; for (int c=0; c < MAXCHANNELS; c++) { // consumedLeast is min over all non-quiesced channels if (consumed[c] != comm->channels[c].workFifoProduced) { if ((produced - consumedLeast) < (produced - consumed[c])) { consumedLeast = consumed[c]; } } } // Compiler only fence to prevent fusion of loops to encourage dense stores. __atomic_signal_fence(__ATOMIC_SEQ_CST); for (int c=0; c < MAXCHANNELS; c++) { // Advance counter on quiesced channels so they don't lag behind // too far where they could get lost in 32-bit wraparound. if (consumed[c] == comm->channels[c].workFifoProduced) { comm->channels[c].workFifoProduced = consumedLeast; __atomic_store_n(&consumedLive[c], consumedLeast, __ATOMIC_RELAXED); } } comm->workFifoConsumedLeast = consumedLeast; hasRoom = (desiredProduced - comm->workFifoConsumedLeast) <= comm->workFifoBytes; if (hasRoom) break; sched_yield(); } } namespace { struct uploadWork_cleanup_t { struct ncclCommEventCallback base; void *hostBuf; }; ncclResult_t uploadWork_cleanup_fn( struct ncclComm* comm, struct ncclCommEventCallback* cb ) { struct uploadWork_cleanup_t* me = (struct uploadWork_cleanup_t*)cb; free(me->hostBuf); CUDACHECK(cudaEventDestroy(me->base.event)); return ncclSuccess; } } static ncclResult_t uploadWork(struct ncclComm* comm, struct ncclKernelPlan* plan) { size_t workBytes = plan->workBytes; size_t batchBytes = plan->nWorkBatches*sizeof(struct ncclDevWorkBatch); void* fifoBufHost; uint32_t fifoCursor, fifoMask; switch (plan->workStorageType) { case ncclDevWorkStorageTypeArgs: plan->kernelArgs->workBuf = nullptr; fifoBufHost = (void*)plan->kernelArgs; fifoCursor = sizeof(ncclDevKernelArgs) + batchBytes; fifoMask = ~0u; break; case ncclDevWorkStorageTypeFifo: fifoBufHost = comm->workFifoBuf; fifoCursor = comm->workFifoProduced; fifoMask = comm->workFifoBytes-1; waitWorkFifoAvailable(comm, fifoCursor + workBytes); plan->kernelArgs->workBuf = comm->workFifoBufDev; break; case ncclDevWorkStorageTypePersistent: // We rely on 16-byte alignment #if __cplusplus >= 201103L fifoBufHost = aligned_alloc(16, ROUNDUP(workBytes, 16)); #else static_assert(16 <= alignof(max_align_t), "We rely on 16-byte alignment."); fifoBufHost = malloc(workBytes); #endif fifoCursor = 0; fifoMask = ~0u; break; default: return ncclInternalError; } plan->kernelArgs->workMask = fifoMask; // Batches were placed after kernelArgs by finishPlan(). Only thing left to // do is translate the work offset from zero based (in plan) to: // ncclDevWorkStorageTypeArgs: offset from beginning of kernel args // ncclDevWorkStorageTypeFifo: offset from base of fifo // ncclDevWorkStorageTypePersistent: no translation since our dedicated buffer will also begin at zero. struct ncclDevWorkBatch* batchZero = (struct ncclDevWorkBatch*)(plan->kernelArgs+1); for (int b=0; b < plan->nWorkBatches; b++) { batchZero[b].offsetBase += fifoCursor; } // Write the channel-shared work structs. struct ncclWorkList* workNode = ncclIntruQueueHead(&plan->workQueue); while (workNode != nullptr) { char* dst = (char*)fifoBufHost; char* src = (char*)(workNode+1); for (int n = workNode->size; n != 0; n -= 16) { memcpy( __builtin_assume_aligned(dst + (fifoCursor & fifoMask), 16), __builtin_assume_aligned(src, 16), 16 ); fifoCursor += 16; src += 16; } workNode = workNode->next; } switch (plan->workStorageType) { case ncclDevWorkStorageTypeFifo: comm->workFifoProduced = fifoCursor; if (comm->workFifoBufGdrHandle != nullptr) wc_store_fence(); break; case ncclDevWorkStorageTypePersistent: { ncclResult_t result = ncclSuccess; struct uploadWork_cleanup_t* cleanup = nullptr; cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed; void* fifoBufDev = nullptr; CUDACHECKGOTO(cudaThreadExchangeStreamCaptureMode(&mode), result, fail); // Acquire deviceStream to gain access to deviceStream.cudaStream. Since the // user's graph will be launched later, and it also acquires the deviceStream, // it will observe this upload. NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->deviceStream), result, fail); CUDACHECKGOTO(cudaMallocAsync(&fifoBufDev, workBytes, comm->memPool, comm->sharedRes->deviceStream.cudaStream), result, fail); plan->workBufPersistent = fifoBufDev; plan->kernelArgs->workBuf = fifoBufDev; // coverity[uninit_use_in_call:FALSE] => fifoBufHost is never NULL CUDACHECKGOTO(cudaMemcpyAsync(fifoBufDev, fifoBufHost, workBytes, cudaMemcpyDefault, comm->sharedRes->deviceStream.cudaStream), result, fail); cudaEvent_t memcpyDone; CUDACHECKGOTO(cudaEventCreateWithFlags(&memcpyDone, cudaEventDisableTiming), result, fail); CUDACHECKGOTO(cudaEventRecord(memcpyDone, comm->sharedRes->deviceStream.cudaStream), result, fail); NCCLCHECKGOTO(ncclCalloc(&cleanup, 1), result, fail); cleanup->base.fn = uploadWork_cleanup_fn; cleanup->base.event = memcpyDone; cleanup->hostBuf = fifoBufHost; ncclIntruQueueEnqueue(&comm->eventCallbackQueue, (struct ncclCommEventCallback *)cleanup); NCCLCHECKGOTO(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->deviceStream), result, fail); NCCLCHECKGOTO(ncclCommPollEventCallbacks(comm), result, fail); finish_scope: if (mode != cudaStreamCaptureModeRelaxed) (void)cudaThreadExchangeStreamCaptureMode(&mode); return result; fail: if (!cleanup) free(fifoBufHost); goto finish_scope; } break; default: break; } return ncclSuccess; } static ncclResult_t uploadProxyOps(struct ncclComm* comm, struct ncclKernelPlan* plan) { uint64_t collOpCount = comm->sharedRes->collOpCount; uint64_t p2pOpBump[MAXCHANNELS] = {/*0...*/}; // Advance comm's collOpCount by number of colls in this plan. comm->sharedRes->collOpCount += plan->collOpCount; comm->collOpCount += plan->collOpCount; struct ncclProxyOp* op = ncclIntruQueueHead(&plan->proxyOpQueue); while (op != nullptr) { op->profilerContext = comm->profilerContext; op->eActivationMask = op->coll <= ncclFuncAllReduce ? op->task.coll->eActivationMask : op->task.p2p->eActivationMask; op->taskEventHandle = op->coll <= ncclFuncAllReduce ? op->task.coll->eventHandle : op->task.p2p->eventHandle; ncclProfilerAddPidToProxyOp(op); uint64_t oldId = op->opCount; // Ignoring the bottom tag bit, opCount's are zero-based within plan so // translate them to the tip of the comm's history. if (oldId & 1) { // p2p // opCount is monotonic increasing within a plan's channel so just // remember last value to compute max. p2pOpBump[op->channelId] = (oldId>>1) + 1; // +1 to ensure next plan doesn't collide op->opCount = (comm->sharedRes->p2pOpCount[op->channelId]<<1) + oldId; } else { // coll op->opCount = (collOpCount<<1) + oldId; } NCCLCHECK(ncclProxySaveOp(comm, op, nullptr)); op->opCount = oldId; // Restore for next uploadProxyOps() op = op->enqNext; } for (int c=0; c < MAXCHANNELS; c++) { // Advance channel's p2pOpCount by number of p2p's in this plan channel. comm->sharedRes->p2pOpCount[c] += p2pOpBump[c]; } return ncclSuccess; } static ncclResult_t hostStreamPlanTask(struct ncclComm* comm, struct ncclKernelPlan* plan) { NCCLCHECK(ncclProfilerStartGroupEvent(plan)); NCCLCHECK(ncclProfilerStartTaskEvents(plan)); NCCLCHECK(uploadProxyOps(comm, plan)); NCCLCHECK(ncclProxyStart(comm)); NCCLCHECK(ncclProfilerStopTaskEvents(plan)); NCCLCHECK(ncclProfilerStopGroupEvent(plan)); if (!plan->persistent) { // Notify main thread of our reclaiming. This will reclaim plan concurrently. ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer); } return ncclSuccess; } static void HIPRT_CB hostStreamPlanCallback(void *plan_) { NVTX3_FUNC_RANGE_IN(nccl_domain); struct ncclKernelPlan* plan = (struct ncclKernelPlan*)plan_; ncclResult_t result = hostStreamPlanTask(plan->comm, plan); if (result != ncclSuccess) { WARN("hostStreamPlanCallback() failed : %s", ncclGetErrorString(result)); } if (!plan->persistent) ncclAtomicRefCountDecrement(&plan->comm->noncapturedRefs); return; } static ncclResult_t reclaimPlan(struct ncclComm* comm, struct ncclCommCallback* me) { struct ncclKernelPlan* plan = (struct ncclKernelPlan*)me; // cast from first member `reclaim` if (plan->persistent) { comm->persistentRefs -= 1; NCCLCHECK(ncclCudaFree(plan->workBufPersistent)); } // Free proxy ops struct ncclProxyOp* q = ncclIntruQueueHead(&plan->proxyOpQueue); while (q != nullptr) { struct ncclProxyOp* q1 = q->enqNext; if (q->ringAlgo && q->ringAlgo->decRefCount() == 0) delete q->ringAlgo; ncclMemoryPoolFree(&comm->memPool_ncclProxyOp, q); q = q1; } // Run other free callbacks ncclResult_t result = ncclSuccess; while (!ncclIntruQueueEmpty(&plan->cleanupQueue)) { struct ncclCommCallback* cb = ncclIntruQueueDequeue(&plan->cleanupQueue); ncclResult_t res1 = cb->fn(comm, cb); // Expect to reclaim memory of cb if (res1 != ncclSuccess) result = res1; } NCCLCHECK(result); // Free plan struct ncclMemoryPoolFree(&comm->memPool_ncclKernelPlan, plan); return ncclSuccess; } static void persistentDestructor(void* plans_) { struct ncclKernelPlan* plan = (struct ncclKernelPlan*)plans_; struct ncclComm* comm = plan->comm; while (plan != nullptr) { struct ncclKernelPlan* next = plan->next; ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer); plan = next; } } ncclResult_t ncclLaunchPrepare(struct ncclComm* comm) { ncclResult_t result = ncclSuccess; struct ncclKernelPlanner* planner = &comm->planner; bool persistent = ncclCudaGraphValid(planner->capturingGraph); planner->persistent = persistent; int nPlans = 0; if (planner->nTasksColl + planner->nTasksP2p != 0) { do { memset(&planner->wipPlan, 0, sizeof(planner->wipPlan)); struct ncclKernelPlan* plan = ncclMemoryPoolAlloc(&comm->memPool_ncclKernelPlan, &comm->memPermanent); plan->comm = comm; plan->reclaimer.fn = reclaimPlan; plan->persistent = persistent; // finishPlan() promotes ncclDevWorkStorageType[Fifo|Persistent]->Args if the work can fit. plan->workStorageType = persistent ? ncclDevWorkStorageTypePersistent : ncclDevWorkStorageTypeFifo; struct ncclKernelPlanBudget budget; budget.inArgsBytes = comm->workArgsBytes - sizeof(struct ncclDevKernelArgs); // Non-persistent kernels fill up at most half of our fifo per kernel. budget.outArgsBytes = plan->persistent ? (1<<30) : comm->workFifoBytes/2; // Drain coll tasks first. This is essential since we partition tasks based // on the work budget and p2p work isn't collective. If we were to drain p2p // first, the place where we cut the kernel could vary by rank which would // cause the "shortest channel first" channel picker to have divergent results. if (planner->nTasksColl != 0) { NCCLCHECKGOTO(scheduleCollTasksToPlan(comm, plan, &budget), result, failure); } // And only drain p2p tasks once colls are depleted. if (planner->nTasksColl == 0 && planner->nTasksP2p != 0) { NCCLCHECKGOTO(scheduleP2pTasksToPlan(comm, plan, &budget), result, failure); } finishPlan(comm, plan); if (plan->workBytes != 0) { ncclIntruQueueEnqueue(&planner->planQueue, plan); nPlans += 1; } } while (planner->nTasksColl + planner->nTasksP2p != 0); struct ncclKernelPlan* planHead = ncclIntruQueueHead(&planner->planQueue); planner->unlaunchedPlansHead = planHead; if (nPlans == 0) return ncclSuccess; // Semantically we want these dependencies for the kernels launched: // 1. Launch host task on hostStream. // 2. Launch kernel, depends on all of {deviceStream, hostStream, userStream[i]...} // 3. {deviceStream, userStream[i]...} depend on kernel. // We achieve this by: // 1. userStream[0] waits on deviceStream // 2. deviceStream waits on each of userStream[1...] // 3. host task launch on hostStream // 4. userStream[0] waits on hostStream // 5. kernel launch on userStream[0] // 6. deviceStream waits on userStream[0] // 7. userStream[1...] each waits on deviceStream // The two-level fan-in fan-out is because ncclStrongStreamWaitStream() requires // at least one of the two streams to be strong-stream. cudaStream_t launchStream = planner->streams->stream; NCCLCHECKGOTO(ncclStrongStreamAcquire(planner->capturingGraph, &comm->sharedRes->deviceStream), result, failure); if (planner->numStreams != 1 || persistent) { // Create dependency for device stream on user streams. First from extra user // streams to deviceStream. Then deviceStream to first user stream. for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { NCCLCHECKGOTO(ncclStrongStreamWaitStream(planner->capturingGraph, &comm->sharedRes->deviceStream, l->stream), result, failure); } NCCLCHECKGOTO(ncclStrongStreamWaitStream(planner->capturingGraph, launchStream, &comm->sharedRes->deviceStream), result, failure); } else if (planner->streams->stream != comm->lastStream && comm->lastStream != nullptr && !persistent) { // Stream changed from last call, create dependency against last NCCL kernel launch CUDACHECK(hipStreamWaitEvent(planner->streams->stream, comm->doneEvent, 0)); } if (persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking || __atomic_load_n(&comm->noncapturedRefs, __ATOMIC_ACQUIRE)) { // We have to launch host tasks to push proxy args. We are careful to only // do this if necessary since host tasks impose a high performance cost in CUDA. bool acquired = false; for (struct ncclKernelPlan* plan=planHead; plan != nullptr; plan = plan->next) { if (plan->hasProxyOps) { if (!acquired) { acquired = true; NCCLCHECKGOTO(ncclStrongStreamAcquire(planner->capturingGraph, &comm->sharedRes->hostStream), result, failure); } if (!persistent) ncclAtomicRefCountIncrement(&comm->noncapturedRefs); plan->isHostCbEnq = true; NCCLCHECKGOTO(ncclStrongStreamLaunchHost(planner->capturingGraph, &comm->sharedRes->hostStream, hostStreamPlanCallback, plan), result, failure); } } if (acquired) { // Make to-be-launched kernels dependent on just-launched host stream tasks. NCCLCHECKGOTO(ncclStrongStreamWaitStream(planner->capturingGraph, launchStream, &comm->sharedRes->hostStream), result, failure); NCCLCHECKGOTO(ncclStrongStreamRelease(planner->capturingGraph, &comm->sharedRes->hostStream), result, failure); } } if (persistent) { comm->persistentRefs += nPlans; NCCLCHECKGOTO(ncclCudaGraphAddDestructor(planner->capturingGraph, persistentDestructor, (void*)planHead), result, failure); } } failure: return result; } ncclResult_t ncclLaunchKernelBefore_NoUncapturedCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) { // This code is called after we've checked in to the intra-process barrier // but before launching the kernel. We are not allowed to call CUDA unless the // kernel launch is captured. NCCLCHECK(uploadWork(comm, plan)); return ncclSuccess; } #if CUDART_VERSION >= 12000 // NCCL uses the "Remote" Mem Sync domain by default NCCL_PARAM(MemSyncDomain, "MEM_SYNC_DOMAIN", cudaLaunchMemSyncDomainRemote); #endif ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan) { struct ncclKernelPlanner* planner = &comm->planner; int nChannels = 0; for (int i = 0; i < MAXCHANNELS/64; i++) nChannels += countOneBits(plan->channelMask.masks[i]); void* sym = plan->kernelFn; dim3 grid = {(unsigned)nChannels, 1, 1}; dim3 block = {(unsigned)plan->threadPerBlock, 1, 1}; int smem = ncclShmemDynamicSize(comm->cudaArch); cudaStream_t launchStream = planner->streams->stream; void* extra[] = {plan->kernelArgs, &plan->kernelArgsSize}; if (planner->numStreams == 1 && !plan->persistent) { CUDACHECK(hipExtLaunchKernel(plan->kernelFn, grid, block, extra, 0, launchStream, NULL, comm->doneEvent, 0)); comm->lastStream = planner->streams->stream; return ncclSuccess; } // CUfunction fn; // CUDACHECK(cudaGetFuncBySymbol(&fn, sym)); #if CUDART_VERSION >= 11080 int driverVersion; NCCLCHECK(ncclCudaDriverVersion(&driverVersion)); if (driverVersion >= 11080) { int compCap = comm->compCap; unsigned int clusterSize = (compCap == 90) ? comm->config.cgaClusterSize : 0; CUlaunchConfig launchConfig = {0}; CUlaunchAttribute launchAttrs[3]; int attrs = 0; /* Cooperative Group Array (CGA) * On sm90 and later we have an extra level of hierarchy where we * can group together several blocks within the Grid, called * Thread Block Clusters. * Clusters enable multiple thread blocks running concurrently * across multiple SMs to synchronize and collaboratively fetch * and exchange data. A cluster of blocks are guaranteed to be * concurrently scheduled onto a group of SMs. * The maximum value is 8 and it must be divisible into the grid dimensions */ if (clusterSize) { // Grid dimension must be divisible by clusterSize if (grid.x % clusterSize) clusterSize = 1; launchAttrs[attrs].id = CU_LAUNCH_ATTRIBUTE_CLUSTER_DIMENSION; launchAttrs[attrs++].value.clusterDim = {clusterSize, 1, 1}; launchAttrs[attrs].id = CU_LAUNCH_ATTRIBUTE_CLUSTER_SCHEDULING_POLICY_PREFERENCE; launchAttrs[attrs++].value.clusterSchedulingPolicyPreference = CU_CLUSTER_SCHEDULING_POLICY_SPREAD; } #if CUDART_VERSION >= 12000 if (compCap >= 90 && driverVersion >= 12000) { // Set the NCCL Mem Sync domain on CUDA 12.0 and later (sm90) launchAttrs[attrs].id = CU_LAUNCH_ATTRIBUTE_MEM_SYNC_DOMAIN; launchAttrs[attrs++].value.memSyncDomain = (CUlaunchMemSyncDomain) ncclParamMemSyncDomain(); } #endif launchConfig.gridDimX = grid.x; launchConfig.gridDimY = grid.y; launchConfig.gridDimZ = grid.z; launchConfig.blockDimX = block.x; launchConfig.blockDimY = block.y; launchConfig.blockDimZ = block.z; launchConfig.sharedMemBytes = smem; launchConfig.attrs = launchAttrs; launchConfig.numAttrs = attrs; launchConfig.hStream = launchStream; //CUDACHECK(cudaLaunchKernelExC(&launchConfig, fnAddr, args)); CUCHECK(cuLaunchKernelEx(&launchConfig, fn, nullptr, extra)); return ncclSuccess; } #endif // Standard kernel launch //cuLaunchKernel(sym, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra); CUDACHECK(cudaLaunchKernel(sym, grid, block, extra, smem, launchStream)); return ncclSuccess; } ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) { if (!(plan->persistent || ncclCudaLaunchBlocking || plan->isHostCbEnq)) { // We are not using the host stream for proxy ops and reclaimation submission. NCCLCHECK(hostStreamPlanTask(comm, plan)); } else { // We are using the host stream for proxy ops and reclaimation submission. // Only plans with proxy ops have a callback pushed by ncclLaunchPrepare. // Since non-persistent plans also require reclaimation, we have to do it // here. if (!plan->persistent && !plan->hasProxyOps) { ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer); } } return ncclSuccess; } ncclResult_t ncclLaunchFinish(struct ncclComm* comm) { ncclResult_t result = ncclSuccess; struct ncclKernelPlanner* planner = &comm->planner; bool persistent = ncclCudaGraphValid(planner->capturingGraph); if (!ncclIntruQueueEmpty(&planner->planQueue)) { // Reset queue to empty without destroying plans since those will be sent // back to us for reclaiming via callbackQueue. ncclIntruQueueConstruct(&planner->planQueue); cudaStream_t launchStream = planner->streams->stream; // First user stream gets launch // Create dependency for deviceStream on launchStream. We know that deviceStream // hasn't been modified since launchStream waited on it (in ncclLaunchPrepare), // so we can say that launchStream subsumes it. if (persistent || planner->numStreams != 1) NCCLCHECKGOTO(ncclStrongStreamWaitStream(planner->capturingGraph, &comm->sharedRes->deviceStream, launchStream, /*b_subsumes_a=*/true), result, resume1); resume1: // Create dependency for other user streams (skip launch stream) on deviceStream. // Again, the user streams haven't been touched since deviceStream waited on them // so we can say they are subsumed by deviceStream. struct ncclCudaStreamList* sl = planner->streams->next; planner->streams = nullptr; // Reset comm->planner.streams to empty. while (sl != nullptr && (planner->numStreams != 1 || persistent)) { NCCLCHECKGOTO(ncclStrongStreamWaitStream(planner->capturingGraph, sl->stream, &comm->sharedRes->deviceStream, /*b_subsumes_a=*/true), result, resume2); resume2: sl = sl->next; } planner->numStreams = 0; // Release device stream as acquired in ncclLaunchPrepare() NCCLCHECKGOTO(ncclStrongStreamRelease(planner->capturingGraph, &comm->sharedRes->deviceStream), result, resume3); resume3:; } return result; } /*****************************************************************************/ /* Enqueueing system : computation of kernel and proxy operations parameters */ /*****************************************************************************/ static inline ncclResult_t getCollNetSupport( struct ncclComm* comm, struct ncclTaskColl* info, int* collNetSupport ) { // Translate ncclAvg and PreMulSum ncclRedOp_t netOp = info->opHost; if (info->opDev.op == ncclDevPreMulSum || info->opDev.op == ncclDevSumPostDiv) { netOp = ncclSum; } *collNetSupport = comm->collNetSupport; switch (info->func) { case ncclFuncAllReduce: case ncclFuncReduce: case ncclFuncReduceScatter: *collNetSupport &= comm->collNetSupportMatrix[netOp][info->datatype]; break; default: break; } return ncclSuccess; } static void initCollCostTable(float** collCostTable) { float (*table)[NCCL_NUM_PROTOCOLS] = (float (*)[NCCL_NUM_PROTOCOLS])collCostTable; for (int a = 0; a < NCCL_NUM_ALGORITHMS; a++) { for (int p = 0; p < NCCL_NUM_PROTOCOLS; p++) { table[a][p] = NCCL_ALGO_PROTO_IGNORE; } } } // numPipeOps: number of pipelined ops. Can be greater than 1 in aggregation mode. Used to adjust latency. static ncclResult_t updateCollCostTable( struct ncclComm* comm, struct ncclTaskColl* info, size_t nBytes, int collNetSupport, int nvlsSupport, int numPipeOps, float** collCostTable) { float (*table)[NCCL_NUM_PROTOCOLS] = (float (*)[NCCL_NUM_PROTOCOLS])collCostTable; if (comm->nRanks == 1 || info->func == ncclFuncAllToAllPivot) { table[NCCL_ALGO_RING][NCCL_PROTO_SIMPLE] = 0.0; return ncclSuccess; } for (int a=0; amaxLocalRanks > NCCL_MAX_DIRECT_ARITY+1) continue; if ((a == NCCL_ALGO_NVLS || a == NCCL_ALGO_NVLS_TREE) && nvlsSupport != 1 && info->func != ncclFuncAllGather) continue; if (a == NCCL_ALGO_NVLS && collNetSupport != 1 && comm->nNodes > 1) continue; /* now we only support single-node NVLS allgather and reducescatter */ if (a == NCCL_ALGO_NVLS && (info->func == ncclFuncAllGather || info->func == ncclFuncReduceScatter) && comm->nNodes > 1) continue; /* Tree reduceScatter doesn't support scaling yet */ if (a == NCCL_ALGO_PAT && info->func == ncclFuncReduceScatter && (info->opDev.op == ncclDevPreMulSum || info->opDev.op == ncclDevSumPostDiv)) continue; for (int p=0; ptopo->type & RCCL_TOPO_XGMI_ALL)) { table[a][p] = NCCL_ALGO_PROTO_IGNORE; continue; } NCCLCHECK(ncclTopoGetAlgoTime(comm, info->func, a, p, nBytes, numPipeOps, &table[a][p])); // Relegate fp8 reduction trees of sufficient depth that they incur precision loss // to be least preferred. if (info->datatype == ncclFloat8e4m3 || info->datatype == ncclFloat8e5m2) { if (a == NCCL_ALGO_RING && comm->nRanks > 8) { table[a][p] *= 1024.0; // Any factor large enough to act as a partition between lossy and non-lossy algos. } } } } return ncclSuccess; } static ncclResult_t topoGetAlgoInfo( struct ncclComm* comm, struct ncclTaskColl* info, size_t nBytes, float** collCostTable, ncclSimInfo_t* simInfo ) { float (*table)[NCCL_NUM_PROTOCOLS] = (float (*)[NCCL_NUM_PROTOCOLS])collCostTable; float minTime = 3600000000.0; int algorithm = info->algorithm = NCCL_ALGO_UNDEF; int protocol = info->protocol = NCCL_PROTO_UNDEF; for (int a=0; a= 0.0 && table[a][p] < minTime) { algorithm = a; protocol = p; minTime = table[a][p]; } } } info->algorithm = algorithm; info->protocol = protocol; float time = minTime; // Yes, we are first assigning and then testing if protocol is sane, but that's OK in this case. // coverity[check_after_sink] if (info->algorithm == NCCL_ALGO_UNDEF || info->protocol == NCCL_PROTO_UNDEF) { char ncclAlgoEnvStr[1024] = ""; char ncclProtoEnvStr[1024] = ""; char* algoEnv = getenv("NCCL_ALGO"); if (algoEnv) { snprintf(ncclAlgoEnvStr, 1023, " NCCL_ALGO was set to %s.", algoEnv); } char* protoEnv = getenv("NCCL_PROTO"); if (protoEnv) { snprintf(ncclProtoEnvStr, 1023, " NCCL_PROTO was set to %s.", protoEnv); } WARN("Error : no algorithm/protocol available for function %s with datatype %s.%s%s", ncclFuncToString(info->func), ncclDatatypeToString(info->datatype), ncclAlgoEnvStr, ncclProtoEnvStr); return (algoEnv || protoEnv) ? ncclInvalidUsage : ncclInternalError; } rcclUpdateCollectiveProtocol(comm, nBytes, info); if (simInfo) simInfo->estimatedTime = time; TRACE(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %f", nBytes, info->algorithm, info->protocol, time); int nc = comm->nChannels; int nt = comm->maxThreads[info->algorithm][info->protocol]; int threadThreshold = comm->threadThresholds[info->algorithm][info->protocol]; if (info->algorithm == NCCL_ALGO_COLLNET_DIRECT) { // CollNet channel tuning int ncSwitch = 16; bool flag = true; while (ncSwitch >= 1 && flag) { while ((flag = nBytes < nc*nt*comm->channels[0].collnetDirect.nHeads*threadThreshold) && nc > ncSwitch) { if (nc == ncSwitch+ncSwitch/2) threadThreshold /= 2; nc--; } ncSwitch /= 2; } } else if (info->algorithm == NCCL_ALGO_NVLS || info->algorithm == NCCL_ALGO_NVLS_TREE) { // NVLS should not need more than 16 channels to get peak BW. nc = comm->nvlsChannels; } else { // Ring/Tree channel tuning while (nBytes < nc * nt * threadThreshold) { if (nc >= 2) nc--; else break; } } #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) #else if (info->algorithm != NCCL_ALGO_NVLS && info->algorithm != NCCL_ALGO_NVLS_TREE && info->algorithm != NCCL_ALGO_COLLNET_DIRECT) { while (nBytes < nc * nt * threadThreshold) { if (nt % 128 == 0) nt /= 2; else break; } } if (info->protocol == NCCL_PROTO_SIMPLE) { if (info->algorithm == NCCL_ALGO_RING) nt += WARP_SIZE; // Extra warp for sync // More threads or sync warps needed due to split thread model if (info->algorithm == NCCL_ALGO_TREE) nt += 4*WARP_SIZE; } nt = nt/WARP_SIZE < 3 ? 3*WARP_SIZE : nt; #endif if (info->func == ncclFuncAllReduce && comm->topo->pivotA2ANumBiRings == 3) { static int userTuneInput = -2; if (userTuneInput == -2) { const char *protoStr = getenv("NCCL_PROTO"); const char *algoStr = getenv("NCCL_ALGO"); if (!protoStr && !algoStr) userTuneInput = 0; else userTuneInput = 1; } info->nMaxChannels = nc; if (!userTuneInput) { // always respect user settings if (nBytes <= 2200008) { info->protocol = NCCL_PROTO_LL; info->algorithm = NCCL_ALGO_TREE; info->nMaxChannels = std::min(24, comm->nChannels); } else { info->protocol = NCCL_PROTO_SIMPLE; info->algorithm = NCCL_ALGO_RING; } } } else if (info->func == ncclFuncAllReduce && comm->topo->treeDefined == 1) { info->algorithm = NCCL_ALGO_TREE; info->nMaxChannels = nc; } else { info->nMaxChannels = nc; } if (info->algorithm == NCCL_ALGO_TREE) nt = NCCL_MAX_NTHREADS; // Tree now uses all threads always. if (info->algorithm == NCCL_ALGO_PAT) nt = NCCL_MAX_NTHREADS; info->nWarps = nt/WARP_SIZE; return ncclSuccess; } // Use the default topo-based tuner if tuner plugin is not successful. // Call the plugin first. Let it set algo+proto, and/or nChannels. // Then, topoGetAlgoInfo will set algo/proto if not set, then nChannels and nThreads based on algo/proto. // Finally, nChannels will be overriden by the plugin setting. rccl_static ncclResult_t getAlgoInfo( struct ncclComm* comm, struct ncclTaskColl* info, int collNetSupport, int nvlsSupport, int numPipeOps, ncclSimInfo_t* simInfo/* = NULL*/ ) { size_t nBytes = ncclTypeSize(info->datatype)*ncclFuncMaxSendRecvCount(info->func, comm->nRanks, info->count); info->algorithm = NCCL_ALGO_UNDEF; info->protocol = NCCL_PROTO_UNDEF; int nMaxChannels = 0; float collCostTable[NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS]; initCollCostTable((float **)collCostTable); NCCLCHECK(updateCollCostTable(comm, info, nBytes, collNetSupport, nvlsSupport, numPipeOps, (float **)collCostTable)); if (comm->tuner != NULL) { size_t elementSize = ncclTypeSize(info->datatype); size_t sendbuffSize = elementSize*ncclFuncSendCount(info->func, comm->nRanks, info->count); size_t recvbuffSize = elementSize*ncclFuncRecvCount(info->func, comm->nRanks, info->count); struct ncclReg* regSendBuf; struct ncclReg* regRecvBuf; NCCLCHECK(ncclRegFind(comm, info->sendbuff, sendbuffSize, ®SendBuf)); NCCLCHECK(ncclRegFind(comm, info->recvbuff, recvbuffSize, ®RecvBuf)); int regBuff = ((regSendBuf && regRecvBuf) || (ncclCudaGraphValid(comm->planner.capturingGraph) && ncclParamGraphRegister())); NCCLCHECK(comm->tuner->getCollInfo( comm->tunerContext, info->func, nBytes, numPipeOps, (float **)collCostTable, NCCL_NUM_ALGORITHMS, NCCL_NUM_PROTOCOLS, regBuff, &nMaxChannels)); } NCCLCHECK(topoGetAlgoInfo(comm, info, nBytes, (float **)collCostTable, simInfo)); info->nMaxChannels = nMaxChannels == 0 ? info->nMaxChannels : nMaxChannels; return ncclSuccess; } NCCL_PARAM(NvlsTreeMaxChunkSize, "NVLSTREE_MAX_CHUNKSIZE", -2); static ncclResult_t calcCollChunking( struct ncclComm* comm, struct ncclTaskColl* info, int nChannels, size_t nBytes, /*outputs*/uint32_t* outChunkSize, uint32_t* outDirectFlags, struct ncclProxyOp* proxyOp ) { ncclPattern_t pattern; size_t grainSize = ncclProtoGrainSize(info->protocol); switch (info->func) { case ncclFuncBroadcast: pattern = info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeDown : ncclPatternPipelineFrom; break; case ncclFuncReduce: pattern = info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUp : ncclPatternPipelineTo; break; case ncclFuncReduceScatter: pattern = info->algorithm == NCCL_ALGO_PAT ? ncclPatternPatUp : info->algorithm == NCCL_ALGO_NVLS ? ncclPatternNvls : info->algorithm == NCCL_ALGO_COLLNET_DIRECT ? ncclPatternCollnetDirect : ncclPatternRing; break; case ncclFuncAllGather: pattern = info->algorithm == NCCL_ALGO_PAT ? ncclPatternPatDown : info->algorithm == NCCL_ALGO_NVLS ? ncclPatternNvls : info->algorithm == NCCL_ALGO_COLLNET_DIRECT ? ncclPatternCollnetDirect : ncclPatternRing; break; case ncclFuncAllToAllPivot: pattern = ncclPatternRing; break; case ncclFuncAllReduce: pattern = info->algorithm == NCCL_ALGO_NVLS ? ncclPatternNvls : info->algorithm == NCCL_ALGO_NVLS_TREE ? ncclPatternNvlsTree : info->algorithm == NCCL_ALGO_COLLNET_DIRECT ? ncclPatternCollnetDirect : info->algorithm == NCCL_ALGO_COLLNET_CHAIN ? ncclPatternCollnetChain : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break; default: WARN("Unknown pattern for collective %d algorithm %d", info->func, info->algorithm); return ncclInternalError; } int nstepsPerLoop, nchunksPerLoop; size_t loopOffset = 0; int stepSize = comm->buffSizes[info->protocol]/NCCL_STEPS; int chunkSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->chunkSteps : 1; int sliceSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->sliceSteps : 1; int chunkSize = stepSize*chunkSteps; if (info->protocol == NCCL_PROTO_LL) chunkSize /= 2; if (info->protocol == NCCL_PROTO_LL128) chunkSize = (chunkSize / NCCL_LL128_LINEELEMS) * NCCL_LL128_DATAELEMS; if (info->algorithm == NCCL_ALGO_TREE && info->protocol == NCCL_PROTO_SIMPLE) { if (pattern == ncclPatternTreeUpDown) { // Optimize chunkSize / nSteps while(nBytes / (nChannels * chunkSize) < comm->channels[0].tree.depth*8 && chunkSize > 131072) chunkSize /=2; while(nBytes / (nChannels * chunkSize) < comm->channels[0].tree.depth*4 && chunkSize > 65536) chunkSize /=2; while(nBytes / (nChannels * chunkSize) < comm->channels[0].tree.depth && chunkSize > 32768) chunkSize /=2; } } else if (info->algorithm == NCCL_ALGO_RING && info->protocol == NCCL_PROTO_SIMPLE) { if (pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) { // Optimize chunkSize / nSteps while(nBytes / (nChannels * chunkSize) < 64 && chunkSize > 262144) chunkSize /= 2; while(nBytes / (nChannels * chunkSize) < 32 && chunkSize > 131072) chunkSize /= 2; while(nBytes / (nChannels * chunkSize) < 16 && chunkSize > 65536) chunkSize /= 2; while(nBytes / (nChannels * chunkSize) < 8 && chunkSize > 32768) chunkSize /= 2; } } else if (info->algorithm == NCCL_ALGO_COLLNET_DIRECT) { // Optimize chunkSize / nSteps while (nBytes / (nChannels * comm->channels[0].collnetDirect.nHeads * chunkSize) < comm->channels[0].collnetDirect.depth * 64 && chunkSize > 131072) chunkSize /= 2; while (nBytes / (nChannels * comm->channels[0].collnetDirect.nHeads * chunkSize) < comm->channels[0].collnetDirect.depth * 8 && chunkSize > 65536) chunkSize /= 2; while (nBytes / (nChannels * comm->channels[0].collnetDirect.nHeads * chunkSize) < comm->channels[0].collnetDirect.depth * 8 && chunkSize > 32768) chunkSize /= 2; } else if (info->algorithm == NCCL_ALGO_COLLNET_CHAIN) { stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS; chunkSize = std::min(256 * 1024, stepSize * chunkSteps); while (nBytes / (nChannels * chunkSize) < comm->channels[0].collnetChain.depth * 64 && chunkSize > 131072) chunkSize /= 2; while (nBytes / (nChannels * chunkSize) < comm->channels[0].collnetChain.depth * 8 && chunkSize > 65536) chunkSize /= 2; while (nBytes / (nChannels * chunkSize) < comm->channels[0].collnetChain.depth && chunkSize > 32768) chunkSize /= 2; } else if (info->algorithm == NCCL_ALGO_NVLS) { int maxChunkSize = comm->nvlsChunkSize; if (comm->nNodes > 1 && comm->bandwidths[ncclFuncAllReduce][NCCL_ALGO_NVLS][NCCL_PROTO_SIMPLE] < 150) maxChunkSize = 32768; if (chunkSize > maxChunkSize) chunkSize = maxChunkSize; // Use uint64_t so that concurrentOps*chunkSize*X does not overflow. // However, nChannels * comm->channels[0].nvls.nHeads should easily fit in 32 bits. // coverity[overflow_before_widen] uint64_t concurrentOps = nChannels * comm->channels[0].nvls.nHeads; if ((nBytes < (64 * (concurrentOps * chunkSize))) && (chunkSize > 65536)) chunkSize = 65536; if ((nBytes < (8 * (concurrentOps * chunkSize))) && (chunkSize > 32768)) chunkSize = 32768; if ((nBytes < (2 * (concurrentOps * chunkSize))) && (chunkSize > 16384)) chunkSize = 16384; } else if (info->algorithm == NCCL_ALGO_NVLS_TREE) { // Use uint64_t so that concurrentOps*chunkSize*X does not overflow. // However, nChannels * comm->channels[0].nvls.nHeads should easily fit in 32 bits. // coverity[overflow_before_widen] uint64_t concurrentOps = nChannels * comm->channels[0].nvls.nHeads; chunkSize = comm->nvlsChunkSize; int maxChunkSize = (int)ncclParamNvlsTreeMaxChunkSize(); if (maxChunkSize == -2) maxChunkSize = comm->nNodes >= 4 ? 65536 : chunkSize; chunkSize = std::min(chunkSize, maxChunkSize); if ((nBytes < (32 * (concurrentOps * chunkSize))) && (chunkSize > 262144)) chunkSize = 262144; if ((nBytes < (16 * (concurrentOps * chunkSize))) && (chunkSize > 131072)) chunkSize = 131072; if ((nBytes < (4 * (concurrentOps * chunkSize))) && (chunkSize > 65536)) chunkSize = 65536; if ((nBytes < (1 * (concurrentOps * chunkSize))) && (chunkSize > 32768)) chunkSize = 32768; } else if (info->algorithm == NCCL_ALGO_TREE && info->protocol == NCCL_PROTO_LL128) { int nNodes = comm->nNodes; float ppn = comm->nRanks / (float)nNodes; float nstepsLL128 = 1+log2i(nNodes) + 0.1*ppn; // Yes, we are OK with the division on the left side of the < operand being integer. // coverity[integer_division] while (nBytes / (nChannels*chunkSize) < nstepsLL128*64/ppn && chunkSize > 131072) chunkSize /= 2; // coverity[integer_division] while (nBytes / (nChannels*chunkSize) < nstepsLL128*16/ppn && chunkSize > 32768) chunkSize /= 2; } else if (info->func == ncclFuncAllGather && info->algorithm == NCCL_ALGO_PAT) { while (chunkSize*nChannels*32 > nBytes && chunkSize > 65536) chunkSize /= 2; } else if (info->func == ncclFuncReduceScatter && info->algorithm == NCCL_ALGO_PAT) { while (chunkSize*nChannels*16 > nBytes && chunkSize > 65536) chunkSize /= 2; } // Compute directFlags of work struct. if (info->algorithm == NCCL_ALGO_COLLNET_DIRECT) { // Set direct direction for broadcast-gather (read or write) *outDirectFlags = (nBytes/nChannels <= 1024 * 4) ? NCCL_P2P_READ : NCCL_P2P_WRITE; } else { *outDirectFlags = 0; } // Compute nSteps for proxies chunkSize = chunkSize / grainSize * grainSize; // align chunkSize to multiple grainSize switch (pattern) { case ncclPatternTreeUp: case ncclPatternTreeDown: case ncclPatternTreeUpDown: case ncclPatternPatUp: case ncclPatternPatDown: case ncclPatternPipelineFrom: case ncclPatternPipelineTo: case ncclPatternCollnetChain: nstepsPerLoop = nchunksPerLoop = 1; break; case ncclPatternNvls: nstepsPerLoop = 1; nchunksPerLoop = comm->channels[0].nvls.nHeads; loopOffset = nChannels * chunkSize * comm->channels[0].nvls.headRank; break; case ncclPatternCollnetDirect: nstepsPerLoop = 1; nchunksPerLoop = comm->channels[0].collnetDirect.nHeads; loopOffset = nChannels * chunkSize * comm->channels[0].collnetDirect.headRank; break; case ncclPatternRing: nstepsPerLoop = comm->nRanks-1; nchunksPerLoop = comm->nRanks; break; case ncclPatternRingTwice: nstepsPerLoop = 2*(comm->nRanks-1); nchunksPerLoop = comm->nRanks; break; case ncclPatternNvlsTree: nstepsPerLoop = 1; nchunksPerLoop = comm->channels[0].nvls.nHeads; break; default: WARN("Unknown pattern %d", pattern); return ncclInternalError; } // Compute nSteps for proxies size_t loopSize = size_t(nChannels)*nchunksPerLoop*chunkSize; int nLoops = (int)DIVUP(nBytes, loopSize); memset(proxyOp, 0, sizeof(*proxyOp)); proxyOp->nsteps = nstepsPerLoop * nLoops * chunkSteps; proxyOp->sliceSteps = sliceSteps; proxyOp->chunkSteps = chunkSteps; proxyOp->chunkSize = chunkSize; proxyOp->sliceSize = chunkSize / chunkSteps * sliceSteps; proxyOp->loopSize = loopSize; proxyOp->loopOffset = loopOffset; proxyOp->protocol = info->protocol; proxyOp->dtype = info->datatype; proxyOp->algorithm = info->algorithm; if (info->opDev.op == ncclDevPreMulSum || info->opDev.op == ncclDevSumPostDiv) { proxyOp->redOp = ncclSum; // Network sees avg as sum } else { proxyOp->redOp = info->opHost; } proxyOp->pattern = pattern; proxyOp->coll = info->func; proxyOp->root = info->root; proxyOp->isOneRPN = comm->isOneRPN; // This is used by P2P to reduce the receive buffer size. We don't use it in collectives // because some protocols need to transmit more than the total size, plus they sometimes // round up proxyOp->nbytes = stepSize*sliceSteps; if (info->regBufType & NCCL_NET_REG_BUFFER) { proxyOp->reg = 1; if (info->algorithm == NCCL_ALGO_COLLNET_DIRECT || info->algorithm == NCCL_ALGO_NVLS || info->algorithm == NCCL_ALGO_COLLNET_CHAIN) { if (proxyOp->isOneRPN) { proxyOp->nsteps = 1; proxyOp->loopOffset = 0; proxyOp->sendbuff = (uint8_t*)info->sendbuff; proxyOp->sendMhandle = info->sendMhandle; } else { if (info->func == ncclFuncAllGather || info->func == ncclFuncReduceScatter) { proxyOp->nbytes = nBytes / nchunksPerLoop; proxyOp->loopSize = proxyOp->loopSize / nchunksPerLoop; proxyOp->loopOffset = 0; if (info->func == ncclFuncAllGather) { proxyOp->sendbuff = (uint8_t*)info->sendbuff; proxyOp->sendMhandle = info->sendMhandle; } } else { proxyOp->sendbuff = (uint8_t*)info->recvbuff; proxyOp->sendMhandle = info->recvMhandle; } } } else if (info->algorithm == NCCL_ALGO_RING) { if (proxyOp->isOneRPN && info->func == ncclFuncAllGather) { proxyOp->chunkSize = NCCL_MAX_NET_SIZE; proxyOp->sliceSize = NCCL_MAX_NET_SIZE; proxyOp->chunkSteps = 1; proxyOp->sliceSteps = 1; proxyOp->loopSize = size_t(nChannels) * nchunksPerLoop * proxyOp->chunkSize; proxyOp->nsteps = DIVUP(nBytes, proxyOp->loopSize) * nstepsPerLoop; proxyOp->loopOffset = 0; } } else { WARN("Net registration invalid algorithm %s", ncclAlgoToString(info->algorithm)); return ncclInternalError; } proxyOp->recvMhandle = info->recvMhandle; proxyOp->recvbuff = (uint8_t*)info->recvbuff; proxyOp->nbytes = nBytes; } else { proxyOp->reg = 0; } if (pattern == ncclPatternCollnetDirect) { proxyOp->specifics.collnetDirect.nNodes = comm->nNodes; proxyOp->specifics.collnetDirect.node = comm->node; if (info->func == ncclFuncAllGather || info->func == ncclFuncReduceScatter) { proxyOp->specifics.collnetDirect.sizePerRank = info->count*ncclTypeSize(info->datatype); } } if (pattern == ncclPatternPatUp || pattern == ncclPatternPatDown) { proxyOp->nbytes = DIVUP(nBytes, nChannels); } *outChunkSize = proxyOp->chunkSize; return ncclSuccess; } static ncclResult_t hostToDevRedOp( ncclDevRedOpFull *opFull, ncclRedOp_t op, ncclDataType_t datatype, ncclComm *comm ) { union { int8_t i8; uint8_t u8; int32_t i32; uint32_t u32; int64_t i64; uint64_t u64; __half f16; float f32; double f64; #if defined(RCCL_BFLOAT16) hip_bfloat16 bf16; #endif #if defined(RCCL_FLOAT8) rccl_float8 f8; rccl_bfloat8 bf8; #endif void *ptr; }; u64 = 0; opFull->scalarArgIsPtr = false; opFull->proxyOp = op; int nbits = 8*ncclTypeSize(datatype); if (nbits <= 0) return ncclInvalidArgument; uint64_t allBits = uint64_t(-1)>>(64-nbits); uint64_t signBit = allBits^(allBits>>1); bool datatype_signed = false; switch (int(op)) { case ncclSum: opFull->op = ncclDevSum; break; case ncclProd: opFull->op = ncclDevProd; break; case ncclMin: case ncclMax: opFull->op = ncclDevMinMax; opFull->scalarArg = 0; // The xormask used by ncclFuncMinMax<[u]int> is the XOR of the sign bit // for signed (opposed to unsigned) types and all the bits for max (opposed to min). if (datatype==ncclInt8 || datatype==ncclInt32 || datatype==ncclInt64) { opFull->scalarArg ^= signBit; } opFull->scalarArg ^= (op == ncclMax) ? allBits : 0; break; case ncclAvg: switch ((int)datatype) { case ncclInt8: case ncclInt32: case ncclInt64: datatype_signed = true; // no break, we want to fall through... case ncclUint8: case ncclUint32: case ncclUint64: opFull->op = ncclDevSumPostDiv; u64 = comm->nRanks<<1 | datatype_signed; break; #if defined(RCCL_FLOAT8) case ncclFloat8e4m3: opFull->op = ncclDevPreMulSum; f8 = static_cast(float(1.0/comm->nRanks)); break; case ncclFloat8e5m2: opFull->op = ncclDevPreMulSum; bf8 = static_cast(float(1.0/comm->nRanks)); break; #endif case ncclFloat16: opFull->op = ncclDevPreMulSum; f16 = __float2half(float(1.0/comm->nRanks)); // __double2half not supported pre CUDA 11.x break; #if defined(RCCL_BFLOAT16) case ncclBfloat16: opFull->op = ncclDevPreMulSum; bf16 = (hip_bfloat16)(float(1.0/comm->nRanks)); break; #endif case ncclFloat32: opFull->op = ncclDevPreMulSum; f32 = float(1.0/comm->nRanks); break; case ncclFloat64: opFull->op = ncclDevPreMulSum; f64 = 1.0/comm->nRanks; break; } opFull->scalarArgIsPtr = false; opFull->scalarArg = u64; break; default: // user created int ix = int(ncclUserRedOpMangle(comm, op)) - int(ncclNumOps); ncclUserRedOp *user = &comm->userRedOps[ix]; if (datatype != user->datatype) { WARN("Data type supplied to user-created ncclRedOp_t does not match type " "given to reduction operation"); return ncclInvalidArgument; } *opFull = user->opFull; break; } return ncclSuccess; } // Converts `info` to a task and adds it to `comm->planner`. The exception is with // single rank communicators, collectives are issued as `ncclMemcpyAsync`s and // thus don't need a task. static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo* info) { struct ncclKernelPlanner *planner = &comm->planner; if (info->coll == ncclFuncSend || info->coll == ncclFuncRecv) { int peer = info->root; ssize_t nBytes = info->count*ncclTypeSize(info->datatype); bool isSendNotRecv = info->coll == ncclFuncSend; // Must be in thread local group before tasks can be alloc'd in `comm->memScoped`. ncclGroupCommJoin(info->comm); struct ncclTaskP2p* p2p = ncclMemoryStackAlloc(&comm->memScoped); p2p->buff = (void*)info->recvbuff; p2p->count = info->count; p2p->datatype = info->datatype; p2p->root = info->root; p2p->bytes = nBytes; p2p->opCount = comm->opCount; ncclIntruQueueEnqueue( isSendNotRecv ? &planner->peers[peer].sendQueue : &planner->peers[peer].recvQueue, p2p); planner->nTasksP2p += 1; // Mark channels that need pre-connect if (comm->rank != peer) { if (!(isSendNotRecv ? planner->peers[peer].sendSeen : planner->peers[peer].recvSeen)) { (isSendNotRecv ? planner->peers[peer].sendSeen : planner->peers[peer].recvSeen) = true; int round = 0; while (peer != (isSendNotRecv ? comm->p2pSchedule[round].sendRank : comm->p2pSchedule[round].recvRank)) { round += 1; } uint8_t base = ncclP2pChannelBaseForRound(comm, round); for (int c=0; c < comm->p2pnChannelsPerPeer; c++) { int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, c, comm->p2pnChannelsPerPeer, comm->nNodes); if (isSendNotRecv) { if (comm->channels[channelId].peers[peer]->send[1].connected == 0) { // P2P uses only 1 connector //comm->connectSend[peer] |= (1UL<connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } if (comm->p2pNet && comm->channels[channelId].peers[peer]->send[NCCL_CONN_IDX_P2P_NET].connected == 0) { //comm->connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } } else { if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) { // P2P uses only 1 connector //comm->connectRecv[peer] |= (1UL<connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } if (comm->p2pNet && comm->channels[channelId].peers[peer]->recv[NCCL_CONN_IDX_P2P_NET].connected == 0) { //comm->connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } } } } } } else { // Empty collectives can be discarded. if (info->count == 0) return ncclSuccess; if (info->datatype == ncclFloat8e4m3 || info->datatype == ncclFloat8e5m2) { if (comm->minCompCap < 90) { WARN("FP8 reduction support begins with sm90 capable devices."); return ncclInvalidArgument; } } // Copy reduction op state from op handle into info struct here since the // op handle may be destroyed before ncclGroupEnd(). struct ncclDevRedOpFull opDev; NCCLCHECK(hostToDevRedOp(&opDev, info->op, info->datatype, comm)); if (comm->nRanks == 1) { NCCLCHECK(ncclLaunchOneRank(info->recvbuff, info->sendbuff, info->count, opDev, info->datatype, info->stream)); return ncclSuccess; } else { // Must be in thread local group before tasks can be alloc'd in `comm->memScoped`. ncclGroupCommJoin(info->comm); struct ncclTaskColl* t = ncclMemoryStackAlloc(&comm->memScoped); t->func = info->coll; t->sendbuff = info->sendbuff; t->recvbuff = info->recvbuff; t->count = info->count; t->root = info->root; t->datatype = info->datatype; size_t elementSize = ncclTypeSize(t->datatype); if (t->func == ncclFuncAllGather || t->func == ncclFuncBroadcast || t->func == ncclFuncAllToAllPivot) { t->count *= elementSize; t->datatype = ncclInt8; elementSize = 1; } t->trafficBytes = t->count*elementSize*ncclFuncTrafficPerByte(t->func, comm->nRanks); t->opHost = info->op; t->opDev = opDev; // C++ struct assignment t->chunkSteps = info->chunkSteps; t->sliceSteps = info->sliceSteps; t->opCount = comm->opCount; planner->nTasksColl += 1; ncclTaskCollSorterInsert(&planner->collSorter, t, t->trafficBytes); } } if (info->stream != planner->streamRecent || planner->streams == nullptr) { planner->streamRecent = info->stream; struct ncclCudaStreamList* l = planner->streams; while (true) { if (l == nullptr) { // Got to the end, this must be a new stream. struct ncclCudaGraph graph; NCCLCHECK(ncclCudaGetCapturingGraph(&graph, info->stream)); if (planner->streams != nullptr && !ncclCudaGraphSame(planner->capturingGraph, graph)) { WARN("Streams given to a communicator within a NCCL group must either be all uncaptured or all captured by the same graph."); return ncclInvalidUsage; } planner->capturingGraph = graph; // C++ struct assignment // Add stream to list l = ncclMemoryStackAlloc(&comm->memScoped); l->stream = info->stream; l->next = planner->streams; planner->streams = l; planner->numStreams++; break; } if (l->stream == info->stream) break; // Already seen stream. l = l->next; } } return ncclSuccess; } ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) { NCCLCHECK(ncclGroupStartInternal()); ncclResult_t ret = ncclSuccess; int devOld = -1; NCCLCHECKGOTO(CommCheck(info->comm, info->opName, "comm"), ret, fail); // Check whether communicator is ready to communicate NCCLCHECKGOTO(ncclCommEnsureReady(info->comm), ret, fail); if (info->comm->checkPointers) { CUDACHECKGOTO(cudaGetDevice(&devOld), ret, fail); CUDACHECKGOTO(cudaSetDevice(info->comm->cudaDev), ret, fail); } NCCLCHECKGOTO(ArgsCheck(info), ret, fail); INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zu datatype %d op %d root %d comm %p [nranks=%d] stream %p task %d globalrank %d", info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream, info->comm->planner.nTasksP2p + info->comm->planner.nTasksColl, info->comm->localRankToRank[info->comm->localRank]); TRACE_CALL("nccl%s(%" PRIx64 ",%" PRIx64 ",%zu,%d,%d,%d,%p,%p)", info->opName, reinterpret_cast(info->sendbuff), reinterpret_cast(info->recvbuff), info->count, info->datatype, info->op, info->root, info->comm, info->stream); NCCLCHECKGOTO(taskAppend(info->comm, info), ret, fail); exit: if (devOld != -1) CUDACHECK(cudaSetDevice(devOld)); ncclGroupErrCheck(ret); NCCLCHECK(ncclGroupEndInternal()); /* if depth is 1, ncclGroupEndInternal() will trigger group ops. The state can change * so we have to check state here. */ if (info->comm && !info->comm->config.blocking) { NCCLCHECK(ncclCommGetAsyncError(info->comm, &ret)); } return ret; fail: if (info->comm && !info->comm->config.blocking) (void) ncclCommSetAsyncError(info->comm, ret); goto exit; } NCCL_API(ncclResult_t, ncclRedOpCreatePreMulSum, ncclRedOp_t *op, void *scalar, ncclDataType_t datatype, ncclScalarResidence_t residence, ncclComm_t comm); ncclResult_t ncclRedOpCreatePreMulSum_impl(ncclRedOp_t *op, void *scalar, ncclDataType_t datatype, ncclScalarResidence_t residence, ncclComm_t comm) { NCCLCHECK(CommCheck(comm, "ncclRedOpCreatePreMulSum", "comm")); /* join init thread before creating PreMulSum op. */ NCCLCHECK(ncclCommEnsureReady(comm)); if (comm->userRedOpFreeHead == comm->userRedOpCapacity) { // double capacity and resize int cap = 2*comm->userRedOpCapacity; if (cap < 4) cap = 4; ncclUserRedOp *ops = new ncclUserRedOp[cap]; if (comm->userRedOpCapacity > 0) std::memcpy(ops, comm->userRedOps, comm->userRedOpCapacity*sizeof(ncclUserRedOp)); for(int ix=comm->userRedOpCapacity; ix < cap; ix++) ops[ix].freeNext = ix + 1; delete[] comm->userRedOps; comm->userRedOps = ops; comm->userRedOpCapacity = cap; } // pop from free list int ix = comm->userRedOpFreeHead; ncclUserRedOp *user = &comm->userRedOps[ix]; comm->userRedOpFreeHead = user->freeNext; user->freeNext = -1; // allocated user->datatype = datatype; user->opFull.op = ncclDevPreMulSum; if (residence == ncclScalarHostImmediate) { int size = ncclTypeSize(datatype); if (size < 1) return ncclInternalError; user->opFull.scalarArgIsPtr = false; std::memcpy(&user->opFull.scalarArg, scalar, size); } else { user->opFull.scalarArgIsPtr = true; user->opFull.scalarArg = reinterpret_cast(scalar); } *op = ncclRedOp_t(int(ncclNumOps) + ix); *op = ncclUserRedOpMangle(comm, *op); // ! recording at sink NCCLCHECK(Recorder::instance().record(rrRedOpCreatePreMulSum, *op, comm, datatype, residence, scalar)); TRACE_CALL("ncclRedOpCreatePreMulSum(%d,%p,%d,%d,%p)", *op, scalar, datatype, residence, comm); return ncclSuccess; } NCCL_API(ncclResult_t, ncclRedOpDestroy, ncclRedOp_t op, ncclComm_t comm); ncclResult_t ncclRedOpDestroy_impl(ncclRedOp_t op, ncclComm_t comm) { NCCLCHECK(Recorder::instance().record(rrRedOpDestroy, op, comm)); if (0 <= int(op) && int(op) < int(ncclNumOps)) { WARN("ncclRedOpDestroy : operator is a NCCL builtin."); return ncclInvalidArgument; } // int(ncclMaxRedOp) < int(op) will always be false due to the sizes of // the datatypes involved, and that's by design. We keep the check though // just as a reminder. // coverity[result_independent_of_operands] if (int(op) < 0 || int(ncclMaxRedOp) < int(op)) { WARN("ncclRedOpDestroy : operator is garbage."); return ncclInvalidArgument; } if (comm == NULL) { WARN("ncclRedOpDestroy : invalid communicator passed."); return ncclInvalidArgument; } int ix = int(ncclUserRedOpMangle(comm, op)) - int(ncclNumOps); if (comm->userRedOpCapacity <= ix || comm->userRedOps[ix].freeNext != -1) { WARN("ncclRedOpDestroy : operator unknown to this communicator."); return ncclInvalidArgument; } // push to free list comm->userRedOps[ix].freeNext = comm->userRedOpFreeHead; comm->userRedOpFreeHead = ix; TRACE_CALL("ncclRedOpDestroy(%d,%p)", op, comm); return ncclSuccess; }