2
0

Add direct allgather algorithm (#1868)

* add direct allgather algorithm

* minor fix

* add debug print for memory allocation tracker

* add message size threshold for direct allgather

* scatter transfers across ranks

* update changelog

* minor fix

* Update CHANGELOG.md

Co-authored-by: Jeffrey Novotny <jnovotny@amd.com>

* enable direct AG when pxn is ON on MI300X or MI350

---------

Co-authored-by: Jeffrey Novotny <jnovotny@amd.com>
Este cometimento está contido em:
Nusrat Islam
2025-08-25 07:55:10 -05:00
cometido por GitHub
ascendente b88c134874
cometimento 5e7937effb
5 ficheiros modificados com 44 adições e 3 eliminações
+1 -1
Ver ficheiro
@@ -24,7 +24,7 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https:
* Two new APIs are exposed as part of an initiative to separate RCCL code. These APIs are `rcclGetAlgoInfo` and `rcclFuncMaxSendRecvCount`. However, user-level invocation requires that RCCL be built with `RCCL_EXPOSE_STATIC` enabled.
* Enabled double-buffering in `reduceCopyPacks` to trigger pipelining, especially to overlap bf16 arithmetic.
* Added `--force-reduce-pipeline` as an option that can be passed to the `install.sh` script. Passing this option will enable software-triggered pipelining `bfloat16` reductions (i.e. `all_reduce`, `reduce_scatter` and `reduce`).
* Added a direct allgather algorithm. This is enabled by default for multi-node if there are 16 nodes or fewer. The message size threshold is 4MB.
### Changed
+35 -2
Ver ficheiro
@@ -79,6 +79,8 @@ const char* ncclProtoToString(int proto) {
}
}
RCCL_PARAM(DirectAllGatherThreshold, "DIRECT_ALLGATHER_THRESHOLD", 4194304);
NCCL_API(ncclResult_t, ncclAllGather, const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream);
@@ -91,7 +93,13 @@ ncclResult_t ncclAllGather_impl(const void* sendbuff, void* recvbuff, size_t sen
sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */
ALLGATHER_CHUNKSTEPS, comm -> rcclUseOneSlice ? ALLGATHER_SLICESTEPS_SINGLE_NODE : ALLGATHER_SLICESTEPS, nullptr };
if (!mscclIsCaller()) // when msccl falls back to
int nRanks;
const void* srcbuff;
int in_place = 0;
NCCLCHECK(ncclCommCount(comm, &nRanks));
size_t msgSize = sendcount * ncclTypeSize(datatype) * nRanks;
if (!mscclIsCaller())
{
NCCLCHECK(Recorder::instance().record(rrAllGather, info));
}
@@ -102,7 +110,32 @@ ncclResult_t ncclAllGather_impl(const void* sendbuff, void* recvbuff, size_t sen
sendcount, datatype, 0, 0, ncclSum, mscclFuncAllGather, comm, stream);
}
return ncclEnqueueCheck(&info);
if (comm->enableCustColl && (comm->nNodes > 1 && comm->nNodes <= 16) && (msgSize <= rcclParamDirectAllGatherThreshold() &&
rcclParamDirectAllGatherThreshold() > -1)) {
// use direct allgather
if (sendcount == 0) return ncclSuccess;
size_t rankOffset = sendcount * ncclTypeSize(datatype);
if (((char*)recvbuff) != (((char*)sendbuff) + comm->rank * rankOffset)) {
srcbuff = sendbuff;
} else {
srcbuff = ((char*)recvbuff) + comm->rank * rankOffset;
in_place = 1;;
}
NCCLCHECK(ncclGroupStart());
for (int r = 0; r < nRanks; r++) {
int peer = (comm->rank + r) % nRanks;
if (in_place && (peer == comm->rank)) {
continue;
}
NCCLCHECK(ncclSend(((char*)srcbuff), sendcount, datatype, peer, comm, stream));
NCCLCHECK(ncclRecv(((char*)recvbuff) + peer * rankOffset, sendcount, datatype, peer, comm, stream));
}
NCCLCHECK(ncclGroupEnd());
return ncclSuccess;
} else {
// use ring allgather
return ncclEnqueueCheck(&info);
}
}
NCCL_API(ncclResult_t, ncclAllReduce, const void* sendbuff, void* recvbuff, size_t count,
+3
Ver ficheiro
@@ -690,6 +690,9 @@ struct ncclComm {
// Unroll factor for comm [RCCL]
int unroll;
// custom collective
bool enableCustColl;
uint64_t endMagic;
};
+1
Ver ficheiro
@@ -2695,6 +2695,7 @@ ncclResult_t ncclCommDestroy_impl(ncclComm_t comm) {
NVTX3_FUNC_RANGE_IN(nccl_domain);
return ncclSuccess;
}
INFO(NCCL_INIT, "Memory used = %ld", allocTracker[comm->cudaDev].totalAllocSize);
#ifdef ENABLE_MSCCLPP
if (comm->mscclppCompatible) {
+4
Ver ficheiro
@@ -122,10 +122,13 @@ ncclResult_t rcclGetAlgoInfo(struct ncclComm* comm, ncclFunc_t coll, uint64_t co
void rcclSetPxn(struct ncclComm* comm, int& rcclPxnDisable) {
static int pxnDisable = RCCL_VALUE_UNSET;
comm->enableCustColl = false;
if(pxnDisable == RCCL_VALUE_UNSET) {
const char *inputStr = getenv("NCCL_PXN_DISABLE");
const bool archGfx942 = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx942");
const bool archGfx950 = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx950");
comm->enableCustColl = (archGfx942 || archGfx950) && (inputStr && !atoi(inputStr));
if((!archGfx942 && !archGfx950) || inputStr) {
rcclPxnDisable = pxnDisable = RCCL_VALUE_INVALID;
return;
@@ -135,6 +138,7 @@ void rcclSetPxn(struct ncclComm* comm, int& rcclPxnDisable) {
INFO(NCCL_INIT, "RCCL PXN set as %s", !pxnDisable? "enabled" : "disabled");
}
rcclPxnDisable = pxnDisable;
comm->enableCustColl = !pxnDisable;
}
void rcclSetP2pNetChunkSize(struct ncclComm* comm, int& rcclP2pNetChunkSize) {