Merge remote-tracking branch 'nccl/master' into develop

[ROCm/rccl commit: 9ff53eeeae]
Этот коммит содержится в:
BertanDogancay
2024-01-30 14:43:43 -08:00
родитель a2b3e1ab2d 382ae08419
Коммит c2c9ed2acb
11 изменённых файлов: 183 добавлений и 187 удалений
+1 -1
Просмотреть файл
@@ -1,6 +1,6 @@
##### version
NCCL_MAJOR := 2
NCCL_MINOR := 19
NCCL_PATCH := 3
NCCL_PATCH := 4
NCCL_SUFFIX :=
PKG_REVISION := 1
+3
Просмотреть файл
@@ -139,6 +139,8 @@ void ncclDebugInit() {
pthread_mutex_unlock(&ncclDebugLock);
}
NCCL_PARAM(WarnSetDebugInfo, "WARN_ENABLE_DEBUG_INFO", 0);
/* Common logging function used by the INFO, WARN and TRACE macros
* Also exported to the dynamically loadable Net transport modules so
* they can share the debugging mechanisms and output files
@@ -172,6 +174,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file
if (level == NCCL_LOG_WARN) {
len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ",
hostname, pid, tid, cudaDev, filefunc, line);
if (ncclParamWarnSetDebugInfo()) ncclDebugLevel = NCCL_LOG_INFO;
} else if (level == NCCL_LOG_INFO) {
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] NCCL INFO ", hostname, pid, tid, cudaDev);
} else if (level == NCCL_LOG_TRACE && flags == NCCL_CALL) {
+19 -4
Просмотреть файл
@@ -662,13 +662,12 @@ static ncclResult_t scheduleP2pTasksToPlan(
while (nChannelsMax*nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2;
}
bool fuseOk;
bool fuseOk = false;
// We can perform 8 send/recv per round per CTA. Make sure we jump between fused blocks at node boundaries.
while (tasks->nTasksP2p != 0) {
for (int i=0; i < tasks->p2pOrderSteps; i++) {
int sendPeer = sendOrder[i];
int recvPeer = recvOrder[i];
if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false;
struct ncclTaskP2p* send = sendPeer != -1 ? ncclIntruQueueHead(&peers[sendPeer].sendQueue) : NULL;
struct ncclTaskP2p* recv = recvPeer != -1 ? ncclIntruQueueHead(&peers[recvPeer].recvQueue) : NULL;
if (sendPeer == comm->rank) {
@@ -710,6 +709,7 @@ static ncclResult_t scheduleP2pTasksToPlan(
recvIdx = NCCL_CONN_IDX_P2P_NET;
do {
if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false;
ssize_t recvChunkBytes = std::min(recvBytes, recvChunkBytesMax); // -1 preserved
ssize_t sendChunkBytes = std::min(sendBytes, sendChunkBytesMax);
if (recvChunkBytes != 0) {
@@ -920,6 +920,14 @@ static ncclResult_t reclaimPlan(struct ncclComm* comm, struct ncclCommCallback*
if (plan->persistent) {
comm->persistentRefs -= 1;
NCCLCHECK(ncclCudaFree(plan->workHead));
for (int c=0; c < plan->channelUbound; c++) {
struct ncclProxyOp* q = ncclIntruQueueHead(&plan->channels[c].proxyOpQueue);
while (q != nullptr) {
struct ncclProxyOp* q1 = q->enqNext;
ncclMemoryPoolFree(&plan->memPool_ncclProxyOp, q);
q = q1;
}
}
while (!ncclIntruQueueEmpty(&plan->ipcMemQueue)) {
struct ncclPointerList* q = ncclIntruQueueDequeue(&plan->ipcMemQueue);
CUDACHECKIGNORE(cudaIpcCloseMemHandle(q->ptr));
@@ -1144,9 +1152,16 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) {
if (!(plan->persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking)) {
// If this isn't being captured and there aren't any CUDA graphs alive
// then we don't need to do our proxyOp pushing on the host stream.
// We are not using the host stream for proxy ops and reclaimation submission.
NCCLCHECK(hostStreamPlanTask(comm, plan));
} else {
// We are using the host stream for proxy ops and reclaimation submission.
// Only plans with proxy ops have a callback pushed by ncclLaunchPrepare.
// Since non-persistent plans also require reclaimation, we have to do it
// here.
if (!plan->persistent && !plan->hasProxyOps) {
ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer);
}
}
return ncclSuccess;
}
+10 -4
Просмотреть файл
@@ -10,11 +10,15 @@
#include <unistd.h>
#include <fcntl.h>
#include <ctype.h>
#include <float.h>
#include "core.h"
#include "nvmlwrap.h"
#include "xml.h"
#include "rocm_smi_wrap.h"
#include "archinfo.h"
#if defined(__x86_64__)
#include <cpuid.h>
#endif
/*******************/
/* XML File Parser */
@@ -419,7 +423,8 @@ ncclResult_t ncclTopoGetXmlFromCpu(struct ncclXmlNode* cpuNode, struct ncclXml*
char vendor[12];
} cpuid0;
asm volatile("cpuid" : "=b" (cpuid0.ebx), "=c" (cpuid0.ecx), "=d" (cpuid0.edx) : "a" (0) : "memory");
unsigned unused;
__cpuid(0, unused, cpuid0.ebx, cpuid0.ecx, cpuid0.edx);
char vendor[13];
strncpy(vendor, cpuid0.vendor, 12);
vendor[12] = '\0';
@@ -441,7 +446,8 @@ ncclResult_t ncclTopoGetXmlFromCpu(struct ncclXmlNode* cpuNode, struct ncclXml*
};
uint32_t val;
} cpuid1;
asm volatile("cpuid" : "=a" (cpuid1.val) : "a" (1) : "memory");
unsigned unused;
__cpuid(1, cpuid1.val, unused, unused, unused);
int familyId = cpuid1.familyId + (cpuid1.extFamilyId << 4);
int modelId = cpuid1.modelId + (cpuid1.extModelId << 4);
NCCLCHECK(xmlSetAttrInt(cpuNode, "familyid", familyId));
@@ -507,11 +513,11 @@ ncclResult_t ncclTopoGetXmlFromSys(struct ncclXmlNode* pciNode, struct ncclXml*
if (index == -1) {
if (path) {
char deviceSpeedStr[MAX_STR_LEN];
float deviceSpeed;
float deviceSpeed = FLT_MAX;
NCCLCHECK(ncclTopoGetStrFromSys(path, "max_link_speed", deviceSpeedStr));
sscanf(deviceSpeedStr, "%f GT/s", &deviceSpeed);
char portSpeedStr[MAX_STR_LEN];
float portSpeed;
float portSpeed = FLT_MAX;
NCCLCHECK(ncclTopoGetStrFromSys(path, "../max_link_speed", portSpeedStr));
if (portSpeedStr[0])
sscanf(portSpeedStr, "%f GT/s", &portSpeed);
+1 -3
Просмотреть файл
@@ -305,7 +305,7 @@ struct ncclComm {
// Flag to ask NCCL kernels to abort
volatile uint32_t *abortFlag;
volatile uint32_t *childAbortFlag;
volatile uint32_t *abortFlagRefCount;
uint32_t *abortFlagRefCount;
// Flags for enable P2P NET
uint32_t p2pNet;
@@ -353,8 +353,6 @@ struct ncclComm {
int nvlsRegSupport;
/* sharable NVLS resource. */
struct ncclNvlsSharedRes* nvlsResources;
struct ncclShmemCollBuff nvlsShmem;
void *nvlsShmemHandle;
ssize_t channelSize; // User requested work size (bytes) for channel partitions
+2 -5
Просмотреть файл
@@ -206,7 +206,6 @@ struct ncclProxyRpcResponseHeader {
};
struct ncclProxyState {
int internalRefCount;
int refCount;
int tpRank;
int tpnRanks;
@@ -221,11 +220,10 @@ struct ncclProxyState {
ncclNet_t* ncclNet;
ncclCollNet_t* ncclCollNet;
volatile uint32_t* abortFlag;
volatile uint32_t* abortFlagRefCount;
// Service thread
pthread_t thread;
struct ncclSocket* listenSock;
volatile int stop;
int stop;
CUcontext cudaCtx;
ncclResult_t asyncResult;
@@ -306,8 +304,7 @@ ncclResult_t ncclProxyClientGetFdBlocking(struct ncclComm* comm, struct ncclProx
ncclResult_t ncclProxyStop(struct ncclComm* comm);
ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm);
ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState);
ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState);
ncclResult_t ncclProxyDestroy(struct ncclComm* comm);
ncclResult_t mscclSaveProxy(struct ncclComm* comm, struct ncclChannel* channel, int type, int peer, struct ncclProxyOp* op, int connIndex);
#endif
+2
Просмотреть файл
@@ -68,6 +68,8 @@ struct ncclNvlsSharedRes {
char shareableHandle[NVLS_HANDLE_SIZE];
size_t ucGran;
int nChannels;
struct ncclShmemCollBuff nvlsShmem;
void *nvlsShmemHandle;
};
#endif /* CUDART_VERSION >= 12010 */
+5 -11
Просмотреть файл
@@ -325,13 +325,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
* free all intra-process communicators; therefore, we only need to focus on local
* resource cleanup in commFree(). */
if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) {
if (*comm->abortFlag == 0) {
/* regular thread join */
pthread_join(comm->proxyState->thread, nullptr);
} else {
/* try to detach thread due to abort */
ncclProxyTryDetach(comm->proxyState);
}
pthread_join(comm->proxyState->thread, nullptr);
}
delete[] comm->userRedOps;
@@ -392,7 +386,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
free(comm->sharedRes->tpRankToLocalRank);
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream));
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream));
NCCLCHECK(ncclProxyDestroy(comm->sharedRes->proxyState));
NCCLCHECK(ncclProxyDestroy(comm));
free(comm->sharedRes);
}
}
@@ -413,7 +407,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) {
NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag));
free((void*)comm->abortFlagRefCount);
free(comm->abortFlagRefCount);
}
free((void*)comm->config.netName);
@@ -2004,7 +1998,7 @@ exit:
fail:
if (comm) {
if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag);
if (comm->abortFlagRefCount) free((void*)comm->abortFlagRefCount);
if (comm->abortFlagRefCount) free(comm->abortFlagRefCount);
free(comm);
}
if (newcomm) *newcomm = NULL;
@@ -2461,7 +2455,7 @@ fail:
if (childComm) {
if (comm && !comm->config.splitShare) {
if (childComm->abortFlag) ncclCudaHostFree((void*)childComm->abortFlag);
if (childComm->abortFlagRefCount) free((void*)childComm->abortFlagRefCount);
if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount);
}
free(childComm);
}
+16 -81
Просмотреть файл
@@ -20,14 +20,6 @@
#include <unistd.h>
#include <sys/time.h>
#define PROGRESS_RUNNING 0
#define PROGRESS_REQUEST_STOP 1
#define PROGRESS_ABORT 2
#define PROGRESS_COMPLETE 3
#define SERVICE_RUNNING 0
#define SERVICE_COMPLETE 1
static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) {
if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true;
@@ -725,13 +717,13 @@ static ncclResult_t ncclProxyGetPostedOps(struct ncclProxyState* proxyState, int
if (state->active == NULL) {
pthread_mutex_lock(&pool->mutex);
while (pool->nextOps == -1 && state->stop == PROGRESS_RUNNING) {
while (pool->nextOps == -1 && !state->stop) {
struct ncclProxyArgs profArgs; // Only used for profiling purposes
ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileSleep);
pthread_cond_wait(&pool->cond, &pool->mutex);
ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileWakeup);
}
if (state->stop != PROGRESS_RUNNING) { // We might have been woken up to stop.
if (state->stop) { // We might have been woken up to stop.
pthread_mutex_unlock(&pool->mutex);
return ncclSuccess;
}
@@ -869,7 +861,7 @@ void* ncclProxyProgress(void *proxyState_) {
* frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */
int proxyOpAppendCounter = 0;
struct ncclProxyArgs profArgs; // Only used for profiling purposes
while (state->stop == PROGRESS_RUNNING || (state->stop == PROGRESS_REQUEST_STOP && state->active)) {
while ((state->stop == 0 || (state->stop == 1 && state->active)) && *proxyState->abortFlag == 0) {
int idle = 1;
ncclResult_t ret = progressOps(proxyState, state, state->active, &idle);
if (ret != ncclSuccess) {
@@ -883,7 +875,7 @@ void* ncclProxyProgress(void *proxyState_) {
int added = 0;
proxyOpAppendCounter = 0;
TIME_START(3);
if (state->stop == PROGRESS_RUNNING)
if (state->stop == 0)
ret = ncclProxyGetPostedOps(proxyState, &added);
if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); }
if (ret != ncclSuccess) {
@@ -896,9 +888,6 @@ void* ncclProxyProgress(void *proxyState_) {
}
lastIdle = idle;
}
/* progress serive thread should be waiting for me, I need to notify it. */
__atomic_store_n(&state->stop, PROGRESS_COMPLETE, __ATOMIC_RELEASE);
return NULL;
}
@@ -921,11 +910,7 @@ ncclResult_t ncclProxyStart(struct ncclComm* comm) {
static ncclResult_t ncclProxyProgressCreate(struct ncclProxyState* proxyState) {
struct ncclProxyProgressState* state = &proxyState->progressState;
if (!state->thread) {
pthread_attr_t attr;
SYSCHECK(pthread_attr_init(&attr), "pthread_attr_init");
SYSCHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED), "pthread_attr_setdetachstate");
SYSCHECK(pthread_create(&state->thread, &attr, ncclProxyProgress, proxyState), "pthread_create");
SYSCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy");
pthread_create(&state->thread, NULL, ncclProxyProgress, proxyState);
ncclSetThreadName(state->thread, "NCCL Progress%2d", proxyState->tpLocalnRanks);
}
return ncclSuccess;
@@ -937,17 +922,10 @@ ncclResult_t ncclProxyProgressDestroy(struct ncclProxyState* proxyState) {
// Request the proxy to stop and then wake it
if (state->opsPool) {
pthread_mutex_lock(&state->opsPool->mutex);
if (*proxyState->abortFlag == 0)
state->stop = PROGRESS_REQUEST_STOP;
else
state->stop = PROGRESS_ABORT;
state->stop = 1;
pthread_cond_signal(&state->opsPool->cond);
pthread_mutex_unlock(&state->opsPool->mutex);
/* progress thread is always detached, wait for it to exit. */
uint64_t t0 = clockNano();
while (__atomic_load_n(&state->stop, __ATOMIC_ACQUIRE) != PROGRESS_COMPLETE) {
if (clockNano() - t0 >= 1000) sched_yield();
}
pthread_join(state->thread, NULL);
}
// Free off any memory allocated for the proxy arg pools
@@ -1587,19 +1565,6 @@ void* ncclProxyService(void* _args) {
ncclSocketClose(proxyState->listenSock);
free(proxyState->listenSock);
proxyOpsFree(proxyState);
if (*proxyState->abortFlag) {
/* abort happened, need to notify main thread I am done. */
__atomic_store_n(&proxyState->stop, SERVICE_COMPLETE, __ATOMIC_RELEASE);
}
if (ncclAtomicRefCountDecrement(proxyState->abortFlagRefCount) == 0) {
ncclCudaHostFree((void *)proxyState->abortFlag);
free((void*)proxyState->abortFlagRefCount);
}
/* proxy itself holds one internal ref count, needs to call ncclProxyDestroy */
ncclProxyDestroy(proxyState);
return NULL;
}
@@ -1608,8 +1573,6 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union
NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1));
comm->proxyState = comm->sharedRes->proxyState;
comm->proxyState->refCount = 1;
/* ref count for communicator and proxy service thread. */
comm->proxyState->internalRefCount = 2;
comm->proxyState->listenSock = sock;
comm->proxyState->peerAddresses = peerAddresses;
// Seed the random number generator for UDS filename generation
@@ -1632,8 +1595,6 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) {
proxyState->tpLocalnRanks = comm->localRanks;
proxyState->cudaDev = comm->cudaDev;
proxyState->abortFlag = comm->abortFlag;
proxyState->abortFlagRefCount = comm->abortFlagRefCount;
ncclAtomicRefCountIncrement(comm->abortFlagRefCount);
proxyState->p2pnChannels = comm->p2pnChannels;
proxyState->p2pChunkSize = comm->p2pChunkSize;
proxyState->nChannels = comm->nChannels;
@@ -1691,41 +1652,15 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) {
return ncclSuccess;
}
ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState) {
if (__atomic_sub_fetch(&proxyState->internalRefCount, 1, __ATOMIC_ACQ_REL) == 0) {
free(proxyState->peerAddresses);
free(proxyState->peerSocks);
free(proxyState->proxyOps);
free(proxyState->sharedDevMems);
expectedProxyResponseFree(proxyState);
free(proxyState);
}
return ncclSuccess;
}
ncclResult_t ncclProxyDestroy(struct ncclComm* comm) {
struct ncclProxyState* sharedProxyState = comm->sharedRes->proxyState;
/* detach all proxy threads in case of abort */
ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState) {
if (proxyState && proxyState->thread) {
/* proxy service thread can call cudaFreeHost to free pinned host mem, but
* it can cause a hang if main thread is issuing other cuda calls. To solution
* should be allocate/free pinned host mem using cuMem* driver API, this waiting
* 5 secs is just a workaround for now. */
bool join = false;
struct timespec start, now;
clock_gettime(CLOCK_MONOTONIC, &start);
do {
clock_gettime(CLOCK_MONOTONIC, &now);
if (__atomic_load_n(&proxyState->stop, __ATOMIC_ACQUIRE) == SERVICE_COMPLETE) {
/* proxy thread is done, join it. */
pthread_join(proxyState->thread, NULL);
join = true;
break;
}
} while(now.tv_sec - start.tv_sec < 5);
if (join == false) {
pthread_detach(proxyState->thread);
}
}
assert(sharedProxyState->refCount == 0);
free(sharedProxyState->peerAddresses);
free(sharedProxyState->peerSocks);
free(sharedProxyState->proxyOps);
free(sharedProxyState->sharedDevMems);
expectedProxyResponseFree(sharedProxyState);
free(sharedProxyState);
return ncclSuccess;
}
+114 -69
Просмотреть файл
@@ -78,14 +78,29 @@ void dumpData(struct ncclConnect* data, int ndata) {
}
}
NCCL_PARAM(ConnectRoundMaxPeers, "CONNECT_ROUND_MAX_PEERS", 128);
NCCL_PARAM(ReportConnectProgress, "REPORT_CONNECT_PROGRESS", 0);
#include <sys/time.h>
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/, bool* needsProxy/*=NULL*/) {
// Stream used during transport setup; need for P2P pre-connect + CUDA Graph
ncclResult_t ret = ncclSuccess;
int highestType = TRANSPORT_P2P; // track highest transport type
bool needsProxyResult = false;
struct ncclConnect** data = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Store intermediate send/recvData structs for connect
struct ncclConnect** recvData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given recv connection within a channel
struct ncclConnect** sendData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given send connection within a channel
struct ncclConnect** data; // Store intermediate send/recvData structs for connect
struct ncclConnect** recvData; // Points to entries inside data for given recv connection within a channel
struct ncclConnect** sendData; // Points to entries inside data for given send connection within a channel
int done = 0;
int maxPeers = ncclParamConnectRoundMaxPeers();
NCCLCHECK(ncclCalloc(&data, maxPeers));
NCCLCHECK(ncclCalloc(&recvData, maxPeers));
NCCLCHECK(ncclCalloc(&sendData, maxPeers));
struct timeval timeStart, timeLast;
gettimeofday(&timeStart, NULL);
timeLast = timeStart; // struct copy
bool timeReported = false;
NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail);
// First time initialization
@@ -93,32 +108,33 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0);
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t recvMask = comm->connectRecv[recvPeer];
uint64_t sendMask = comm->connectSend[sendPeer];
// Data[i] contains all ncclConnect information for all send and receive connections with a given send and recv peer
// This data is packed in the array based on the number of sendChannels and recvChannels connected with these peers
// The first N entries contain recvData, connection information for recv connections
// The next M entries contain sendData, connection information for send connections
// It's not guaranteed that each entry of data has the same number of total or send/recv specific connections
data[i] = (ncclConnect*) malloc(sizeof(ncclConnect) * 2*MAXCHANNELS);
recvData[i] = data[i];
int p = i-(done+1);
if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
recvData[p] = data[p];
int sendChannels = 0, recvChannels = 0;
int type;
bool proxy;
TIME_START(0);
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) {
NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[i]+recvChannels++, c, recvPeer, connIndex, &type, &proxy), ret, fail);
NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type, &proxy), ret, fail);
if (type > highestType) highestType = type;
}
}
TIME_STOP(0);
TIME_START(1);
sendData[i] = recvData[i]+recvChannels;
sendData[p] = recvData[p]+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[i]+sendChannels++, c, sendPeer, connIndex, &type, &proxy), ret, fail);
NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type, &proxy), ret, fail);
if (type > highestType) highestType = type;
needsProxyResult |= proxy;
}
@@ -128,70 +144,100 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
TIME_START(2);
if (sendPeer == recvPeer) {
if (recvChannels+sendChannels) {
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
sendData[i] = data[i];
recvData[i] = data[i]+sendChannels;
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
sendData[p] = data[p];
recvData[p] = data[p]+sendChannels;
}
} else {
if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail);
if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail);
}
TIME_STOP(2);
if (i-done == maxPeers || i == comm->nRanks-1) {
// Loop until all channels with all ranks have been connected
bool allChannelsConnected;
allChannelsConnected = false;
while (!allChannelsConnected) {
allChannelsConnected = true;
for (int j=done+1; j<=i; j++) {
int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + j) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer];
uint64_t sendMask = comm->connectSend[sendPeer];
int p = j-(done+1);
int sendDataOffset = 0;
int recvDataOffset = 0;
for (int c=0; c<MAXCHANNELS; c++) {
TIME_START(3);
if (sendMask & (1UL<<c)) {
struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
conn->connected = 1;
/* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false;
}
}
}
TIME_STOP(3);
// Start with recv channels
TIME_START(4);
if (recvMask & (1UL<<c)) {
struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[p] + recvDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
conn->connected = 1;
/* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false;
}
}
}
TIME_STOP(4);
}
if (sendMask || recvMask) {
free(data[p]);
data[p] = NULL;
}
}
if (ncclParamReportConnectProgress() && comm->rank == 0) {
struct timeval now;
gettimeofday(&now, NULL);
if (((now.tv_sec - timeLast.tv_sec)*1.0 + (now.tv_usec-timeLast.tv_usec)*1e-6) > 1) {
float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6;
float remaining = elapsed*(comm->nRanks-done)/done;
printf("%sP2p connect: %g%% Elapsed %d:%02d Remaining %d:%02d ",
timeReported ? "\r" : "", done*100.0/comm->nRanks, ((int)elapsed)/60, ((int)elapsed)%60, ((int)remaining)/60, ((int)remaining)%60);
fflush(stdout);
timeReported = true;
timeLast = now; // struct copy;
}
}
}
done = i;
}
}
// Loop until all channels with all ranks have been connected
bool allChannelsConnected;
allChannelsConnected = false;
while (!allChannelsConnected) {
allChannelsConnected = true;
for (int i=1; i<comm->nRanks; i++) {
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)];
int sendDataOffset = 0;
int recvDataOffset = 0;
for (int c=0; c<MAXCHANNELS; c++) {
TIME_START(3);
if (sendMask & (1UL<<c)) {
struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
conn->connected = 1;
/* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false;
}
}
}
TIME_STOP(3);
// Start with recv channels
TIME_START(4);
if (recvMask & (1UL<<c)) {
struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex;
// This connector hasn't completed connection yet
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
conn->connected = 1;
/* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false;
}
}
}
TIME_STOP(4);
}
}
if (timeReported) {
struct timeval now;
gettimeofday(&now, NULL);
float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6;
printf("\rP2p connect done in %d:%02d \n",
((int)elapsed)/60, ((int)elapsed)%60);
fflush(stdout);
}
/* We need to sync ranks here since some ranks might run too fast after connection setup
@@ -221,7 +267,6 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
}
}
comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = 0UL;
free(data[i]);
}
free(data);
+10 -9
Просмотреть файл
@@ -393,18 +393,18 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
typeSize = sizeof(struct localRegData);
if (comm->localRank == 0) {
shmPath[0] = '\0';
NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsShmemHandle), res, cleanup);
NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup);
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup);
} else {
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup);
NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsShmemHandle), res, cleanup);
NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup);
}
/* need 2 pools and a shared counter for shmem-based collectives */
comm->nvlsShmem.cnt[0] = (size_t*)nvlsShmem;
comm->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsShmem.cnt[0] + sizeof(size_t));
comm->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsShmem.ptr[0] + typeSize * comm->localRanks);
comm->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsShmem.cnt[1] + sizeof(size_t));
comm->nvlsShmem.round = 0;
comm->nvlsResources->nvlsShmem.cnt[0] = (size_t*)nvlsShmem;
comm->nvlsResources->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[0] + sizeof(size_t));
comm->nvlsResources->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsResources->nvlsShmem.ptr[0] + typeSize * comm->localRanks);
comm->nvlsResources->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[1] + sizeof(size_t));
comm->nvlsResources->nvlsShmem.round = 0;
return res;
@@ -418,6 +418,7 @@ ncclResult_t ncclNvlsFree(struct ncclComm* comm) {
if (resources == NULL) return ncclSuccess;
if (ncclAtomicRefCountDecrement(&resources->refCount) == 0) {
NCCLCHECK(ncclShmClose(resources->nvlsShmemHandle));
NCCLCHECK(nvlsGroupUnbind(comm, resources));
NCCLCHECK(nvlsGroupUnmapMem(comm, resources));
free(resources);
@@ -476,7 +477,7 @@ ncclResult_t tryRegisterBuffer(struct ncclComm *comm, struct localRequestData *r
/* get all buffer addresses */
NCCLCHECKGOTO(ncclCalloc(&regRecord->addrs, comm->localRanks), ret, fail);
regRecord->addrs[comm->localRank] = regRecord->buff;
NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail);
NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail);
/* enqueue record */
ncclIntruQueueEnqueue(&comm->regRecordQueue, regRecord);
@@ -551,7 +552,7 @@ ncclResult_t ncclNvlsLocalRegisterBuffer(struct ncclComm *comm, const void *send
regRequestHead = regRequestHead->next;
}
NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail);
NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail);
/* first check whether all local ranks find their registered buffer */
for (int i = 0; i < comm->localRanks; ++i) {