From a075779dcdb053c14fe02a89b4f196b180f87f0f Mon Sep 17 00:00:00 2001 From: Mustafa Abduljabbar Date: Mon, 22 Sep 2025 16:25:10 -0400 Subject: [PATCH] Use batched P2P to enhance alltoall small message performance (#1902) * Batch P2P operations (2 per CU/channel) and update channel-part mapping - Revert bitreversal and fix channel mapping to be compatible with P2P batching and avoid hangs - P2P batching is only used for more than 2 nodes to avoid aggregating intra-node traffic when it is dominant for less than 2 nodes * Address single node regression and channel per net peer * Add batching threshold * Add enable switch for batching * Update CHANGELOG.md * Add minor comment change * Update CHANGELOG.md Co-authored-by: Jeffrey Novotny * Update CHANGELOG.md Co-authored-by: Jeffrey Novotny [ROCm/rccl commit: c1e1f2faeb4fad73679603e3c200560d653849a2] --- projects/rccl/CHANGELOG.md | 3 +++ projects/rccl/src/device/sendrecv.h | 2 +- projects/rccl/src/enqueue.cc | 30 ++++++++++++++++++++++------- projects/rccl/src/graph/paths.cc | 2 +- projects/rccl/src/include/channel.h | 7 ++++--- projects/rccl/src/include/device.h | 20 +++---------------- 6 files changed, 35 insertions(+), 29 deletions(-) diff --git a/projects/rccl/CHANGELOG.md b/projects/rccl/CHANGELOG.md index 3d31ae5be0..a888fb6028 100644 --- a/projects/rccl/CHANGELOG.md +++ b/projects/rccl/CHANGELOG.md @@ -5,6 +5,8 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https: ## Unreleased - RCCL 2.27.3 for ROCm 7.1.0 ### Added +* Added `RCCL_P2P_BATCH_THRESHOLD` to set the message size limit for batching P2P operations. This mainly affects small message performance for alltoall at a large scale but also applies to alltoallv. +* Added `RCCL_P2P_BATCH_ENABLE` to enable batching P2P operations to receive performance gains for smaller messages up to 4MB for alltoall when the workload requires it. This is to avoid performance dips for larger messages. ### Changed @@ -12,6 +14,7 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https: * Compatibility with NCCL 2.27.3 ### Resolved issues +* Improve small message performance for alltoall by enabling and optimizing batched P2P operations. ## RCCL 2.26.6 for ROCm 7.0.0 diff --git a/projects/rccl/src/device/sendrecv.h b/projects/rccl/src/device/sendrecv.h index 7bf63e1f49..5d2445dec1 100644 --- a/projects/rccl/src/device/sendrecv.h +++ b/projects/rccl/src/device/sendrecv.h @@ -22,7 +22,7 @@ struct RunWorkBatchsendIpcReg && ncclShmem.comm.isAllNvlink) || work->sendNetReg; int chunkSize = useLargeChunk ? NCCL_MAX_NET_SIZE : u32fp8Decode(work->sendChunkSize_u32fp8); int stepSize = useLargeChunk ? NCCL_MAX_NET_SIZE : ncclShmem.comm.p2pChunkSize; - + #if defined(ENABLE_NPKIT) bool isNpKitThread = (tid == 0); int npKitCtxIdx = blockIdx.x * NCCL_MAX_DEV_WORK_P2P_ELEMENTS + group; diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index 3f2088193e..0c762ee6cf 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -175,7 +175,7 @@ static ncclResult_t addProxyOpIfNeeded(struct ncclComm* comm, struct ncclKernelP static void addWorkBatchToPlan( struct ncclComm* comm, struct ncclKernelPlan* plan, int channelId, enum ncclDevWorkType workType, int devFuncId, uint32_t workOffset, - int p2pRound = -1 + int p2pRound = -1, bool batchP2P = false ) { ncclKernelPlanner::WipPlan::Channel* chan = &comm->planner.wipPlan.channels[channelId]; size_t workSize = ncclDevWorkSize(workType); @@ -193,7 +193,7 @@ static void addWorkBatchToPlan( // batch further down. newBatch |= NCCL_MAX_DEV_WORK_BATCH_BYTES < chan->wipBatch.workBytes + workSize; if (workType == ncclDevWorkTypeP2p) { - newBatch |= chan->wipBatch.nP2ps == NCCL_MAX_DEV_WORK_P2P_PER_BATCH; + newBatch |= (comm->nNodes > 2 && batchP2P)? (chan->wipBatch.nP2ps == NCCL_MAX_DEV_WORK_P2P_PER_BATCH) : (chan->wipBatch.nP2ps == 1); for (int i=0; i < chan->wipBatch.nP2ps; i++) { newBatch |= p2pRound == chan->wipBatch.p2pRounds[i]; } @@ -388,7 +388,7 @@ ncclResult_t ncclTasksRegAndEnqueue(struct ncclComm* comm) { devWork.redOpArgIsPtr = task->opDev.scalarArgIsPtr; devWork.oneNode = (comm->nNodes == 1); devWork.rcclUseOneSlice = comm->rcclUseOneSlice; - + devWork.isOneRPN = comm->isOneRPN; devWork.netRegUsed = devWork.regUsed = 0; devWork.gfx942CheapFenceOff = gfx942CheapFenceOff(devWork, comm->gfx942CheapFenceOff); @@ -913,6 +913,21 @@ NCCL_PARAM(P2pLLThreshold, "P2P_LL_THRESHOLD", 16384); RCCL_PARAM(P2pNetThreshold, "P2P_NET_THRESHOLD", 131072); NCCL_PARAM(ChunkSize, "CHUNK_SIZE", 0); +// This is the maximum P2P message size that can be batched with others +// Below this message size, NCCL_MAX_DEV_WORK_P2P_PER_BATCH will be applicable +// For alltoall, this can be mutiplied by number of ranks to match Size (B) in rccl-tests +// Without a threshold, RCCL will suffer large message regression due to limitation at a larger scale +// when more batches are needed to saturate the NIC BW in RCCL. +// The threshold can be set to a higher value to experiment on other platforms. +// This value has been tested on MI300. +RCCL_PARAM(P2pBatchThreshold, "P2P_BATCH_THRESHOLD", 1 << 16); // 64k + + +// Need this temporary parameter to disable p2p batching to avoid some dips at 4MB - 32 MB message size at large scale +// This parameter must be removed after further investigation, +// Note that NCCL enables batching by default and it is needed to achieve perf for with smaller messages <= 4MB +RCCL_PARAM(P2pBatchEnable, "P2P_BATCH_ENABLE", 0); // 64k + // Put p2p op in plan assuming there is sizeof(ncclDevWorkBatch) in batch budget // and sizeof(ncclDevWorkP2p) in work budget. "sendRank" and "recvRank" must // match the corresponding values for this round of the p2p schedule (no -1's). @@ -934,8 +949,9 @@ static ncclResult_t addP2pToPlan( bool network[2] = {false, false}; bool proxySameProcess[2] = {true, true}; void** handles[2] = {NULL, NULL}; - uint8_t base = ncclP2pChannelBaseForRound(comm, p2pRound); - + auto batchP2PEnableEnv = rcclParamP2pBatchEnable(); + bool batchP2P = batchP2PEnableEnv && ((sendBytes == -1)? recvBytes <= rcclParamP2pBatchThreshold() : sendBytes <= rcclParamP2pBatchThreshold()); + uint8_t base = ncclP2pChannelBaseForRound(comm, p2pRound, batchP2PEnableEnv); if (comm->p2pNet) { for (int dir = 0; dir <= 1; dir++) { if (bytes[dir] > rcclParamP2pNetThreshold()) @@ -1108,7 +1124,7 @@ static ncclResult_t addP2pToPlan( plan->channelMask.masks[channelId/64] |= uint64_t(1)<<(channelId%64); // Add batch first. int funcIdx = ncclDevFuncId_P2p(); - addWorkBatchToPlan(comm, plan, channelId, ncclDevWorkTypeP2p, funcIdx, workOffset, p2pRound); + addWorkBatchToPlan(comm, plan, channelId, ncclDevWorkTypeP2p, funcIdx, workOffset, p2pRound, batchP2P); if (funcIdx < 0) { WARN("%s: unsupported collective. Please ensure the collective has been enabled in build.", __func__); return ncclInvalidUsage; @@ -2566,7 +2582,7 @@ static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo* info) { : comm->p2pSchedule[round].recvRank)) { round += 1; } - uint8_t base = ncclP2pChannelBaseForRound(comm, round); + uint8_t base = ncclP2pChannelBaseForRound(comm, round, rcclParamP2pBatchEnable()); for (int c=0; c < comm->p2pnChannelsPerPeer; c++) { int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, c, comm->p2pnChannelsPerPeer, comm->nNodes); if (isSendNotRecv) { diff --git a/projects/rccl/src/graph/paths.cc b/projects/rccl/src/graph/paths.cc index f921e5bf6a..56907800a3 100644 --- a/projects/rccl/src/graph/paths.cc +++ b/projects/rccl/src/graph/paths.cc @@ -956,7 +956,7 @@ static ncclResult_t ncclTopoGetNchannels(struct ncclComm* comm, int g /*local gp int netCountByBw = 1, nChannelsMax = nNetChannels; NCCLCHECK(getLocalNetCountByBw(system, g, &netCountByBw)); // Avoid overloading channels with 8+ operations as we loose the sync warp, hence a bit of bandwidth. - while (nChannelsMax*comm->nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2; + while (nChannelsMax*comm->nRanks > comm->p2pnChannels && nChannelsMax > 1) nChannelsMax /= 2; //allow upto channels requires to drive the NICs nNetChannels = std::max(netCountByBw, nChannelsMax); diff --git a/projects/rccl/src/include/channel.h b/projects/rccl/src/include/channel.h index ee9aa6d0b8..f4cf4fe7f7 100644 --- a/projects/rccl/src/include/channel.h +++ b/projects/rccl/src/include/channel.h @@ -16,12 +16,13 @@ ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclCo ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncclComm* parent, bool share); ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks, int collnetNRanks, int nvlsNRanks); -inline uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound) { +inline uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound, int p2pBatchEnable = 0) { if (comm->nNodes > 1) { int nodeDelta = p2pRound/comm->maxLocalRanks; int localDelta = p2pRound%comm->maxLocalRanks; - int base = nodeDelta*divUp(comm->maxLocalRanks, NCCL_MAX_DEV_WORK_P2P_PER_BATCH); - base += localDelta/NCCL_MAX_DEV_WORK_P2P_PER_BATCH; + int batchSize = (comm->nNodes > 2 && p2pBatchEnable) ? NCCL_MAX_DEV_WORK_P2P_PER_BATCH : 1; + int base = nodeDelta*divUp(comm->maxLocalRanks, batchSize); + base += localDelta/batchSize; return base & 0xff; } else { return p2pRound & 0xff; diff --git a/projects/rccl/src/include/device.h b/projects/rccl/src/include/device.h index c62ccd9f47..d65ffd3664 100644 --- a/projects/rccl/src/include/device.h +++ b/projects/rccl/src/include/device.h @@ -311,29 +311,15 @@ inline __host__ __device__ void ncclP2pPartBounds(int nParts, int part, size_t b } // implemented in channel.h -inline __host__ uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound); +inline __host__ uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound, int p2pBatchEnable); // ncclP2pChannelToPart and ncclP2pChannelForPart are inverses. The device code // uses ncclP2pChannelToPart to determine which part "this" channel is responsible for. inline __host__ int ncclP2pChannelForPart(int nP2pChannels, int base, int part, int nParts, int nNodes) { - if (nNodes > 2) { - // Only works because nP2pChannels is pow2 - int nChannelsLog2 = countOneBits(nP2pChannels-1); - int delta = reverseBits(part, nChannelsLog2); - return (base + delta) & (nP2pChannels-1); - } else { return (base * nParts + part) & (nP2pChannels-1); - } } inline __device__ int ncclP2pChannelToPart(int nP2pChannels, int base, int channel, int nParts, int nNodes) { - if (nNodes > 2) { - // Only works because nP2pChannels is pow2 - int nChannelsLog2 = countOneBits(nP2pChannels-1); - int delta = (channel-base) & (nP2pChannels-1); - return reverseBits(delta, nChannelsLog2); - } else { - return (channel - base * nParts) & (nParts-1); - } + return (channel - base * nParts) & (nP2pChannels-1); } struct alignas(16) ncclDevWorkColl { @@ -425,7 +411,7 @@ constexpr size_t ncclDevWorkSize(enum ncclDevWorkType type) { #define NCCL_MAX_DEV_WORK_BATCH_BYTES 128 #define NCCL_MAX_DEV_WORK_BATCH_COLLS (NCCL_MAX_DEV_WORK_BATCH_BYTES/sizeof(ncclDevWorkColl)) -#define NCCL_MAX_DEV_WORK_P2P_PER_BATCH 1 +#define NCCL_MAX_DEV_WORK_P2P_PER_BATCH 2 #define NCCL_MAX_DEV_WORK_P2P_ELEMENTS 2 struct alignas(16) ncclDevWorkBatch { union {