topo_expl: Update channel masks for >64 channels (#1279)

[ROCm/rccl commit: 4bc118336a]
This commit is contained in:
Benjamin Kitor
2024-07-25 17:27:34 -07:00
zatwierdzone przez GitHub
rodzic 496e98a73f
commit d2df042c36
+24 -14
Wyświetl plik
@@ -370,16 +370,20 @@ static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph*
ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) {
TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv);
struct ncclChannel* channel = &comm->channels[channelId];
uint64_t mask = 1UL << channel->id;
struct channelMasks mask;
for (int i = 0; i < MAXCHANNELS/64; i++)
mask.masks[i] = 0;
mask.masks[channel->id/64] = (1UL << (channel->id%64));
for (int i=0; i<nrecv; i++) {
int peer = peerRecv[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->recv[connIndex].connected) continue;
comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask;
comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[channel->id/64] |= mask.masks[channel->id/64];
}
for (int i=0; i<nsend; i++) {
int peer = peerSend[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->send[connIndex].connected) continue;
comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask;
comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[channel->id/64] |= mask.masks[channel->id/64];
}
return ncclSuccess;
}
@@ -417,8 +421,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0);
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
struct channelMasks recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
struct channelMasks sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
// Data[i] contains all ncclConnect information for all send and receive connections with a given send and recv peer
// This data is packed in the array based on the number of sendChannels and recvChannels connected with these peers
@@ -426,14 +430,19 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
// The next M entries contain sendData, connection information for send connections
// It's not guaranteed that each entry of data has the same number of total or send/recv specific connections
int p = i-(done+1);
if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
uint64_t recvMaskBits = 0, sendMaskBits = 0;
for (int i = 0; i < MAXCHANNELS/64; i++) {
recvMaskBits |= recvMask.masks[i];
sendMaskBits |= sendMask.masks[i];
}
if (recvMaskBits || sendMaskBits) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
recvData[p] = data[p];
int sendChannels = 0, recvChannels = 0;
int type;
bool proxy;
TIME_START(0);
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) {
if (recvMask.masks[c/64] & (1UL<<(c%64))) {
NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type, &proxy), ret, fail);
if (type > highestType) highestType = type;
}
@@ -442,7 +451,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
TIME_START(1);
sendData[p] = recvData[p]+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
if (sendMask.masks[c/64] & (1UL<<(c%64))) {
NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type, &proxy), ret, fail);
if (type > highestType) highestType = type;
needsProxyResult |= proxy;
@@ -475,15 +484,15 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
for (int j=done+1; j<=i; j++) {
int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + j) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
struct channelMasks recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
struct channelMasks sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
int p = j-(done+1);
int sendDataOffset = 0;
int recvDataOffset = 0;
for (int c=0; c<MAXCHANNELS; c++) {
TIME_START(3);
if (sendMask & (1UL<<c)) {
if (sendMask.masks[c/64] & (1UL<<(c%64))) {
struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
@@ -501,7 +510,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
// Start with recv channels
TIME_START(4);
if (recvMask & (1UL<<c)) {
if (recvMask.masks[c/64] & (1UL<<(c%64))) {
struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
@@ -517,7 +526,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
}
TIME_STOP(4);
}
if (sendMask || recvMask) {
if (sendMaskBits || recvMaskBits) {
free(data[p]);
data[p] = NULL;
}
@@ -554,7 +563,8 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, &flag, sizeof(int)), ret, fail);
}
}*/
comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = 0UL;
for (int j = 0; j<MAXCHANNELS/64; j++)
comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[j] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)].masks[j] = 0UL;
}
free(data);