diff --git a/src/debug.cc b/src/debug.cc index cf68e28c8d..dccea7035f 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -169,7 +169,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file cudaGetDevice(&cudaDev); } - char buffer[2048]; + char buffer[4096]; size_t len = 0; if (level == NCCL_LOG_WARN) { len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ", diff --git a/src/device/common.cu b/src/device/common.cu index 174934cd22..0022b6c233 100644 --- a/src/device/common.cu +++ b/src/device/common.cu @@ -17,11 +17,11 @@ struct RunWorkNop { __device__ void run(ncclWork *w) {} }; -__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernel_Generic(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { +__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernel_Generic(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { ncclKernelMain<-1, RunWorkNop, false>(comm, channelMask, workHead); } #ifdef ENABLE_COLLTRACE -__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { +__launch_bounds__(NCCL_MAX_NTHREADS, 1) __global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { ncclKernelMain<-1, RunWorkNop, true>(comm, channelMask, workHead); } #endif diff --git a/src/device/common.h b/src/device/common.h index 7ebab718ad..acb05c40ac 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -13,6 +13,7 @@ #include "op128.h" #include "device_table.h" #include "network/unpack/unpack_defs.h" +#include "comm.h" #if defined(__gfx908__) || defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) #define COLL_UNROLL 2 @@ -226,21 +227,36 @@ static __forceinline__ __device__ void ncclRedopPtrDeref(struct ncclWorkElem* we } template -__forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { +__forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { const int tid = threadIdx.x; int x = tid; + int total = 0, y; + int num = MAXCHANNELS/64 > 0 ? MAXCHANNELS/64 : 1; + switch (tid/WARP_SIZE) { case 0: - if (channelMask & (1ull<, algo, proto>, false>(comm, channelMask, workHead); \ } \ \ - __global__ void ncclDevKernelDebug_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \ + __global__ void ncclDevKernelDebug_##suffix(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { \ ncclKernelMain, algo, proto>, true>(comm, channelMask, workHead); \ } #else #define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \ - __global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \ + __global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, struct channelMasks channelMask, struct ncclWork* workHead) { \ ncclKernelMain, algo, proto>, false>(comm, channelMask, workHead); \ } #endif @@ -394,4 +410,4 @@ __global__ void ncclDevKernelDebug_Generic(struct ncclDevComm* comm, uint64_t ch } #endif -#endif \ No newline at end of file +#endif diff --git a/src/enqueue.cc b/src/enqueue.cc index 8c6755f8f9..3d8464fa4c 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -616,14 +616,19 @@ static ncclResult_t addP2pToPlan( static void finishPlan(struct ncclKernelPlan* plan) { int channelUbound = 0; int channelCount = 0; - uint64_t channelMask = 0; + //uint64_t channelMask = 0; + struct channelMasks channelMask; + for (int i =0; i < MAXCHANNELS/64; i++) { + channelMask.masks[i] = 0; + } bool hasProxyOps = false; for (int c=0; c < MAXCHANNELS; c++) { struct ncclWorkList* tail = ncclIntruQueueTail(&plan->channels[c].workQueue); if (tail != nullptr) { channelUbound = c+1; channelCount += 1; - channelMask |= 1ull<work.header.isLast = 1; finishWork(&tail->work, plan->comm->WarpSize); } @@ -2033,20 +2038,24 @@ static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo* info) { NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId)); if (isSendNotRecv) { if (comm->channels[channelId].peers[peer]->send[1].connected == 0) { // P2P uses only 1 connector - comm->connectSend[peer] |= (1UL<connectSend[peer] |= (1UL<connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } if (comm->p2pNet && comm->channels[channelId].peers[peer]->send[NCCL_CONN_IDX_P2P_NET].connected == 0) { - comm->connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<connectSend[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } } else { if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) { // P2P uses only 1 connector - comm->connectRecv[peer] |= (1UL<connectRecv[peer] |= (1UL<connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } if (comm->p2pNet && comm->channels[channelId].peers[peer]->recv[NCCL_CONN_IDX_P2P_NET].connected == 0) { - comm->connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET] |= (1UL<connectRecv[peer+comm->nRanks*NCCL_CONN_IDX_P2P_NET].masks[channelId/64] |= (1UL<<(channelId%64)); ncclGroupCommPreconnect(comm); } } diff --git a/src/graph/connect.cc b/src/graph/connect.cc index a87fc92749..f00a34fd8c 100644 --- a/src/graph/connect.cc +++ b/src/graph/connect.cc @@ -282,7 +282,7 @@ static ncclResult_t setTreeDown(struct ncclTree* tree, int* indexes, int d) { static ncclResult_t connectTrees(struct ncclComm* comm, int* treeToParent, int* treeToChild0, int* treeToChild1, int* treePatterns) { - const int channelLimit = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? MAXCHANNELS/2 : 16; + const int channelLimit = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? 2*CHANNEL_LIMIT : CHANNEL_LIMIT; const int nChannels = (comm->nChannels > channelLimit) ? comm->nChannels / 2 : comm->nChannels; const int nNodes = comm->nNodes, node = comm->node; @@ -625,7 +625,7 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa NCCLCHECK(connectTrees(comm, treeToParent, treeToChild0, treeToChild1, treePatterns)); // Only use full MAXCHANNELS for gfx94x - int maxChannels = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? MAXCHANNELS : (MAXCHANNELS/2); + int maxChannels = IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94") ? MAXCHANNELS : 2*CHANNEL_LIMIT; // Duplicate ringPrev/ringNext for ncclBuildRing if (nChannels <= maxChannels/2) memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int)); @@ -668,7 +668,12 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa nChannels = comm->nChannels = copyChannels(comm, nChannels, 2*nChannels, ringPrev, ringNext); } - int minNchannels = ncclMinNchannels(); + int minNchannels = 64; + if (comm->nNodes == 1) { + minNchannels = ncclMinNchannels(); + } else { + minNchannels = std::min(64,ncclMinNchannels()); + } if (mscclEnabled() && (comm->topo->mscclEnabled || mscclForceEnabled())) { int mscclNumChannelsRequired = 0; diff --git a/src/graph/paths.cc b/src/graph/paths.cc index 0b197cab67..b4f474f465 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -908,7 +908,7 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) { comm->p2pnChannelsPerPeer = (ncclParamNChannelsPerPeer() == -2 ? nextPow2(minChannels) : ncclParamNChannelsPerPeer()); // Doubling P2P channels per peer on single node if (comm->topo->nodes[GPU].count == comm->topo->nRanks && IsArchMatch(comm->topo->nodes[GPU].nodes[0].gpu.gcn, "gfx94")) comm->p2pnChannelsPerPeer *= 2; - comm->p2pnChannels = nextPow2(comm->p2pnChannels); + comm->p2pnChannels = std::min(nextPow2(comm->p2pnChannels), 4*CHANNEL_LIMIT); } // Init channels that weren't used so far @@ -918,7 +918,7 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) { // fill the whole space of nChannels. To do so we mirror the bits in the // nChannels space. for (int c=0; cp2pnChannels; c++) { - comm->p2pChannels[c] = mirrorBits(c, comm->p2pnChannels); + comm->p2pChannels[c] = mirrorBits(c, comm->p2pnChannels); } return ncclSuccess; } @@ -950,4 +950,4 @@ int ncclTopoPathAllNVLink(struct ncclTopoSystem* system) { } } return minPath >= PATH_PIX ? 0 : 1; -} \ No newline at end of file +} diff --git a/src/group.cc b/src/group.cc index 708d43e7a2..6200aab189 100644 --- a/src/group.cc +++ b/src/group.cc @@ -225,8 +225,10 @@ static void groupCleanup(struct ncclComm** groupCommHeadPtr, struct ncclComm** g for (int i = 0; i < comm->nRanks; i++) { comm->tasks.peers[i].sendSeen = false; comm->tasks.peers[i].recvSeen = false; - comm->connectSend[i] = 0UL; - comm->connectRecv[i] = 0UL; + for (int j = 0; j < MAXCHANNELS/64; j++) { + comm->connectSend[i].masks[j] = 0UL; + comm->connectRecv[i].masks[j] = 0UL; + } } comm->unlaunchedPlansHead = nullptr; // Reclaim abandoned kernel plan memory. Note ncclWork structs were already diff --git a/src/include/comm.h b/src/include/comm.h index dfabb70632..83973c7605 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -172,6 +172,10 @@ struct ncclNvlsMcHandleList { size_t size; }; +struct channelMasks { + uint64_t masks[MAXCHANNELS/64]; +}; + struct ncclKernelPlan { // A kernel plan is also a callback that reclaims itself. Hence this must // be the first member. @@ -185,7 +189,7 @@ struct ncclKernelPlan { void *kernelFn; int channelUbound; // only channels c < channelUbound are present int channelCount; // number of channels present - uint64_t channelMask; // which channels are present, channelCount == popcount(channelMask) + struct channelMasks channelMask; bool hasProxyOps; // does any channel have a non-empty proxyOpQueue int threadPerBlock; // workHeap fields are null until uploadWorkFifo() or preparePersistentKernel() @@ -226,8 +230,8 @@ struct ncclComm { ncclCollNet_t* ncclCollNet; void* bootstrap; // Bitmasks for ncclTransportP2pSetup - uint64_t* connectSend; - uint64_t* connectRecv; + struct channelMasks* connectSend; + struct channelMasks* connectRecv; uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches. @@ -481,4 +485,4 @@ static inline ncclRedOp_t ncclUserRedOpMangle(ncclComm *comm, ncclRedOp_t op) { ncclResult_t ncclCommEnsureReady(ncclComm_t comm); ncclResult_t ncclCommSetAsyncError(ncclComm_t comm, ncclResult_t nextState); -#endif \ No newline at end of file +#endif diff --git a/src/include/device.h b/src/include/device.h index 3cf7cdf57a..4f0cec9490 100644 --- a/src/include/device.h +++ b/src/include/device.h @@ -61,7 +61,10 @@ union ncclLLFifoLine { }; #define WARP_SIZE warpSize -#define MAXCHANNELS 64 + +#define MAXCHANNELS 128 +#define CHANNEL_LIMIT 16 + #define NCCL_MAX_NTHREADS 256 #define NCCL_SIMPLE_MAX_NTHREADS NCCL_MAX_NTHREADS #define NCCL_LL_MAX_NTHREADS NCCL_MAX_NTHREADS @@ -586,4 +589,4 @@ inline int ncclDevFuncId(int coll, int devRedOp, int type, int algo, int proto) inline int ncclDevFuncId_P2p() { return ncclDevFuncRowToId[FUNC_INDEX_TOTAL - NCCL_NUM_ONERANK - 1]; } -#endif \ No newline at end of file +#endif diff --git a/src/include/proxy.h b/src/include/proxy.h index fa748fd534..4ad426518c 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -143,7 +143,7 @@ struct ncclProxyOps { struct ncclProxySharedP2p { int refcount; - int size; + int64_t size; char* cudaBuff; char* hostBuff; // CUDA IPC @@ -331,4 +331,4 @@ ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm); ncclResult_t ncclProxyDestroy(struct ncclComm* comm); ncclResult_t mscclSaveProxy(struct ncclComm* comm, struct ncclChannel* channel, int type, int peer, struct ncclProxyOp* op, int connIndex); -#endif \ No newline at end of file +#endif diff --git a/src/init.cc b/src/init.cc index b68639ada8..162e1a6f45 100644 --- a/src/init.cc +++ b/src/init.cc @@ -1532,7 +1532,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels); - char line[2048]; + char line[4096]; line[0]='\0'; for (int c=0; cnChannels; c++) { struct ncclTree* tree = &comm->channels[c].tree; @@ -1541,7 +1541,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p INFO(NCCL_GRAPH, "Ring %d : %d -> %d -> %d comm %p nRanks %02d busId %lx", c, comm->channels[c].ring.prev, comm->rank, comm->channels[c].ring.next, comm, comm->nRanks, comm->busId); } - line[2047] = '\0'; + line[4095] = '\0'; INFO(NCCL_INIT, "Trees%s comm %p nRanks %02d busId %lx", line, comm, comm->nRanks, comm->busId); NCCLCHECKGOTO(computeBuffSizes(comm), ret, fail); @@ -1685,13 +1685,13 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p for (int c=0; cp2pnChannelsPerPeer; c++) { NCCLCHECKGOTO(ncclChannelCompute(comm, peer, c, ncclFuncSend, &channelId), ret, fail); if (comm->channels[channelId].peers[peer]->send[1].connected == 0) { - comm->connectSend[peer] |= (1UL<connectSend[peer].masks[channelId/64] |= (1UL<<(channelId%64)); } } for (int c=0; cp2pnChannelsPerPeer; c++) { NCCLCHECKGOTO(ncclChannelCompute(comm, peer, c, ncclFuncRecv, &channelId), ret, fail); if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) { - comm->connectRecv[peer] |= (1UL<connectRecv[peer].masks[channelId/64] |= (1UL<<(channelId%64)); } } } diff --git a/src/proxy.cc b/src/proxy.cc index e7e7feb99f..a69fc9f39e 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1362,7 +1362,8 @@ static ncclResult_t proxyProgressAsync(struct ncclProxyAsyncOp* op, struct ncclP TRACE(NCCL_PROXY, "proxyProgressAsync::proxyConnect() opId=%p op.reqBuff=%p", op->opId, op->reqBuff); res = op->connection->tcomm->proxyConnect(op->connection, proxyState, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done); } else if (op->type == ncclProxyMsgSharedInit) { - int nChannels = (int) *op->reqBuff; + int nChannels = *((int*) op->reqBuff); + TRACE(NCCL_PROXY, "proxyProgressAsync::ncclProxyMsgSharedInit opId=%p op.reqBuff=%p nChannels=%d", op->opId, op->reqBuff, nChannels); if (op->connection->tcomm->proxySharedInit) res = op->connection->tcomm->proxySharedInit(op->connection, proxyState, nChannels); __atomic_store_n(&op->connection->state, connSharedInitialized, __ATOMIC_RELEASE); diff --git a/src/transport.cc b/src/transport.cc index 2731d2088d..cc09bb0515 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -55,16 +55,20 @@ static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) { TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv); struct ncclChannel* channel = &comm->channels[channelId]; - uint64_t mask = 1UL << channel->id; + //uint64_t mask = 1UL << channel->id; + uint64_t mask = 1UL << (channel->id % 64); + for (int i=0; i= comm->nRanks || peer == comm->rank || channel->peers[peer]->recv[connIndex].connected) continue; - comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask; + //comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask; + comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[channel->id / 64] |= mask; } for (int i=0; i= comm->nRanks || peer == comm->rank || channel->peers[peer]->send[connIndex].connected) continue; - comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask; + //comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask; + comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[channel->id / 64] |= mask; } return ncclSuccess; } @@ -102,14 +106,19 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* timeLast = timeStart; // struct copy bool timeReported = false; + int count = 0; + int num = MAXCHANNELS/64; + NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail); // First time initialization for (int i=1; inRanks; i++) { int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0); int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; - uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; - uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; + /*uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; + uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];*/ + struct channelMasks recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; + struct channelMasks sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; // Data[i] contains all ncclConnect information for all send and receive connections with a given send and recv peer // This data is packed in the array based on the number of sendChannels and recvChannels connected with these peers @@ -117,14 +126,24 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* // The next M entries contain sendData, connection information for send connections // It's not guaranteed that each entry of data has the same number of total or send/recv specific connections int p = i-(done+1); - if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS)); + count = 0; + for (int j = 0; j < num; j++) { + if ((recvMask.masks[j]) || (sendMask.masks[j])) { + count++; + } + } + + //if ((recvMask.masks[0]) || (sendMask.masks[0])) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS)); + if (count) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS)); + recvData[p] = data[p]; int sendChannels = 0, recvChannels = 0; int type; bool proxy; TIME_START(0); for (int c=0; c(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type, &proxy), ret, fail); if (type > highestType) highestType = type; } @@ -133,7 +152,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_START(1); sendData[p] = recvData[p]+recvChannels; for (int c=0; c(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type, &proxy), ret, fail); if (type > highestType) highestType = type; needsProxyResult |= proxy; @@ -166,15 +186,18 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* for (int j=done+1; j<=i; j++) { int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + j) % comm->nRanks; - uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; - uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; + /*uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; + uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];*/ + struct channelMasks recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; + struct channelMasks sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; int p = j-(done+1); int sendDataOffset = 0; int recvDataOffset = 0; for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; // This connector hasn't completed connection yet if (conn->connected == 0) { @@ -192,7 +215,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* // Start with recv channels TIME_START(4); - if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; // This connector hasn't completed connection yet if (conn->connected == 0) { @@ -208,7 +232,15 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* } TIME_STOP(4); } - if (sendMask || recvMask) { + + count = 0; + for (int j = 0; j < num; j++) { + if ((recvMask.masks[j]) || (sendMask.masks[j])) { + count++; + } + } + //if (sendMask.masks[0] || recvMask.masks[0]) { + if (count) { free(data[p]); data[p] = NULL; } @@ -250,23 +282,25 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* int sendPeer = (comm->rank + i) % comm->nRanks; int flag = 0; + for (int j = 0; j < MAXCHANNELS/64; j++) { if (recvPeer != sendPeer) { - if (comm->connectSend[sendPeer] != 0UL) + if (comm->connectSend[sendPeer].masks[j] != 0UL) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail); - if (comm->connectRecv[recvPeer] != 0UL) + if (comm->connectRecv[recvPeer].masks[j] != 0UL) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, &flag, sizeof(int)), ret, fail); - if (comm->connectSend[sendPeer] != 0UL) + if (comm->connectSend[sendPeer].masks[j] != 0UL) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail); - if (comm->connectRecv[recvPeer] != 0UL) + if (comm->connectRecv[recvPeer].masks[j] != 0UL) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, &flag, sizeof(int)), ret, fail); } else { - if (comm->connectSend[sendPeer] != 0UL || comm->connectRecv[recvPeer] != 0UL) { + if (comm->connectSend[sendPeer].masks[j] != 0UL || comm->connectRecv[recvPeer].masks[j] != 0UL) { NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail); NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail); } } - comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = 0UL; + comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[j] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[j] = 0UL; + } } free(data); diff --git a/src/transport/net.cc b/src/transport/net.cc index d0a5943090..3fdc1dda7a 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -521,7 +521,8 @@ static ncclResult_t sharedNetBuffersInit(struct ncclProxyState* proxyState, int struct ncclProxySharedP2p* state = type == 0 ? &peer->send : &peer->recv; state->refcount++; if (state->size == 0) { - state->size = nChannels * NCCL_SHARED_STEPS * proxyState->p2pChunkSize; + state->size = (int64_t)nChannels * NCCL_SHARED_STEPS * proxyState->p2pChunkSize; + } if (size) *size = state->size; diff --git a/tools/topo_expl/utils.cpp b/tools/topo_expl/utils.cpp index 19bb504555..9aa3c81a3d 100644 --- a/tools/topo_expl/utils.cpp +++ b/tools/topo_expl/utils.cpp @@ -1089,7 +1089,7 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGatherInfo *a TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels); - char line[2048]; + char line[4096]; line[0]='\0'; for (int c=0; cnChannels; c++) { struct ncclTree* tree = &comm->channels[c].tree; @@ -1098,7 +1098,7 @@ ncclResult_t initTransportsRank_3(struct ncclComm* comm, struct allGatherInfo *a INFO(NCCL_GRAPH, "Ring %d : %d -> %d -> %d comm %p nRanks %02d busId %lx", c, comm->channels[c].ring.prev, comm->rank, comm->channels[c].ring.next, comm, comm->nRanks, comm->busId); } - line[2047] = '\0'; + line[4095] = '\0'; INFO(NCCL_INIT, "Trees%s comm %p nRanks %02d busId %lx", line, comm, comm->nRanks, comm->busId); //NCCLCHECKGOTO(computeBuffSizes(comm), ret, fail);