From 7aa1c46fd551d5514474cfc55848219dc67ca683 Mon Sep 17 00:00:00 2001 From: Sylvain Jeaugey Date: Tue, 3 May 2022 01:30:26 -0700 Subject: [PATCH] 2.12.12-1 Improve allreduce performance when we have more than one network interface per GPU and we need to use PXN to close rings. Add support for PCI Gen5 on 5.4 kernels. Fix crash when setting NCCL_SET_THREAD_NAME. Fix random crash in init due to uninitialized struct. Fix hang on cubemesh topologies. Add P2P_DIRECT_DISABLE parameter to disable direct access to pointers within a process. --- makefiles/version.mk | 2 +- src/bootstrap.cc | 2 +- src/enqueue.cc | 22 +++++++--------------- src/graph/paths.cc | 26 ++++++++++++++++---------- src/graph/topo.cc | 2 +- src/graph/topo.h | 15 +++++++++++++++ src/group.cc | 16 +++++++++------- src/include/channel.h | 31 +++++++++++++++++++++++++++++++ src/init.cc | 15 +++++++-------- src/proxy.cc | 6 ++---- src/transport/net_ib.cc | 2 +- src/transport/p2p.cc | 5 +++-- 12 files changed, 94 insertions(+), 50 deletions(-) diff --git a/makefiles/version.mk b/makefiles/version.mk index 7c9bf0f136..88656d9fa7 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 12 -NCCL_PATCH := 10 +NCCL_PATCH := 12 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/bootstrap.cc b/src/bootstrap.cc index daaa8cdbb7..4f7f48c6bb 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -165,8 +165,8 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) { memcpy(id, &listenSock->addr, sizeof(union ncclSocketAddress)); pthread_t thread; pthread_create(&thread, NULL, bootstrapRoot, (void*)listenSock); - pthread_detach(thread); // will not be pthread_join()'d ncclSetThreadName(thread, "NCCL BootstrapR"); + pthread_detach(thread); // will not be pthread_join()'d return ncclSuccess; } diff --git a/src/enqueue.cc b/src/enqueue.cc index a15c3701cf..349cb2b03a 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -9,6 +9,7 @@ #include "coll_net.h" #include "gdrwrap.h" #include "bootstrap.h" +#include "channel.h" #include // std::memcpy @@ -861,20 +862,14 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { struct ncclComm* comm = info->comm; int peer = info->root; ssize_t nBytes = info->count*ncclTypeSize(info->datatype); - int p2pGroupSize = NCCL_MAX_WORK_ELEMENTS_P2P/2; - int peerNode = comm->rankToNode[peer]; - int peerIndex = comm->rankToLocalRank[peer]; - int nsteps = comm->maxLocalRanks; - int rankIndex = comm->rankToLocalRank[comm->rank]; + int channelBaseId; + NCCLCHECK(ncclChannelComputeBase(comm, peer, info->coll, &channelBaseId)); if (info->coll == ncclFuncSend) { if (peer != comm->rank) { - int step = (nsteps + peerIndex - rankIndex)%nsteps; - int delta = (comm->nNodes + peerNode - comm->node) % comm->nNodes; - if (comm->nNodes == 1) delta = (comm->nRanks + peer - comm->rank) % comm->nRanks; // Mark channels that need pre-connect for (int c=0; cp2pnChannelsPerPeer; c++) { - int shuffle = comm->nNodes > 1 ? delta+(step/p2pGroupSize) : step; - int channelId = (shuffle+comm->p2pChannels[c]) % comm->p2pnChannels; + int channelId; + NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId)); if (comm->channels[channelId].peers[peer].send[1].connected == 0) { // P2P uses only 1 connector comm->connectSend[peer] |= (1<connect = 1; @@ -885,13 +880,10 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { comm->p2pSendCount++; } else { if (peer != comm->rank) { - int step = (nsteps + rankIndex - peerIndex)%nsteps; - int delta = (comm->nNodes + comm->node - peerNode) % comm->nNodes; - if (comm->nNodes == 1) delta = (comm->nRanks - peer + comm->rank) % comm->nRanks; // Mark channels that need pre-connect for (int c=0; cp2pnChannelsPerPeer; c++) { - int shuffle = comm->nNodes > 1 ? delta+(step/p2pGroupSize) : step; - int channelId = (shuffle+comm->p2pChannels[c]) % comm->p2pnChannels; + int channelId; + NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId)); if (comm->channels[channelId].peers[peer].recv[1].connected == 0) { // P2P uses only 1 connector comm->connectRecv[peer] |= (1<connect = 1; diff --git a/src/graph/paths.cc b/src/graph/paths.cc index 2bd52b0d72..222be70762 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -228,6 +228,9 @@ ncclResult_t ncclGetLevel(int* level, const char* disableEnv, const char* levelE } } // Old style numbering + // levelsOldToNew to is an array with each index corresponding to the + // "old level" int, and each value mapping to the correct value defined in topo.h + // maxOldLevel is a quick check to handle out of bounds (based on the length of levelsOldToNew) if (l == -1 && str[0] >= '0' && str[0] <= '9') { int oldLevel = strtol(str, NULL, 0); const int maxOldLevel = sizeof(levelsOldToNew)/sizeof(int) - 1; @@ -521,24 +524,27 @@ ncclResult_t ncclTopoComputePaths(struct ncclTopoSystem* system, struct ncclPeer // Check whether we can access the NIC through another NVLink-connected GPU (PXN) struct ncclTopoNode* gpu = system->nodes[GPU].nodes+g; if (ncclPxnDisable() != 1 && gpu->paths[NET][n].type > PATH_PXB) { + int pxnGpu = -1; + for (int p=0; pnodes[GPU].count; p++) { if (p == g) continue; - struct ncclTopoNode* peerNode = system->nodes[GPU].nodes+p; - - // To ensure proper balancing, use only a local GPU which advertised that NIC as its preferred one. - int netDev; - NCCLCHECK(ncclTopoGetLocalNet(system, peerNode->gpu.rank, &netDev)); - // Make sure we can allocate memory on that GPU. - if (netDev != netNode->id) continue; // PXN = PCI + NVLink. - if (netNode->paths[GPU][p].type > PATH_PXB || peerNode->paths[GPU][g].type > PATH_NVL) continue; + struct ncclTopoNode* peerNode = system->nodes[GPU].nodes+p; + if (peerNode->paths[NET][n].type > PATH_PXB || peerNode->paths[GPU][g].type > PATH_NVL) continue; + pxnGpu = p; + + int netDev; + NCCLCHECK(ncclTopoGetLocalNet(system, peerNode->gpu.rank, &netDev)); + // To ensure proper balancing, use preferably a local GPU which advertised that NIC as its preferred one. + if (netDev == netNode->id) break; + } + if (pxnGpu != -1) { // We can use that GPU as relay to communicate with that NIC. // Only enabling it in the GPU->NIC direction for now to favor // receiving locally and sending remotely (consistent with net.cc) - NCCLCHECK(addInterStep(system, GPU, p, GPU, g, NET, n)); - break; + NCCLCHECK(addInterStep(system, GPU, pxnGpu, GPU, g, NET, n)); } } // Update path when we dont want to / can't use GPU Direct RDMA. diff --git a/src/graph/topo.cc b/src/graph/topo.cc index 83f125f22c..53e12e5ee4 100644 --- a/src/graph/topo.cc +++ b/src/graph/topo.cc @@ -371,7 +371,7 @@ ncclResult_t ncclTopoAddGpu(struct ncclXmlNode* xmlGpu, struct ncclTopoSystem* s struct kvDict kvDictPciClass[] = { { "0x060400", PCI }, { "0x068000", NVS }, { "0x068001", CPU }, { "0x03", GPU }, { "0x02", NIC }, { NULL, PCI /* Default fallback value */ } }; struct kvDict kvDictPciGen[] = { - { "2.5 GT/s", 15 }, { "5 GT/s", 30 }, { "8 GT/s", 60 }, { "16 GT/s", 120 }, /* Kernel 5.6 and earlier */ + { "2.5 GT/s", 15 }, { "5 GT/s", 30 }, { "8 GT/s", 60 }, { "16 GT/s", 120 }, { "32 GT/s", 240 }, /* Kernel 5.6 and earlier */ { "2.5 GT/s PCIe", 15 }, { "5.0 GT/s PCIe", 30 }, { "8.0 GT/s PCIe", 60 }, { "16.0 GT/s PCIe", 120 }, { "32.0 GT/s PCIe", 240 }, { "64.0 GT/s PCIe", 480 }, { NULL, 60 /* Default fallback */ } }; // x100 Mbps per lane ncclResult_t ncclTopoAddPci(struct ncclXmlNode* xmlPci, struct ncclTopoSystem* system, struct ncclTopoNode* parent) { diff --git a/src/graph/topo.h b/src/graph/topo.h index ada1732c96..71c1fca469 100644 --- a/src/graph/topo.h +++ b/src/graph/topo.h @@ -49,13 +49,28 @@ extern const char* topoNodeTypeStr[]; #define LINK_NET 8 extern const char* topoLinkTypeStr[]; +// Local (myself) #define PATH_LOC 0 + +// Connection traversing NVLink #define PATH_NVL 1 + +// Connection through NVLink using an intermediate GPU #define PATH_NVB 2 + +// Connection traversing at most a single PCIe bridge #define PATH_PIX 3 + +// Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge) #define PATH_PXB 4 + +// Connection between a GPU and a NIC using an intermediate GPU. Used to enable rail-local, aggregated network send/recv operations. #define PATH_PXN 5 + +// Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU) #define PATH_PHB 6 + +// Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI) #define PATH_SYS 7 #define PATH_DIS 7 extern const char* topoPathTypeStr[]; diff --git a/src/group.cc b/src/group.cc index f2b9e37dbd..5f65a5881e 100644 --- a/src/group.cc +++ b/src/group.cc @@ -8,6 +8,7 @@ #include "debug.h" #include "enqueue.h" #include "transport.h" +#include "channel.h" #define MAX_ASYNC_OPS 128 thread_local pthread_t ncclGroupThreads[MAX_ASYNC_OPS]; @@ -101,18 +102,22 @@ ncclResult_t ncclGroupStart() { return ncclSuccess; } -static ncclResult_t scheduleSend(struct ncclComm* comm, int peer, int channelId, size_t count, void* buff) { +static ncclResult_t scheduleSend(struct ncclComm* comm, int peer, int chunk, size_t count, void* buff) { struct ncclInfo info = { ncclFuncSend, "Send", NULL, buff, count, ncclInt8, ncclSum, peer, comm, comm->userStream, /* Args */ 1, 1 }; + int channelId; + NCCLCHECK(ncclChannelCompute(comm, peer, chunk%comm->p2pnChannelsPerPeer, ncclFuncSend, &channelId)); info.channelId = channelId; NCCLCHECK(ncclSetupP2pKernel(&info)); return ncclSuccess; } -static ncclResult_t scheduleRecv(struct ncclComm* comm, int peer, int channelId, size_t count, void* buff) { +static ncclResult_t scheduleRecv(struct ncclComm* comm, int peer, int chunk, size_t count, void* buff) { struct ncclInfo info = { ncclFuncRecv, "Recv", NULL, buff, count, ncclInt8, ncclSum, peer, comm, comm->userStream, /* Args */ 1, 1 }; + int channelId; + NCCLCHECK(ncclChannelCompute(comm, peer, chunk%comm->p2pnChannelsPerPeer, ncclFuncRecv, &channelId)); info.channelId = channelId; NCCLCHECK(ncclSetupP2pKernel(&info)); return ncclSuccess; @@ -208,7 +213,6 @@ ncclResult_t ncclGroupEnd() { int node = comm->node; int nNodes = comm->nNodes; int localRank = comm->localRank; - int p2pGroupSize = NCCL_MAX_WORK_ELEMENTS_P2P/2; // Compute how much to split operations // Natural step size matching buffer steps. @@ -266,8 +270,6 @@ sched_delta: do { // Shuffle channels with s intra-node, and delta inter-node. Inter-node, make sure // to use multiple channels to guarantee progress on all ranks from the same node. - int shuffle = comm->nNodes > 1 ? delta+(s/p2pGroupSize) : s; - int channelId = (shuffle+comm->p2pChannels[chunk%comm->p2pnChannelsPerPeer]) % comm->p2pnChannels; ssize_t recvbytes = totRecvBytes-recvOffset; ssize_t sendbytes = totSendBytes-sendOffset; if (recvbytes > recvChunkSize) { recvbytes = recvChunkSize; } else { recvRemaining = 0; } @@ -277,10 +279,10 @@ sched_delta: if (sendbytes < 0 || (sendbytes == 0 && totSendBytes != 0)) send = NULL; if (recvbytes < 0 || (recvbytes == 0 && totRecvBytes != 0)) recv = NULL; if (recv) { - NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, channelId, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup); + NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, chunk, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup); } if (send) { - NCCLCHECKGOTO(scheduleSend(comm, sendPeer, channelId, sendbytes, ((char*)sendBuff)+sendOffset), ret, group_cleanup); + NCCLCHECKGOTO(scheduleSend(comm, sendPeer, chunk, sendbytes, ((char*)sendBuff)+sendOffset), ret, group_cleanup); } recvOffset += recvChunkSize; sendOffset += sendChunkSize; diff --git a/src/include/channel.h b/src/include/channel.h index e2da325233..dc1536aea3 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -10,5 +10,36 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelid); ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks); +static ncclResult_t ncclChannelComputeBase(struct ncclComm* comm, int peer, int coll, int*channelBase) { + int p2pGroupSize = NCCL_MAX_WORK_ELEMENTS_P2P/2; + int peerNode = comm->rankToNode[peer]; + int peerIndex = comm->rankToLocalRank[peer]; + int nsteps = comm->maxLocalRanks; + int rankIndex = comm->rankToLocalRank[comm->rank]; + int step, delta; + if (coll == ncclFuncSend) { + step = (nsteps + peerIndex - rankIndex)%nsteps; + delta = (comm->nNodes + peerNode - comm->node) % comm->nNodes; + } else if (coll == ncclFuncRecv) { + step = (nsteps + rankIndex - peerIndex)%nsteps; + delta = (comm->nNodes + comm->node - peerNode) % comm->nNodes; + } else { + return ncclInternalError; + } + *channelBase = comm->nNodes > 1 ? delta+(step/p2pGroupSize) : step; + return ncclSuccess; +} + +static ncclResult_t ncclChannelComputeFromBase(struct ncclComm* comm, int base, int channelInc, int*channelId) { + *channelId = (base+comm->p2pChannels[channelInc]) % comm->p2pnChannels; + return ncclSuccess; +} + +static ncclResult_t ncclChannelCompute(struct ncclComm* comm, int peer, int channelInc, int coll, int*channelId) { + int base; + NCCLCHECK(ncclChannelComputeBase(comm, peer, coll, &base)); + NCCLCHECK(ncclChannelComputeFromBase(comm, base, channelInc, channelId)); + return ncclSuccess; +} #endif diff --git a/src/init.cc b/src/init.cc index 29bfa01f0f..c6b6e8fcb8 100644 --- a/src/init.cc +++ b/src/init.cc @@ -823,18 +823,17 @@ collnet_cleanup: NCCLCHECK(ncclTopoGetNvbGpus(comm->topo, comm->rank, &nvbNpeers, &nvbPeers)); for (int r=0; rnRanks + (comm->rank-peer)) % comm->nRanks; + int channelId; for (int c=0; cp2pnChannelsPerPeer; c++) { - int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].recv[1].connected == 0) { // P2P uses only 1 connector - comm->connectRecv[peer] |= (1<channels[channelId].peers[peer].send[1].connected == 0) { + comm->connectSend[peer] |= (1<nRanks - (comm->rank-peer)) % comm->nRanks; for (int c=0; cp2pnChannelsPerPeer; c++) { - int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].send[1].connected == 0) { // P2P uses only 1 connector - comm->connectSend[peer] |= (1<channels[channelId].peers[peer].recv[1].connected == 0) { + comm->connectRecv[peer] |= (1<proxyState.listenSock->fd; pollfds[NCCL_MAX_LOCAL_RANKS].events = POLLIN; @@ -1066,13 +1064,13 @@ void* ncclProxyService(void* _args) { ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union ncclSocketAddress* peerAddresses) { comm->proxyState.listenSock = sock; comm->proxyState.peerAddresses = peerAddresses; - ncclSetThreadName(comm->proxyState.thread, "NCCL Service %2d", comm->cudaDev); return ncclSuccess; } ncclResult_t ncclProxyCreate(struct ncclComm* comm) { // comm->proxyState.thread is pthread_join()'d by commFree() in init.cc pthread_create(&comm->proxyState.thread, NULL, ncclProxyService, comm); + ncclSetThreadName(comm->proxyState.thread, "NCCL Service %2d", comm->cudaDev); return ncclSuccess; } diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 26b47be7ad..d3d4f9a426 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -223,8 +223,8 @@ ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) { ncclIbDevs[ncclNIbDevs].mrCache.slots = NULL; pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context); - pthread_detach(ncclIbAsyncThread); // will not be pthread_join()'d ncclSetThreadName(ncclIbAsyncThread, "NCCL IbAsync %2d", ncclNIbDevs); + pthread_detach(ncclIbAsyncThread); // will not be pthread_join()'d ncclNIbDevs++; nPorts++; } diff --git a/src/transport/p2p.cc b/src/transport/p2p.cc index e71e157b1b..9859c87f47 100644 --- a/src/transport/p2p.cc +++ b/src/transport/p2p.cc @@ -127,6 +127,7 @@ ncclResult_t p2pCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTop // Setting this to non zero causes P2P to use Reads rather than Writes NCCL_PARAM(P2pReadEnable, "P2P_READ_ENABLE", -2); +NCCL_PARAM(P2pDirectDisable, "P2P_DIRECT_DISABLE", 0); static ncclResult_t p2pGetInfo(struct ncclTopoSystem* topo, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2, int* read, int* intermediateRank) { int p2p; @@ -185,7 +186,7 @@ ncclResult_t p2pSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st if (intermediateRank == -1) { info->rank = myInfo->rank; if (myInfo->pidHash == peerInfo->pidHash) { - send->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; + if (ncclParamP2pDirectDisable() == 0) send->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; INFO(NCCL_INIT|NCCL_P2P, "Channel %02d : %d[%lx] -> %d[%lx] via P2P/direct pointer%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, useReadStr); } else { @@ -230,7 +231,7 @@ ncclResult_t p2pRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st if (intermediateRank == -1) { info->rank = myInfo->rank; if (myInfo->pidHash == peerInfo->pidHash) { - recv->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; + if (ncclParamP2pDirectDisable() == 0) recv->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; } else { recv->conn.direct |= info->read ? NCCL_IPC_READ : NCCL_IPC_WRITE; }