Enable local sendrecv over network if GDR is available on all GPUs (#324)

[ROCm/rccl commit: c018edf0f2]
This commit is contained in:
Wenkai Du
2021-03-05 19:59:41 -08:00
کامیت شده توسط GitHub
والد 8d90821062
کامیت bcf4ecb0e3
14فایلهای تغییر یافته به همراه163 افزوده شده و 44 حذف شده
@@ -23,6 +23,8 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelid) {
for (size_t i=0; i<comm->nRanks+1; ++i) {
channel->peers[i].send.comm = comm;
channel->peers[i].recv.comm = comm;
channel->peers[i].p2pSend.comm = comm;
channel->peers[i].p2pRecv.comm = comm;
}
// Per-channel operation list.
@@ -46,10 +48,16 @@ ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks) {
for (int r=0; r<nRanks+1; r++) {
struct ncclPeer* peer = channel->peers+r;
if (peer->send.transportResources) NCCLCHECK(peer->send.transportComm->free(peer->send.transportResources));
if (peer->send.transportResources == peer->p2pSend.transportResources) peer->p2pSend.transportResources = NULL;
peer->send.transportResources = NULL;
if (peer->p2pSend.transportResources) NCCLCHECK(peer->p2pSend.transportComm->free(peer->p2pSend.transportResources));
}
for (int r=0; r<nRanks+1; r++) {
struct ncclPeer* peer = channel->peers+r;
if (peer->recv.transportResources) NCCLCHECK(peer->recv.transportComm->free(peer->recv.transportResources));
if (peer->recv.transportResources == peer->p2pRecv.transportResources) peer->p2pRecv.transportResources = NULL;
peer->recv.transportResources = NULL;
if (peer->p2pRecv.transportResources) NCCLCHECK(peer->p2pRecv.transportComm->free(peer->p2pRecv.transportResources));
}
// Free the peer structures.
@@ -71,6 +71,7 @@ class ncclPrimitives {
int peer = -1;
int role = 0;
int group;
const int p2p;
uint64_t step;
T* direct = NULL;
T* buff;
@@ -198,7 +199,7 @@ class ncclPrimitives {
__device__ __forceinline__ void loadRecvConn(struct ncclChannel* channel, T* directBuff) {
if (role & (ROLE_WAIT_RECV|ROLE_POST_RECV)) {
conn = &channel->devPeers[peer].recv.conn;
conn = (LOAD(comm->p2pNet) && p2p) ? &channel->devPeers[peer].p2pRecv.conn : &channel->devPeers[peer].recv.conn;
step = conn->step;
step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS);
if (role & ROLE_POST_RECV) {
@@ -221,7 +222,7 @@ class ncclPrimitives {
__device__ __forceinline__ void loadSendConn(struct ncclChannel* channel) {
if (role & (ROLE_WAIT_SEND|ROLE_POST_SEND)) {
conn = &channel->devPeers[peer].send.conn;
conn = (LOAD(comm->p2pNet) && p2p) ? &channel->devPeers[peer].p2pSend.conn : &channel->devPeers[peer].send.conn;
step = conn->step;
step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS);
if (role & ROLE_POST_SEND) {
@@ -251,8 +252,8 @@ class ncclPrimitives {
public:
__device__ __forceinline__
ncclPrimitives(const int tid, const int nworkers, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, struct ncclShmemPtrs* ptrs, int group)
: comm(comm), tid(tid), nworkers(nworkers), stepSize(stepSize), srcs((const T**)ptrs[group].srcs), dsts((T**)ptrs[group].dsts), group(group), barriers(&ptrs[group].barrier), barrier_next(ptrs[group].barrier_next) {
ncclPrimitives(const int tid, const int nworkers, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, struct ncclShmemPtrs* ptrs, int group, int p2p = 0)
: comm(comm), tid(tid), nworkers(nworkers), stepSize(stepSize), srcs((const T**)ptrs[group].srcs), dsts((T**)ptrs[group].dsts), group(group), barriers(&ptrs[group].barrier), barrier_next(ptrs[group].barrier_next), p2p(p2p) {
nthreads = nworkers;
// For send operations, we need an extra warp to overlap the threadfence and the copy
// int postThreads = NSEND && nworkers >= 64 ? WARP_SIZE : 0;
@@ -56,7 +56,7 @@ class ncclFunction<ncclFuncSendRecv, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T,
int peer = (comm->rank-delta+comm->nRanks)%comm->nRanks;
int nt = nThreadsSplit;
ncclPrimitives<UNROLL, 1, 1, T, 1, 0, 1, FUNC>
prims(tid, nt, &peer, NULL, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupRecv);
prims(tid, nt, &peer, NULL, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupRecv, 1);
if (recvCount == 0) {
prims.recv(recvbuff, 0);
@@ -71,7 +71,7 @@ class ncclFunction<ncclFuncSendRecv, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T,
int peer = (comm->rank+delta)%comm->nRanks;
int nt = nThreads-nThreadsSplit;
ncclPrimitives<UNROLL, 1, 1, T, 0, 1, 1, FUNC>
prims(tid-nThreadsSplit, nt, NULL, &peer, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupSend);
prims(tid-nThreadsSplit, nt, NULL, &peer, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupSend, 1);
if (sendCount == 0) {
prims.send(sendbuff, 0);
+7 -5
مشاهده پرونده
@@ -391,8 +391,8 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo
NCCLCHECK(getLoopInfo(info));
if ((info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllToAllv)
&& info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME)
info->nChannels =info->comm->p2pnChannels;
&& info->comm->topo->nodes[GPU].count == info->comm->topo->nRanks && (info->comm->topo->type & RCCL_TOPO_4P2H_ROME))
info->nChannels = info->comm->p2pnChannels;
work->opCount = info->comm->opCount;
work->sendbuff = info->sendbuff;
@@ -617,7 +617,8 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) {
int delta = (comm->nRanks - (comm->rank-peer)) % comm->nRanks;
for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels;
if (comm->channels[channelId].peers[peer].send.connected == 0) {
if ((LOAD(comm->p2pNet) ? comm->channels[channelId].peers[peer].p2pSend.connected :
comm->channels[channelId].peers[peer].send.connected) == 0) {
comm->connectSend[peer] |= (1<<channelId);
comm->connect = 1;
}
@@ -630,7 +631,8 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) {
int delta = (comm->nRanks + (comm->rank-peer)) % comm->nRanks;
for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels;
if (comm->channels[channelId].peers[peer].recv.connected == 0) {
if ((LOAD(comm->p2pNet) ? comm->channels[channelId].peers[peer].p2pRecv.connected :
comm->channels[channelId].peers[peer].recv.connected ) == 0) {
comm->connectRecv[peer] |= (1<<channelId);
comm->connect = 1;
}
@@ -643,7 +645,7 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) {
}
static int getSegment(struct ncclInfo* info, struct ncclWork* work) {
const int e = (info->comm->topo->nodes[NET].count == 0 && info->comm->topo->type == RCCL_TOPO_4P2H_ROME)
const int e = (info->comm->topo->nodes[GPU].count == info->comm->topo->nRanks && (info->comm->topo->type & RCCL_TOPO_4P2H_ROME))
? 1 : NCCL_MAX_WORK_ELEMENTS;
for (int s=0; s<e && work->elems[s].p2p.delta != info->delta; s++) {
if (work->elems[s].p2p.nThreads == 0) return s;
@@ -267,9 +267,9 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa
int nc = nChannels*2;
if (gcn == 908) nc = std::max(nc, 4);
if (comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_CR8G) nc = nChannels*4;
if (comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_CR8G)) nc = nChannels*4;
if (!nnets) nnets = comm->topo->nodes[NET].count;
if (nnets && comm->topo->type == RCCL_TOPO_4P2H_ROME) nc = (nnets > 3 ? 2 : 4)*nnets;
if (nnets && (comm->topo->type & RCCL_TOPO_4P2H_ROME)) nc = (nnets > 3 ? 2 : 4)*nnets;
int end = std::min((int)ncclMaxNchannels(), std::max(nc, ncclMinNchannels()));
// Duplication should be complete now
@@ -468,8 +468,29 @@ ncclResult_t ncclTopoTrimSystem(struct ncclTopoSystem* system, struct ncclComm*
NCCLCHECK(ncclTopoRemoveNode(system, GPU, g));
}
int remove = 1;
int arch, vendor, model;
NCCLCHECK(ncclTopoCpuType(system, &arch, &vendor, &model));
if (arch == NCCL_TOPO_CPU_ARCH_X86 && vendor == NCCL_TOPO_CPU_VENDOR_AMD
&& model == NCCL_TOPO_CPU_TYPE_ROME) {
int gdr, ret = 1;
int64_t net;
for (int g = 0; g < system->nodes[GPU].count; g++) {
NCCLCHECK(ncclTopoGetLocalNet(system, system->nodes[GPU].nodes[g].gpu.rank, &net, 0));
NCCLCHECK(ncclTopoCheckGdr(system, system->nodes[GPU].nodes[g].id, net, 1, &gdr));
if (!gdr) {
ret = 0;
break;
}
}
if (ret) {
system->type |= RCCL_TOPO_GDR_ALL;
remove = 0;
INFO(NCCL_GRAPH, "GDR is available on all GPUs");
}
}
comm->localRanks = system->nodes[GPU].count;
if (system->nodes[GPU].count == comm->nRanks) {
if (system->nodes[GPU].count == comm->nRanks && remove) {
for (int n=system->nodes[NET].count-1; n>=0; n--)
NCCLCHECK(ncclTopoRemoveNode(system, NET, n));
}
@@ -529,7 +550,7 @@ ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) {
}
}
if (comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_4P2H_ROME) {
if (comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_4P2H_ROME) && !(comm->topo->type & RCCL_TOPO_GDR_ALL)) {
// Adjust P2P channels on Rome
comm->p2pnChannelsPerPeer = 2;
comm->p2pnChannels = 2;
@@ -525,7 +525,7 @@ ncclResult_t ncclTopoSearchRecNet(struct ncclTopoSystem* system, struct ncclTopo
* `--> NET n (or m if crossNic)
*/
ncclResult_t ncclTopoSearchParams(struct ncclTopoSystem* system, int pattern, int* backToNet, int* backToFirstRank) {
if (system->nodes[NET].count) {
if (system->nodes[NET].count && system->nodes[GPU].count != system->nRanks) {
if (pattern == NCCL_TOPO_PATTERN_RING) *backToNet = system->nodes[GPU].count-1;
else if (pattern == NCCL_TOPO_PATTERN_SPLIT_TREE) *backToNet = 1;
else *backToNet = 0;
@@ -541,7 +541,7 @@ ncclResult_t ncclTopoSearchParams(struct ncclTopoSystem* system, int pattern, in
ncclResult_t ncclTopoSearchRec(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int* time) {
int backToNet, backToFirstRank;
NCCLCHECK(ncclTopoSearchParams(system, graph->pattern, &backToNet, &backToFirstRank));
if (system->nodes[NET].count) {
if (system->nodes[NET].count && system->nodes[GPU].count != system->nRanks) {
// Start from NET
ncclTopoSearchRecNet(system, graph, saveGraph, backToNet, backToFirstRank, time);
} else {
@@ -831,9 +831,9 @@ static ncclResult_t parseChordalRing(struct ncclTopoSystem* system, struct ncclT
dist[m] = dist[n]; dist[n] = temp;
}
// create chordal ring based on reference and remapped ids
system->type = RCCL_TOPO_CR8G;
system->type |= RCCL_TOPO_CR8G;
NCCLCHECK(parseGraph(ringBase, system, graph, id, 0, NULL));
if (system->nodes[NET].count) {
if (system->nodes[NET].count && system->nodes[GPU].count != system->nRanks) {
int *intra, *used;
graph->nChannels = system->nodes[NET].count;
NCCLCHECK(ncclCalloc(&intra, ngpus));
@@ -909,8 +909,7 @@ static ncclResult_t parseRomeSystem(struct ncclTopoSystem* system, struct rcclRo
romeTopo->connMatrix[i*romeTopo->nGpus+n] = 1;
count ++;
}
if (!romeTopo->nLinks) romeTopo->nLinks = count;
else if (romeTopo->nLinks != count) return ncclSuccess;
if (romeTopo->nLinks < count) romeTopo->nLinks = count;
}
// trim ports and create NET map
@@ -1050,10 +1049,10 @@ static ncclResult_t parseRome4P2H(struct ncclTopoSystem* system, struct ncclTopo
struct rcclRomeModel romeTopo;
char pattern[256];
int net_map[MAX_ROME_NICS];
parseRomeSystem(system, &romeTopo, pattern, net_map);
NCCLCHECK(parseRomeSystem(system, &romeTopo, pattern, net_map));
// recognize system as Rome 4P2H even if no matching model
if (ngpus == 8 && romeTopo.nLinks) system->type = RCCL_TOPO_4P2H_ROME;
if (ngpus > 4 && romeTopo.nLinks) system->type |= RCCL_TOPO_4P2H_ROME;
int g[MAX_ROME_GPUS];
int time = 0;
@@ -1135,8 +1134,7 @@ ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph
// user supplied topo
NCCLCHECK(parseGraph(str, system, graph, NULL, nnets, NULL));
if (graph->nChannels) {
system->type = RCCL_TOPO_4P2H_ROME;
return ncclSuccess;
system->type |= RCCL_TOPO_4P2H_ROME;
}
} else {
// try to match 8P6L
@@ -1144,8 +1142,8 @@ ncclResult_t ncclTopoCompute(ncclTopoSystem* system, struct ncclTopoGraph* graph
if (graph->nChannels) return ncclSuccess;
// try to match Rome 4P2H
NCCLCHECK(parseRome4P2H(system, graph));
if (graph->nChannels) return ncclSuccess;
}
if (graph->nChannels) return ncclSuccess;
if (ngpus == 1) if (graph->pattern != NCCL_TOPO_PATTERN_RING) graph->pattern = NCCL_TOPO_PATTERN_TREE;
@@ -1293,7 +1291,7 @@ ncclResult_t ncclTopoPrintGraph(struct ncclTopoSystem* system, struct ncclTopoGr
for (int c=0; c<graph->nChannels; c++) {
sprintf(line, "%2d :", c);
int offset = strlen(line);
if (system->nodes[NET].count > 0) {
if (system->nodes[NET].count > 0 && system->nodes[GPU].count != system->nRanks) {
sprintf(line+offset, " %s/%d", topoNodeTypeStr[NET], graph->inter[2*c]);
offset = strlen(line);
}
@@ -1301,7 +1299,7 @@ ncclResult_t ncclTopoPrintGraph(struct ncclTopoSystem* system, struct ncclTopoGr
sprintf(line+offset, " %s/%d", topoNodeTypeStr[GPU], graph->intra[ngpus*c+i]);
offset = strlen(line);
}
if (system->nodes[NET].count > 0) {
if (system->nodes[NET].count > 0 && system->nodes[GPU].count != system->nRanks) {
sprintf(line+offset, " %s/%d", topoNodeTypeStr[NET], graph->inter[2*c+1]);
offset = strlen(line);
}
@@ -82,8 +82,9 @@ struct ncclTopoLinkList {
#define NCCL_TOPO_UNDEF (-1)
#define RCCL_TOPO_CR8G 1
#define RCCL_TOPO_CR8G 1
#define RCCL_TOPO_4P2H_ROME 2
#define RCCL_TOPO_GDR_ALL 4
struct ncclTopoNode {
int type;
@@ -132,6 +133,7 @@ struct ncclTopoSystem {
float maxWidth;
float totalWidth;
int type;
int nRanks;
};
ncclResult_t ncclTopoGetNode(struct ncclTopoSystem* system, struct ncclTopoNode** node, int type, uint64_t id);
@@ -121,6 +121,9 @@ struct ncclComm {
// Flag to ask NCCL kernels to abort
volatile uint32_t *abortFlag;
// Flags for enable P2P NET
uint32_t *p2pNet;
// Device side of the communicator
struct ncclDevComm *devComm;
// Host copy of the devComm (to free CUDA allocs)
@@ -150,6 +150,8 @@ struct ncclTree {
struct ncclPeer {
struct ncclConnector send;
struct ncclConnector recv;
struct ncclConnector p2pSend;
struct ncclConnector p2pRecv;
};
struct ncclDevComm;
@@ -332,6 +334,9 @@ struct ncclDevComm {
// Channels, device side
struct ncclChannel* channels;
// Flags for enable P2P NET
uint32_t *p2pNet;
#ifdef ENABLE_PROFILING
// Profiling counters
struct ncclProf* devProf;
+18 -2
مشاهده پرونده
@@ -352,6 +352,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
free(comm->intraCC);
}
NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag));
NCCLCHECK(ncclCudaHostFree((void *)comm->p2pNet));
// Poison comm to try and catch a double free
commPoison(comm);
@@ -362,6 +363,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 1);
RCCL_PARAM(ForceEnableClique, "FORCE_ENABLE_CLIQUE", 0);
RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 0);
static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) {
if (ndev < 1) {
@@ -401,6 +403,10 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) {
comm->hostDevComm.abortFlag = comm->abortFlag;
STORE(comm->abortFlag, 0);
NCCLCHECK(ncclCudaHostCalloc((uint32_t**)&comm->p2pNet, 1));
comm->hostDevComm.p2pNet = comm->p2pNet;
STORE(comm->p2pNet, 0);
comm->argsptr = &comm->args;
#ifdef ENABLE_PROFILING
NCCLCHECK(ncclCudaCalloc(&comm->hostDevComm.devProf, 1));
@@ -808,6 +814,8 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
// Topo detection / System graph creation
NCCLCHECK(ncclTopoGetSystem(comm, &comm->topo));
// save nRanks to ncclTopoSystem as indicator of multi-node
comm->topo->nRanks = comm->nRanks;
// Compute paths between GPUs and NICs
NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo));
// Remove inaccessible GPUs and unused NICs
@@ -852,7 +860,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
if (!rcclParamForceEnableClique())
{
// Disable clique-kernel support if not on CR8 topology
if (!(comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_CR8G))
if (!(comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_CR8G)))
{
INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (force enable with RCCL_FORCE_ENABLE_CLIQUE)");
cliqueMode = CliqueManager::CLIQUE_DISABLED;
@@ -898,6 +906,14 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
NCCLCHECK(ncclTopoDumpGraphs(comm->topo, 3, graphs));
}
if ((comm->topo->type & RCCL_TOPO_4P2H_ROME) && (comm->topo->type & RCCL_TOPO_GDR_ALL)) {
if (rcclParamP2pNetDisable() == 0) {
STORE(comm->p2pNet, 1);
INFO(NCCL_INIT, "RCCL enabled same node P2P over network");
}
else
INFO(NCCL_INIT, "RCCL force disabled same node P2P over network");
}
// AllGather3 - begin
struct ncclGraphInfo {
int pattern;
@@ -925,7 +941,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
NCCLCHECK(ncclTopoIdToIndex(comm->topo, GPU, myInfo->busId, &idx));
allGather3Data[rank].cudaCompCap = comm->topo->nodes[GPU].nodes[idx].gpu.cudaCompCap;
allGather3Data[rank].gcn = comm->topo->nodes[GPU].nodes[idx].gpu.gcn;
allGather3Data[rank].alltoallDisable = comm->topo->nodes[NET].count? 1 : comm->alltoallDisable;
allGather3Data[rank].alltoallDisable = comm->topo->nodes[GPU].count == comm->topo->nRanks ? 1 : comm->alltoallDisable;
allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels =
std::min(treeGraph.nChannels, ringGraph.nChannels);
+7 -5
مشاهده پرونده
@@ -9,7 +9,7 @@
#include "graph.h"
#include "collectives.h"
enum { proxyRecv=0, proxySend=1 };
enum { proxyRecv=0, proxySend=1, p2pProxyRecv=2, p2pProxySend=3 };
static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) {
if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true;
@@ -162,10 +162,12 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) {
if (peer < 0) return ncclSuccess;
struct ncclPeer* peerComm = args->channel->peers+peer;
struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send;
struct ncclConnector* connector = type < p2pProxyRecv ? (type == proxyRecv ? &peerComm->recv : &peerComm->send)
: (type == p2pProxyRecv ? &peerComm->p2pRecv : &peerComm->p2pSend);
if (connector->transportComm == NULL) {
WARN("[%d] Error no transport for %s peer %d on channel %d", connector->comm->rank,
type == proxyRecv ? "recv" : "send", peer, args->channel->id);
type < p2pProxyRecv ? (type == proxyRecv ? "recv" : "send") : (type == p2pProxyRecv ? "p2pRecv" : "p2pSend"),
peer, args->channel->id);
return ncclInternalError;
}
if (connector->transportComm->proxy == NULL) return ncclSuccess;
@@ -238,7 +240,7 @@ ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel
if (args.nsteps == 0) args.nsteps = 1;
args.recvbytes = info->recvbytes;
args.sendbytes = 0;
NCCLCHECK(SaveProxy(proxyRecv, peerrecv, &args));
NCCLCHECK(SaveProxy(LOAD(info->comm->p2pNet) ? p2pProxyRecv : proxyRecv, peerrecv, &args));
}
if (info->delta > 0 && info->sendbytes >= 0) {
int peersend = (info->comm->rank+info->delta)%info->comm->nRanks;
@@ -246,7 +248,7 @@ ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel
if (args.nsteps == 0) args.nsteps = 1;
args.sendbytes = info->sendbytes;
args.recvbytes = 0;
NCCLCHECK(SaveProxy(proxySend, peersend, &args));
NCCLCHECK(SaveProxy(LOAD(info->comm->p2pNet) ? p2pProxySend : proxySend, peersend, &args));
}
return ncclSuccess;
}
+68 -8
مشاهده پرونده
@@ -8,6 +8,7 @@
#include "comm.h"
#include "info.h"
#include "bootstrap.h"
#include "../graph/topo.h"
extern struct ncclTransport p2pTransport;
extern struct ncclTransport shmTransport;
@@ -19,6 +20,34 @@ struct ncclTransport ncclTransports[NTRANSPORTS] = {
netTransport,
};
static ncclResult_t connectedByXGMI(int* ret, struct ncclTopoSystem* system, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) {
*ret = 0;
if (info1->hostHash != info2->hostHash) return ncclSuccess;
int g1, g2;
NCCLCHECK(ncclTopoRankToIndex(system, info1->rank, &g1));
NCCLCHECK(ncclTopoRankToIndex(system, info2->rank, &g2));
if (system->nodes[GPU].nodes[g1].paths[GPU][g2].type == PATH_NVL) *ret = 1;
return ncclSuccess;
}
template <int type>
static ncclResult_t selectTransportN(struct ncclComm* comm, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId, int n) {
for (int t=n; t<NTRANSPORTS; t++) {
if (t == TRANSPORT_SHM) continue;
struct ncclTransport *transport = ncclTransports+t;
struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
int ret = 0;
NCCLCHECK(transport->canConnect(&ret, comm->topo, NULL, myInfo, peerInfo));
if (ret) {
connector->transportComm = transportComm;
NCCLCHECK(transportComm->setup(comm, NULL, myInfo, peerInfo, connect, connector, channelId));
return ncclSuccess;
}
}
WARN("No transport found !");
return ncclInternalError;
}
template <int type>
static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId) {
for (int t=0; t<NTRANSPORTS; t++) {
@@ -63,6 +92,7 @@ void dumpData(struct ncclConnect* data, int ndata) {
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph) {
struct ncclConnect data[2*MAXCHANNELS];
uint32_t p2pNet = LOAD(comm->p2pNet);
for (int i=1; i<comm->nRanks; i++) {
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
@@ -73,15 +103,41 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
int sendChannels = 0, recvChannels = 0;
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1<<c)) {
struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv;
NCCLCHECK(selectTransport<0>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c));
int xgmi = 0;
if (p2pNet && graph == NULL) {
struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].p2pRecv;
NCCLCHECK(connectedByXGMI(&xgmi, comm->topo, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer));
if (xgmi) {
NCCLCHECK(selectTransportN<0>(comm, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c, TRANSPORT_P2P));
}
else {
NCCLCHECK(selectTransportN<0>(comm, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c, TRANSPORT_NET));
}
}
else {
struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv;
NCCLCHECK(selectTransport<0>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c));
}
}
}
struct ncclConnect* sendData = recvData+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1<<c)) {
struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send;
NCCLCHECK(selectTransport<1>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c));
int xgmi = 0;
if (p2pNet && graph == NULL) {
struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].p2pSend;
NCCLCHECK(connectedByXGMI(&xgmi, comm->topo, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer));
if (xgmi) {
NCCLCHECK(selectTransportN<1>(comm, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c, TRANSPORT_P2P));
}
else {
NCCLCHECK(selectTransportN<1>(comm, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c, TRANSPORT_NET));
}
}
else {
struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send;
NCCLCHECK(selectTransport<1>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c));
}
}
}
@@ -101,18 +157,22 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1<<c)) {
struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send;
struct ncclConnector* conn = (p2pNet && graph == NULL) ? &comm->channels[c].peers[sendPeer].p2pSend
: &comm->channels[c].peers[sendPeer].send;
NCCLCHECK(conn->transportComm->connect(comm, sendData++, 1, comm->rank, conn));
conn->connected = 1;
CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[sendPeer].send, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice));
if (p2pNet && graph == NULL) CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[sendPeer].p2pSend, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice));
else CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[sendPeer].send, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice));
}
}
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1<<c)) {
struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv;
struct ncclConnector* conn = (p2pNet && graph == NULL) ? &comm->channels[c].peers[recvPeer].p2pRecv
: &comm->channels[c].peers[recvPeer].recv;
NCCLCHECK(conn->transportComm->connect(comm, recvData++, 1, comm->rank, conn));
conn->connected = 1;
CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[recvPeer].recv, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice));
if (p2pNet && graph == NULL) CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[recvPeer].p2pRecv, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice));
else CUDACHECK(hipMemcpy(&comm->channels[c].devPeers[recvPeer].recv, conn, sizeof(struct ncclConnector), hipMemcpyHostToDevice));
}
}
comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0;
@@ -207,6 +207,7 @@ ncclResult_t initTransportsRank_1(struct ncclComm* comm, struct allGather1Data_t
// Topo detection / System graph creation
//NCCLCHECK(ncclTopoGetSystem(comm, &comm->topo));
comm->topo->nRanks = comm->nRanks;
// Compute paths between GPUs and NICs
NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo));
// Remove inaccessible GPUs and unused NICs