From e80e29573cb297a8f09bb7e9478926a991e73d81 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Wed, 18 Mar 2020 17:03:03 -0700 Subject: [PATCH] Add gather, scatter and alltoall collectives Introducing 3 new APIs: ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff, size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); Only out of place operation is supported. Preprocessor symbol RCCL_GATHER_SCATTER=1 indicates API availibility. By default the APIs launche RCCL kernel implementation, which can be disabled by RCCL_ALLTOALL_KERNEL_DISABLE=1. Then the APIs use wrapper around ncclSend and ncclRecv. --- CMakeLists.txt | 6 +++ src/collectives/all_to_all_api.cc | 33 +++++++++++++ src/collectives/device/all_to_all.cu | 12 +++++ src/collectives/device/all_to_all.h | 64 ++++++++++++++++++++++++ src/collectives/device/common.h | 15 ++++++ src/collectives/device/gather.cu | 12 +++++ src/collectives/device/gather.h | 74 ++++++++++++++++++++++++++++ src/collectives/device/scatter.cu | 12 +++++ src/collectives/device/scatter.h | 74 ++++++++++++++++++++++++++++ src/collectives/gather_api.cc | 37 ++++++++++++++ src/collectives/scatter_api.cc | 37 ++++++++++++++ src/enqueue.cc | 35 ++++++++++--- src/include/collectives.h | 17 +++++-- src/include/comm.h | 3 ++ src/include/devcomm.h | 4 +- src/include/info.h | 3 +- src/include/param.h | 25 ++++++++++ src/include/proxy.h | 1 + src/init.cc | 49 +++++++++++++++++- src/misc/argcheck.cc | 7 ++- src/nccl.h.in | 47 ++++++++++++++++++ src/proxy.cc | 21 ++++++++ tools/topo_expl/topo_expl.cpp | 3 ++ tools/topo_expl/utils.cpp | 41 +++++++++++++++ 24 files changed, 617 insertions(+), 15 deletions(-) create mode 100644 src/collectives/all_to_all_api.cc create mode 100644 src/collectives/device/all_to_all.cu create mode 100644 src/collectives/device/all_to_all.h create mode 100644 src/collectives/device/gather.cu create mode 100644 src/collectives/device/gather.h create mode 100644 src/collectives/device/scatter.cu create mode 100644 src/collectives/device/scatter.h create mode 100644 src/collectives/gather_api.cc create mode 100644 src/collectives/scatter_api.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 2060cb0685..6feb3e2b87 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,6 +91,9 @@ set(CU_SOURCES src/collectives/device/broadcast.cu src/collectives/device/reduce_scatter.cu src/collectives/device/sendrecv.cu + src/collectives/device/gather.cu + src/collectives/device/scatter.cu + src/collectives/device/all_to_all.cu src/collectives/device/functions.cu) set(CPP_SOURCES) @@ -119,6 +122,9 @@ set(CC_SOURCES src/collectives/broadcast_api.cc src/collectives/reduce_scatter_api.cc src/collectives/sendrecv_api.cc + src/collectives/gather_api.cc + src/collectives/scatter_api.cc + src/collectives/all_to_all_api.cc src/channel.cc src/misc/argcheck.cc src/misc/nvmlwrap_stub.cc diff --git a/src/collectives/all_to_all_api.cc b/src/collectives/all_to_all_api.cc new file mode 100644 index 0000000000..d4a8eb02ed --- /dev/null +++ b/src/collectives/all_to_all_api.cc @@ -0,0 +1,33 @@ +/************************************************************************* + * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "enqueue.h" +#include "collectives.h" + +NCCL_API(ncclResult_t, ncclAllToAll, const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, + ncclComm_t comm, hipStream_t stream); +ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, + ncclComm_t comm, hipStream_t stream) { + if (comm->alltoallDisable) { + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + size_t rankOffset = count * ncclTypeSize(datatype); + if (count == 0) return ncclSuccess; + NCCLCHECK(ncclGroupStart()); + for (int r=0; r +__attribute__((noinline)) +__device__ void ncclAllToAllKernel(struct CollectiveArgs* args) { + const int tid = threadIdx.x; + const int nthreads = args->coll.nThreads; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const ssize_t size = args->coll.count; + const int nranks = comm->nRanks; + const int bid = args->coll.bid; + const int rank = ring->devUserRanks[0]; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * ALLTOALL_CHUNKSTEPS; + const int peersPerChan = (nChannels >= nranks ? 1 : DIVUP(nranks, nChannels)); + const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + for (int i = 0; i < peersPerChan; i++) { + if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || + (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) + continue; + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + if ((blockIdx.x*peersPerChan+i)%nranks == 0) { + if (tid < nthreads && thisInput != thisOutput) { + const T* sendbuff = thisInput+chunkOffset+rank*size; + T* recvbuff = thisOutput+chunkOffset+rank*size; + // local copy + ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + } + } + else { + int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; + int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; + ncclPrimitives + prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount); + + ssize_t send_offset = chunkOffset + peerSend*size; + ssize_t recv_offset = chunkOffset + peerRecv*size; + prims.send(thisInput+send_offset, nelem); + prims.recv(thisOutput+recv_offset, nelem); + } + } + } +} diff --git a/src/collectives/device/common.h b/src/collectives/device/common.h index 713f8f04a1..7f834ddc4e 100644 --- a/src/collectives/device/common.h +++ b/src/collectives/device/common.h @@ -96,6 +96,9 @@ static inline __device__ void exitIfAbortBarrier(int abort) { NCCL_FUNCS2B(ncclAllGather), \ NCCL_FUNCS2A(ncclReduceScatter), \ NCCL_FUNCS2A(ncclAllReduce), \ + NCCL_COLL_NAME(ncclGather, copy, i8), \ + NCCL_COLL_NAME(ncclScatter, copy, i8), \ + NCCL_COLL_NAME(ncclAllToAll, copy, i8), \ NCCL_COLL_NAME(ncclSendRecv, copy, i8) } // Must be consistent with the ncclFuncSet enum @@ -111,6 +114,9 @@ static const __device__ constexpr ncclKernelFunc_t ncclFuncs[]{ NCCL_FUNCS2B(ncclAllGather), NCCL_FUNCS2A(ncclReduceScatter), NCCL_FUNCS2A(ncclAllReduce), + NCCL_COLL_NAME(ncclGather, copy, i8), + NCCL_COLL_NAME(ncclScatter, copy, i8), + NCCL_COLL_NAME(ncclAllToAll, copy, i8), NCCL_COLL_NAME(ncclSendRecv, copy, i8) #endif }; @@ -159,6 +165,15 @@ void NCCL_CALL_FUNCTIONS(struct ncclColl* const c) noexcept { else ncclAllGatherCollNet_copy_i8(&c->args); } else if (c->funcIndex < 1800) Caller<1080, 1800>::call(c); + else if (c->funcIndex == 1800) { + ncclGather_copy_i8(&c->args); + } + else if (c->funcIndex == 1801) { + ncclScatter_copy_i8(&c->args); + } + else if (c->funcIndex == 1802) { + ncclAllToAll_copy_i8(&c->args); + } else ncclSendRecv_copy_i8(&c->args); } diff --git a/src/collectives/device/gather.cu b/src/collectives/device/gather.cu new file mode 100644 index 0000000000..5aca1afdaa --- /dev/null +++ b/src/collectives/device/gather.cu @@ -0,0 +1,12 @@ +/************************************************************************* + * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "gather.h" +#include "common.h" +#include "collectives.h" + +IMPL_COLL_FUNC(ncclGather, copy, FuncSum, i8, int8_t); +IMPL_COLL_KERN(ncclGather, copy, FuncSum, i8, int8_t, 0); diff --git a/src/collectives/device/gather.h b/src/collectives/device/gather.h new file mode 100644 index 0000000000..ab3db2eae0 --- /dev/null +++ b/src/collectives/device/gather.h @@ -0,0 +1,74 @@ +/************************************************************************* + * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "devcomm.h" +#include "primitives.h" +#include "collectives.h" + +template +__attribute__((noinline)) +__device__ void ncclGatherKernel(struct CollectiveArgs* args) { + const int tid = threadIdx.x; + const int nthreads = args->coll.nThreads; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const ssize_t size = args->coll.count; + const int nranks = comm->nRanks; + const int bid = args->coll.bid; + const int rank = ring->devUserRanks[0]; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * GATHER_CHUNKSTEPS; + const int peersPerChan = (nChannels >= nranks ? 1 : DIVUP(nranks, nChannels)); + const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); + const int root = args->coll.root; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + for (int i = 0; i < peersPerChan; i++) { + if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || + (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) + continue; + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + if ((blockIdx.x*peersPerChan+i)%nranks == 0 && rank == root) { + const T* sendbuff = thisInput+chunkOffset; + T* recvbuff = thisOutput+chunkOffset+rank*size; + if (tid < nthreads && sendbuff != recvbuff) { + // local copy + ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + } + } + else { + int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; + int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; + if (rank == root) { + ncclPrimitives + prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount); + + ssize_t recv_offset = chunkOffset + peerRecv*size; + prims.recv(thisOutput+recv_offset, nelem); + } + else { + if (peerSend == root) { + ncclPrimitives + prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount); + + ssize_t send_offset = chunkOffset; + prims.send(thisInput+send_offset, nelem); + } + } + } + } + } +} diff --git a/src/collectives/device/scatter.cu b/src/collectives/device/scatter.cu new file mode 100644 index 0000000000..6da04434f3 --- /dev/null +++ b/src/collectives/device/scatter.cu @@ -0,0 +1,12 @@ +/************************************************************************* + * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "scatter.h" +#include "common.h" +#include "collectives.h" + +IMPL_COLL_FUNC(ncclScatter, copy, FuncSum, i8, int8_t); +IMPL_COLL_KERN(ncclScatter, copy, FuncSum, i8, int8_t, 0); diff --git a/src/collectives/device/scatter.h b/src/collectives/device/scatter.h new file mode 100644 index 0000000000..15235df784 --- /dev/null +++ b/src/collectives/device/scatter.h @@ -0,0 +1,74 @@ +/************************************************************************* + * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "devcomm.h" +#include "primitives.h" +#include "collectives.h" + +template +__attribute__((noinline)) +__device__ void ncclScatterKernel(struct CollectiveArgs* args) { + const int tid = threadIdx.x; + const int nthreads = args->coll.nThreads; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const ssize_t size = args->coll.count; + const int nranks = comm->nRanks; + const int bid = args->coll.bid; + const int rank = ring->devUserRanks[0]; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * SCATTER_CHUNKSTEPS; + const int peersPerChan = (nChannels >= nranks ? 1 : DIVUP(nranks, nChannels)); + const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); + const int root = args->coll.root; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + for (int i = 0; i < peersPerChan; i++) { + if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || + (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) + continue; + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + if ((blockIdx.x*peersPerChan+i)%nranks == 0 && rank == root) { + const T* sendbuff = thisInput+chunkOffset+rank*size; + T* recvbuff = thisOutput+chunkOffset; + if (tid < nthreads && sendbuff != recvbuff) { + // local copy + ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + } + } + else { + int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; + int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; + if (rank == root) { + ncclPrimitives + prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount); + + ssize_t send_offset = chunkOffset + peerSend*size; + prims.send(thisInput+send_offset, nelem); + } + else { + if (peerRecv == root) { + ncclPrimitives + prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount); + + ssize_t recv_offset = chunkOffset; + prims.recv(thisOutput+recv_offset, nelem); + } + } + } + } + } +} diff --git a/src/collectives/gather_api.cc b/src/collectives/gather_api.cc new file mode 100644 index 0000000000..b2305897f8 --- /dev/null +++ b/src/collectives/gather_api.cc @@ -0,0 +1,37 @@ +/************************************************************************* + * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "enqueue.h" +#include "collectives.h" + +NCCL_API(ncclResult_t, ncclGather, const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); +ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream) { + if (comm->alltoallDisable) { + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + size_t rankOffset = sendcount * ncclTypeSize(datatype); + if (sendcount == 0) return ncclSuccess; + int rank; + NCCLCHECK(ncclCommUserRank(comm, &rank)); + NCCLCHECK(ncclGroupStart()); + if (rank == root) { + for (int r=0; ralltoallDisable) { + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + size_t rankOffset = recvcount * ncclTypeSize(datatype); + if (recvcount == 0) return ncclSuccess; + int rank; + NCCLCHECK(ncclCommUserRank(comm, &rank)); + NCCLCHECK(ncclGroupStart()); + if (rank == root) { + for (int r=0; rcoll == ncclCollAllToAll || info->coll == ncclCollGather || info->coll == ncclCollScatter) { + info->algorithm = NCCL_ALGO_RING; + info->protocol = NCCL_PROTO_SIMPLE; + } if (info->algorithm == -1 || info->protocol == -1) { WARN("Error : no algorithm/protocol available"); return ncclInternalError; @@ -288,7 +295,9 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { int nt = comm->maxThreads[info->algorithm][info->protocol]; int threadThreshold = comm->threadThresholds[info->algorithm][info->protocol]; while (info->nBytes < nc*nt*threadThreshold) { - if (info->algorithm != NCCL_ALGO_COLLNET && nc >= 2) nc--; + // do not reduce channels in case of alltoall + if (info->algorithm != NCCL_ALGO_COLLNET && info->coll != ncclCollAllToAll && + info->coll != ncclCollGather && info->coll != ncclCollScatter && nc >= 2) nc--; #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) // do not reduce threads count on VEGA #else @@ -316,6 +325,10 @@ static ncclResult_t getPatternInfo(struct ncclInfo* info) { info->pattern = ncclPatternRing; break; case ncclCollAllReduce: info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUp : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break; + case ncclCollGather: + case ncclCollScatter: + case ncclCollAllToAll: + info->pattern = ncclPatternAll; break; default: WARN("Unknown pattern for collective %d algorithm %d", info->coll, info->algorithm); return ncclInternalError; @@ -332,7 +345,11 @@ static ncclResult_t getLoopInfo(struct ncclInfo* info) { case ncclPatternPipelineTo: case ncclPatternCollTreeUp: case ncclPatternCollTreeDown: - info->nstepsPerLoop = info-> nchunksPerLoop = 1; break; + info->nstepsPerLoop = info->nchunksPerLoop = 1; break; + case ncclPatternAll: + info->nstepsPerLoop = 1; + info->nchunksPerLoop = info->comm->nRanks; + break; case ncclPatternRing: info->nstepsPerLoop = info->comm->nRanks-1; info->nchunksPerLoop = info->comm->nRanks; break; case ncclPatternRingTwice: @@ -417,7 +434,11 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo if (info->protocol == NCCL_PROTO_LL) chunkEffectiveSize /= 2; if (info->protocol == NCCL_PROTO_LL128) chunkEffectiveSize = (chunkSize / NCCL_LL128_LINEELEMS) * NCCL_LL128_DATAELEMS; //if (info->comm->rank == 0) printf("Coll %d, size %ld -> %dx%d, chunkSize %d (algo %d proto%d)\n", info->coll, info->nBytes, info->nChannels, info->nThreads, chunkSize, info->algorithm, info->protocol); - int nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize))); + int nLoops; + if (info->pattern != ncclPatternAll) + nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize))); + else + nLoops = (int)(DIVUP(info->nBytes, (((size_t)((info->nChannels >= info->comm->nRanks ? (info->nChannels/info->comm->nRanks) : 1))))*info->nchunksPerLoop*chunkEffectiveSize)); proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps; proxyArgs->sliceSteps = sliceSteps; proxyArgs->chunkSteps = chunkSteps; @@ -425,8 +446,8 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo proxyArgs->opCount = info->comm->opCount; proxyArgs->dtype = info->datatype; proxyArgs->redOp = info->op; - TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p", - coll->args.opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, info->nBytes, info->protocol, info->nChannels, info->nThreads, + TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d ces %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p", + coll->args.opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, chunkEffectiveSize, info->nBytes, info->protocol, info->nChannels, info->nThreads, nLoops, proxyArgs->nsteps, info->comm); return ncclSuccess; } @@ -479,6 +500,8 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) { if (info->coll == ncclCollSendRecv) { info->comm->myParams->gridDim.x = std::max(info->comm->myParams->gridDim.x, channelId+1); NCCLCHECK(ncclProxySaveP2p(info, channel)); + } else if (info->coll == ncclCollAllToAll || info->coll == ncclCollScatter || info->coll == ncclCollGather) { + NCCLCHECK(ncclProxySaveA2a(&proxyArgs, info)); } else { NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks)); } diff --git a/src/include/collectives.h b/src/include/collectives.h index 13509846f1..7f4b30c45d 100644 --- a/src/include/collectives.h +++ b/src/include/collectives.h @@ -8,8 +8,10 @@ #ifndef NCCL_COLLECTIVES_H_ #define NCCL_COLLECTIVES_H_ -#define FUNC_INDEX_P2P 1800 -#define FUNC_INDEX(coll, redop, dtype, al, pr) (((((coll)*ncclNumOps + (redop))*ncclNumTypes) + (dtype))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr) +#define FUNC_INDEX_P2P (3+NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps) +#define FUNC_INDEX(coll, redop, dtype, al, pr) ((coll >= NCCL_NUM_FUNCTIONS) \ + ? (coll-NCCL_NUM_FUNCTIONS+NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps) \ + : ((((((coll)*ncclNumOps + (redop))*ncclNumTypes) + (dtype))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr))) #define NCCL_COLL_NAME(coll, op, dtype) \ coll##_##op##_##dtype @@ -56,7 +58,10 @@ DECL_COLL2(ncclAllGather, copy) \ DECL_COLL(ncclReduceScatter) \ DECL_COLL(ncclAllReduce) \ - DECL_COLL5(ncclSendRecv,copy,i8) \ + DECL_COLL5(ncclGather, copy, i8) \ + DECL_COLL5(ncclScatter, copy, i8) \ + DECL_COLL5(ncclAllToAll, copy, i8) \ + DECL_COLL5(ncclSendRecv, copy, i8) \ DECL_ALL_COLLS @@ -78,5 +83,11 @@ DECL_ALL_COLLS #define REDUCE_SLICESTEPS 1 #define REDUCE_CHUNKSTEPS 1 #define SENDRECV_SLICEFACTOR 4 +#define GATHER_SLICESTEPS 4 +#define GATHER_CHUNKSTEPS 4 +#define SCATTER_SLICESTEPS 4 +#define SCATTER_CHUNKSTEPS 4 +#define ALLTOALL_SLICESTEPS 4 +#define ALLTOALL_CHUNKSTEPS 4 #endif diff --git a/src/include/comm.h b/src/include/comm.h index 7500734a82..32f446285a 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -147,6 +147,9 @@ struct ncclComm { int collNetSupport; //list of async p2p operation queued in a group semantics struct ncclP2Plist p2plist; + + // RCCL AllToAll/Scatter/Gather API + bool alltoallDisable; }; #endif diff --git a/src/include/devcomm.h b/src/include/devcomm.h index 66824f6827..eabf190b96 100644 --- a/src/include/devcomm.h +++ b/src/include/devcomm.h @@ -23,8 +23,8 @@ #endif #define NCCL_NUM_FUNCTIONS 5 // SendRecv not included for now -typedef enum { ncclCollBroadcast, ncclCollReduce, ncclCollAllGather, ncclCollReduceScatter, ncclCollAllReduce, ncclCollSendRecv} ncclFunc_t; -extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS]; +typedef enum { ncclCollBroadcast, ncclCollReduce, ncclCollAllGather, ncclCollReduceScatter, ncclCollAllReduce, ncclCollGather, ncclCollScatter, ncclCollAllToAll, ncclCollSendRecv} ncclFunc_t; +extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3]; #define NCCL_NUM_ALGORITHMS 3 // Tree/Ring/CollNet #define NCCL_ALGO_TREE 0 diff --git a/src/include/info.h b/src/include/info.h index 6b71492bd1..11d9bb6db5 100644 --- a/src/include/info.h +++ b/src/include/info.h @@ -20,7 +20,8 @@ typedef enum { ncclPatternTreeDown, ncclPatternTreeUpDown, ncclPatternCollTreeUp, - ncclPatternCollTreeDown + ncclPatternCollTreeDown, + ncclPatternAll } ncclPattern_t; // Used to pass NCCL call information between functions diff --git a/src/include/param.h b/src/include/param.h index 54317571e7..f3c306d836 100644 --- a/src/include/param.h +++ b/src/include/param.h @@ -77,4 +77,29 @@ int64_t ncclParam##name() { \ return value; \ } +#define RCCL_PARAM(name, env, default_value) \ +pthread_mutex_t rcclParamMutex##name = PTHREAD_MUTEX_INITIALIZER; \ +int64_t rcclParam##name() { \ + static_assert(default_value != -1LL, "default value cannot be -1"); \ + static int64_t value = -1LL; \ + pthread_mutex_lock(&rcclParamMutex##name); \ + char* en = getenv("RCCL_TEST_ENV_VARS"); \ + if (value == -1LL || (en && (strcmp(en, "ENABLE") == 0))) { \ + value = default_value; \ + char* str = getenv("RCCL_" env); \ + if (str && strlen(str) > 0) { \ + errno = 0; \ + int64_t v = strtoll(str, NULL, 0); \ + if (errno) { \ + INFO(NCCL_ALL,"Invalid value %s for %s, using default %lu.", str, "RCCL_" env, value); \ + } else { \ + value = v; \ + INFO(NCCL_ALL,"%s set by environment to %lu.", "RCCL_" env, value); \ + } \ + } \ + } \ + pthread_mutex_unlock(&rcclParamMutex##name); \ + return value; \ +} + #endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 04daa844b1..176382a197 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -60,6 +60,7 @@ enum proxyMode { ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks); ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel); +ncclResult_t ncclProxySaveA2a(struct ncclProxyArgs* args, struct ncclInfo* info); ncclResult_t ncclProxyStart(struct ncclComm* comm); ncclResult_t ncclProxyCreate(struct ncclComm* comm); ncclResult_t ncclProxyDestroy(struct ncclComm* comm); diff --git a/src/init.cc b/src/init.cc index d7e64378a2..ac760bba73 100644 --- a/src/init.cc +++ b/src/init.cc @@ -41,7 +41,7 @@ std::chrono::high_resolution_clock::time_point ncclEpoch; #define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream #endif -const char* ncclFuncStr[NCCL_NUM_FUNCTIONS] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce" }; +const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "Gather", "Scatter", "AllToAll" }; const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" }; const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" }; @@ -521,6 +521,10 @@ static ncclResult_t computeBuffSizes(struct ncclComm* comm) { int defaults[NCCL_NUM_PROTOCOLS] = { DEFAULT_LL_BUFFSIZE, DEFAULT_LL128_BUFFSIZE, DEFAULT_BUFFSIZE }; if (cpuArch == NCCL_TOPO_CPU_ARCH_ARM) defaults[NCCL_PROTO_SIMPLE] = DEFAULT_BUFFSIZE_ARM; + if (comm->nRanks >= 32) { + defaults[NCCL_PROTO_SIMPLE] = 524288; + INFO(NCCL_INIT, "Setting DEFAULT_BUFFSIZE to %d for nRanks %d", defaults[NCCL_PROTO_SIMPLE], comm->nRanks); + } for (int p=0; pbuffSizes[p] = comm->hostDevComm.buffSizes[p] = envs[p] != -2 ? envs[p] : defaults[p]; @@ -652,6 +656,7 @@ static ncclResult_t checkCollNetSetup(struct ncclComm* comm, int rank, int collN NCCL_PARAM(CrossNic, "CROSS_NIC", 2); NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0); +RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 0); static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) { // We use 3 AllGathers @@ -909,6 +914,48 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm // Compute nChannels per peer for p2p NCCLCHECK(ncclTopoComputeP2pChannels(comm)); + comm->alltoallDisable = true; + if (rcclParamAllToAllDisable() == 0) { + comm->alltoallDisable = false; + for (int c=0; cnChannels; c++) { + const int peersPerChan = (comm->nChannels >= nranks ? 1 : DIVUP(nranks, comm->nChannels)); + struct ncclP2PConnect* connect = &comm->p2plist.connect; + connect->nrecv[c] = 0; + connect->nsend[c] = 0; + for (int p=0; pchannels[c].peers[peerSend].send.connected == 0) { + connect->send[c*nranks+connect->nsend[c]++] = peerSend; + } + if (comm->channels[c].peers[peerRecv].recv.connected == 0) { + connect->recv[c*nranks+connect->nrecv[c]++] = peerRecv; + } + } + } + + for (int c=0; cnChannels; c++) { + struct ncclChannel* channel = comm->channels+c; + struct ncclP2PConnect* connect = &comm->p2plist.connect; +#if 0 + printf("channel %d recv: ", c); + for (int i=0; inrecv[c]; i++) + printf("%d ", connect->recv[c*nranks+i]); + printf("\n"); + printf("channel %d send: ", c); + for (int i=0; insend[c]; i++) + printf("%d ", connect->send[c*nranks+i]); + printf("\n"); +#endif + NCCLCHECK(ncclTransportP2pSetup(comm, NULL, channel, connect->nrecv[c], connect->recv+c*nranks, connect->nsend[c], connect->send+c*nranks)); + connect->nrecv[c] = 0; + connect->nsend[c] = 0; + } + } + INFO(NCCL_INIT, "RCCL AllToAll/Scatter/Gather kernels %s", comm->alltoallDisable ? "disabled" : "enabled"); // We should have allocated all buffers, collective fifos, ... we can // restore the affinity. affinity_restore: diff --git a/src/misc/argcheck.cc b/src/misc/argcheck.cc index e476582d8c..d3531a2b0e 100644 --- a/src/misc/argcheck.cc +++ b/src/misc/argcheck.cc @@ -46,11 +46,14 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) { } // Type is OK, compute nbytes. Convert Allgather/Broadcast/P2P calls to chars. info->nBytes = info->count * ncclTypeSize(info->datatype); - if (info->coll == ncclCollAllGather || info->coll == ncclCollBroadcast) { + if (info->coll == ncclCollAllGather || info->coll == ncclCollBroadcast + || info->coll == ncclCollGather || info->coll == ncclCollScatter || info->coll == ncclCollAllToAll) { info->count = info->nBytes; info->datatype = ncclInt8; } - if (info->coll == ncclCollAllGather || info->coll == ncclCollReduceScatter) info->nBytes *= info->comm->nRanks; // count is per rank + if (info->coll == ncclCollAllGather || info->coll == ncclCollReduceScatter + || info->coll == ncclCollGather || info->coll == ncclCollScatter || info->coll == ncclCollAllToAll) + info->nBytes *= info->comm->nRanks; // count is per rank if (info->op < 0 || info->op >= ncclNumOps) { WARN("%s : invalid reduction operation %d", info->opName, info->op); diff --git a/src/nccl.h.in b/src/nccl.h.in index 1079478219..51c3d091fd 100644 --- a/src/nccl.h.in +++ b/src/nccl.h.in @@ -20,6 +20,7 @@ #define NCCL_VERSION(X,Y,Z) ((X) * 1000 + (Y) * 100 + (Z)) #define RCCL_BFLOAT16 1 +#define RCCL_GATHER_SCATTER 1 #ifdef __cplusplus extern "C" { @@ -271,6 +272,52 @@ ncclResult_t pncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, in ncclResult_t ncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer, ncclComm_t comm, hipStream_t stream); +/* + * Gather + * + * Root device gathers sendcount values from other GPUs into recvbuff, + * receiving data from rank i at offset i*sendcount. + * Assumes recvcount is equal to nranks*sendcount, which means that recvbuff + * should have a size of at least nranks*sendcount elements. + * + * In-place operations will happen if sendbuff == recvbuff + rank * sendcount. + */ +ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); + +/* + * Scatter + * + * Scattered over the devices so that recvbuff on rank i will contain the i-th + * block of the data on root. + * Assumes sendcount is equal to nranks*recvcount, which means that sendbuff + * should have a size of at least nranks*recvcount elements. + * + * In-place operations will happen if recvbuff == sendbuff + rank * recvcount. + */ +ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm, + hipStream_t stream); +ncclResult_t pncclScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm, + hipStream_t stream); + +/* + * All-To-All + * + * Device (i) send (j)th block of data to device (j) and be placed as (i)th + * block. Each block for sending/receiving has count elements, which means + * that recvbuff and sendbuff should have a size of nranks*count elements. + * + * In-place operation will happen if sendbuff == recvbuff. + */ +ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); + /* * Group semantics * diff --git a/src/proxy.cc b/src/proxy.cc index 706530fb21..9985953938 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -165,6 +165,27 @@ ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel return ncclSuccess; } +ncclResult_t ncclProxySaveA2a(struct ncclProxyArgs* args, struct ncclInfo* info) { + const int peersPerChan = (info->nChannels >= info->comm->nRanks ? 1 : DIVUP(info->comm->nRanks, info->nChannels)); + for (int p=0; pchannel->id >= (info->nChannels/info->comm->nRanks)*info->comm->nRanks) || + (peersPerChan > 1 && args->channel->id*peersPerChan+p >= info->comm->nRanks)) + continue; + // first channel is reserved for self copy + if ((args->channel->id*peersPerChan+p)%info->comm->nRanks == 0) + continue; + int peerSend = (info->comm->rank+(args->channel->id*peersPerChan)+p)%info->comm->nRanks; + int peerRecv = (2*info->comm->nRanks+info->comm->rank-(args->channel->id*peersPerChan)%info->comm->nRanks-p%info->comm->nRanks)%info->comm->nRanks; + if (info->coll == ncclCollAllToAll || (info->coll == ncclCollScatter && info->comm->rank == info->root) || + (info->coll == ncclCollGather && peerSend == info->root)) + NCCLCHECK(SaveProxy(peerSend, args)); + if (info->coll == ncclCollAllToAll || (info->coll == ncclCollGather && info->comm->rank == info->root) || + (info->coll == ncclCollScatter && peerRecv == info->root)) + NCCLCHECK(SaveProxy(peerRecv, args)); + } + return ncclSuccess; +} + void* persistentThread(void *comm_) { struct ncclComm* comm = (struct ncclComm*)comm_; struct ncclProxyState* state = &comm->proxyState; diff --git a/tools/topo_expl/topo_expl.cpp b/tools/topo_expl/topo_expl.cpp index ac384a042d..555bb28225 100644 --- a/tools/topo_expl/topo_expl.cpp +++ b/tools/topo_expl/topo_expl.cpp @@ -200,6 +200,9 @@ int main(int argc,char* argv[]) for (int i = 0; i < nranks; i++) { comm[i].rank = i; comm[i].nRanks = nranks; + comm[i].p2plist.count=0; + NCCLCHECK(ncclCalloc(&comm[i].p2plist.connect.recv, MAXCHANNELS*comm->nRanks)); + NCCLCHECK(ncclCalloc(&comm[i].p2plist.connect.send, MAXCHANNELS*comm->nRanks)); node_model = network.GetNode(i); assert(node_model!=0); comm[i].topo = node_model->getSystem(i); diff --git a/tools/topo_expl/utils.cpp b/tools/topo_expl/utils.cpp index faa2a7115f..ac007f10b0 100644 --- a/tools/topo_expl/utils.cpp +++ b/tools/topo_expl/utils.cpp @@ -353,6 +353,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* return ncclSuccess; } +RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 0); + ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGather3Data_t *allGather3Data, struct ncclTopoGraph& treeGraph, struct ncclTopoGraph& ringGraph, struct ncclTopoGraph& collNetGraph) { int rank = comm->rank; @@ -503,6 +505,45 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGather3Data_t // Compute nChannels per peer for p2p NCCLCHECK(ncclTopoComputeP2pChannels(comm)); + if (rcclParamAllToAllDisable() == 0) { + for (int c=0; cnChannels; c++) { + const int peersPerChan = (comm->nChannels >= nranks ? 1 : DIVUP(nranks, comm->nChannels)); + struct ncclP2PConnect* connect = &comm->p2plist.connect; + connect->nrecv[c] = 0; + connect->nsend[c] = 0; + for (int p=0; pchannels[c].peers[peerSend].send.connected == 0) { + connect->send[c*nranks+connect->nsend[c]++] = peerSend; + } + if (comm->channels[c].peers[peerRecv].recv.connected == 0) { + connect->recv[c*nranks+connect->nrecv[c]++] = peerRecv; + } + } + } + + for (int c=0; cnChannels; c++) { + struct ncclChannel* channel = comm->channels+c; + struct ncclP2PConnect* connect = &comm->p2plist.connect; +#if 0 + printf("channel %d recv: ", c); + for (int i=0; inrecv[c]; i++) + printf("%d ", connect->recv[c*nranks+i]); + printf("\n"); + printf("channel %d send: ", c); + for (int i=0; insend[c]; i++) + printf("%d ", connect->send[c*nranks+i]); + printf("\n"); +#endif + NCCLCHECK(ncclTransportP2pSetup(comm, NULL, channel, connect->nrecv[c], connect->recv+c*nranks, connect->nsend[c], connect->send+c*nranks)); + connect->nrecv[c] = 0; + connect->nsend[c] = 0; + } + } // We should have allocated all buffers, collective fifos, ... we can // restore the affinity. affinity_restore: