From f4d5d3d62017cabbcbd4327cf5e4d0482707db0b Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 4 Dec 2020 18:52:32 -0500 Subject: [PATCH] Port alltoall[v] --- CMakeLists.txt | 2 + src/channel.cc | 2 + src/collectives/all_to_all_api.cc | 9 +- src/collectives/all_to_allv_api.cc | 9 +- src/collectives/device/all_to_all.cu | 3 +- src/collectives/device/all_to_all.h | 135 +++++++++++------------ src/collectives/device/all_to_allv.cu | 3 +- src/collectives/device/all_to_allv.h | 150 +++++++++++++------------- src/collectives/device/common.h | 10 +- src/collectives/device/gather.cu | 12 --- src/collectives/device/gather.h | 74 ------------- src/collectives/device/scatter.cu | 12 --- src/collectives/device/scatter.h | 74 ------------- src/collectives/gather_api.cc | 9 -- src/collectives/scatter_api.cc | 9 -- src/enqueue.cc | 68 ++++++++++-- src/graph/paths.cc | 15 ++- src/group.cc | 4 +- src/include/collectives.h | 16 ++- src/include/devcomm.h | 10 +- src/include/info.h | 8 +- src/init.cc | 35 ++++-- src/misc/argcheck.cc | 9 +- src/proxy.cc | 10 +- tools/topo_expl/utils.cpp | 27 ++++- 25 files changed, 332 insertions(+), 383 deletions(-) mode change 100755 => 100644 src/collectives/device/all_to_allv.cu mode change 100755 => 100644 src/collectives/device/all_to_allv.h delete mode 100644 src/collectives/device/gather.cu delete mode 100644 src/collectives/device/gather.h delete mode 100644 src/collectives/device/scatter.cu delete mode 100644 src/collectives/device/scatter.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 328185535c..c37283c129 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,6 +89,8 @@ set(CU_SOURCES src/collectives/device/broadcast.cu src/collectives/device/reduce_scatter.cu src/collectives/device/sendrecv.cu + src/collectives/device/all_to_all.cu + src/collectives/device/all_to_allv.cu src/collectives/device/functions.cu) set(CPP_SOURCES) diff --git a/src/channel.cc b/src/channel.cc index 2a4be474bb..f4c969898c 100644 --- a/src/channel.cc +++ b/src/channel.cc @@ -27,12 +27,14 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelid) { // Per-channel operation list. NCCLCHECK(ncclCudaHostCalloc(&channel->workFifo, NCCL_MAX_OPS)); + NCCLCHECK(ncclCudaHostCalloc(&channel->a2avParams, comm->nRanks*NCCL_MAX_OPS*4)); return ncclSuccess; } ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks) { if (channel->id == -1) return ncclSuccess; // Operation list + NCCLCHECK(ncclCudaHostFree(channel->a2avParams)); NCCLCHECK(ncclCudaHostFree(channel->workFifo)); // Free Ring index to rank tables diff --git a/src/collectives/all_to_all_api.cc b/src/collectives/all_to_all_api.cc index 5320ff5efd..9ecc1c47e9 100644 --- a/src/collectives/all_to_all_api.cc +++ b/src/collectives/all_to_all_api.cc @@ -25,10 +25,9 @@ ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, nc NCCLCHECK(ncclGroupEnd()); return ncclSuccess; } else { - //struct ncclInfo info = { ncclCollAllToAll, "AllToAll", - // sendbuff, recvbuff, count, datatype, ncclSum, 0, comm, stream, /* Args */ - // ALLTOALL_CHUNKSTEPS, ALLTOALL_SLICESTEPS }; - //return ncclEnqueueCheck(&info); - return ncclInternalError; + struct ncclInfo info = { ncclFuncAllToAll, "AllToAll", + sendbuff, recvbuff, count, datatype, ncclSum, 0, comm, stream, /* Args */ + ALLTOALL_CHUNKSTEPS, ALLTOALL_SLICESTEPS }; + return ncclEnqueueCheck(&info); } } diff --git a/src/collectives/all_to_allv_api.cc b/src/collectives/all_to_allv_api.cc index cd7151f9e8..ca405fbf5b 100755 --- a/src/collectives/all_to_allv_api.cc +++ b/src/collectives/all_to_allv_api.cc @@ -37,10 +37,9 @@ ncclResult_t ncclAllToAllv(const void *sendbuff, const size_t sendcounts[], cons NCCLCHECK(ncclGroupEnd()); return ncclSuccess; } else { - //struct ncclInfo info = { ncclCollAllToAllv, "AllToAllv", - // sendbuff, recvbuff, 0, datatype, ncclSum, 0, comm, stream, /* Args */ - // ALLTOALLV_CHUNKSTEPS, ALLTOALLV_SLICESTEPS, sendcounts, sdispls, recvcounts, rdispls }; - //return ncclEnqueueCheck(&info); - return ncclInternalError; + struct ncclInfo info = { ncclFuncAllToAllv, "AllToAllv", + sendbuff, recvbuff, 0, datatype, ncclSum, 0, comm, stream, /* Args */ + ALLTOALLV_CHUNKSTEPS, ALLTOALLV_SLICESTEPS, sendcounts, sdispls, recvcounts, rdispls }; + return ncclEnqueueCheck(&info); } } diff --git a/src/collectives/device/all_to_all.cu b/src/collectives/device/all_to_all.cu index 5a1314276a..46d0486d67 100644 --- a/src/collectives/device/all_to_all.cu +++ b/src/collectives/device/all_to_all.cu @@ -8,5 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_FUNC(ncclAllToAll, copy, FuncSum, i8, int8_t); -IMPL_COLL_KERN(ncclAllToAll, copy, FuncSum, i8, int8_t, 0); +IMPL_COLL_FUNC(AllToAll, RING, SIMPLE, Sum, int8_t); diff --git a/src/collectives/device/all_to_all.h b/src/collectives/device/all_to_all.h index 9b986cdc13..05927a2e8a 100644 --- a/src/collectives/device/all_to_all.h +++ b/src/collectives/device/all_to_all.h @@ -9,80 +9,81 @@ #include "primitives.h" #include "collectives.h" -template -__attribute__((noinline)) -__device__ void ncclAllToAllKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->coll.nThreads; - const int nChannels = args->coll.nChannels; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->coll.count; - const int nranks = comm->nRanks; - const int bid = args->coll.bid; - const int rank = ring->devUserRanks[0]; - const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * ALLTOALL_CHUNKSTEPS; - const int peersPerChan = DIVUP(nranks, nChannels); - const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); +template +class ncclFunction { + public: + __device__ __attribute__((noinline)) void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const ssize_t size = args->coll.count; + const int nranks = comm->nRanks; + const int bid = args->coll.bid; + const int rank = ring->devUserRanks[0]; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * ALLTOALL_CHUNKSTEPS; + const int peersPerChan = DIVUP(nranks, nChannels); + const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->sendbuff; - T * __restrict__ thisOutput = (T*)args->recvbuff; + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + for (int i = 0; i < peersPerChan; i++) { + if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || + (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) + continue; + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + if ((blockIdx.x*peersPerChan+i)%nranks == 0) { + if (tid < nthreads && thisInput != thisOutput) { + const T* sendbuff = thisInput+chunkOffset+rank*size; + T* recvbuff = thisOutput+chunkOffset+rank*size; + // local copy + ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + } + } + } + } - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { for (int i = 0; i < peersPerChan; i++) { if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) continue; - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - if ((blockIdx.x*peersPerChan+i)%nranks == 0) { - if (tid < nthreads && thisInput != thisOutput) { - const T* sendbuff = thisInput+chunkOffset+rank*size; - T* recvbuff = thisOutput+chunkOffset+rank*size; - // local copy - ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + if ((blockIdx.x*peersPerChan+i)%nranks != 0) { + int nthreadsSplit = nthreads/2; + if (tid < nthreadsSplit ) { + int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; + ncclPrimitives + prims(tid, nthreadsSplit, NULL, &peerSend, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + ssize_t send_offset = chunkOffset + peerSend*size; + prims.send(thisInput+send_offset, nelem); + } + } else { + int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; + ncclPrimitives + prims(tid-nthreadsSplit, nthreads-nthreadsSplit, &peerRecv, NULL, NULL, stepSize, channel, comm, ncclShmem->ptrs, 1); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + ssize_t recv_offset = chunkOffset + peerRecv*size; + prims.recv(thisOutput+recv_offset, nelem); + } } } } } - - for (int i = 0; i < peersPerChan; i++) { - if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || - (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) - continue; - if ((blockIdx.x*peersPerChan+i)%nranks != 0) { - int nthreadsSplit = nthreads/2; - int peerNone[2] = {-1,-1}; - if (tid < nthreadsSplit ) { - int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; - ncclPrimitives - prims(tid, nthreadsSplit, peerNone, &peerSend, NULL, stepSize, channel, comm); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - ssize_t send_offset = chunkOffset + peerSend*size; - prims.send(thisInput+send_offset, nelem); - } - } else { - int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; - ncclPrimitives - prims(tid-nthreadsSplit, nthreads-nthreadsSplit, &peerRecv, peerNone, NULL, stepSize, channel, comm); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - ssize_t recv_offset = chunkOffset + peerRecv*size; - prims.recv(thisOutput+recv_offset, nelem); - } - } - } - } -} +}; \ No newline at end of file diff --git a/src/collectives/device/all_to_allv.cu b/src/collectives/device/all_to_allv.cu old mode 100755 new mode 100644 index f78042c939..49d883c530 --- a/src/collectives/device/all_to_allv.cu +++ b/src/collectives/device/all_to_allv.cu @@ -8,5 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_FUNC(ncclAllToAllv, copy, FuncSum, i8, int8_t); -IMPL_COLL_KERN(ncclAllToAllv, copy, FuncSum, i8, int8_t, 0); +IMPL_COLL_FUNC(AllToAllv, RING, SIMPLE, Sum, int8_t); diff --git a/src/collectives/device/all_to_allv.h b/src/collectives/device/all_to_allv.h old mode 100755 new mode 100644 index c1bc3cf0d2..dffa2e1d60 --- a/src/collectives/device/all_to_allv.h +++ b/src/collectives/device/all_to_allv.h @@ -9,87 +9,89 @@ #include "primitives.h" #include "collectives.h" -template -__attribute__((noinline)) -__device__ void ncclAllToAllvKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->a2av.nThreads; - const int nChannels = args->a2av.nChannels; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t typesize = args->a2av.count; - const int nranks = comm->nRanks; - const int bid = args->a2av.bid; - const int rank = ring->devUserRanks[0]; - const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * ALLTOALLV_CHUNKSTEPS; - const int peersPerChan = DIVUP(nranks, nChannels); - const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); +template +class ncclFunction { + public: + __device__ __attribute__((noinline)) void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int nChannels = args->a2av.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const ssize_t typesize = args->a2av.count; + const int nranks = comm->nRanks; + const int bid = args->a2av.bid; + const int rank = ring->devUserRanks[0]; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * ALLTOALLV_CHUNKSTEPS; + const int peersPerChan = DIVUP(nranks, nChannels); + const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->sendbuff; - T * __restrict__ thisOutput = (T*)args->recvbuff; + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + size_t* params = channel->a2avParams + nranks*4*args->index; + size_t *sendcounts = params; + size_t *sdispls = params + nranks; + size_t *recvcounts = params + nranks*2; + size_t *rdispls = params + nranks*3; + ssize_t size = sendcounts[rank]*typesize; + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + for (int i = 0; i < peersPerChan; i++) { + if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || + (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) + continue; + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + if ((blockIdx.x*peersPerChan+i)%nranks == 0) { + if (tid < nthreads && thisInput != thisOutput) { + const T* sendbuff = thisInput+chunkOffset+sdispls[rank]*typesize; + T* recvbuff = thisOutput+chunkOffset+rdispls[rank]*typesize; + // local copy + ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + } + } + } + } - size_t *sendcounts = args->a2av.extra; - size_t *sdispls = args->a2av.extra + nranks; - size_t *recvcounts = args->a2av.extra + nranks*2; - size_t *rdispls = args->a2av.extra + nranks*3; - ssize_t size = sendcounts[rank]*typesize; - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { for (int i = 0; i < peersPerChan; i++) { if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) continue; - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - if ((blockIdx.x*peersPerChan+i)%nranks == 0) { - if (tid < nthreads && thisInput != thisOutput) { - const T* sendbuff = thisInput+chunkOffset+sdispls[rank]*typesize; - T* recvbuff = thisOutput+chunkOffset+rdispls[rank]*typesize; - // local copy - ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); + if ((blockIdx.x*peersPerChan+i)%nranks != 0) { + int nthreadsSplit = nthreads/2; + if (tid < nthreadsSplit ) { + int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; + ncclPrimitives + prims(tid, nthreadsSplit, NULL, &peerSend, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + size = sendcounts[peerSend]*typesize; + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, nthreadsSplit*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + ssize_t send_offset = chunkOffset + sdispls[peerSend]*typesize; + prims.send(thisInput+send_offset, nelem); + } + } else { + int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; + ncclPrimitives + prims(tid-nthreadsSplit, nthreads-nthreadsSplit, &peerRecv, NULL, NULL, stepSize, channel, comm, ncclShmem->ptrs, 1); + size = recvcounts[peerRecv]*typesize; + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); + ALIGN_SIZE(realChunkSize, (nthreads-nthreadsSplit)*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); + int nelem = min(realChunkSize, size-chunkOffset); + ssize_t recv_offset = chunkOffset + rdispls[peerRecv]*typesize; + prims.recv(thisOutput+recv_offset, nelem); + } } } } } - - for (int i = 0; i < peersPerChan; i++) { - if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || - (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) - continue; - if ((blockIdx.x*peersPerChan+i)%nranks != 0) { - int nthreadsSplit = nthreads/2; - int peerNone[2] = {-1,-1}; - if (tid < nthreadsSplit ) { - int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; - ncclPrimitives - prims(tid, nthreadsSplit, peerNone, &peerSend, NULL, stepSize, channel, comm); - size = sendcounts[peerSend]*typesize; - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreadsSplit*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - ssize_t send_offset = chunkOffset + sdispls[peerSend]*typesize; - prims.send(thisInput+send_offset, nelem); - } - } else { - int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; - ncclPrimitives - prims(tid-nthreadsSplit, nthreads-nthreadsSplit, &peerRecv, peerNone, NULL, stepSize, channel, comm); - size = recvcounts[peerRecv]*typesize; - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, (nthreads-nthreadsSplit)*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - ssize_t recv_offset = chunkOffset + rdispls[peerRecv]*typesize; - prims.recv(thisOutput+recv_offset, nelem); - } - } - } - } -} +}; \ No newline at end of file diff --git a/src/collectives/device/common.h b/src/collectives/device/common.h index 9c54eae4ef..9ce25fa656 100644 --- a/src/collectives/device/common.h +++ b/src/collectives/device/common.h @@ -77,7 +77,9 @@ NCCL_FUNCS2B(AllGather), \ NCCL_FUNCS2A(ReduceScatter), \ NCCL_FUNCS2A(AllReduce), \ - NCCL_FUNC_NAME(SendRecv, RING, SIMPLE, Sum, int8_t) } + NCCL_FUNC_NAME(SendRecv, RING, SIMPLE, Sum, int8_t), \ + NCCL_FUNC_NAME(AllToAll, RING, SIMPLE, Sum, int8_t), \ + NCCL_FUNC_NAME(AllToAllv, RING, SIMPLE, Sum, int8_t) } // Must be consistent with the ncclFuncSet enum using ncclKernelFunc_t = void (*)(struct ncclWorkElem* args); @@ -93,6 +95,8 @@ static const __device__ constexpr ncclKernelFunc_t ncclFuncs[]{ NCCL_FUNCS2A(ReduceScatter), NCCL_FUNCS2A(AllReduce), NCCL_FUNC_NAME(SendRecv, RING, SIMPLE, Sum, int8_t), + NCCL_FUNC_NAME(AllToAll, RING, SIMPLE, Sum, int8_t), + NCCL_FUNC_NAME(AllToAllv, RING, SIMPLE, Sum, int8_t), #endif }; @@ -140,7 +144,9 @@ void NCCL_CALL_FUNCTIONS(struct ncclWorkElem* const c) noexcept { else ncclFunction_AllGather_COLLNET_SIMPLE_Sum_int8_t(c); } else if (c->funcIndex < 1800) Caller<1080, 1800>::call(c); - else ncclFunction_SendRecv_RING_SIMPLE_Sum_int8_t(c); + else if (c->funcIndex == 1800) ncclFunction_SendRecv_RING_SIMPLE_Sum_int8_t(c); + else if (c->funcIndex == 1801) ncclFunction_AllToAll_RING_SIMPLE_Sum_int8_t(c); + else if (c->funcIndex == 1802) ncclFunction_AllToAllv_RING_SIMPLE_Sum_int8_t(c); } static __device__ void load_parallel(void* dst, void* src, size_t size, int tid) { diff --git a/src/collectives/device/gather.cu b/src/collectives/device/gather.cu deleted file mode 100644 index 5aca1afdaa..0000000000 --- a/src/collectives/device/gather.cu +++ /dev/null @@ -1,12 +0,0 @@ -/************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#include "gather.h" -#include "common.h" -#include "collectives.h" - -IMPL_COLL_FUNC(ncclGather, copy, FuncSum, i8, int8_t); -IMPL_COLL_KERN(ncclGather, copy, FuncSum, i8, int8_t, 0); diff --git a/src/collectives/device/gather.h b/src/collectives/device/gather.h deleted file mode 100644 index c704e39421..0000000000 --- a/src/collectives/device/gather.h +++ /dev/null @@ -1,74 +0,0 @@ -/************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. - * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#include "devcomm.h" -#include "primitives.h" -#include "collectives.h" - -template -__attribute__((noinline)) -__device__ void ncclGatherKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->coll.nThreads; - const int nChannels = args->coll.nChannels; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->coll.count; - const int nranks = comm->nRanks; - const int bid = args->coll.bid; - const int rank = ring->devUserRanks[0]; - const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * GATHER_CHUNKSTEPS; - const int peersPerChan = DIVUP(nranks, nChannels); - const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); - const int root = args->coll.root; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->sendbuff; - T * __restrict__ thisOutput = (T*)args->recvbuff; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - for (int i = 0; i < peersPerChan; i++) { - if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || - (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) - continue; - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - if ((blockIdx.x*peersPerChan+i)%nranks == 0 && rank == root) { - const T* sendbuff = thisInput+chunkOffset; - T* recvbuff = thisOutput+chunkOffset+rank*size; - if (tid < nthreads && sendbuff != recvbuff) { - // local copy - ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); - } - } - else { - int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; - int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; - if (rank == root) { - ncclPrimitives - prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm); - - ssize_t recv_offset = chunkOffset + peerRecv*size; - prims.recv(thisOutput+recv_offset, nelem); - } - else { - if (peerSend == root) { - ncclPrimitives - prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm); - - ssize_t send_offset = chunkOffset; - prims.send(thisInput+send_offset, nelem); - } - } - } - } - } -} diff --git a/src/collectives/device/scatter.cu b/src/collectives/device/scatter.cu deleted file mode 100644 index 6da04434f3..0000000000 --- a/src/collectives/device/scatter.cu +++ /dev/null @@ -1,12 +0,0 @@ -/************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#include "scatter.h" -#include "common.h" -#include "collectives.h" - -IMPL_COLL_FUNC(ncclScatter, copy, FuncSum, i8, int8_t); -IMPL_COLL_KERN(ncclScatter, copy, FuncSum, i8, int8_t, 0); diff --git a/src/collectives/device/scatter.h b/src/collectives/device/scatter.h deleted file mode 100644 index f08544068a..0000000000 --- a/src/collectives/device/scatter.h +++ /dev/null @@ -1,74 +0,0 @@ -/************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. - * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#include "devcomm.h" -#include "primitives.h" -#include "collectives.h" - -template -__attribute__((noinline)) -__device__ void ncclScatterKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->coll.nThreads; - const int nChannels = args->coll.nChannels; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->coll.count; - const int nranks = comm->nRanks; - const int bid = args->coll.bid; - const int rank = ring->devUserRanks[0]; - const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * SCATTER_CHUNKSTEPS; - const int peersPerChan = DIVUP(nranks, nChannels); - const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize); - const int root = args->coll.root; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->sendbuff; - T * __restrict__ thisOutput = (T*)args->recvbuff; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - for (int i = 0; i < peersPerChan; i++) { - if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) || - (peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks)) - continue; - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1))); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0); - int nelem = min(realChunkSize, size-chunkOffset); - if ((blockIdx.x*peersPerChan+i)%nranks == 0 && rank == root) { - const T* sendbuff = thisInput+chunkOffset+rank*size; - T* recvbuff = thisOutput+chunkOffset; - if (tid < nthreads && sendbuff != recvbuff) { - // local copy - ReduceOrCopyMulti(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem); - } - } - else { - int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks; - int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks; - if (rank == root) { - ncclPrimitives - prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm); - - ssize_t send_offset = chunkOffset + peerSend*size; - prims.send(thisInput+send_offset, nelem); - } - else { - if (peerRecv == root) { - ncclPrimitives - prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm); - - ssize_t recv_offset = chunkOffset; - prims.recv(thisOutput+recv_offset, nelem); - } - } - } - } - } -} diff --git a/src/collectives/gather_api.cc b/src/collectives/gather_api.cc index f24e38b887..8b53538933 100644 --- a/src/collectives/gather_api.cc +++ b/src/collectives/gather_api.cc @@ -12,7 +12,6 @@ NCCL_API(ncclResult_t, ncclGather, const void* sendbuff, void* recvbuff, size_t ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream) { - if (comm->alltoallDisable) { int nRanks; NCCLCHECK(ncclCommCount(comm, &nRanks)); size_t rankOffset = sendcount * ncclTypeSize(datatype); @@ -27,12 +26,4 @@ ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, NCCLCHECK(ncclSend(sendbuff, sendcount, datatype, root, comm, stream)); NCCLCHECK(ncclGroupEnd()); return ncclSuccess; - } - else { - //struct ncclInfo info = { ncclCollGather, "Gather", - // sendbuff, recvbuff, sendcount, datatype, ncclSum, root, comm, stream, /* Args */ - // GATHER_CHUNKSTEPS, GATHER_SLICESTEPS }; - //return ncclEnqueueCheck(&info); - return ncclInternalError; - } } diff --git a/src/collectives/scatter_api.cc b/src/collectives/scatter_api.cc index b573294b00..2a0bc20df4 100644 --- a/src/collectives/scatter_api.cc +++ b/src/collectives/scatter_api.cc @@ -12,7 +12,6 @@ NCCL_API(ncclResult_t, ncclScatter, const void* sendbuff, void* recvbuff, size_t ncclComm_t comm, hipStream_t stream); ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff, size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream) { - if (comm->alltoallDisable) { int nRanks; NCCLCHECK(ncclCommCount(comm, &nRanks)); size_t rankOffset = recvcount * ncclTypeSize(datatype); @@ -27,12 +26,4 @@ ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff, size_t recvcount, NCCLCHECK(ncclRecv(recvbuff, recvcount, datatype, root, comm, stream)); NCCLCHECK(ncclGroupEnd()); return ncclSuccess; - } - else { - //struct ncclInfo info = { ncclCollScatter, "Scatter", - // sendbuff, recvbuff, recvcount, datatype, ncclSum, root, comm, stream, /* Args */ - // SCATTER_CHUNKSTEPS, SCATTER_SLICESTEPS }; - //return ncclEnqueueCheck(&info); - return ncclInternalError; - } } diff --git a/src/enqueue.cc b/src/enqueue.cc index 937d1a8f92..e32b519974 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -8,6 +8,7 @@ #include "enqueue.h" #include "argcheck.h" #include "coll_net.h" +#include "graph/topo.h" // Only generate inline kernels for LL #define NCCL_FUNC5(func, algo, redop, dtype) \ @@ -107,7 +108,7 @@ static ncclResult_t getNextOp(struct ncclChannel* channel, struct ncclWork** wor static ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) { // Only launch blocks where we have work to do. - for (int c=0; cp2pnChannels; c++) { + for (int c=0; cnChannels, comm->p2pnChannels); c++) { if (comm->channels[c].workCount) params->gridDim.x = c+1; } @@ -241,7 +242,7 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) { max = std::max(max, channel->workFifoTail); channel->workCount = 0; } - for (int r=0; rp2pnChannels; r++) { + for (int r=0; rnChannels, comm->p2pnChannels); r++) { struct ncclChannel* channel = comm->channels+r; channel->workFifoTail = max; } @@ -272,6 +273,13 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { struct ncclComm* comm = info->comm; float minTime = 3600000000.0; // Hopefully no operation will take an hour to complete. // Find algorithm / protocol. + if (info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllToAllv) { + info->algorithm = NCCL_ALGO_RING; + info->protocol = NCCL_PROTO_SIMPLE; + info->nChannels = comm->nChannels; + info->nThreads = NCCL_MAX_NTHREADS; + return ncclSuccess; + } info->algorithm = -1; info->protocol = -1; int nAlgos = NCCL_NUM_ALGORITHMS; @@ -332,6 +340,9 @@ static ncclResult_t getPatternInfo(struct ncclInfo* info) { info->pattern = ncclPatternRing; break; case ncclFuncAllReduce: info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUp : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break; + case ncclFuncAllToAll: + case ncclFuncAllToAllv: + info->pattern = ncclPatternAll; break; default: WARN("Unknown pattern for collective %d algorithm %d", info->coll, info->algorithm); return ncclInternalError; @@ -353,6 +364,9 @@ static ncclResult_t getLoopInfo(struct ncclInfo* info) { info->nstepsPerLoop = info->comm->nRanks-1; info->nchunksPerLoop = info->comm->nRanks; break; case ncclPatternRingTwice: info->nstepsPerLoop = 2*(info->comm->nRanks-1); info->nchunksPerLoop = info->comm->nRanks; break; + case ncclPatternAll: + info->nstepsPerLoop = 1; + info->nchunksPerLoop = info->comm->nRanks; break; default: WARN("Unknown pattern %d\n", info->pattern); return ncclInternalError; @@ -368,12 +382,21 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo NCCLCHECK(getPatternInfo(info)); NCCLCHECK(getLoopInfo(info)); + if ((info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllToAllv) + && info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME) + info->nChannels =info->comm->p2pnChannels; + work->opCount = info->comm->opCount; work->sendbuff = info->sendbuff; work->recvbuff = info->recvbuff; - work->coll.root = info->root; - work->coll.count = info->count; - work->coll.nChannels = info->nChannels; + if (info->coll == ncclFuncAllToAllv) { + work->a2av.count = info->count; + work->a2av.nChannels = info->nChannels; + } else { + work->coll.root = info->root; + work->coll.count = info->count; + work->coll.nChannels = info->nChannels; + } work->nThreads = info->nThreads; work->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol); @@ -481,8 +504,21 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) { if (proxyArgs.nsteps) NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks)); info->comm->myParams->gridDim.x++; - work.coll.bid = bid % nChannels; - NCCLCHECK(getNextOp(channel, NULL, &work)); + if (info->coll == ncclFuncAllToAllv) { + work.a2av.bid = bid % work.a2av.nChannels; + } else { + work.coll.bid = bid % nChannels; + } + struct ncclWork* w; + NCCLCHECK(getNextOp(channel, &w, &work)); + if (info->coll == ncclFuncAllToAllv) { + struct ncclWorkElem* e = w->elems; + size_t* params = channel->a2avParams + info->comm->nRanks*4*e->index; + memcpy(params, info->sendcounts, sizeof(size_t*)*(info->comm->nRanks)); + memcpy(params+info->comm->nRanks, info->sdispls, sizeof(size_t*)*(info->comm->nRanks)); + memcpy(params+info->comm->nRanks*2, info->recvcounts, sizeof(size_t*)*(info->comm->nRanks)); + memcpy(params+info->comm->nRanks*3, info->rdispls, sizeof(size_t*)*(info->comm->nRanks)); + } } info->comm->opCount++; return ncclSuccess; @@ -565,7 +601,9 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { } static int getSegment(struct ncclInfo* info, struct ncclWork* work) { - for (int s=0; selems[s].p2p.delta != info->delta; s++) { + const int e = (info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME) + ? 1 : NCCL_MAX_WORK_ELEMENTS; + for (int s=0; selems[s].p2p.delta != info->delta; s++) { if (work->elems[s].p2p.nThreads == 0) return s; } return -1; @@ -632,7 +670,12 @@ ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) { NCCLCHECKGOTO(ncclAsyncColl(info->comm), ret, end); NCCLCHECKGOTO(checkSetStream(info), ret, end); - INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", + if (info->coll == ncclFuncAllToAllv) + INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p sendcounts %p sdispls %p recvbuff %p recvcounts %p rdispls %p datatype %d typesize %zi op %d root %d comm %p [nranks=%d] stream %p", + info->opName, info->comm->opCount, info->sendbuff, info->sendcounts, info->sdispls, info->recvbuff, info->recvcounts, info->rdispls, + info->datatype, info->count, info->op, info->root, info->comm, info->comm->nRanks, info->stream); + else if (info->coll != ncclFuncSendRecv) + INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); @@ -653,7 +696,12 @@ end: NCCLCHECK(ArgsCheck(info)); NCCLCHECK(checkSetStream(info)); - INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", + if (info->coll == ncclFuncAllToAllv) + INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p sendcounts %p sdispls %p recvbuff %p recvcounts %p rdispls %p datatype %d typesize %zi op %d root %d comm %p [nranks=%d] stream %p", + info->opName, info->comm->opCount, info->sendbuff, info->sendcounts, info->sdispls, info->recvbuff, info->recvcounts, info->rdispls, + info->datatype, info->count, info->op, info->root, info->comm, info->comm->nRanks, info->stream); + else + INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); diff --git a/src/graph/paths.cc b/src/graph/paths.cc index 381d263b44..567043d4f6 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -519,12 +519,19 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) { } } - // Round to next pow2 nChannelsPerPeer and nChannels - comm->p2pnChannelsPerPeer = nextPow2(minChannels); - comm->p2pnChannels = nextPow2(comm->p2pnChannels); + if (comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_4P2H_ROME) { + // Adjust P2P channels on Rome + comm->p2pnChannelsPerPeer = 2; + comm->p2pnChannels = 2; + } + else { + // Round to next pow2 nChannelsPerPeer and nChannels + comm->p2pnChannelsPerPeer = nextPow2(minChannels); + comm->p2pnChannels = nextPow2(comm->p2pnChannels); + } // Init channels that weren't used so far - for (int c=comm->nChannels; cp2pnChannels; c++) NCCLCHECK(initChannel(comm, c)); + for (int c=comm->nChannels; cnChannels, comm->p2pnChannels); c++) NCCLCHECK(initChannel(comm, c)); // We want to spread channels used when there aren't many and progressively // fill the whole space of nChannels. To do so we mirror the bits in the diff --git a/src/group.cc b/src/group.cc index 8572cda7a7..df9cd85ddd 100644 --- a/src/group.cc +++ b/src/group.cc @@ -226,9 +226,9 @@ ncclResult_t ncclGroupEnd() { int nChannelsMax = comm->p2pnChannelsPerPeer; int nChannelsMin = nChannelsMax; // Try to use all channels, but one channel per operation. - while (nChannelsMin*comm->nRanks > comm->p2pnChannels && nChannelsMin > 1) nChannelsMin /= 2; + while (nChannelsMin*comm->nRanks > std::max(comm->nChannels, comm->p2pnChannels) && nChannelsMin > 1) nChannelsMin /= 2; // Avoid overloading channels with 8+ operations as we loose the sync warp, hence a bit of bandwidth. - while (nChannelsMax*comm->nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2; + while (nChannelsMax*comm->nRanks > std::max(comm->nChannels, comm->p2pnChannels)*4 && nChannelsMax > 1) nChannelsMax /= 2; while (comm->p2pSendCount > 0 || comm->p2pRecvCount > 0) { // schedule delta 0, +1, -1, +2, -2, ... diff --git a/src/include/collectives.h b/src/include/collectives.h index 05e8b9ee5f..6b905385fd 100644 --- a/src/include/collectives.h +++ b/src/include/collectives.h @@ -8,8 +8,10 @@ #ifndef NCCL_COLLECTIVES_H_ #define NCCL_COLLECTIVES_H_ -#define FUNC_INDEX_P2P 1800 -#define FUNC_INDEX(func, redop, ncclType, al, pr) ((((((func)*ncclNumOps + (redop))*ncclNumTypes) + (ncclType))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr)) +#define FUNC_INDEX_P2P (NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps) +#define FUNC_INDEX(func, redop, ncclType, al, pr) ((func >= NCCL_NUM_FUNCTIONS) \ + ? (func-NCCL_NUM_FUNCTIONS+NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps) \ + : ((((((func)*ncclNumOps + (redop))*ncclNumTypes) + (ncclType))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr))) #define NCCL_FUNC_NAME(func, algo, proto, redop, type) \ ncclFunction_##func##_##algo##_##proto##_##redop##_##type @@ -60,6 +62,8 @@ DECL(ReduceScatter) \ DECL(AllReduce) \ DECL5(SendRecv, RING, SIMPLE, Sum, int8_t) \ + DECL5(AllToAll, RING, SIMPLE, Sum, int8_t) \ + DECL5(AllToAllv, RING, SIMPLE, Sum, int8_t) \ DECL_ALL @@ -81,4 +85,12 @@ DECL_ALL #define REDUCE_SLICESTEPS 1 #define REDUCE_CHUNKSTEPS 1 #define SENDRECV_SLICEFACTOR 1 +#define GATHER_SLICESTEPS 4 +#define GATHER_CHUNKSTEPS 4 +#define SCATTER_SLICESTEPS 4 +#define SCATTER_CHUNKSTEPS 4 +#define ALLTOALL_SLICESTEPS 4 +#define ALLTOALL_CHUNKSTEPS 4 +#define ALLTOALLV_SLICESTEPS 4 +#define ALLTOALLV_CHUNKSTEPS 4 #endif diff --git a/src/include/devcomm.h b/src/include/devcomm.h index f64e3094f0..341ddb2862 100644 --- a/src/include/devcomm.h +++ b/src/include/devcomm.h @@ -23,8 +23,8 @@ #endif #define NCCL_NUM_FUNCTIONS 5 // SendRecv not included for now -typedef enum { ncclFuncBroadcast, ncclFuncReduce, ncclFuncAllGather, ncclFuncReduceScatter, ncclFuncAllReduce, ncclFuncSendRecv} ncclFunc_t; -extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS]; +typedef enum { ncclFuncBroadcast, ncclFuncReduce, ncclFuncAllGather, ncclFuncReduceScatter, ncclFuncAllReduce, ncclFuncSendRecv, ncclFuncAllToAll, ncclFuncAllToAllv } ncclFunc_t; +extern const char* ncclFuncStr[]; #define NCCL_NUM_ALGORITHMS 3 // Tree/Ring/CollNet #define NCCL_ALGO_TREE 0 @@ -183,6 +183,11 @@ struct ncclWorkElem { int32_t delta; uint16_t nThreads; } p2p; + struct { + size_t count; + uint8_t bid; + uint8_t nChannels; + } a2av; uint64_t align[3]; }; }; @@ -208,6 +213,7 @@ struct ncclChannel { struct ncclWork* workFifo; int workCount; uint64_t workFifoTail; // Only used by CPU + size_t* a2avParams; #ifdef ENABLE_PROFILING struct timeval tvs; diff --git a/src/include/info.h b/src/include/info.h index 6b71492bd1..fa9402f350 100644 --- a/src/include/info.h +++ b/src/include/info.h @@ -20,7 +20,8 @@ typedef enum { ncclPatternTreeDown, ncclPatternTreeUpDown, ncclPatternCollTreeUp, - ncclPatternCollTreeDown + ncclPatternCollTreeDown, + ncclPatternAll } ncclPattern_t; // Used to pass NCCL call information between functions @@ -39,6 +40,11 @@ struct ncclInfo { // Algorithm details int chunkSteps; int sliceSteps; + // For alltoallv + const size_t *sendcounts; + const size_t *sdispls; + const size_t *recvcounts; + const size_t *rdispls; // Computed later int algorithm; int protocol; diff --git a/src/init.cc b/src/init.cc index 9a8a53618a..734b992f89 100644 --- a/src/init.cc +++ b/src/init.cc @@ -41,7 +41,7 @@ std::chrono::high_resolution_clock::time_point ncclEpoch; #define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream #endif -const char* ncclFuncStr[NCCL_NUM_FUNCTIONS] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce" }; +const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+2] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "AllToAll", "AllToAllv" }; const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" }; const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" }; @@ -395,7 +395,7 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { for (int c=0; cchannels[c].id = -1; comm->alltoallDisable = true; - //if (rcclParamAllToAllDisable() == 0) comm->alltoallDisable = false; + if (rcclParamAllToAllDisable() == 0) comm->alltoallDisable = false; *comret = comm; return ncclSuccess; @@ -403,11 +403,11 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { static ncclResult_t devCommSetup(ncclComm_t comm) { // Duplicate the channels on the device - NCCLCHECK(ncclCudaCalloc(&comm->hostDevComm.channels, comm->p2pnChannels)); - NCCLCHECK(ncclCudaMemcpy(comm->hostDevComm.channels, comm->channels, comm->p2pnChannels)); + NCCLCHECK(ncclCudaCalloc(&comm->hostDevComm.channels, std::max(comm->nChannels, comm->p2pnChannels))); + NCCLCHECK(ncclCudaMemcpy(comm->hostDevComm.channels, comm->channels, std::max(comm->nChannels, comm->p2pnChannels))); // Copy userRanks and peers - for (int r=0; rp2pnChannels; r++) { + for (int r=0; rnChannels, comm->p2pnChannels); r++) { NCCLCHECK(ncclCudaMemcpy(comm->channels[r].ring.devUserRanks, comm->channels[r].ring.userRanks, comm->nRanks)); } @@ -838,7 +838,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(ncclTopoIdToIndex(comm->topo, GPU, myInfo->busId, &idx)); allGather3Data[rank].cudaCompCap = comm->topo->nodes[GPU].nodes[idx].gpu.cudaCompCap; allGather3Data[rank].gcn = comm->topo->nodes[GPU].nodes[idx].gpu.gcn; - allGather3Data[rank].alltoallDisable = comm->alltoallDisable; + allGather3Data[rank].alltoallDisable = comm->topo->nodes[NET].count? 1 : comm->alltoallDisable; allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); @@ -1031,6 +1031,29 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm // Compute nChannels per peer for p2p NCCLCHECK(ncclTopoComputeP2pChannels(comm)); + if (!alltoallDisable) { + int nc = comm->p2pnChannels; + for (int c=0; cchannels[c].peers[peerSend].send.connected == 0) { + comm->connectSend[peerSend] |= (1<connect = 1; + } + if (comm->channels[c].peers[peerRecv].recv.connected == 0) { + comm->connectRecv[peerRecv] |= (1<connect = 1; + } + } + } + NCCLCHECK(ncclTransportP2pSetup(comm, NULL)); + } + NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, intraRank0Comm)); if (comm->nNodes) NCCLCHECK(ncclProxyCreate(comm)); diff --git a/src/misc/argcheck.cc b/src/misc/argcheck.cc index 716cd53408..741a3d6141 100644 --- a/src/misc/argcheck.cc +++ b/src/misc/argcheck.cc @@ -46,11 +46,16 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) { } // Type is OK, compute nbytes. Convert Allgather/Broadcast/P2P calls to chars. info->nBytes = info->count * ncclTypeSize(info->datatype); - if (info->coll == ncclFuncAllGather || info->coll == ncclFuncBroadcast) { + if (info->coll == ncclFuncAllGather || info->coll == ncclFuncBroadcast || info->coll == ncclFuncAllToAll) { info->count = info->nBytes; info->datatype = ncclInt8; } - if (info->coll == ncclFuncAllGather || info->coll == ncclFuncReduceScatter) info->nBytes *= info->comm->nRanks; // count is per rank + if (info->coll == ncclFuncAllToAllv) { + // Use count to store data type size for alltoallv + info->count = ncclTypeSize(info->datatype); + info->datatype = ncclInt8; + } + if (info->coll == ncclFuncAllGather || info->coll == ncclFuncReduceScatter || info->coll == ncclFuncAllToAll) info->nBytes *= info->comm->nRanks; // count is per rank if (info->op < 0 || info->op >= ncclNumOps) { WARN("%s : invalid reduction operation %d", info->opName, info->op); diff --git a/src/proxy.cc b/src/proxy.cc index b2434b19c7..2ba7aabfb4 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -425,19 +425,19 @@ ncclResult_t ncclProxySharedBuffersInit(struct ncclComm* comm, int cuda, int* si char* buff; int* used; - *size = 2*comm->p2pnChannels*state->slotSize*state->nslots; + *size = 2*std::max(comm->nChannels, comm->p2pnChannels)*state->slotSize*state->nslots; if (cuda && state->cudaBuff[0] == NULL) { NCCLCHECK(ncclCudaCalloc(&buff, *size, cuda)); - NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots)); - for (int i=0; i<2*comm->p2pnChannels; i++) { + NCCLCHECK(ncclCalloc(&used, 2*std::max(comm->nChannels, comm->p2pnChannels)*state->nslots)); + for (int i=0; i<2*std::max(comm->nChannels, comm->p2pnChannels); i++) { state->cudaBuff[i] = buff + state->nslots*state->slotSize*i; state->cudaUsed[i] = used + state->nslots*i; } } else if (state->hostBuff[0] == NULL) { NCCLCHECK(ncclCudaHostCalloc(&buff, *size)); - NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots)); - for (int i=0; i<2*comm->p2pnChannels; i++) { + NCCLCHECK(ncclCalloc(&used, 2*std::max(comm->nChannels, comm->p2pnChannels)*state->nslots)); + for (int i=0; i<2*std::max(comm->nChannels, comm->p2pnChannels); i++) { state->hostBuff[i] = buff + state->nslots*state->slotSize*i; state->hostUsed[i] = used + state->nslots*i; } diff --git a/tools/topo_expl/utils.cpp b/tools/topo_expl/utils.cpp index 45d2723475..ca216eee81 100644 --- a/tools/topo_expl/utils.cpp +++ b/tools/topo_expl/utils.cpp @@ -30,7 +30,7 @@ #include "model.h" #include "utils.h" -const char* ncclFuncStr[NCCL_NUM_FUNCTIONS] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce" }; +const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+1] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "AllToAll" }; const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" }; const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" }; @@ -282,7 +282,7 @@ ncclResult_t initTransportsRank_1(struct ncclComm* comm, struct allGather1Data_t NCCLCHECK(ncclTopoIdToIndex(comm->topo, GPU, myInfo->busId, &idx)); allGather3Data[rank].cudaCompCap = comm->topo->nodes[GPU].nodes[idx].gpu.cudaCompCap; allGather3Data[rank].gcn = comm->topo->nodes[GPU].nodes[idx].gpu.gcn; - allGather3Data[rank].alltoallDisable = comm->alltoallDisable; + allGather3Data[rank].alltoallDisable = comm->topo->nodes[NET].count? 1 : comm->alltoallDisable; allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); @@ -624,6 +624,29 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGather3Data_t // Compute nChannels per peer for p2p NCCLCHECK(ncclTopoComputeP2pChannels(comm)); + if (!alltoallDisable) { + int nc = comm->nChannels; + for (int c=0; cchannels[c].peers[peerSend].send.connected == 0) { + comm->connectSend[peerSend] |= (1<connect = 1; + } + if (comm->channels[c].peers[peerRecv].recv.connected == 0) { + comm->connectRecv[peerRecv] |= (1<connect = 1; + } + } + } + NCCLCHECK(ncclTransportP2pSetup(comm, NULL)); + } + //NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, intraRank0Comm)); //if (comm->nNodes) NCCLCHECK(ncclProxyCreate(comm));