Files
rocm-systems/src/device/reduce_scatter.h
T
Sylvain Jeaugey 68b542363f 2.23.4-1
Add scalable init API
 * Add new ncclCommInitRankScalable to allow for passing multiple
   unique IDs to the init function.
 * Spreads the load onto multiple bootstrap roots, allowing for
   constant bootstrap time.
 * Requires multiple ranks to create a unique ID, and the CPU-side
   ID exchange code to call allgather[v] instead of broadcast.

Accelerate init bootstrap operations
 * Reduce the number of calls to allgather.
 * Allow roots to reply early to ranks when information is already
   available.
 * Add an option to use ncclNet instead of sockets to perform
   bootstrap allgather operations.

Add PAT algorithms for Allgather and ReduceScatter
 * Parallel Aggregated Trees, variation of Bruck algorithm.
 * Logarithmic number of network steps for small sizes at scale.
 * Only supports one rank per node at the moment.

Add support for registered buffers for intra-node communication.
 * Allow registered user buffers to be accessed directly intra-node
 * Avoids extra copies in algorithms which permit it, saving
   memory bandwidth and helping with compute overlap.

Add profiler plugin API
 * New plugin API for profiling
 * Supports various levels of profiling, with a hierarchy.

Asynchronous graph allocation
 * Make calls to cudaMalloc and cudaMemcpy during graph allocation
   asynchronous.
 * Significantly speeds up graph capture.

Use fatal IB asynchronous events to stop network operation
 * Avoids many other error messages
 * Only fatal errors are affected; potentially transient errors
   (e.g. port down) do not cause an immediate stop.

Set P2P level to PXB on AMD CPUs when using more than 2 GPUs per node
 * P2P would cause a significant performance degradation when using
   many GPUs, and therefore many interleaved data flows.
 * Disable P2P through the CPU when we have 3+ GPUs per node; keep it
   enabled when we only have 2 GPUs.

Improve the init logs to report the real NCCL function.
 * Make the log report ncclCommInitRank or ncclCommSplit, rather than
   the generic ncclCommInitRankFunc.

Add a parameter to set the location of the user configuration file.
 * Add NCCL_CONF_FILE environment variable to set where the user's
   configuration file resides.

Increase default IB timeout
 * Increase IB timeout value from 18 to 20.
 * Should help avoid fatal errors on large RoCE systems.

Add new check for nvidia peermem
 * On linux kernels 6.6+, /sys/kernel/mm/memory_peers is no longer
   present; check for /sys/module/nvidia_peermem/version instead.

Fix old performance regression when mixing small and large operations.
 * Improves distribution of work on channels.

Fix crash when NUMA IDs are equal to -1.
 * Can happen when a NIC is a virtual NIC, or when linux doesn't
   know which NUMA node a device is attached to
 * Issue NVIDIA/nccl-tests#233

Fix tree graph search when NCCL_CROSS_NIC is set to 1.
 * Would force NCCL to use the balanced_tree pattern, thereby
   disabling LL128 on platforms with 1 GPU+1 NIC per PCI switch.
 * Would also try to use alternate rings even though it was not
   needed.

Compiler tweaks and fixes
 * PR #1177
 * PR #1228

Fix stack smash
 * PR #1325

Fixes for multi-node NVLink + IB operation

Coverity fixes and comments.
2024-09-16 23:41:17 -07:00

350 γραμμές
15 KiB
C++

