Use batched P2P to enhance alltoall small message performance (#1902)

* Batch P2P operations (2 per CU/channel) and update channel-part mapping

- Revert bitreversal and fix channel mapping to be compatible with P2P batching and avoid hangs

- P2P batching is only used for more than 2 nodes to avoid aggregating intra-node traffic when it is dominant for less than 2 nodes

* Address single node regression and channel per net peer

* Add batching threshold

* Add enable switch for batching

* Update CHANGELOG.md

* Add minor comment change

* Update CHANGELOG.md

Co-authored-by: Jeffrey Novotny <jnovotny@amd.com>

* Update CHANGELOG.md

Co-authored-by: Jeffrey Novotny <jnovotny@amd.com>

[ROCm/rccl commit: c1e1f2faeb]
Этот коммит содержится в:
Mustafa Abduljabbar
2025-09-22 16:25:10 -04:00
коммит произвёл GitHub
родитель bb2e12c831
Коммит a075779dcd
6 изменённых файлов: 35 добавлений и 29 удалений
+3
Просмотреть файл
@@ -5,6 +5,8 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https:
## Unreleased - RCCL 2.27.3 for ROCm 7.1.0
### Added
* Added `RCCL_P2P_BATCH_THRESHOLD` to set the message size limit for batching P2P operations. This mainly affects small message performance for alltoall at a large scale but also applies to alltoallv.
* Added `RCCL_P2P_BATCH_ENABLE` to enable batching P2P operations to receive performance gains for smaller messages up to 4MB for alltoall when the workload requires it. This is to avoid performance dips for larger messages.
### Changed
@@ -12,6 +14,7 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https:
* Compatibility with NCCL 2.27.3
### Resolved issues
* Improve small message performance for alltoall by enabling and optimizing batched P2P operations.
## RCCL 2.26.6 for ROCm 7.0.0
+1 -1
Просмотреть файл
@@ -22,7 +22,7 @@ struct RunWorkBatch<ncclFuncSendRecv, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPL
bool useLargeChunk = (work->sendIpcReg && ncclShmem.comm.isAllNvlink) || work->sendNetReg;
int chunkSize = useLargeChunk ? NCCL_MAX_NET_SIZE : u32fp8Decode(work->sendChunkSize_u32fp8);
int stepSize = useLargeChunk ? NCCL_MAX_NET_SIZE : ncclShmem.comm.p2pChunkSize;
#if defined(ENABLE_NPKIT)
bool isNpKitThread = (tid == 0);
int npKitCtxIdx = blockIdx.x * NCCL_MAX_DEV_WORK_P2P_ELEMENTS + group;
+23 -7
Просмотреть файл
@@ -175,7 +175,7 @@ static ncclResult_t addProxyOpIfNeeded(struct ncclComm* comm, struct ncclKernelP
static void addWorkBatchToPlan(
struct ncclComm* comm, struct ncclKernelPlan* plan, int channelId,
enum ncclDevWorkType workType, int devFuncId, uint32_t workOffset,
int p2pRound = -1
int p2pRound = -1, bool batchP2P = false
) {
ncclKernelPlanner::WipPlan::Channel* chan = &comm->planner.wipPlan.channels[channelId];
size_t workSize = ncclDevWorkSize(workType);
@@ -193,7 +193,7 @@ static void addWorkBatchToPlan(
// batch further down.
newBatch |= NCCL_MAX_DEV_WORK_BATCH_BYTES < chan->wipBatch.workBytes + workSize;
if (workType == ncclDevWorkTypeP2p) {
newBatch |= chan->wipBatch.nP2ps == NCCL_MAX_DEV_WORK_P2P_PER_BATCH;
newBatch |= (comm->nNodes > 2 && batchP2P)? (chan->wipBatch.nP2ps == NCCL_MAX_DEV_WORK_P2P_PER_BATCH) : (chan->wipBatch.nP2ps == 1);
for (int i=0; i < chan->wipBatch.nP2ps; i++) {
newBatch |= p2pRound == chan->wipBatch.p2pRounds[i];
}
@@ -388,7 +388,7 @@ ncclResult_t ncclTasksRegAndEnqueue(struct ncclComm* comm) {
devWork.redOpArgIsPtr = task->opDev.scalarArgIsPtr;
devWork.oneNode = (comm->nNodes == 1);
devWork.rcclUseOneSlice = comm->rcclUseOneSlice;
devWork.isOneRPN = comm->isOneRPN;
devWork.netRegUsed = devWork.regUsed = 0;
devWork.gfx942CheapFenceOff = gfx942CheapFenceOff(devWork, comm->gfx942CheapFenceOff);
@@ -913,6 +913,21 @@ NCCL_PARAM(P2pLLThreshold, "P2P_LL_THRESHOLD", 16384);
RCCL_PARAM(P2pNetThreshold, "P2P_NET_THRESHOLD", 131072);
NCCL_PARAM(ChunkSize, "CHUNK_SIZE", 0);
// This is the maximum P2P message size that can be batched with others
// Below this message size, NCCL_MAX_DEV_WORK_P2P_PER_BATCH will be applicable
// For alltoall, this can be mutiplied by number of ranks to match Size (B) in rccl-tests
// Without a threshold, RCCL will suffer large message regression due to limitation at a larger scale
// when more batches are needed to saturate the NIC BW in RCCL.
// The threshold can be set to a higher value to experiment on other platforms.
// This value has been tested on MI300.
RCCL_PARAM(P2pBatchThreshold, "P2P_BATCH_THRESHOLD", 1 << 16); // 64k
// Need this temporary parameter to disable p2p batching to avoid some dips at 4MB - 32 MB message size at large scale
// This parameter must be removed after further investigation,
// Note that NCCL enables batching by default and it is needed to achieve perf for with smaller messages <= 4MB
RCCL_PARAM(P2pBatchEnable, "P2P_BATCH_ENABLE", 0); // 64k
// Put p2p op in plan assuming there is sizeof(ncclDevWorkBatch) in batch budget
// and sizeof(ncclDevWorkP2p) in work budget. "sendRank" and "recvRank" must
// match the corresponding values for this round of the p2p schedule (no -1's).
@@ -934,8 +949,9 @@ static ncclResult_t addP2pToPlan(
bool network[2] = {false, false};
bool proxySameProcess[2] = {true, true};
void** handles[2] = {NULL, NULL};
uint8_t base = ncclP2pChannelBaseForRound(comm, p2pRound);
auto batchP2PEnableEnv = rcclParamP2pBatchEnable();
bool batchP2P = batchP2PEnableEnv && ((sendBytes == -1)? recvBytes <= rcclParamP2pBatchThreshold() : sendBytes <= rcclParamP2pBatchThreshold());
uint8_t base = ncclP2pChannelBaseForRound(comm, p2pRound, batchP2PEnableEnv);
if (comm->p2pNet) {
for (int dir = 0; dir <= 1; dir++) {
if (bytes[dir] > rcclParamP2pNetThreshold())
@@ -1108,7 +1124,7 @@ static ncclResult_t addP2pToPlan(
plan->channelMask.masks[channelId/64] |= uint64_t(1)<<(channelId%64);
// Add batch first.
int funcIdx = ncclDevFuncId_P2p();
addWorkBatchToPlan(comm, plan, channelId, ncclDevWorkTypeP2p, funcIdx, workOffset, p2pRound);
addWorkBatchToPlan(comm, plan, channelId, ncclDevWorkTypeP2p, funcIdx, workOffset, p2pRound, batchP2P);
if (funcIdx < 0) {
WARN("%s: unsupported collective. Please ensure the collective has been enabled in build.", __func__);
return ncclInvalidUsage;
@@ -2566,7 +2582,7 @@ static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo* info) {
: comm->p2pSchedule[round].recvRank)) {
round += 1;
}
uint8_t base = ncclP2pChannelBaseForRound(comm, round);
uint8_t base = ncclP2pChannelBaseForRound(comm, round, rcclParamP2pBatchEnable());
for (int c=0; c < comm->p2pnChannelsPerPeer; c++) {
int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, c, comm->p2pnChannelsPerPeer, comm->nNodes);
if (isSendNotRecv) {
+1 -1
Просмотреть файл
@@ -956,7 +956,7 @@ static ncclResult_t ncclTopoGetNchannels(struct ncclComm* comm, int g /*local gp
int netCountByBw = 1, nChannelsMax = nNetChannels;
NCCLCHECK(getLocalNetCountByBw(system, g, &netCountByBw));
// Avoid overloading channels with 8+ operations as we loose the sync warp, hence a bit of bandwidth.
while (nChannelsMax*comm->nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2;
while (nChannelsMax*comm->nRanks > comm->p2pnChannels && nChannelsMax > 1) nChannelsMax /= 2;
//allow upto channels requires to drive the NICs
nNetChannels = std::max(netCountByBw, nChannelsMax);
+4 -3
Просмотреть файл
@@ -16,12 +16,13 @@ ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclCo
ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncclComm* parent, bool share);
ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks, int collnetNRanks, int nvlsNRanks);
inline uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound) {
inline uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound, int p2pBatchEnable = 0) {
if (comm->nNodes > 1) {
int nodeDelta = p2pRound/comm->maxLocalRanks;
int localDelta = p2pRound%comm->maxLocalRanks;
int base = nodeDelta*divUp(comm->maxLocalRanks, NCCL_MAX_DEV_WORK_P2P_PER_BATCH);
base += localDelta/NCCL_MAX_DEV_WORK_P2P_PER_BATCH;
int batchSize = (comm->nNodes > 2 && p2pBatchEnable) ? NCCL_MAX_DEV_WORK_P2P_PER_BATCH : 1;
int base = nodeDelta*divUp(comm->maxLocalRanks, batchSize);
base += localDelta/batchSize;
return base & 0xff;
} else {
return p2pRound & 0xff;
+3 -17
Просмотреть файл
@@ -311,29 +311,15 @@ inline __host__ __device__ void ncclP2pPartBounds(int nParts, int part, size_t b
}
// implemented in channel.h
inline __host__ uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound);
inline __host__ uint8_t ncclP2pChannelBaseForRound(struct ncclComm* comm, int p2pRound, int p2pBatchEnable);
// ncclP2pChannelToPart and ncclP2pChannelForPart are inverses. The device code
// uses ncclP2pChannelToPart to determine which part "this" channel is responsible for.
inline __host__ int ncclP2pChannelForPart(int nP2pChannels, int base, int part, int nParts, int nNodes) {
if (nNodes > 2) {
// Only works because nP2pChannels is pow2
int nChannelsLog2 = countOneBits(nP2pChannels-1);
int delta = reverseBits(part, nChannelsLog2);
return (base + delta) & (nP2pChannels-1);
} else {
return (base * nParts + part) & (nP2pChannels-1);
}
}
inline __device__ int ncclP2pChannelToPart(int nP2pChannels, int base, int channel, int nParts, int nNodes) {
if (nNodes > 2) {
// Only works because nP2pChannels is pow2
int nChannelsLog2 = countOneBits(nP2pChannels-1);
int delta = (channel-base) & (nP2pChannels-1);
return reverseBits(delta, nChannelsLog2);
} else {
return (channel - base * nParts) & (nParts-1);
}
return (channel - base * nParts) & (nP2pChannels-1);
}
struct alignas(16) ncclDevWorkColl {
@@ -425,7 +411,7 @@ constexpr size_t ncclDevWorkSize(enum ncclDevWorkType type) {
#define NCCL_MAX_DEV_WORK_BATCH_BYTES 128
#define NCCL_MAX_DEV_WORK_BATCH_COLLS (NCCL_MAX_DEV_WORK_BATCH_BYTES/sizeof(ncclDevWorkColl))
#define NCCL_MAX_DEV_WORK_P2P_PER_BATCH 1
#define NCCL_MAX_DEV_WORK_P2P_PER_BATCH 2
#define NCCL_MAX_DEV_WORK_P2P_ELEMENTS 2
struct alignas(16) ncclDevWorkBatch {
union {