From 5e7937effbb37c012bf64445655d3d7b0e601cba Mon Sep 17 00:00:00 2001 From: Nusrat Islam Date: Mon, 25 Aug 2025 07:55:10 -0500 Subject: [PATCH] Add direct allgather algorithm (#1868) * add direct allgather algorithm * minor fix * add debug print for memory allocation tracker * add message size threshold for direct allgather * scatter transfers across ranks * update changelog * minor fix * Update CHANGELOG.md Co-authored-by: Jeffrey Novotny * enable direct AG when pxn is ON on MI300X or MI350 --------- Co-authored-by: Jeffrey Novotny --- CHANGELOG.md | 2 +- src/collectives.cc | 37 +++++++++++++++++++++++++++++++++++-- src/include/comm.h | 3 +++ src/init.cc | 1 + src/rccl_wrap.cc | 4 ++++ 5 files changed, 44 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f09e86568..41e2ac31ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https: * Two new APIs are exposed as part of an initiative to separate RCCL code. These APIs are `rcclGetAlgoInfo` and `rcclFuncMaxSendRecvCount`. However, user-level invocation requires that RCCL be built with `RCCL_EXPOSE_STATIC` enabled. * Enabled double-buffering in `reduceCopyPacks` to trigger pipelining, especially to overlap bf16 arithmetic. * Added `--force-reduce-pipeline` as an option that can be passed to the `install.sh` script. Passing this option will enable software-triggered pipelining `bfloat16` reductions (i.e. `all_reduce`, `reduce_scatter` and `reduce`). - +* Added a direct allgather algorithm. This is enabled by default for multi-node if there are 16 nodes or fewer. The message size threshold is 4MB. ### Changed diff --git a/src/collectives.cc b/src/collectives.cc index 3da5eb2f3f..3e3fdb5c9c 100644 --- a/src/collectives.cc +++ b/src/collectives.cc @@ -79,6 +79,8 @@ const char* ncclProtoToString(int proto) { } } +RCCL_PARAM(DirectAllGatherThreshold, "DIRECT_ALLGATHER_THRESHOLD", 4194304); + NCCL_API(ncclResult_t, ncclAllGather, const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream); @@ -91,7 +93,13 @@ ncclResult_t ncclAllGather_impl(const void* sendbuff, void* recvbuff, size_t sen sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */ ALLGATHER_CHUNKSTEPS, comm -> rcclUseOneSlice ? ALLGATHER_SLICESTEPS_SINGLE_NODE : ALLGATHER_SLICESTEPS, nullptr }; - if (!mscclIsCaller()) // when msccl falls back to + int nRanks; + const void* srcbuff; + int in_place = 0; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + size_t msgSize = sendcount * ncclTypeSize(datatype) * nRanks; + + if (!mscclIsCaller()) { NCCLCHECK(Recorder::instance().record(rrAllGather, info)); } @@ -102,7 +110,32 @@ ncclResult_t ncclAllGather_impl(const void* sendbuff, void* recvbuff, size_t sen sendcount, datatype, 0, 0, ncclSum, mscclFuncAllGather, comm, stream); } - return ncclEnqueueCheck(&info); + if (comm->enableCustColl && (comm->nNodes > 1 && comm->nNodes <= 16) && (msgSize <= rcclParamDirectAllGatherThreshold() && + rcclParamDirectAllGatherThreshold() > -1)) { + // use direct allgather + if (sendcount == 0) return ncclSuccess; + size_t rankOffset = sendcount * ncclTypeSize(datatype); + if (((char*)recvbuff) != (((char*)sendbuff) + comm->rank * rankOffset)) { + srcbuff = sendbuff; + } else { + srcbuff = ((char*)recvbuff) + comm->rank * rankOffset; + in_place = 1;; + } + NCCLCHECK(ncclGroupStart()); + for (int r = 0; r < nRanks; r++) { + int peer = (comm->rank + r) % nRanks; + if (in_place && (peer == comm->rank)) { + continue; + } + NCCLCHECK(ncclSend(((char*)srcbuff), sendcount, datatype, peer, comm, stream)); + NCCLCHECK(ncclRecv(((char*)recvbuff) + peer * rankOffset, sendcount, datatype, peer, comm, stream)); + } + NCCLCHECK(ncclGroupEnd()); + return ncclSuccess; + } else { + // use ring allgather + return ncclEnqueueCheck(&info); + } } NCCL_API(ncclResult_t, ncclAllReduce, const void* sendbuff, void* recvbuff, size_t count, diff --git a/src/include/comm.h b/src/include/comm.h index d0ead1c8d3..42690b7a59 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -690,6 +690,9 @@ struct ncclComm { // Unroll factor for comm [RCCL] int unroll; + // custom collective + bool enableCustColl; + uint64_t endMagic; }; diff --git a/src/init.cc b/src/init.cc index 9a97ecde5a..e7aab0f1c8 100644 --- a/src/init.cc +++ b/src/init.cc @@ -2695,6 +2695,7 @@ ncclResult_t ncclCommDestroy_impl(ncclComm_t comm) { NVTX3_FUNC_RANGE_IN(nccl_domain); return ncclSuccess; } + INFO(NCCL_INIT, "Memory used = %ld", allocTracker[comm->cudaDev].totalAllocSize); #ifdef ENABLE_MSCCLPP if (comm->mscclppCompatible) { diff --git a/src/rccl_wrap.cc b/src/rccl_wrap.cc index d1efc30bde..2a38b5fc88 100644 --- a/src/rccl_wrap.cc +++ b/src/rccl_wrap.cc @@ -122,10 +122,13 @@ ncclResult_t rcclGetAlgoInfo(struct ncclComm* comm, ncclFunc_t coll, uint64_t co void rcclSetPxn(struct ncclComm* comm, int& rcclPxnDisable) { static int pxnDisable = RCCL_VALUE_UNSET; + comm->enableCustColl = false; if(pxnDisable == RCCL_VALUE_UNSET) { const char *inputStr = getenv("NCCL_PXN_DISABLE"); const bool archGfx942 = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx942"); const bool archGfx950 = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx950"); + comm->enableCustColl = (archGfx942 || archGfx950) && (inputStr && !atoi(inputStr)); + if((!archGfx942 && !archGfx950) || inputStr) { rcclPxnDisable = pxnDisable = RCCL_VALUE_INVALID; return; @@ -135,6 +138,7 @@ void rcclSetPxn(struct ncclComm* comm, int& rcclPxnDisable) { INFO(NCCL_INIT, "RCCL PXN set as %s", !pxnDisable? "enabled" : "disabled"); } rcclPxnDisable = pxnDisable; + comm->enableCustColl = !pxnDisable; } void rcclSetP2pNetChunkSize(struct ncclComm* comm, int& rcclP2pNetChunkSize) {