/*************************************************************************
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "device.h"
#include "collectives.h"
#include "primitives.h"
namespace {
template<typename T, typename RedOp, typename Proto>
__device__ __forceinline__ void runRing(int tid, int nthreads, struct ncclDevWorkColl* work) {
ncclRing *ring = &ncclShmem.channel.ring;
int const *ringRanks = ring->userRanks;
const int nranks = ncclShmem.comm.nRanks;
size_t count;
size_t gridOffset;
size_t channelCount;
size_t chunkCount;
ncclCollCbdPart(work, ncclShmem.channelId, Proto::Id, sizeof(T), &count, &gridOffset, &channelCount, &chunkCount);
size_t offset;
size_t dataOffset;
uint32_t nelem;
int rankDest;
// Coverity reports that the callee treats &ring->next as an array. However, due to the use of
// FanSymmetric<1>, only the first element is ever accessed, so it's fine.
// coverity[callee_ptr_arith:FALSE]
Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>
prims(tid, nthreads, &ring->prev, &ring->next, work->sendbuff, work->recvbuff, work->redOpArg);
for (size_t elemOffset = 0; elemOffset < channelCount; elemOffset += chunkCount) {
nelem = min(chunkCount, channelCount - elemOffset);
dataOffset = gridOffset + elemOffset;
/////////////// begin ReduceScatter steps ///////////////
// step 0: push data to next GPU
rankDest = ringRanks[nranks-1];
offset = dataOffset + rankDest * count;
prims.send(offset, nelem);
// k-2 steps: reduce and copy to next GPU
for (int j=2; j<nranks; ++j) {
rankDest = ringRanks[nranks-j];
offset = dataOffset + rankDest * count;
prims.recvReduceSend(offset, nelem);
}
// step k-1: reduce this buffer and data, which will produce the final result
rankDest = ringRanks[0];
offset = dataOffset + rankDest * count;
prims.recvReduceCopy(offset, dataOffset, nelem, /*postOp=*/true);
}
}
}
template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncReduceScatter, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
using Proto = ProtoSimple<REDUCESCATTER_CHUNKSTEPS/REDUCESCATTER_SLICESTEPS, REDUCESCATTER_SLICESTEPS>;
runRing<T, RedOp, Proto>(tid, nthreads, work);
}
};
template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncReduceScatter, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_LL> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
runRing<T, RedOp, ProtoLL>(tid, nthreads, work);
}
};
template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncReduceScatter, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_LL128> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
runRing<T, RedOp, ProtoLL128>(tid, nthreads, work);
}
};
template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncReduceScatter, T, RedOp, NCCL_ALGO_PAT, NCCL_PROTO_SIMPLE> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
using Proto = ProtoSimple<1, 1>;
const int nranks = ncclShmem.comm.nRanks;
const int rank = ncclShmem.comm.rank;
size_t count, channelOffset, channelCount, chunkCount;
ncclCollCbdPart(work, ncclShmem.channelId, Proto::Id, sizeof(T), &count, &channelOffset, &channelCount, &chunkCount);
T *inputBuf = (T*)work->sendbuff;
T *outputBuf = (T*)work->recvbuff;
Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0> prims
(tid, nthreads, NULL, NULL, inputBuf, outputBuf, work->redOpArg, 0*Proto::MaxGroupWidth, 0, 0, nullptr, false, false, 0, primsModePatRs);
PatRSAlgorithm<T> patAlgo(chunkCount*sizeof(T), NCCL_STEPS, channelOffset, channelOffset + channelCount, count, chunkCount, rank, nranks);
int last = 0;
while (!last) {
int recvDim, sendDim, recvOffset, sendOffset, sendStepOffset, postRecv, postSend, nelem;
size_t inpIx, outIx;
patAlgo.getNextOp(recvDim, sendDim, inpIx, outIx, recvOffset, sendOffset, sendStepOffset, nelem, postRecv, postSend, last);
prims.patReduce(recvDim, sendDim, inpIx, outIx, recvOffset, sendOffset, sendStepOffset, nelem, postRecv, postSend);
}
}
};
template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncReduceScatter, T, RedOp, NCCL_ALGO_NVLS, NCCL_PROTO_SIMPLE> {
__device__ __forceinline__ void run(int tid, int/*nthreads*/, struct ncclDevWorkColl* work) {
struct ncclNvls* nvls = &ncclShmem.channel.nvls;
size_t count;
size_t gridOffset;
size_t channelCount;
size_t chunkCount;
ncclCollCbdPart(work, ncclShmem.channelId, NCCL_PROTO_SIMPLE, sizeof(T), &count, &gridOffset, &channelCount, &chunkCount);
const int rank = ncclShmem.comm.rank;
const int nranks = ncclShmem.comm.nRanks;
size_t offset;
int nelem;
/* if we are direct NVLS, we only need to allocate 1 warp to scatter for sync;
* if not, based on #ranks, we allocate 7 or 5 warps to reduce to saturate bandwidth
* and the rest are allocated to scatter. */
const int nThreadsReduce = work->regUsed ? (NCCL_MAX_NTHREADS - WARP_SIZE) : (nranks <= 6 ? 7 * WARP_SIZE : 5 * WARP_SIZE);
const int nThreadsScatter = work->regUsed ? WARP_SIZE : (NCCL_MAX_NTHREADS - nThreadsReduce);
const int tidEndScatter = nThreadsScatter;
const int tidEndReduce = tidEndScatter + nThreadsReduce;
if (!work->regUsed) {
if (tid < tidEndScatter) {
// Scatter
using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>
prims(tid, nThreadsScatter, NULL, nvls->up, work->sendbuff, NULL,
work->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1);
for (size_t elemOffset = 0; elemOffset < channelCount; elemOffset += chunkCount) {
offset = gridOffset + elemOffset;
nelem = min(chunkCount, channelCount - elemOffset);
prims.scatter(offset, nvls->nHeads * count, nelem, count, -1, 0);
}
} else if (tid < tidEndReduce) {
// Reduce through NVLS
using Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 0>;
Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>
prims(tid - tidEndScatter, nThreadsReduce, &nvls->down, NULL, NULL, work->recvbuff,
work->redOpArg, 3 * Proto::MaxGroupWidth, 0, 0);
for (size_t elemOffset = 0; elemOffset < channelCount; elemOffset += chunkCount) {
offset = gridOffset + elemOffset;
nelem = min(chunkCount, channelCount - elemOffset);
prims.recv(offset, nelem);
}
}
} else {
if (tid < tidEndScatter) {
// Scatter
using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
Primitives<T, RedOp, FanSymmetric<NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>
prims(tid, nThreadsScatter, nvls->up, nvls->up, NULL, NULL,
work->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1);
for (size_t elemOffset = 0; elemOffset < channelCount; elemOffset += chunkCount) {
prims.scatter(0, 0, 0, 0, -1, 0);
}
/* gather used as sync */
prims.gather(0, 0, 0, 0, -1, 0);
} else if (tid < tidEndReduce) {
// Reduce through NVLS
using Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 0>;
Primitives<T, RedOp, FanSymmetric<1>, /*Direct=*/1, Proto, 0>
prims(tid - tidEndScatter, nThreadsReduce, &nvls->down, &nvls->down, NULL, work->recvbuff,
work->redOpArg, 3 * Proto::MaxGroupWidth, 0, 0, work);
for (size_t elemOffset = 0; elemOffset < channelCount; elemOffset += chunkCount) {
size_t outOffset = gridOffset + elemOffset;
size_t inpOffset = outOffset + rank * count;
nelem = min(chunkCount, channelCount - elemOffset);
// Coverity complains about a possible overrun inside the method invoked below, but that's actually
// a false positive.
// coverity[overrun-call:FALSE]
prims.directRecvCopy(inpOffset, outOffset, nelem);
}
/* send for sync */
prims.send(0, 0);
}
}
}
};
template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncReduceScatter, T, RedOp, NCCL_ALGO_COLLNET_DIRECT, NCCL_PROTO_SIMPLE> {
template<bool ReduceSendNotRecv>
struct Scatterer {
struct ncclDevWorkColl* work;
int chunkSize;
ssize_t railGridOffset;
template<int SlicePerChunk, int MinSrcs, int MaxSrcs, int MinDsts, int MaxDsts>
__device__ __forceinline__ void operator()(
int tid, int tn, int slice, int maxSliceSize,
int nSrcs, void** srcPtrs, int nDsts, void** dstPtrs, int32_t* dstSizes, uint32_t sendDirectFlag, uint32_t recvDirectFlag
) {
static_assert(SlicePerChunk==1, "require: SlicePerChunk==1");
static_assert(MaxDsts<=1 || MaxSrcs<=1, "require: MaxDsts<=1 || MaxSrcs<=1");
struct ncclDirect* direct = &ncclShmem.channel.collnetDirect;
int nNodes = ncclShmem.comm.nNodes;
int nRails = direct->nHeads;
int part = ncclShmem.channelId - work->channelLo;
void* inbuf = (void*)work->sendbuff;
ssize_t sizePerRank = work->collnet.count;
ssize_t railAllBeg = min(railGridOffset + part*chunkSize, nNodes*sizePerRank);
ssize_t railAllEnd = min(railAllBeg + chunkSize, nNodes*sizePerRank);
int railAllSize = railAllEnd - railAllBeg;
if (tid < nDsts) dstSizes[tid] = railAllSize;
int dst = 0;
int rail;
if (!ReduceSendNotRecv) {
rail = direct->headRank;
} else {
rail = direct->headRank+1;
if (rail == nRails) rail = 0;
}
do {
int node = railAllBeg/sizePerRank;
int railAllOffset = 0;
while (railAllOffset < railAllSize) {
ssize_t railOneBeg = node*sizePerRank;
ssize_t railOneEnd = railOneBeg + sizePerRank;
ssize_t railOneOffset = (railAllBeg+railAllOffset) - railOneBeg;
int delta = min(railAllEnd, railOneEnd) - (railAllBeg+railAllOffset);
int rank = ncclShmem.comm.collNetDenseToUserRank[node*nRails + rail];
ssize_t userOneBeg = rank*sizePerRank + railOneOffset;
if (nDsts != 0) {
reduceCopy<ncclCollUnroll(), RedOp, T,
/*MultimemSrcs=*/0, 1+MinSrcs, 1+MaxSrcs,
/*MultimemDsts,MinDsts,MaxDsts=*/0,1,1,
/*PreOpSrcs=*/1>
(tid, tn, work->redOpArg, &work->redOpArg, false,
/*nSrcs=*/1+nSrcs, [=]__device__(int s) {
return s==0 ? (T*)inbuf + userOneBeg
: work->regUsed && (recvDirectFlag & NCCL_DIRECT_READ)
? (T*)srcPtrs[s-1] + userOneBeg
: (T*)srcPtrs[s-1] + railAllOffset;
},
/*nDsts=*/1, [=]__device__(int d/*==0*/) {
return (T*)dstPtrs[dst] + railAllOffset;
},
delta);
}
railAllOffset += delta;
node += 1;
}
dst += 1;
rail += 1;
if (rail == nRails) rail = 0;
} while (ReduceSendNotRecv && dst < nRails-1);
}
};
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
const int part = ncclShmem.channelId - work->channelLo;
const int nChannels = work->channelHi - work->channelLo + 1;
struct ncclDirect* direct = &ncclShmem.channel.collnetDirect;
int const &nNodes = ncclShmem.comm.nNodes;
ssize_t chunkSize = int(work->collnet.chunkCount);
ssize_t sizePerRank = work->collnet.count;
if (direct->out == -1) __trap();
bool isMultiRail = (direct->nHeads > 1);
int nWarps1 = (isMultiRail ? 2 : 0);
int nWarps2 = (isMultiRail ? 2 : 1);
int nWarps3 = 1;
float denom = float(work->nWarps)/float(nWarps1+nWarps2+nWarps3);
nWarps3 = int(denom*nWarps3);
nWarps2 = int(denom*nWarps2);
nWarps1 = work->nWarps - (nWarps2+nWarps3);
using Proto = ProtoSimple<1, 1>;
int tn = nWarps1*WARP_SIZE;
if (tid < tn) {
// Phase 1: Scatter inputs to peers
Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_DIRECT_ARITY>, /*Direct=*/1, Proto, 0>
prims(tid, tn, nullptr, direct->heads+1, work->sendbuff, nullptr,
work->redOpArg, 0*Proto::MaxGroupWidth, 1, 1, work);
for (ssize_t railGridOffset=0; railGridOffset < nNodes*sizePerRank; railGridOffset += nChannels*chunkSize) {
Scatterer</*ReduceSendNotRecv=*/true> scat;
scat.work = work;
scat.chunkSize = chunkSize;
scat.railGridOffset = railGridOffset;
prims.template process</*Recv=*/0, /*Send=*/1>(scat, NCCL_DIRECT_READ, 0);
}
return;
}
tid -= tn;
tn = nWarps2*WARP_SIZE;
if (tid < tn) {
if (work->regUsed == NCCL_COLLNET_REG_BUFFER) {
if (tid == 0) {
int steps = (int)divUp(nNodes * sizePerRank * sizeof(T), NCCL_MAX_COLLNET_SIZE);
Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_DIRECT_ARITY, 1>, /*Direct=*/0, Proto, 0>::sendPeerNotify(direct->out, 1, steps);
}
__syncwarp();
} else {
// Phase 2: Reduce from peers + local input -> send to network
Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_DIRECT_ARITY, 1>, /*Direct=*/1, Proto, 0>
prims(tid, tn, direct->heads + 1, &direct->out, nullptr, nullptr,
work->redOpArg, 1 * Proto::MaxGroupWidth, 1, 1, work);
for (ssize_t railGridOffset = 0; railGridOffset < nNodes * sizePerRank; railGridOffset += nChannels * chunkSize) {
Scatterer</*ReduceSendNotRecv=*/false> scat;
scat.work = work;
scat.chunkSize = chunkSize;
scat.railGridOffset = railGridOffset;
prims.template process</*Recv=*/1, /*Send=*/1>(scat, 0, NCCL_DIRECT_READ);
}
}
return;
}
tid -= tn;
tn = nWarps3*WARP_SIZE;
if (tid < tn) {
if (work->regUsed == NCCL_COLLNET_REG_BUFFER) {
if (tid == 0) {
int steps = (int)divUp(nNodes * sizePerRank * sizeof(T), NCCL_MAX_COLLNET_SIZE);
Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>::recvPeerNotify(direct->out, 0, steps);
}
__syncwarp();
} else {
// Phase 3: recv from network
Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>
prims(tid, tn, &direct->out, nullptr, nullptr, work->recvbuff,
work->redOpArg, 2 * Proto::MaxGroupWidth, 0, 0);
for (ssize_t railGridOffset = 0; railGridOffset < nNodes * sizePerRank; railGridOffset += nChannels * chunkSize) {
ssize_t railAllBeg = railGridOffset + part * chunkSize;
ssize_t railAllEnd = min(railAllBeg + chunkSize, nNodes * sizePerRank);
ssize_t railOneBeg = ncclShmem.comm.node * sizePerRank;
ssize_t railOneEnd = railOneBeg + sizePerRank;
ssize_t beg = max(railAllBeg, railOneBeg);
ssize_t end = min(railAllEnd, railOneEnd);
prims.recv(beg - railOneBeg, max(ssize_t(0), end - beg), /*postOp=*/true);
}
}
return;
}
}
};