From bcf4ecb0e3ed8b5a93994ba82c19b2b16af0b54a Mon Sep 17 00:00:00 2001 From: Wenkai Du <43822138+wenkaidu@users.noreply.github.com> Date: Fri, 5 Mar 2021 19:59:41 -0800 Subject: [PATCH] Enable local sendrecv over network if GDR is available on all GPUs (#324) [ROCm/rccl commit: c018edf0f28aaaae862b97943d0a76b79423d409] --- projects/rccl/src/channel.cc | 8 ++ .../rccl/src/collectives/device/primitives.h | 9 ++- .../rccl/src/collectives/device/sendrecv.h | 4 +- projects/rccl/src/enqueue.cc | 12 +-- projects/rccl/src/graph/connect.cc | 4 +- projects/rccl/src/graph/paths.cc | 25 +++++- projects/rccl/src/graph/search.cc | 24 +++--- projects/rccl/src/graph/topo.h | 4 +- projects/rccl/src/include/comm.h | 3 + projects/rccl/src/include/devcomm.h | 5 ++ projects/rccl/src/init.cc | 20 ++++- projects/rccl/src/proxy.cc | 12 +-- projects/rccl/src/transport.cc | 76 +++++++++++++++++-- projects/rccl/tools/topo_expl/utils.cpp | 1 + 14 files changed, 163 insertions(+), 44 deletions(-) diff --git a/projects/rccl/src/channel.cc b/projects/rccl/src/channel.cc index f4c969898c..635f07869b 100644 --- a/projects/rccl/src/channel.cc +++ b/projects/rccl/src/channel.cc @@ -23,6 +23,8 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelid) { for (size_t i=0; inRanks+1; ++i) { channel->peers[i].send.comm = comm; channel->peers[i].recv.comm = comm; + channel->peers[i].p2pSend.comm = comm; + channel->peers[i].p2pRecv.comm = comm; } // Per-channel operation list. @@ -46,10 +48,16 @@ ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks) { for (int r=0; rpeers+r; if (peer->send.transportResources) NCCLCHECK(peer->send.transportComm->free(peer->send.transportResources)); + if (peer->send.transportResources == peer->p2pSend.transportResources) peer->p2pSend.transportResources = NULL; + peer->send.transportResources = NULL; + if (peer->p2pSend.transportResources) NCCLCHECK(peer->p2pSend.transportComm->free(peer->p2pSend.transportResources)); } for (int r=0; rpeers+r; if (peer->recv.transportResources) NCCLCHECK(peer->recv.transportComm->free(peer->recv.transportResources)); + if (peer->recv.transportResources == peer->p2pRecv.transportResources) peer->p2pRecv.transportResources = NULL; + peer->recv.transportResources = NULL; + if (peer->p2pRecv.transportResources) NCCLCHECK(peer->p2pRecv.transportComm->free(peer->p2pRecv.transportResources)); } // Free the peer structures. diff --git a/projects/rccl/src/collectives/device/primitives.h b/projects/rccl/src/collectives/device/primitives.h index 5ffb02601c..ad5593c20c 100644 --- a/projects/rccl/src/collectives/device/primitives.h +++ b/projects/rccl/src/collectives/device/primitives.h @@ -71,6 +71,7 @@ class ncclPrimitives { int peer = -1; int role = 0; int group; + const int p2p; uint64_t step; T* direct = NULL; T* buff; @@ -198,7 +199,7 @@ class ncclPrimitives { __device__ __forceinline__ void loadRecvConn(struct ncclChannel* channel, T* directBuff) { if (role & (ROLE_WAIT_RECV|ROLE_POST_RECV)) { - conn = &channel->devPeers[peer].recv.conn; + conn = (LOAD(comm->p2pNet) && p2p) ? &channel->devPeers[peer].p2pRecv.conn : &channel->devPeers[peer].recv.conn; step = conn->step; step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS); if (role & ROLE_POST_RECV) { @@ -221,7 +222,7 @@ class ncclPrimitives { __device__ __forceinline__ void loadSendConn(struct ncclChannel* channel) { if (role & (ROLE_WAIT_SEND|ROLE_POST_SEND)) { - conn = &channel->devPeers[peer].send.conn; + conn = (LOAD(comm->p2pNet) && p2p) ? &channel->devPeers[peer].p2pSend.conn : &channel->devPeers[peer].send.conn; step = conn->step; step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS); if (role & ROLE_POST_SEND) { @@ -251,8 +252,8 @@ class ncclPrimitives { public: __device__ __forceinline__ - ncclPrimitives(const int tid, const int nworkers, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, struct ncclShmemPtrs* ptrs, int group) - : comm(comm), tid(tid), nworkers(nworkers), stepSize(stepSize), srcs((const T**)ptrs[group].srcs), dsts((T**)ptrs[group].dsts), group(group), barriers(&ptrs[group].barrier), barrier_next(ptrs[group].barrier_next) { + ncclPrimitives(const int tid, const int nworkers, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, struct ncclShmemPtrs* ptrs, int group, int p2p = 0) + : comm(comm), tid(tid), nworkers(nworkers), stepSize(stepSize), srcs((const T**)ptrs[group].srcs), dsts((T**)ptrs[group].dsts), group(group), barriers(&ptrs[group].barrier), barrier_next(ptrs[group].barrier_next), p2p(p2p) { nthreads = nworkers; // For send operations, we need an extra warp to overlap the threadfence and the copy // int postThreads = NSEND && nworkers >= 64 ? WARP_SIZE : 0; diff --git a/projects/rccl/src/collectives/device/sendrecv.h b/projects/rccl/src/collectives/device/sendrecv.h index d9947bf050..abc6368ae3 100644 --- a/projects/rccl/src/collectives/device/sendrecv.h +++ b/projects/rccl/src/collectives/device/sendrecv.h @@ -56,7 +56,7 @@ class ncclFunctionrank-delta+comm->nRanks)%comm->nRanks; int nt = nThreadsSplit; ncclPrimitives - prims(tid, nt, &peer, NULL, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupRecv); + prims(tid, nt, &peer, NULL, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupRecv, 1); if (recvCount == 0) { prims.recv(recvbuff, 0); @@ -71,7 +71,7 @@ class ncclFunctionrank+delta)%comm->nRanks; int nt = nThreads-nThreadsSplit; ncclPrimitives - prims(tid-nThreadsSplit, nt, NULL, &peer, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupSend); + prims(tid-nThreadsSplit, nt, NULL, &peer, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupSend, 1); if (sendCount == 0) { prims.send(sendbuff, 0); diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index 43848017f6..e331a86b47 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -391,8 +391,8 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo NCCLCHECK(getLoopInfo(info)); if ((info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllToAllv) - && info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME) - info->nChannels =info->comm->p2pnChannels; + && info->comm->topo->nodes[GPU].count == info->comm->topo->nRanks && (info->comm->topo->type & RCCL_TOPO_4P2H_ROME)) + info->nChannels = info->comm->p2pnChannels; work->opCount = info->comm->opCount; work->sendbuff = info->sendbuff; @@ -617,7 +617,8 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { int delta = (comm->nRanks - (comm->rank-peer)) % comm->nRanks; for (int c=0; cp2pnChannelsPerPeer; c++) { int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].send.connected == 0) { + if ((LOAD(comm->p2pNet) ? comm->channels[channelId].peers[peer].p2pSend.connected : + comm->channels[channelId].peers[peer].send.connected) == 0) { comm->connectSend[peer] |= (1<connect = 1; } @@ -630,7 +631,8 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { int delta = (comm->nRanks + (comm->rank-peer)) % comm->nRanks; for (int c=0; cp2pnChannelsPerPeer; c++) { int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].recv.connected == 0) { + if ((LOAD(comm->p2pNet) ? comm->channels[channelId].peers[peer].p2pRecv.connected : + comm->channels[channelId].peers[peer].recv.connected ) == 0) { comm->connectRecv[peer] |= (1<connect = 1; } @@ -643,7 +645,7 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { } static int getSegment(struct ncclInfo* info, struct ncclWork* work) { - const int e = (info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME) + const int e = (info->comm->topo->nodes[GPU].count == info->comm->topo->nRanks && (info->comm->topo->type & RCCL_TOPO_4P2H_ROME)) ? 1 : NCCL_MAX_WORK_ELEMENTS; for (int s=0; selems[s].p2p.delta != info->delta; s++) { if (work->elems[s].p2p.nThreads == 0) return s; diff --git a/projects/rccl/src/graph/connect.cc b/projects/rccl/src/graph/connect.cc index ce60f41121..653e560ab6 100644 --- a/projects/rccl/src/graph/connect.cc +++ b/projects/rccl/src/graph/connect.cc @@ -267,9 +267,9 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa int nc = nChannels*2; if (gcn == 908) nc = std::max(nc, 4); - if (comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_CR8G) nc = nChannels*4; + if (comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_CR8G)) nc = nChannels*4; if (!nnets) nnets = comm->topo->nodes[NET].count; - if (nnets && comm->topo->type == RCCL_TOPO_4P2H_ROME) nc = (nnets > 3 ? 2 : 4)*nnets; + if (nnets && (comm->topo->type & RCCL_TOPO_4P2H_ROME)) nc = (nnets > 3 ? 2 : 4)*nnets; int end = std::min((int)ncclMaxNchannels(), std::max(nc, ncclMinNchannels())); // Duplication should be complete now diff --git a/projects/rccl/src/graph/paths.cc b/projects/rccl/src/graph/paths.cc index 8475bf1ef0..92ed788b67 100644 --- a/projects/rccl/src/graph/paths.cc +++ b/projects/rccl/src/graph/paths.cc @@ -468,8 +468,29 @@ ncclResult_t ncclTopoTrimSystem(struct ncclTopoSystem* system, struct ncclComm* NCCLCHECK(ncclTopoRemoveNode(system, GPU, g)); } + int remove = 1; + int arch, vendor, model; + NCCLCHECK(ncclTopoCpuType(system, &arch, &vendor, &model)); + if (arch == NCCL_TOPO_CPU_ARCH_X86 && vendor == NCCL_TOPO_CPU_VENDOR_AMD + && model == NCCL_TOPO_CPU_TYPE_ROME) { + int gdr, ret = 1; + int64_t net; + for (int g = 0; g < system->nodes[GPU].count; g++) { + NCCLCHECK(ncclTopoGetLocalNet(system, system->nodes[GPU].nodes[g].gpu.rank, &net, 0)); + NCCLCHECK(ncclTopoCheckGdr(system, system->nodes[GPU].nodes[g].id, net, 1, &gdr)); + if (!gdr) { + ret = 0; + break; + } + } + if (ret) { + system->type |= RCCL_TOPO_GDR_ALL; + remove = 0; + INFO(NCCL_GRAPH, "GDR is available on all GPUs"); + } + } comm->localRanks = system->nodes[GPU].count; - if (system->nodes[GPU].count == comm->nRanks) { + if (system->nodes[GPU].count == comm->nRanks && remove) { for (int n=system->nodes[NET].count-1; n>=0; n--) NCCLCHECK(ncclTopoRemoveNode(system, NET, n)); } @@ -529,7 +550,7 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) { } } - if (comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_4P2H_ROME) { + if (comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_4P2H_ROME) && !(comm->topo->type & RCCL_TOPO_GDR_ALL)) { // Adjust P2P channels on Rome comm->p2pnChannelsPerPeer = 2; comm->p2pnChannels = 2; diff --git a/projects/rccl/src/graph/search.cc b/projects/rccl/src/graph/search.cc index cb34d515b6..d52dbe1315 100644 --- a/projects/rccl/src/graph/search.cc +++ b/projects/rccl/src/graph/search.cc @@ -525,7 +525,7 @@ ncclResult_t ncclTopoSearchRecNet(struct ncclTopoSystem* system, struct ncclTopo * `--> NET n (or m if crossNic) */ ncclResult_t ncclTopoSearchParams(struct ncclTopoSystem* system, int pattern, int* backToNet, int* backToFirstRank) { - if (system->nodes[NET].count) { + if (system->nodes[NET].count && system->nodes[GPU].count != system->nRanks) { if (pattern == NCCL_TOPO_PATTERN_RING) *backToNet = system->nodes[GPU].count-1; else if (pattern == NCCL_TOPO_PATTERN_SPLIT_TREE) *backToNet = 1; else *backToNet = 0; @@ -541,7 +541,7 @@ ncclResult_t ncclTopoSearchParams(struct ncclTopoSystem* system, int pattern, in ncclResult_t ncclTopoSearchRec(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int* time) { int backToNet, backToFirstRank; NCCLCHECK(ncclTopoSearchParams(system, graph->pattern, &backToNet, &backToFirstRank)); - if (system->nodes[NET].count) { + if (system->nodes[NET].count && system->nodes[GPU].count != system->nRanks) { // Start from NET ncclTopoSearchRecNet(system, graph, saveGraph, backToNet, backToFirstRank, time); } else { @@ -831,9 +831,9 @@ static ncclResult_t parseChordalRing(struct ncclTopoSystem* system, struct ncclT dist[m] = dist[n]; dist[n] = temp; } // create chordal ring based on reference and remapped ids - system->type = RCCL_TOPO_CR8G; + system->type |= RCCL_TOPO_CR8G; NCCLCHECK(parseGraph(ringBase, system, graph, id, 0, NULL)); - if (system->nodes[NET].count) { + if (system->nodes[NET].count && system->nodes[GPU].count != system->nRanks) { int *intra, *used; graph->nChannels = system->nodes[NET].count; NCCLCHECK(ncclCalloc(&intra, ngpus)); @@ -909,8 +909,7 @@ static ncclResult_t parseRomeSystem(struct ncclTopoSystem* system, struct rcclRo romeTopo->connMatrix[i*romeTopo->nGpus+n] = 1; count ++; } - if (!romeTopo->nLinks) romeTopo->nLinks = count; - else if (romeTopo->nLinks != count) return ncclSuccess; + if (romeTopo->nLinks < count) romeTopo->nLinks = count; } // trim ports and create NET map @@ -1050,10 +1049,10 @@ static ncclResult_t parseRome4P2H(struct ncclTopoSystem* system, struct ncclTopo struct rcclRomeModel romeTopo; char pattern[256]; int net_map[MAX_ROME_NICS]; - parseRomeSystem(system, &romeTopo, pattern, net_map); + NCCLCHECK(parseRomeSystem(system, &romeTopo, pattern, net_map)); // recognize system as Rome 4P2H even if no matching model - if (ngpus == 8 && romeTopo.nLinks) system->type = RCCL_TOPO_4P2H_ROME; + if (ngpus > 4 && romeTopo.nLinks) system->type |= RCCL_TOPO_4P2H_ROME; int g[MAX_ROME_GPUS]; int time = 0; @@ -1135,8 +1134,7 @@ ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph // user supplied topo NCCLCHECK(parseGraph(str, system, graph, NULL, nnets, NULL)); if (graph->nChannels) { - system->type = RCCL_TOPO_4P2H_ROME; - return ncclSuccess; + system->type |= RCCL_TOPO_4P2H_ROME; } } else { // try to match 8P6L @@ -1144,8 +1142,8 @@ ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph if (graph->nChannels) return ncclSuccess; // try to match Rome 4P2H NCCLCHECK(parseRome4P2H(system, graph)); - if (graph->nChannels) return ncclSuccess; } + if (graph->nChannels) return ncclSuccess; if (ngpus == 1) if (graph->pattern != NCCL_TOPO_PATTERN_RING) graph->pattern = NCCL_TOPO_PATTERN_TREE; @@ -1293,7 +1291,7 @@ ncclResult_t ncclTopoPrintGraph(struct ncclTopoSystem* system, struct ncclTopoGr for (int c=0; cnChannels; c++) { sprintf(line, "%2d :", c); int offset = strlen(line); - if (system->nodes[NET].count > 0) { + if (system->nodes[NET].count > 0 && system->nodes[GPU].count != system->nRanks) { sprintf(line+offset, " %s/%d", topoNodeTypeStr[NET], graph->inter[2*c]); offset = strlen(line); } @@ -1301,7 +1299,7 @@ ncclResult_t ncclTopoPrintGraph(struct ncclTopoSystem* system, struct ncclTopoGr sprintf(line+offset, " %s/%d", topoNodeTypeStr[GPU], graph->intra[ngpus*c+i]); offset = strlen(line); } - if (system->nodes[NET].count > 0) { + if (system->nodes[NET].count > 0 && system->nodes[GPU].count != system->nRanks) { sprintf(line+offset, " %s/%d", topoNodeTypeStr[NET], graph->inter[2*c+1]); offset = strlen(line); } diff --git a/projects/rccl/src/graph/topo.h b/projects/rccl/src/graph/topo.h index 379b5a3351..f4054c23f3 100644 --- a/projects/rccl/src/graph/topo.h +++ b/projects/rccl/src/graph/topo.h @@ -82,8 +82,9 @@ struct ncclTopoLinkList { #define NCCL_TOPO_UNDEF (-1) -#define RCCL_TOPO_CR8G 1 +#define RCCL_TOPO_CR8G 1 #define RCCL_TOPO_4P2H_ROME 2 +#define RCCL_TOPO_GDR_ALL 4 struct ncclTopoNode { int type; @@ -132,6 +133,7 @@ struct ncclTopoSystem { float maxWidth; float totalWidth; int type; + int nRanks; }; ncclResult_t ncclTopoGetNode(struct ncclTopoSystem* system, struct ncclTopoNode** node, int type, uint64_t id); diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index 4817162ba6..a507a5616f 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -121,6 +121,9 @@ struct ncclComm { // Flag to ask NCCL kernels to abort volatile uint32_t *abortFlag; + // Flags for enable P2P NET + uint32_t *p2pNet; + // Device side of the communicator struct ncclDevComm *devComm; // Host copy of the devComm (to free CUDA allocs) diff --git a/projects/rccl/src/include/devcomm.h b/projects/rccl/src/include/devcomm.h index bbccb3ac84..06a2742acf 100644 --- a/projects/rccl/src/include/devcomm.h +++ b/projects/rccl/src/include/devcomm.h @@ -150,6 +150,8 @@ struct ncclTree { struct ncclPeer { struct ncclConnector send; struct ncclConnector recv; + struct ncclConnector p2pSend; + struct ncclConnector p2pRecv; }; struct ncclDevComm; @@ -332,6 +334,9 @@ struct ncclDevComm { // Channels, device side struct ncclChannel* channels; + // Flags for enable P2P NET + uint32_t *p2pNet; + #ifdef ENABLE_PROFILING // Profiling counters struct ncclProf* devProf; diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index 76237e7b22..1c7c81f7b8 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -352,6 +352,7 @@ static ncclResult_t commFree(ncclComm_t comm) { free(comm->intraCC); } NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag)); + NCCLCHECK(ncclCudaHostFree((void *)comm->p2pNet)); // Poison comm to try and catch a double free commPoison(comm); @@ -362,6 +363,7 @@ static ncclResult_t commFree(ncclComm_t comm) { RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 1); RCCL_PARAM(ForceEnableClique, "FORCE_ENABLE_CLIQUE", 0); +RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 0); static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { if (ndev < 1) { @@ -401,6 +403,10 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { comm->hostDevComm.abortFlag = comm->abortFlag; STORE(comm->abortFlag, 0); + NCCLCHECK(ncclCudaHostCalloc((uint32_t**)&comm->p2pNet, 1)); + comm->hostDevComm.p2pNet = comm->p2pNet; + STORE(comm->p2pNet, 0); + comm->argsptr = &comm->args; #ifdef ENABLE_PROFILING NCCLCHECK(ncclCudaCalloc(&comm->hostDevComm.devProf, 1)); @@ -808,6 +814,8 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm // Topo detection / System graph creation NCCLCHECK(ncclTopoGetSystem(comm, &comm->topo)); + // save nRanks to ncclTopoSystem as indicator of multi-node + comm->topo->nRanks = comm->nRanks; // Compute paths between GPUs and NICs NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo)); // Remove inaccessible GPUs and unused NICs @@ -852,7 +860,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm if (!rcclParamForceEnableClique()) { // Disable clique-kernel support if not on CR8 topology - if (!(comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_CR8G)) + if (!(comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_CR8G))) { INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (force enable with RCCL_FORCE_ENABLE_CLIQUE)"); cliqueMode = CliqueManager::CLIQUE_DISABLED; @@ -898,6 +906,14 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(ncclTopoDumpGraphs(comm->topo, 3, graphs)); } + if ((comm->topo->type & RCCL_TOPO_4P2H_ROME) && (comm->topo->type & RCCL_TOPO_GDR_ALL)) { + if (rcclParamP2pNetDisable() == 0) { + STORE(comm->p2pNet, 1); + INFO(NCCL_INIT, "RCCL enabled same node P2P over network"); + } + else + INFO(NCCL_INIT, "RCCL force disabled same node P2P over network"); + } // AllGather3 - begin struct ncclGraphInfo { int pattern; @@ -925,7 +941,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(ncclTopoIdToIndex(comm->topo, GPU, myInfo->busId, &idx)); allGather3Data[rank].cudaCompCap = comm->topo->nodes[GPU].nodes[idx].gpu.cudaCompCap; allGather3Data[rank].gcn = comm->topo->nodes[GPU].nodes[idx].gpu.gcn; - allGather3Data[rank].alltoallDisable = comm->topo->nodes[NET].count? 1 : comm->alltoallDisable; + allGather3Data[rank].alltoallDisable = comm->topo->nodes[GPU].count == comm->topo->nRanks ? 1 : comm->alltoallDisable; allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); diff --git a/projects/rccl/src/proxy.cc b/projects/rccl/src/proxy.cc index 29cf1fda39..84a632e724 100644 --- a/projects/rccl/src/proxy.cc +++ b/projects/rccl/src/proxy.cc @@ -9,7 +9,7 @@ #include "graph.h" #include "collectives.h" -enum { proxyRecv=0, proxySend=1 }; +enum { proxyRecv=0, proxySend=1, p2pProxyRecv=2, p2pProxySend=3 }; static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true; @@ -162,10 +162,12 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) { if (peer < 0) return ncclSuccess; struct ncclPeer* peerComm = args->channel->peers+peer; - struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send; + struct ncclConnector* connector = type < p2pProxyRecv ? (type == proxyRecv ? &peerComm->recv : &peerComm->send) + : (type == p2pProxyRecv ? &peerComm->p2pRecv : &peerComm->p2pSend); if (connector->transportComm == NULL) { WARN("[%d] Error no transport for %s peer %d on channel %d", connector->comm->rank, - type == proxyRecv ? "recv" : "send", peer, args->channel->id); + type < p2pProxyRecv ? (type == proxyRecv ? "recv" : "send") : (type == p2pProxyRecv ? "p2pRecv" : "p2pSend"), + peer, args->channel->id); return ncclInternalError; } if (connector->transportComm->proxy == NULL) return ncclSuccess; @@ -238,7 +240,7 @@ ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel if (args.nsteps == 0) args.nsteps = 1; args.recvbytes = info->recvbytes; args.sendbytes = 0; - NCCLCHECK(SaveProxy(proxyRecv, peerrecv, &args)); + NCCLCHECK(SaveProxy(LOAD(info->comm->p2pNet) ? p2pProxyRecv : proxyRecv, peerrecv, &args)); } if (info->delta > 0 && info->sendbytes >= 0) { int peersend = (info->comm->rank+info->delta)%info->comm->nRanks; @@ -246,7 +248,7 @@ ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel if (args.nsteps == 0) args.nsteps = 1; args.sendbytes = info->sendbytes; args.recvbytes = 0; - NCCLCHECK(SaveProxy(proxySend, peersend, &args)); + NCCLCHECK(SaveProxy(LOAD(info->comm->p2pNet) ? p2pProxySend : proxySend, peersend, &args)); } return ncclSuccess; } diff --git a/projects/rccl/src/transport.cc b/projects/rccl/src/transport.cc index 80800258e0..c81f9b9c48 100644 --- a/projects/rccl/src/transport.cc +++ b/projects/rccl/src/transport.cc @@ -8,6 +8,7 @@ #include "comm.h" #include "info.h" #include "bootstrap.h" +#include "../graph/topo.h" extern struct ncclTransport p2pTransport; extern struct ncclTransport shmTransport; @@ -19,6 +20,34 @@ struct ncclTransport ncclTransports[NTRANSPORTS] = { netTransport, }; +static ncclResult_t connectedByXGMI(int* ret, struct ncclTopoSystem* system, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) { + *ret = 0; + if (info1->hostHash != info2->hostHash) return ncclSuccess; + int g1, g2; + NCCLCHECK(ncclTopoRankToIndex(system, info1->rank, &g1)); + NCCLCHECK(ncclTopoRankToIndex(system, info2->rank, &g2)); + if (system->nodes[GPU].nodes[g1].paths[GPU][g2].type == PATH_NVL) *ret = 1; + return ncclSuccess; +} + +template +static ncclResult_t selectTransportN(struct ncclComm* comm, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId, int n) { + for (int t=n; tsend : &transport->recv; + int ret = 0; + NCCLCHECK(transport->canConnect(&ret, comm->topo, NULL, myInfo, peerInfo)); + if (ret) { + connector->transportComm = transportComm; + NCCLCHECK(transportComm->setup(comm, NULL, myInfo, peerInfo, connect, connector, channelId)); + return ncclSuccess; + } + } + WARN("No transport found !"); + return ncclInternalError; +} + template static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId) { for (int t=0; tp2pNet); for (int i=1; inRanks; i++) { int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; @@ -73,15 +103,41 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* int sendChannels = 0, recvChannels = 0; for (int c=0; cchannels[c].peers[recvPeer].recv; - NCCLCHECK(selectTransport<0>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c)); + int xgmi = 0; + if (p2pNet && graph == NULL) { + struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].p2pRecv; + NCCLCHECK(connectedByXGMI(&xgmi, comm->topo, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer)); + if (xgmi) { + NCCLCHECK(selectTransportN<0>(comm, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c, TRANSPORT_P2P)); + } + else { + NCCLCHECK(selectTransportN<0>(comm, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c, TRANSPORT_NET)); + } + } + else { + struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv; + NCCLCHECK(selectTransport<0>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c)); + } } } struct ncclConnect* sendData = recvData+recvChannels; for (int c=0; cchannels[c].peers[sendPeer].send; - NCCLCHECK(selectTransport<1>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c)); + int xgmi = 0; + if (p2pNet && graph == NULL) { + struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].p2pSend; + NCCLCHECK(connectedByXGMI(&xgmi, comm->topo, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer)); + if (xgmi) { + NCCLCHECK(selectTransportN<1>(comm, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c, TRANSPORT_P2P)); + } + else { + NCCLCHECK(selectTransportN<1>(comm, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c, TRANSPORT_NET)); + } + } + else { + struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send; + NCCLCHECK(selectTransport<1>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c)); + } } } @@ -101,18 +157,22 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* for (int c=0; cchannels[c].peers[sendPeer].send; + struct ncclConnector* conn = (p2pNet && graph == NULL) ? &comm->channels[c].peers[sendPeer].p2pSend + : &comm->channels[c].peers[sendPeer].send; NCCLCHECK(conn->transportComm->connect(comm, sendData++, 1, comm->rank, conn)); conn->connected = 1; - CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[sendPeer].send, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice)); + if (p2pNet && graph == NULL) CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[sendPeer].p2pSend, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice)); + else CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[sendPeer].send, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice)); } } for (int c=0; cchannels[c].peers[recvPeer].recv; + struct ncclConnector* conn = (p2pNet && graph == NULL) ? &comm->channels[c].peers[recvPeer].p2pRecv + : &comm->channels[c].peers[recvPeer].recv; NCCLCHECK(conn->transportComm->connect(comm, recvData++, 1, comm->rank, conn)); conn->connected = 1; - CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[recvPeer].recv, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice)); + if (p2pNet && graph == NULL) CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[recvPeer].p2pRecv, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice)); + else CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[recvPeer].recv, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice)); } } comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0; diff --git a/projects/rccl/tools/topo_expl/utils.cpp b/projects/rccl/tools/topo_expl/utils.cpp index 3d2a70dbbc..0a82949eaf 100644 --- a/projects/rccl/tools/topo_expl/utils.cpp +++ b/projects/rccl/tools/topo_expl/utils.cpp @@ -207,6 +207,7 @@ ncclResult_t initTransportsRank_1(struct ncclComm* comm, struct allGather1Data_t // Topo detection / System graph creation //NCCLCHECK(ncclTopoGetSystem(comm, &comm->topo)); + comm->topo->nRanks = comm->nRanks; // Compute paths between GPUs and NICs NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo)); // Remove inaccessible GPUs and unused NICs