Add gather, scatter and alltoall collectives

Introducing 3 new APIs:
ncclResult_t  ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount,
    ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream);
ncclResult_t  ncclScatter(const void* sendbuff, void* recvbuff,
    size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm,
    hipStream_t stream);
ncclResult_t  ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count,
    ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream);

Only out of place operation is supported.
Preprocessor symbol RCCL_GATHER_SCATTER=1 indicates API availibility.
By default the APIs launche RCCL kernel implementation, which can be disabled by
RCCL_ALLTOALL_KERNEL_DISABLE=1. Then the APIs use wrapper around ncclSend and ncclRecv.
此提交包含在:
Wenkai Du
2020-03-18 17:03:03 -07:00
父節點 71ec3e09df
當前提交 e80e29573c
共有 24 個檔案被更改,包括 617 行新增15 行删除
+6
查看文件
@@ -91,6 +91,9 @@ set(CU_SOURCES
src/collectives/device/broadcast.cu
src/collectives/device/reduce_scatter.cu
src/collectives/device/sendrecv.cu
src/collectives/device/gather.cu
src/collectives/device/scatter.cu
src/collectives/device/all_to_all.cu
src/collectives/device/functions.cu)
set(CPP_SOURCES)
@@ -119,6 +122,9 @@ set(CC_SOURCES
src/collectives/broadcast_api.cc
src/collectives/reduce_scatter_api.cc
src/collectives/sendrecv_api.cc
src/collectives/gather_api.cc
src/collectives/scatter_api.cc
src/collectives/all_to_all_api.cc
src/channel.cc
src/misc/argcheck.cc
src/misc/nvmlwrap_stub.cc
+33
查看文件
@@ -0,0 +1,33 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "enqueue.h"
#include "collectives.h"
NCCL_API(ncclResult_t, ncclAllToAll, const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype,
ncclComm_t comm, hipStream_t stream);
ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype,
ncclComm_t comm, hipStream_t stream) {
if (comm->alltoallDisable) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
size_t rankOffset = count * ncclTypeSize(datatype);
if (count == 0) return ncclSuccess;
NCCLCHECK(ncclGroupStart());
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclSend(((char*)sendbuff)+r*rankOffset, count, datatype, r, comm, stream));
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, count, datatype, r, comm, stream));
}
NCCLCHECK(ncclGroupEnd());
return ncclSuccess;
} else {
struct ncclInfo info = { ncclCollAllToAll, "AllToAll",
sendbuff, recvbuff, count, datatype, ncclSum, 0, comm, stream, /* Args */
ALLTOALL_CHUNKSTEPS, ALLTOALL_SLICESTEPS };
return ncclEnqueueCheck(&info);
}
}
+12
查看文件
@@ -0,0 +1,12 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "all_to_all.h"
#include "common.h"
#include "collectives.h"
IMPL_COLL_FUNC(ncclAllToAll, copy, FuncSum, i8, int8_t);
IMPL_COLL_KERN(ncclAllToAll, copy, FuncSum, i8, int8_t, 0);
+64
查看文件
@@ -0,0 +1,64 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "devcomm.h"
#include "primitives.h"
#include "collectives.h"
template<int UNROLL, class FUNC, typename T>
__attribute__((noinline))
__device__ void ncclAllToAllKernel(struct CollectiveArgs* args) {
const int tid = threadIdx.x;
const int nthreads = args->coll.nThreads;
const int nChannels = args->coll.nChannels;
struct ncclDevComm* comm = args->comm;
struct ncclChannel* channel = comm->channels+blockIdx.x;
struct ncclRing* ring = &channel->ring;
const ssize_t size = args->coll.count;
const int nranks = comm->nRanks;
const int bid = args->coll.bid;
const int rank = ring->devUserRanks[0];
const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS);
const int chunkSize = stepSize * ALLTOALL_CHUNKSTEPS;
const int peersPerChan = (nChannels >= nranks ? 1 : DIVUP(nranks, nChannels));
const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize);
// Compute pointers
const T * __restrict__ thisInput = (const T*)args->sendbuff;
T * __restrict__ thisOutput = (T*)args->recvbuff;
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
for (int i = 0; i < peersPerChan; i++) {
if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) ||
(peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks))
continue;
int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1)));
ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T));
ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0);
int nelem = min(realChunkSize, size-chunkOffset);
if ((blockIdx.x*peersPerChan+i)%nranks == 0) {
if (tid < nthreads && thisInput != thisOutput) {
const T* sendbuff = thisInput+chunkOffset+rank*size;
T* recvbuff = thisOutput+chunkOffset+rank*size;
// local copy
ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, 1>(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem);
}
}
else {
int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks;
int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks;
ncclPrimitives<UNROLL, ALLTOALL_CHUNKSTEPS/ALLTOALL_SLICESTEPS, ALLTOALL_SLICESTEPS, T, 1, 1, 0, FUNC>
prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount);
ssize_t send_offset = chunkOffset + peerSend*size;
ssize_t recv_offset = chunkOffset + peerRecv*size;
prims.send(thisInput+send_offset, nelem);
prims.recv(thisOutput+recv_offset, nelem);
}
}
}
}
+15
查看文件
@@ -96,6 +96,9 @@ static inline __device__ void exitIfAbortBarrier(int abort) {
NCCL_FUNCS2B(ncclAllGather), \
NCCL_FUNCS2A(ncclReduceScatter), \
NCCL_FUNCS2A(ncclAllReduce), \
NCCL_COLL_NAME(ncclGather, copy, i8), \
NCCL_COLL_NAME(ncclScatter, copy, i8), \
NCCL_COLL_NAME(ncclAllToAll, copy, i8), \
NCCL_COLL_NAME(ncclSendRecv, copy, i8) }
// Must be consistent with the ncclFuncSet enum
@@ -111,6 +114,9 @@ static const __device__ constexpr ncclKernelFunc_t ncclFuncs[]{
NCCL_FUNCS2B(ncclAllGather),
NCCL_FUNCS2A(ncclReduceScatter),
NCCL_FUNCS2A(ncclAllReduce),
NCCL_COLL_NAME(ncclGather, copy, i8),
NCCL_COLL_NAME(ncclScatter, copy, i8),
NCCL_COLL_NAME(ncclAllToAll, copy, i8),
NCCL_COLL_NAME(ncclSendRecv, copy, i8)
#endif
};
@@ -159,6 +165,15 @@ void NCCL_CALL_FUNCTIONS(struct ncclColl* const c) noexcept {
else ncclAllGatherCollNet_copy_i8(&c->args);
}
else if (c->funcIndex < 1800) Caller<1080, 1800>::call(c);
else if (c->funcIndex == 1800) {
ncclGather_copy_i8(&c->args);
}
else if (c->funcIndex == 1801) {
ncclScatter_copy_i8(&c->args);
}
else if (c->funcIndex == 1802) {
ncclAllToAll_copy_i8(&c->args);
}
else ncclSendRecv_copy_i8(&c->args);
}
+12
查看文件
@@ -0,0 +1,12 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "gather.h"
#include "common.h"
#include "collectives.h"
IMPL_COLL_FUNC(ncclGather, copy, FuncSum, i8, int8_t);
IMPL_COLL_KERN(ncclGather, copy, FuncSum, i8, int8_t, 0);
+74
查看文件
@@ -0,0 +1,74 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "devcomm.h"
#include "primitives.h"
#include "collectives.h"
template<int UNROLL, class FUNC, typename T>
__attribute__((noinline))
__device__ void ncclGatherKernel(struct CollectiveArgs* args) {
const int tid = threadIdx.x;
const int nthreads = args->coll.nThreads;
const int nChannels = args->coll.nChannels;
struct ncclDevComm* comm = args->comm;
struct ncclChannel* channel = comm->channels+blockIdx.x;
struct ncclRing* ring = &channel->ring;
const ssize_t size = args->coll.count;
const int nranks = comm->nRanks;
const int bid = args->coll.bid;
const int rank = ring->devUserRanks[0];
const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS);
const int chunkSize = stepSize * GATHER_CHUNKSTEPS;
const int peersPerChan = (nChannels >= nranks ? 1 : DIVUP(nranks, nChannels));
const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize);
const int root = args->coll.root;
// Compute pointers
const T * __restrict__ thisInput = (const T*)args->sendbuff;
T * __restrict__ thisOutput = (T*)args->recvbuff;
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
for (int i = 0; i < peersPerChan; i++) {
if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) ||
(peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks))
continue;
int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1)));
ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T));
ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0);
int nelem = min(realChunkSize, size-chunkOffset);
if ((blockIdx.x*peersPerChan+i)%nranks == 0 && rank == root) {
const T* sendbuff = thisInput+chunkOffset;
T* recvbuff = thisOutput+chunkOffset+rank*size;
if (tid < nthreads && sendbuff != recvbuff) {
// local copy
ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, 1>(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem);
}
}
else {
int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks;
int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks;
if (rank == root) {
ncclPrimitives<UNROLL, GATHER_CHUNKSTEPS/GATHER_SLICESTEPS, GATHER_SLICESTEPS, T, 1, 1, 0, FUNC>
prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount);
ssize_t recv_offset = chunkOffset + peerRecv*size;
prims.recv(thisOutput+recv_offset, nelem);
}
else {
if (peerSend == root) {
ncclPrimitives<UNROLL, GATHER_CHUNKSTEPS/GATHER_SLICESTEPS, GATHER_SLICESTEPS, T, 1, 1, 0, FUNC>
prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount);
ssize_t send_offset = chunkOffset;
prims.send(thisInput+send_offset, nelem);
}
}
}
}
}
}
+12
查看文件
@@ -0,0 +1,12 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "scatter.h"
#include "common.h"
#include "collectives.h"
IMPL_COLL_FUNC(ncclScatter, copy, FuncSum, i8, int8_t);
IMPL_COLL_KERN(ncclScatter, copy, FuncSum, i8, int8_t, 0);
+74
查看文件
@@ -0,0 +1,74 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "devcomm.h"
#include "primitives.h"
#include "collectives.h"
template<int UNROLL, class FUNC, typename T>
__attribute__((noinline))
__device__ void ncclScatterKernel(struct CollectiveArgs* args) {
const int tid = threadIdx.x;
const int nthreads = args->coll.nThreads;
const int nChannels = args->coll.nChannels;
struct ncclDevComm* comm = args->comm;
struct ncclChannel* channel = comm->channels+blockIdx.x;
struct ncclRing* ring = &channel->ring;
const ssize_t size = args->coll.count;
const int nranks = comm->nRanks;
const int bid = args->coll.bid;
const int rank = ring->devUserRanks[0];
const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS);
const int chunkSize = stepSize * SCATTER_CHUNKSTEPS;
const int peersPerChan = (nChannels >= nranks ? 1 : DIVUP(nranks, nChannels));
const ssize_t loopSize = (peersPerChan == 1 ? (nChannels/nranks)*(ssize_t)chunkSize : (ssize_t)chunkSize);
const int root = args->coll.root;
// Compute pointers
const T * __restrict__ thisInput = (const T*)args->sendbuff;
T * __restrict__ thisOutput = (T*)args->recvbuff;
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
for (int i = 0; i < peersPerChan; i++) {
if ((peersPerChan == 1 && blockIdx.x >= (nChannels/nranks)*nranks) ||
(peersPerChan > 1 && blockIdx.x*peersPerChan+i >= nranks))
continue;
int realChunkSize = min(chunkSize, DIVUP(size-gridOffset, (peersPerChan == 1 ? (nChannels/nranks) : 1)));
ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T));
ssize_t chunkOffset = gridOffset + (peersPerChan == 1 ? (bid/nranks)*realChunkSize : 0);
int nelem = min(realChunkSize, size-chunkOffset);
if ((blockIdx.x*peersPerChan+i)%nranks == 0 && rank == root) {
const T* sendbuff = thisInput+chunkOffset+rank*size;
T* recvbuff = thisOutput+chunkOffset;
if (tid < nthreads && sendbuff != recvbuff) {
// local copy
ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, 1>(tid, nthreads, 1, &sendbuff, 1, &recvbuff, nelem);
}
}
else {
int peerSend = (rank+(blockIdx.x*peersPerChan)+i)%nranks;
int peerRecv = (2*nranks+rank-((blockIdx.x*peersPerChan)%nranks)-(i%nranks))%nranks;
if (rank == root) {
ncclPrimitives<UNROLL, SCATTER_CHUNKSTEPS/SCATTER_SLICESTEPS, SCATTER_SLICESTEPS, T, 1, 1, 0, FUNC>
prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount);
ssize_t send_offset = chunkOffset + peerSend*size;
prims.send(thisInput+send_offset, nelem);
}
else {
if (peerRecv == root) {
ncclPrimitives<UNROLL, SCATTER_CHUNKSTEPS/SCATTER_SLICESTEPS, SCATTER_SLICESTEPS, T, 1, 1, 0, FUNC>
prims(tid, nthreads, &peerRecv, &peerSend, NULL, stepSize, channel, comm, args->opCount);
ssize_t recv_offset = chunkOffset;
prims.recv(thisOutput+recv_offset, nelem);
}
}
}
}
}
}
+37
查看文件
@@ -0,0 +1,37 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "enqueue.h"
#include "collectives.h"
NCCL_API(ncclResult_t, ncclGather, const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream);
ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream) {
if (comm->alltoallDisable) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
size_t rankOffset = sendcount * ncclTypeSize(datatype);
if (sendcount == 0) return ncclSuccess;
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
NCCLCHECK(ncclGroupStart());
if (rank == root) {
for (int r=0; r<nRanks; r++)
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, sendcount, datatype, r, comm, stream));
}
NCCLCHECK(ncclSend(sendbuff, sendcount, datatype, root, comm, stream));
NCCLCHECK(ncclGroupEnd());
return ncclSuccess;
}
else {
struct ncclInfo info = { ncclCollGather, "Gather",
sendbuff, recvbuff, sendcount, datatype, ncclSum, root, comm, stream, /* Args */
GATHER_CHUNKSTEPS, GATHER_SLICESTEPS };
return ncclEnqueueCheck(&info);
}
}
+37
查看文件
@@ -0,0 +1,37 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "enqueue.h"
#include "collectives.h"
NCCL_API(ncclResult_t, ncclScatter, const void* sendbuff, void* recvbuff, size_t recvcount, ncclDataType_t datatype, int root,
ncclComm_t comm, hipStream_t stream);
ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff, size_t recvcount, ncclDataType_t datatype, int root,
ncclComm_t comm, hipStream_t stream) {
if (comm->alltoallDisable) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
size_t rankOffset = recvcount * ncclTypeSize(datatype);
if (recvcount == 0) return ncclSuccess;
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
NCCLCHECK(ncclGroupStart());
if (rank == root) {
for (int r=0; r<nRanks; r++)
NCCLCHECK(ncclSend(((char*)sendbuff)+r*rankOffset, recvcount, datatype, r, comm, stream));
}
NCCLCHECK(ncclRecv(recvbuff, recvcount, datatype, root, comm, stream));
NCCLCHECK(ncclGroupEnd());
return ncclSuccess;
}
else {
struct ncclInfo info = { ncclCollScatter, "Scatter",
sendbuff, recvbuff, recvcount, datatype, ncclSum, root, comm, stream, /* Args */
SCATTER_CHUNKSTEPS, SCATTER_SLICESTEPS };
return ncclEnqueueCheck(&info);
}
}
+29 -6
查看文件
@@ -58,12 +58,15 @@
typedef void(*ncclKern_t)(struct ncclDevComm*);
// Must be consistent with the ncclFuncSet enum
static ncclKern_t const ncclKerns[1+NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS] = {
static ncclKern_t const ncclKerns[4+NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS] = {
NCCL_FUNCS2B(ncclBroadcast),
NCCL_FUNCS2A(ncclReduce),
NCCL_FUNCS2B(ncclAllGather),
NCCL_FUNCS2A(ncclReduceScatter),
NCCL_FUNCS2A(ncclAllReduce),
NCCL_KERN_NAME(ncclGather, copy, i8),
NCCL_KERN_NAME(ncclScatter, copy, i8),
NCCL_KERN_NAME(ncclAllToAll, copy, i8),
NCCL_KERN_NAME(ncclSendRecv, copy, i8)
};
@@ -277,6 +280,10 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) {
}
}
}
if (info->coll == ncclCollAllToAll || info->coll == ncclCollGather || info->coll == ncclCollScatter) {
info->algorithm = NCCL_ALGO_RING;
info->protocol = NCCL_PROTO_SIMPLE;
}
if (info->algorithm == -1 || info->protocol == -1) {
WARN("Error : no algorithm/protocol available");
return ncclInternalError;
@@ -288,7 +295,9 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) {
int nt = comm->maxThreads[info->algorithm][info->protocol];
int threadThreshold = comm->threadThresholds[info->algorithm][info->protocol];
while (info->nBytes < nc*nt*threadThreshold) {
if (info->algorithm != NCCL_ALGO_COLLNET && nc >= 2) nc--;
// do not reduce channels in case of alltoall
if (info->algorithm != NCCL_ALGO_COLLNET && info->coll != ncclCollAllToAll &&
info->coll != ncclCollGather && info->coll != ncclCollScatter && nc >= 2) nc--;
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
// do not reduce threads count on VEGA
#else
@@ -316,6 +325,10 @@ static ncclResult_t getPatternInfo(struct ncclInfo* info) {
info->pattern = ncclPatternRing; break;
case ncclCollAllReduce:
info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUp : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break;
case ncclCollGather:
case ncclCollScatter:
case ncclCollAllToAll:
info->pattern = ncclPatternAll; break;
default:
WARN("Unknown pattern for collective %d algorithm %d", info->coll, info->algorithm);
return ncclInternalError;
@@ -332,7 +345,11 @@ static ncclResult_t getLoopInfo(struct ncclInfo* info) {
case ncclPatternPipelineTo:
case ncclPatternCollTreeUp:
case ncclPatternCollTreeDown:
info->nstepsPerLoop = info-> nchunksPerLoop = 1; break;
info->nstepsPerLoop = info->nchunksPerLoop = 1; break;
case ncclPatternAll:
info->nstepsPerLoop = 1;
info->nchunksPerLoop = info->comm->nRanks;
break;
case ncclPatternRing:
info->nstepsPerLoop = info->comm->nRanks-1; info->nchunksPerLoop = info->comm->nRanks; break;
case ncclPatternRingTwice:
@@ -417,7 +434,11 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo
if (info->protocol == NCCL_PROTO_LL) chunkEffectiveSize /= 2;
if (info->protocol == NCCL_PROTO_LL128) chunkEffectiveSize = (chunkSize / NCCL_LL128_LINEELEMS) * NCCL_LL128_DATAELEMS;
//if (info->comm->rank == 0) printf("Coll %d, size %ld -> %dx%d, chunkSize %d (algo %d proto%d)\n", info->coll, info->nBytes, info->nChannels, info->nThreads, chunkSize, info->algorithm, info->protocol);
int nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize)));
int nLoops;
if (info->pattern != ncclPatternAll)
nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize)));
else
nLoops = (int)(DIVUP(info->nBytes, (((size_t)((info->nChannels >= info->comm->nRanks ? (info->nChannels/info->comm->nRanks) : 1))))*info->nchunksPerLoop*chunkEffectiveSize));
proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps;
proxyArgs->sliceSteps = sliceSteps;
proxyArgs->chunkSteps = chunkSteps;
@@ -425,8 +446,8 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo
proxyArgs->opCount = info->comm->opCount;
proxyArgs->dtype = info->datatype;
proxyArgs->redOp = info->op;
TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p",
coll->args.opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, info->nBytes, info->protocol, info->nChannels, info->nThreads,
TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d ces %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p",
coll->args.opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, chunkEffectiveSize, info->nBytes, info->protocol, info->nChannels, info->nThreads,
nLoops, proxyArgs->nsteps, info->comm);
return ncclSuccess;
}
@@ -479,6 +500,8 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
if (info->coll == ncclCollSendRecv) {
info->comm->myParams->gridDim.x = std::max<unsigned>(info->comm->myParams->gridDim.x, channelId+1);
NCCLCHECK(ncclProxySaveP2p(info, channel));
} else if (info->coll == ncclCollAllToAll || info->coll == ncclCollScatter || info->coll == ncclCollGather) {
NCCLCHECK(ncclProxySaveA2a(&proxyArgs, info));
} else {
NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks));
}
+14 -3
查看文件
@@ -8,8 +8,10 @@
#ifndef NCCL_COLLECTIVES_H_
#define NCCL_COLLECTIVES_H_
#define FUNC_INDEX_P2P 1800
#define FUNC_INDEX(coll, redop, dtype, al, pr) (((((coll)*ncclNumOps + (redop))*ncclNumTypes) + (dtype))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr)
#define FUNC_INDEX_P2P (3+NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps)
#define FUNC_INDEX(coll, redop, dtype, al, pr) ((coll >= NCCL_NUM_FUNCTIONS) \
? (coll-NCCL_NUM_FUNCTIONS+NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps) \
: ((((((coll)*ncclNumOps + (redop))*ncclNumTypes) + (dtype))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr)))
#define NCCL_COLL_NAME(coll, op, dtype) \
coll##_##op##_##dtype
@@ -56,7 +58,10 @@
DECL_COLL2(ncclAllGather, copy) \
DECL_COLL(ncclReduceScatter) \
DECL_COLL(ncclAllReduce) \
DECL_COLL5(ncclSendRecv,copy,i8) \
DECL_COLL5(ncclGather, copy, i8) \
DECL_COLL5(ncclScatter, copy, i8) \
DECL_COLL5(ncclAllToAll, copy, i8) \
DECL_COLL5(ncclSendRecv, copy, i8) \
DECL_ALL_COLLS
@@ -78,5 +83,11 @@ DECL_ALL_COLLS
#define REDUCE_SLICESTEPS 1
#define REDUCE_CHUNKSTEPS 1
#define SENDRECV_SLICEFACTOR 4
#define GATHER_SLICESTEPS 4
#define GATHER_CHUNKSTEPS 4
#define SCATTER_SLICESTEPS 4
#define SCATTER_CHUNKSTEPS 4
#define ALLTOALL_SLICESTEPS 4
#define ALLTOALL_CHUNKSTEPS 4
#endif
+3
查看文件
@@ -147,6 +147,9 @@ struct ncclComm {
int collNetSupport;
//list of async p2p operation queued in a group semantics
struct ncclP2Plist p2plist;
// RCCL AllToAll/Scatter/Gather API
bool alltoallDisable;
};
#endif
+2 -2
查看文件
@@ -23,8 +23,8 @@
#endif
#define NCCL_NUM_FUNCTIONS 5 // SendRecv not included for now
typedef enum { ncclCollBroadcast, ncclCollReduce, ncclCollAllGather, ncclCollReduceScatter, ncclCollAllReduce, ncclCollSendRecv} ncclFunc_t;
extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS];
typedef enum { ncclCollBroadcast, ncclCollReduce, ncclCollAllGather, ncclCollReduceScatter, ncclCollAllReduce, ncclCollGather, ncclCollScatter, ncclCollAllToAll, ncclCollSendRecv} ncclFunc_t;
extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3];
#define NCCL_NUM_ALGORITHMS 3 // Tree/Ring/CollNet
#define NCCL_ALGO_TREE 0
+2 -1
查看文件
@@ -20,7 +20,8 @@ typedef enum {
ncclPatternTreeDown,
ncclPatternTreeUpDown,
ncclPatternCollTreeUp,
ncclPatternCollTreeDown
ncclPatternCollTreeDown,
ncclPatternAll
} ncclPattern_t;
// Used to pass NCCL call information between functions
+25
查看文件
@@ -77,4 +77,29 @@ int64_t ncclParam##name() { \
return value; \
}
#define RCCL_PARAM(name, env, default_value) \
pthread_mutex_t rcclParamMutex##name = PTHREAD_MUTEX_INITIALIZER; \
int64_t rcclParam##name() { \
static_assert(default_value != -1LL, "default value cannot be -1"); \
static int64_t value = -1LL; \
pthread_mutex_lock(&rcclParamMutex##name); \
char* en = getenv("RCCL_TEST_ENV_VARS"); \
if (value == -1LL || (en && (strcmp(en, "ENABLE") == 0))) { \
value = default_value; \
char* str = getenv("RCCL_" env); \
if (str && strlen(str) > 0) { \
errno = 0; \
int64_t v = strtoll(str, NULL, 0); \
if (errno) { \
INFO(NCCL_ALL,"Invalid value %s for %s, using default %lu.", str, "RCCL_" env, value); \
} else { \
value = v; \
INFO(NCCL_ALL,"%s set by environment to %lu.", "RCCL_" env, value); \
} \
} \
} \
pthread_mutex_unlock(&rcclParamMutex##name); \
return value; \
}
#endif
+1
查看文件
@@ -60,6 +60,7 @@ enum proxyMode {
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks);
ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel);
ncclResult_t ncclProxySaveA2a(struct ncclProxyArgs* args, struct ncclInfo* info);
ncclResult_t ncclProxyStart(struct ncclComm* comm);
ncclResult_t ncclProxyCreate(struct ncclComm* comm);
ncclResult_t ncclProxyDestroy(struct ncclComm* comm);
+48 -1
查看文件
@@ -41,7 +41,7 @@ std::chrono::high_resolution_clock::time_point ncclEpoch;
#define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream
#endif
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce" };
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "Gather", "Scatter", "AllToAll" };
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" };
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
@@ -521,6 +521,10 @@ static ncclResult_t computeBuffSizes(struct ncclComm* comm) {
int defaults[NCCL_NUM_PROTOCOLS] = { DEFAULT_LL_BUFFSIZE, DEFAULT_LL128_BUFFSIZE, DEFAULT_BUFFSIZE };
if (cpuArch == NCCL_TOPO_CPU_ARCH_ARM) defaults[NCCL_PROTO_SIMPLE] = DEFAULT_BUFFSIZE_ARM;
if (comm->nRanks >= 32) {
defaults[NCCL_PROTO_SIMPLE] = 524288;
INFO(NCCL_INIT, "Setting DEFAULT_BUFFSIZE to %d for nRanks %d", defaults[NCCL_PROTO_SIMPLE], comm->nRanks);
}
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
comm->buffSizes[p] = comm->hostDevComm.buffSizes[p] = envs[p] != -2 ? envs[p] : defaults[p];
@@ -652,6 +656,7 @@ static ncclResult_t checkCollNetSetup(struct ncclComm* comm, int rank, int collN
NCCL_PARAM(CrossNic, "CROSS_NIC", 2);
NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0);
RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 0);
static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
// We use 3 AllGathers
@@ -909,6 +914,48 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
// Compute nChannels per peer for p2p
NCCLCHECK(ncclTopoComputeP2pChannels(comm));
comm->alltoallDisable = true;
if (rcclParamAllToAllDisable() == 0) {
comm->alltoallDisable = false;
for (int c=0; c<comm->nChannels; c++) {
const int peersPerChan = (comm->nChannels >= nranks ? 1 : DIVUP(nranks, comm->nChannels));
struct ncclP2PConnect* connect = &comm->p2plist.connect;
connect->nrecv[c] = 0;
connect->nsend[c] = 0;
for (int p=0; p<peersPerChan; p++) {
// first channel is reserved for self copy
if ((c*peersPerChan+p)%nranks == 0)
continue;
int peerSend = (rank+c*peersPerChan+p)%nranks;
int peerRecv = (2*nranks+rank-(c*peersPerChan)%nranks-p)%nranks;
if (comm->channels[c].peers[peerSend].send.connected == 0) {
connect->send[c*nranks+connect->nsend[c]++] = peerSend;
}
if (comm->channels[c].peers[peerRecv].recv.connected == 0) {
connect->recv[c*nranks+connect->nrecv[c]++] = peerRecv;
}
}
}
for (int c=0; c<comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
struct ncclP2PConnect* connect = &comm->p2plist.connect;
#if 0
printf("channel %d recv: ", c);
for (int i=0; i<connect->nrecv[c]; i++)
printf("%d ", connect->recv[c*nranks+i]);
printf("\n");
printf("channel %d send: ", c);
for (int i=0; i<connect->nsend[c]; i++)
printf("%d ", connect->send[c*nranks+i]);
printf("\n");
#endif
NCCLCHECK(ncclTransportP2pSetup(comm, NULL, channel, connect->nrecv[c], connect->recv+c*nranks, connect->nsend[c], connect->send+c*nranks));
connect->nrecv[c] = 0;
connect->nsend[c] = 0;
}
}
INFO(NCCL_INIT, "RCCL AllToAll/Scatter/Gather kernels %s", comm->alltoallDisable ? "disabled" : "enabled");
// We should have allocated all buffers, collective fifos, ... we can
// restore the affinity.
affinity_restore:
+5 -2
查看文件
@@ -46,11 +46,14 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) {
}
// Type is OK, compute nbytes. Convert Allgather/Broadcast/P2P calls to chars.
info->nBytes = info->count * ncclTypeSize(info->datatype);
if (info->coll == ncclCollAllGather || info->coll == ncclCollBroadcast) {
if (info->coll == ncclCollAllGather || info->coll == ncclCollBroadcast
|| info->coll == ncclCollGather || info->coll == ncclCollScatter || info->coll == ncclCollAllToAll) {
info->count = info->nBytes;
info->datatype = ncclInt8;
}
if (info->coll == ncclCollAllGather || info->coll == ncclCollReduceScatter) info->nBytes *= info->comm->nRanks; // count is per rank
if (info->coll == ncclCollAllGather || info->coll == ncclCollReduceScatter
|| info->coll == ncclCollGather || info->coll == ncclCollScatter || info->coll == ncclCollAllToAll)
info->nBytes *= info->comm->nRanks; // count is per rank
if (info->op < 0 || info->op >= ncclNumOps) {
WARN("%s : invalid reduction operation %d", info->opName, info->op);
+47
查看文件
@@ -20,6 +20,7 @@
#define NCCL_VERSION(X,Y,Z) ((X) * 1000 + (Y) * 100 + (Z))
#define RCCL_BFLOAT16 1
#define RCCL_GATHER_SCATTER 1
#ifdef __cplusplus
extern "C" {
@@ -271,6 +272,52 @@ ncclResult_t pncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, in
ncclResult_t ncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer,
ncclComm_t comm, hipStream_t stream);
/*
* Gather
*
* Root device gathers sendcount values from other GPUs into recvbuff,
* receiving data from rank i at offset i*sendcount.
* Assumes recvcount is equal to nranks*sendcount, which means that recvbuff
* should have a size of at least nranks*sendcount elements.
*
* In-place operations will happen if sendbuff == recvbuff + rank * sendcount.
*/
ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream);
ncclResult_t pncclGather(const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream);
/*
* Scatter
*
* Scattered over the devices so that recvbuff on rank i will contain the i-th
* block of the data on root.
* Assumes sendcount is equal to nranks*recvcount, which means that sendbuff
* should have a size of at least nranks*recvcount elements.
*
* In-place operations will happen if recvbuff == sendbuff + rank * recvcount.
*/
ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff,
size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm,
hipStream_t stream);
ncclResult_t pncclScatter(const void* sendbuff, void* recvbuff,
size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm,
hipStream_t stream);
/*
* All-To-All
*
* Device (i) send (j)th block of data to device (j) and be placed as (i)th
* block. Each block for sending/receiving has count elements, which means
* that recvbuff and sendbuff should have a size of nranks*count elements.
*
* In-place operation will happen if sendbuff == recvbuff.
*/
ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count,
ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream);
ncclResult_t pncclAllToAll(const void* sendbuff, void* recvbuff, size_t count,
ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream);
/*
* Group semantics
*
+21
查看文件
@@ -165,6 +165,27 @@ ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel
return ncclSuccess;
}
ncclResult_t ncclProxySaveA2a(struct ncclProxyArgs* args, struct ncclInfo* info) {
const int peersPerChan = (info->nChannels >= info->comm->nRanks ? 1 : DIVUP(info->comm->nRanks, info->nChannels));
for (int p=0; p<peersPerChan; p++) {
if ((peersPerChan == 1 && args->channel->id >= (info->nChannels/info->comm->nRanks)*info->comm->nRanks) ||
(peersPerChan > 1 && args->channel->id*peersPerChan+p >= info->comm->nRanks))
continue;
// first channel is reserved for self copy
if ((args->channel->id*peersPerChan+p)%info->comm->nRanks == 0)
continue;
int peerSend = (info->comm->rank+(args->channel->id*peersPerChan)+p)%info->comm->nRanks;
int peerRecv = (2*info->comm->nRanks+info->comm->rank-(args->channel->id*peersPerChan)%info->comm->nRanks-p%info->comm->nRanks)%info->comm->nRanks;
if (info->coll == ncclCollAllToAll || (info->coll == ncclCollScatter && info->comm->rank == info->root) ||
(info->coll == ncclCollGather && peerSend == info->root))
NCCLCHECK(SaveProxy<proxySend>(peerSend, args));
if (info->coll == ncclCollAllToAll || (info->coll == ncclCollGather && info->comm->rank == info->root) ||
(info->coll == ncclCollScatter && peerRecv == info->root))
NCCLCHECK(SaveProxy<proxyRecv>(peerRecv, args));
}
return ncclSuccess;
}
void* persistentThread(void *comm_) {
struct ncclComm* comm = (struct ncclComm*)comm_;
struct ncclProxyState* state = &comm->proxyState;
+3
查看文件
@@ -200,6 +200,9 @@ int main(int argc,char* argv[])
for (int i = 0; i < nranks; i++) {
comm[i].rank = i;
comm[i].nRanks = nranks;
comm[i].p2plist.count=0;
NCCLCHECK(ncclCalloc(&comm[i].p2plist.connect.recv, MAXCHANNELS*comm->nRanks));
NCCLCHECK(ncclCalloc(&comm[i].p2plist.connect.send, MAXCHANNELS*comm->nRanks));
node_model = network.GetNode(i);
assert(node_model!=0);
comm[i].topo = node_model->getSystem(i);
+41
查看文件
@@ -353,6 +353,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
return ncclSuccess;
}
RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 0);
ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGather3Data_t *allGather3Data,
struct ncclTopoGraph& treeGraph, struct ncclTopoGraph& ringGraph, struct ncclTopoGraph& collNetGraph) {
int rank = comm->rank;
@@ -503,6 +505,45 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGather3Data_t
// Compute nChannels per peer for p2p
NCCLCHECK(ncclTopoComputeP2pChannels(comm));
if (rcclParamAllToAllDisable() == 0) {
for (int c=0; c<comm->nChannels; c++) {
const int peersPerChan = (comm->nChannels >= nranks ? 1 : DIVUP(nranks, comm->nChannels));
struct ncclP2PConnect* connect = &comm->p2plist.connect;
connect->nrecv[c] = 0;
connect->nsend[c] = 0;
for (int p=0; p<peersPerChan; p++) {
// first channel is reserved for self copy
if ((c*peersPerChan+p)%nranks == 0)
continue;
int peerSend = (rank+c*peersPerChan+p)%nranks;
int peerRecv = (2*nranks+rank-(c*peersPerChan)%nranks-p)%nranks;
if (comm->channels[c].peers[peerSend].send.connected == 0) {
connect->send[c*nranks+connect->nsend[c]++] = peerSend;
}
if (comm->channels[c].peers[peerRecv].recv.connected == 0) {
connect->recv[c*nranks+connect->nrecv[c]++] = peerRecv;
}
}
}
for (int c=0; c<comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
struct ncclP2PConnect* connect = &comm->p2plist.connect;
#if 0
printf("channel %d recv: ", c);
for (int i=0; i<connect->nrecv[c]; i++)
printf("%d ", connect->recv[c*nranks+i]);
printf("\n");
printf("channel %d send: ", c);
for (int i=0; i<connect->nsend[c]; i++)
printf("%d ", connect->send[c*nranks+i]);
printf("\n");
#endif
NCCLCHECK(ncclTransportP2pSetup(comm, NULL, channel, connect->nrecv[c], connect->recv+c*nranks, connect->nsend[c], connect->send+c*nranks));
connect->nrecv[c] = 0;
connect->nsend[c] = 0;
}
}
// We should have allocated all buffers, collective fifos, ... we can
// restore the affinity.
affinity_restore: