Merge pull request #1184 from nusislam/256-channel-2

add 256 channels support
This commit is contained in:
Nusrat Islam
2024-06-04 08:25:34 -05:00
gecommit door GitHub
bovenliggende 6475da2ed9 9746d8ca3f
commit 955347bab4
15 gewijzigde bestanden met toevoegingen van 143 en 68 verwijderingen
+1 -1
Bestand weergeven
@@ -169,7 +169,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file
cudaGetDevice(&cudaDev);
}
char buffer[2048];
char buffer[4096];
size_t len = 0;
if (level == NCCL_LOG_WARN) {
len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ",
+2 -2
Bestand weergeven
@@ -17,11 +17,11 @@ struct RunWorkNop {
__device__ void run(ncclWork *w) {}
};
__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernel_Generic(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernel_Generic(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) {
ncclKernelMain<-1, RunWorkNop, false>(comm, channelMask, workHead);
}
#ifdef ENABLE_COLLTRACE
__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) {
ncclKernelMain<-1, RunWorkNop, true>(comm, channelMask, workHead);
}
#endif
+32 -16
Bestand weergeven
@@ -13,6 +13,7 @@
#include "op128.h"
#include "device_table.h"
#include "network/unpack/unpack_defs.h"
#include "comm.h"
#if defined(__gfx908__) || defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__)
#define COLL_UNROLL 2
@@ -226,21 +227,36 @@ static __forceinline__ __device__ void ncclRedopPtrDeref(struct ncclWorkElem* we
}
template<int SpecializedFnId, typename SpecializedRunWork, bool COLLTRACE>
__forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
__forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) {
const int tid = threadIdx.x;
int x = tid;
int total = 0, y;
int num = MAXCHANNELS/64 > 0 ? MAXCHANNELS/64 : 1;
switch (tid/WARP_SIZE) {
case 0:
if (channelMask & (1ull<<x)) {
int y = __popcll(channelMask & ((1ull<<x)-1));
if (blockIdx.x == y) ncclShmem.channelId = x;
}
if (WARP_SIZE < MAXCHANNELS) {
x = WARP_SIZE + tid;
if (channelMask & (1ull<<x)) {
int y = __popcll(channelMask & ((1ull<<x)-1));
if (blockIdx.x == y) ncclShmem.channelId = x;
//ncclShmem.channelId = blockIdx.x;
for (int i = 0; i < num; i++) {
if (channelMask.masks[i] & (1ull<<x)) {
y = __popcll(channelMask.masks[i] & ((1ull<<x)-1));
y = total + y;
if (blockIdx.x == y) {
ncclShmem.channelId = x + total;
break;
}
}
if (WARP_SIZE < 64) {
x = WARP_SIZE + tid;
if (channelMask.masks[i] & (1ull<<x)) {
y = __popcll(channelMask.masks[i] & ((1ull<<x)-1));
y = y + total;
if (blockIdx.x == y) {
ncclShmem.channelId = x + total;
break;
}
}
}
total = total + __popcll(channelMask.masks[i]);
}
break;
case 1:
@@ -361,23 +377,23 @@ __forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_
#endif
}
__global__ void ncclDevKernel_Generic(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead);
__global__ void ncclDevKernel_Generic(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead);
#ifdef ENABLE_COLLTRACE
__global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead);
__global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead);
#endif
#ifdef ENABLE_COLLTRACE
#define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \
__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \
__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { \
ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>, false>(comm, channelMask, workHead); \
} \
\
__global__ void ncclDevKernelDebug_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \
__global__ void ncclDevKernelDebug_##suffix(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { \
ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>, true>(comm, channelMask, workHead); \
}
#else
#define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \
__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \
__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { \
ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>, false>(comm, channelMask, workHead); \
}
#endif
@@ -394,4 +410,4 @@ __global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, uint64_t ch
}
#endif
#endif
#endif
+15 -6
Bestand weergeven
@@ -616,14 +616,19 @@ static ncclResult_t addP2pToPlan(
static void finishPlan(struct ncclKernelPlan* plan) {
int channelUbound = 0;
int channelCount = 0;
uint64_t channelMask = 0;
//uint64_t channelMask = 0;
struct channelMasks channelMask;
for (int i =0; i < MAXCHANNELS/64; i++) {
channelMask.masks[i] = 0;
}
bool hasProxyOps = false;
for (int c=0; c < MAXCHANNELS; c++) {
struct ncclWorkList* tail = ncclIntruQueueTail(&plan->channels[c].workQueue);
if (tail != nullptr) {
channelUbound = c+1;
channelCount += 1;
channelMask |= 1ull<<c;
//channelMask |= 1ull<<c;
channelMask.masks[c/64] |= 1ull<<(c%64);
tail->work.header.isLast = 1;
finishWork(&tail->work, plan->comm->WarpSize);
}
@@ -2033,20 +2038,24 @@ static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo* info) {
NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId));
if (isSendNotRecv) {
if (comm->channels[channelId].peers[peer]->send[1].connected == 0) { // P2P uses only 1 connector
comm->connectSend[peer] |= (1UL<<channelId);
//comm->connectSend[peer] |= (1UL<<channelId);
comm->connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64));
ncclGroupCommPreconnect(comm);
}
if (comm->p2pNet && comm->channels[channelId].peers[peer]->send[NCCL_CONN_IDX_P2P_NET].connected == 0) {
comm->connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<<channelId);
//comm->connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<<channelId);
comm->connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET].masks[channelId/64] |= (1UL<<(channelId%64));
ncclGroupCommPreconnect(comm);
}
} else {
if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) { // P2P uses only 1 connector
comm->connectRecv[peer] |= (1UL<<channelId);
//comm->connectRecv[peer] |= (1UL<<channelId);
comm->connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64));
ncclGroupCommPreconnect(comm);
}
if (comm->p2pNet && comm->channels[channelId].peers[peer]->recv[NCCL_CONN_IDX_P2P_NET].connected == 0) {
comm->connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<<channelId);
//comm->connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<<channelId);
comm->connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET].masks[channelId/64] |= (1UL<<(channelId%64));
ncclGroupCommPreconnect(comm);
}
}
+8 -3
Bestand weergeven
@@ -282,7 +282,7 @@ static ncclResult_t setTreeDown(struct ncclTree* tree, int* indexes, int d) {
static ncclResult_t connectTrees(struct ncclComm* comm, int* treeToParent, int* treeToChild0, int* treeToChild1, int* treePatterns) {
const int channelLimit = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? MAXCHANNELS/2 : 16;
const int channelLimit = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? 2*CHANNEL_LIMIT : CHANNEL_LIMIT;
const int nChannels = (comm->nChannels > channelLimit) ? comm->nChannels / 2 : comm->nChannels;
const int nNodes = comm->nNodes, node = comm->node;
@@ -625,7 +625,7 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa
NCCLCHECK(connectTrees(comm, treeToParent, treeToChild0, treeToChild1, treePatterns));
// Only use full MAXCHANNELS for gfx94x
int maxChannels = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? MAXCHANNELS : (MAXCHANNELS/2);
int maxChannels = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? MAXCHANNELS : 2*CHANNEL_LIMIT;
// Duplicate ringPrev/ringNext for ncclBuildRing
if (nChannels <= maxChannels/2) memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int));
@@ -668,7 +668,12 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa
nChannels = comm->nChannels = copyChannels(comm, nChannels, 2*nChannels, ringPrev, ringNext);
}
int minNchannels = ncclMinNchannels();
int minNchannels = 64;
if (comm->nNodes == 1) {
minNchannels = ncclMinNchannels();
} else {
minNchannels = std::min(64,ncclMinNchannels());
}
if (mscclEnabled() && (comm->topo->mscclEnabled || mscclForceEnabled())) {
int mscclNumChannelsRequired = 0;
+3 -3
Bestand weergeven
@@ -908,7 +908,7 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) {
comm->p2pnChannelsPerPeer = (ncclParamNChannelsPerPeer() == -2 ? nextPow2(minChannels) : ncclParamNChannelsPerPeer());
// Doubling P2P channels per peer on single node
if (comm->topo->nodes[GPU].count == comm->topo->nRanks && IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94")) comm->p2pnChannelsPerPeer *= 2;
comm->p2pnChannels = nextPow2(comm->p2pnChannels);
comm->p2pnChannels = std::min(nextPow2(comm->p2pnChannels), 4*CHANNEL_LIMIT);
}
// Init channels that weren't used so far
@@ -918,7 +918,7 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) {
// fill the whole space of nChannels. To do so we mirror the bits in the
// nChannels space.
for (int c=0; c<comm->p2pnChannels; c++) {
comm->p2pChannels[c] = mirrorBits(c, comm->p2pnChannels);
comm->p2pChannels[c] = mirrorBits(c, comm->p2pnChannels);
}
return ncclSuccess;
}
@@ -950,4 +950,4 @@ int ncclTopoPathAllNVLink(struct ncclTopoSystem* system) {
}
}
return minPath >= PATH_PIX ? 0 : 1;
}
}
+4 -2
Bestand weergeven
@@ -225,8 +225,10 @@ static void groupCleanup(struct ncclComm** groupCommHeadPtr, struct ncclComm** g
for (int i = 0; i < comm->nRanks; i++) {
comm->tasks.peers[i].sendSeen = false;
comm->tasks.peers[i].recvSeen = false;
comm->connectSend[i] = 0UL;
comm->connectRecv[i] = 0UL;
for (int j = 0; j < MAXCHANNELS/64; j++) {
comm->connectSend[i].masks[j] = 0UL;
comm->connectRecv[i].masks[j] = 0UL;
}
}
comm->unlaunchedPlansHead = nullptr;
// Reclaim abandoned kernel plan memory. Note ncclWork structs were already
+8 -4
Bestand weergeven
@@ -172,6 +172,10 @@ struct ncclNvlsMcHandleList {
size_t size;
};
struct channelMasks {
uint64_t masks[MAXCHANNELS/64];
};
struct ncclKernelPlan {
// A kernel plan is also a callback that reclaims itself. Hence this must
// be the first member.
@@ -185,7 +189,7 @@ struct ncclKernelPlan {
void *kernelFn;
int channelUbound; // only channels c < channelUbound are present
int channelCount; // number of channels present
uint64_t channelMask; // which channels are present, channelCount == popcount(channelMask)
struct channelMasks channelMask;
bool hasProxyOps; // does any channel have a non-empty proxyOpQueue
int threadPerBlock;
// workHeap fields are null until uploadWorkFifo() or preparePersistentKernel()
@@ -226,8 +230,8 @@ struct ncclComm {
ncclCollNet_t* ncclCollNet;
void* bootstrap;
// Bitmasks for ncclTransportP2pSetup
uint64_t* connectSend;
uint64_t* connectRecv;
struct channelMasks* connectSend;
struct channelMasks* connectRecv;
uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
@@ -481,4 +485,4 @@ static inline ncclRedOp_t ncclUserRedOpMangle(ncclComm *comm, ncclRedOp_t op) {
ncclResult_t ncclCommEnsureReady(ncclComm_t comm);
ncclResult_t ncclCommSetAsyncError(ncclComm_t comm, ncclResult_t nextState);
#endif
#endif
+5 -2
Bestand weergeven
@@ -61,7 +61,10 @@ union ncclLLFifoLine {
};
#define WARP_SIZE warpSize
#define MAXCHANNELS 64
#define MAXCHANNELS 128
#define CHANNEL_LIMIT 16
#define NCCL_MAX_NTHREADS 256
#define NCCL_SIMPLE_MAX_NTHREADS NCCL_MAX_NTHREADS
#define NCCL_LL_MAX_NTHREADS NCCL_MAX_NTHREADS
@@ -586,4 +589,4 @@ inline int ncclDevFuncId(int coll, int devRedOp, int type, int algo, int proto)
inline int ncclDevFuncId_P2p() { return ncclDevFuncRowToId[FUNC_INDEX_TOTAL - NCCL_NUM_ONERANK - 1]; }
#endif
#endif
+2 -2
Bestand weergeven
@@ -143,7 +143,7 @@ struct ncclProxyOps {
struct ncclProxySharedP2p {
int refcount;
int size;
int64_t size;
char* cudaBuff;
char* hostBuff;
// CUDA IPC
@@ -331,4 +331,4 @@ ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm);
ncclResult_t ncclProxyDestroy(struct ncclComm* comm);
ncclResult_t mscclSaveProxy(struct ncclComm* comm, struct ncclChannel* channel, int type, int peer, struct ncclProxyOp* op, int connIndex);
#endif
#endif
+4 -4
Bestand weergeven
@@ -1532,7 +1532,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p
TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels);
char line[2048];
char line[4096];
line[0]='\0';
for (int c=0; c<comm->nChannels; c++) {
struct ncclTree* tree = &comm->channels[c].tree;
@@ -1541,7 +1541,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p
INFO(NCCL_GRAPH, "Ring %d : %d -> %d -> %d comm %p nRanks %02d busId %lx", c, comm->channels[c].ring.prev,
comm->rank, comm->channels[c].ring.next, comm, comm->nRanks, comm->busId);
}
line[2047] = '\0';
line[4095] = '\0';
INFO(NCCL_INIT, "Trees%s comm %p nRanks %02d busId %lx", line, comm, comm->nRanks, comm->busId);
NCCLCHECKGOTO(computeBuffSizes(comm), ret, fail);
@@ -1685,13 +1685,13 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p
for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
NCCLCHECKGOTO(ncclChannelCompute(comm, peer, c, ncclFuncSend, &channelId), ret, fail);
if (comm->channels[channelId].peers[peer]->send[1].connected == 0) {
comm->connectSend[peer] |= (1UL<<channelId);
comm->connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64));
}
}
for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
NCCLCHECKGOTO(ncclChannelCompute(comm, peer, c, ncclFuncRecv, &channelId), ret, fail);
if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) {
comm->connectRecv[peer] |= (1UL<<channelId);
comm->connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64));
}
}
}
+2 -1
Bestand weergeven
@@ -1362,7 +1362,8 @@ static ncclResult_t proxyProgressAsync(struct ncclProxyAsyncOp* op, struct ncclP
TRACE(NCCL_PROXY, "proxyProgressAsync::proxyConnect() opId=%p op.reqBuff=%p", op->opId, op->reqBuff);
res = op->connection->tcomm->proxyConnect(op->connection, proxyState, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done);
} else if (op->type == ncclProxyMsgSharedInit) {
int nChannels = (int) *op->reqBuff;
int nChannels = *((int*) op->reqBuff);
TRACE(NCCL_PROXY, "proxyProgressAsync::ncclProxyMsgSharedInit opId=%p op.reqBuff=%p nChannels=%d", op->opId, op->reqBuff, nChannels);
if (op->connection->tcomm->proxySharedInit) res = op->connection->tcomm->proxySharedInit(op->connection, proxyState, nChannels);
__atomic_store_n(&op->connection->state, connSharedInitialized, __ATOMIC_RELEASE);
+53 -19
Bestand weergeven
@@ -55,16 +55,20 @@ static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph*
ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) {
TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv);
struct ncclChannel* channel = &comm->channels[channelId];
uint64_t mask = 1UL << channel->id;
//uint64_t mask = 1UL << channel->id;
uint64_t mask = 1UL << (channel->id % 64);
for (int i=0; i<nrecv; i++) {
int peer = peerRecv[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->recv[connIndex].connected) continue;
comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask;
//comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask;
comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[channel->id / 64] |= mask;
}
for (int i=0; i<nsend; i++) {
int peer = peerSend[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->send[connIndex].connected) continue;
comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask;
//comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask;
comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[channel->id / 64] |= mask;
}
return ncclSuccess;
}
@@ -102,14 +106,19 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
timeLast = timeStart; // struct copy
bool timeReported = false;
int count = 0;
int num = MAXCHANNELS/64;
NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail);
// First time initialization
for (int i=1; i<comm->nRanks; i++) {
int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0);
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
/*uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];*/
struct channelMasks recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
struct channelMasks sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
// Data[i] contains all ncclConnect information for all send and receive connections with a given send and recv peer
// This data is packed in the array based on the number of sendChannels and recvChannels connected with these peers
@@ -117,14 +126,24 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
// The next M entries contain sendData, connection information for send connections
// It's not guaranteed that each entry of data has the same number of total or send/recv specific connections
int p = i-(done+1);
if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
count = 0;
for (int j = 0; j < num; j++) {
if ((recvMask.masks[j]) || (sendMask.masks[j])) {
count++;
}
}
//if ((recvMask.masks[0]) || (sendMask.masks[0])) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
if (count) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
recvData[p] = data[p];
int sendChannels = 0, recvChannels = 0;
int type;
bool proxy;
TIME_START(0);
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) {
//if (recvMask & (1UL<<c)) {
if (recvMask.masks[c/64] & (1UL<<(c%64))) {
NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type, &proxy), ret, fail);
if (type > highestType) highestType = type;
}
@@ -133,7 +152,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
TIME_START(1);
sendData[p] = recvData[p]+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
//if (sendMask & (1UL<<c)) {
if (sendMask.masks[c/64] & (1UL<<(c%64))) {
NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type, &proxy), ret, fail);
if (type > highestType) highestType = type;
needsProxyResult |= proxy;
@@ -166,15 +186,18 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
for (int j=done+1; j<=i; j++) {
int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + j) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
/*uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];*/
struct channelMasks recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
struct channelMasks sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
int p = j-(done+1);
int sendDataOffset = 0;
int recvDataOffset = 0;
for (int c=0; c<MAXCHANNELS; c++) {
TIME_START(3);
if (sendMask & (1UL<<c)) {
//if (sendMask & (1UL<<c)) {
if (sendMask.masks[c/64] & (1UL<<(c%64))) {
struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
@@ -192,7 +215,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
// Start with recv channels
TIME_START(4);
if (recvMask & (1UL<<c)) {
//if (recvMask & (1UL<<c)) {
if (recvMask.masks[c/64] & (1UL<<(c%64))) {
struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
@@ -208,7 +232,15 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
}
TIME_STOP(4);
}
if (sendMask || recvMask) {
count = 0;
for (int j = 0; j < num; j++) {
if ((recvMask.masks[j]) || (sendMask.masks[j])) {
count++;
}
}
//if (sendMask.masks[0] || recvMask.masks[0]) {
if (count) {
free(data[p]);
data[p] = NULL;
}
@@ -250,23 +282,25 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
int sendPeer = (comm->rank + i) % comm->nRanks;
int flag = 0;
for (int j = 0; j < MAXCHANNELS/64; j++) {
if (recvPeer != sendPeer) {
if (comm->connectSend[sendPeer] != 0UL)
if (comm->connectSend[sendPeer].masks[j] != 0UL)
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
if (comm->connectRecv[recvPeer] != 0UL)
if (comm->connectRecv[recvPeer].masks[j] != 0UL)
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
if (comm->connectSend[sendPeer] != 0UL)
if (comm->connectSend[sendPeer].masks[j] != 0UL)
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
if (comm->connectRecv[recvPeer] != 0UL)
if (comm->connectRecv[recvPeer].masks[j] != 0UL)
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
} else {
if (comm->connectSend[sendPeer] != 0UL || comm->connectRecv[recvPeer] != 0UL) {
if (comm->connectSend[sendPeer].masks[j] != 0UL || comm->connectRecv[recvPeer].masks[j] != 0UL) {
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
}
}
comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = 0UL;
comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[j] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[j] = 0UL;
}
}
free(data);
+2 -1
Bestand weergeven
@@ -521,7 +521,8 @@ static ncclResult_t sharedNetBuffersInit(struct ncclProxyState* proxyState, int
struct ncclProxySharedP2p* state = type == 0 ? &peer->send : &peer->recv;
state->refcount++;
if (state->size == 0) {
state->size = nChannels * NCCL_SHARED_STEPS * proxyState->p2pChunkSize;
state->size = (int64_t)nChannels * NCCL_SHARED_STEPS * proxyState->p2pChunkSize;
}
if (size) *size = state->size;
+2 -2
Bestand weergeven
@@ -1089,7 +1089,7 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGatherInfo *a
TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels);
char line[2048];
char line[4096];
line[0]='\0';
for (int c=0; c<comm->nChannels; c++) {
struct ncclTree* tree = &comm->channels[c].tree;
@@ -1098,7 +1098,7 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGatherInfo *a
INFO(NCCL_GRAPH, "Ring %d : %d -> %d -> %d comm %p nRanks %02d busId %lx", c, comm->channels[c].ring.prev,
comm->rank, comm->channels[c].ring.next, comm, comm->nRanks, comm->busId);
}
line[2047] = '\0';
line[4095] = '\0';
INFO(NCCL_INIT, "Trees%s comm %p nRanks %02d busId %lx", line, comm, comm->nRanks, comm->busId);
//NCCLCHECKGOTO(computeBuffSizes(comm), ret, fail);