diff --git a/src/all_gather.cu b/src/all_gather.cu index 0ca428dbed..bc1c59969c 100644 --- a/src/all_gather.cu +++ b/src/all_gather.cu @@ -31,17 +31,24 @@ void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - for (int j=0; jexpected[i])+args->sendBytes*j, sendcount, type, rep, j)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + for (int j=0; jexpected[k])+args->sendBytes*j, sendcount, type, rep, j)); + } + k++; } HIPCHECK(hipDeviceSynchronize()); } @@ -99,4 +106,4 @@ testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t struct testEngine ncclTestEngine = { AllGatherGetBuffSize, AllGatherRunTest -}; \ No newline at end of file +}; diff --git a/src/all_reduce.cu b/src/all_reduce.cu index 1c1d73a9d2..e76ee38dff 100644 --- a/src/all_reduce.cu +++ b/src/all_reduce.cu @@ -31,16 +31,23 @@ void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k = 0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks)); + k++; + } HIPCHECK(hipDeviceSynchronize()); } return testSuccess; @@ -110,4 +117,4 @@ testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t struct testEngine ncclTestEngine = { AllReduceGetBuffSize, AllReduceRunTest -}; \ No newline at end of file +}; diff --git a/src/alltoall.cu b/src/alltoall.cu index 4b8e66d5a2..48020e4fa3 100644 --- a/src/alltoall.cu +++ b/src/alltoall.cu @@ -31,18 +31,25 @@ void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { char* str = getenv("NCCL_TESTS_DEVICE"); int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - for (int j=0; jexpected[i])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + for (int j=0; jexpected[k])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j)); + } + k++; } HIPCHECK(hipDeviceSynchronize()); } diff --git a/src/alltoallv.cu b/src/alltoallv.cu index fb6d0acde8..7a39bcce7b 100644 --- a/src/alltoallv.cu +++ b/src/alltoallv.cu @@ -41,44 +41,51 @@ void AlltoAllvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par testResult_t AlltoAllvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { char* str = getenv("NCCL_TESTS_DEVICE"); int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); #if 0 - int *dataHost = (int *)malloc(args->sendBytes); - hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost); - printf(" Rank [%d] Original: ", rank); - for(int j=0; jsendBytes); + hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost); + printf(" Rank [%d] Original: ", rank); + for(int j=0; jexpected[i])+rdisp*wordSize(type), rcount, type, rep+sdisp, j)); - rdisp += rcount; + printf("\n"); + free(dataHost); +#endif + size_t rdisp = 0; + size_t data_count = sendcount*2/nranks; + size_t chunksize = data_count/nranks; + for (int j=0; jexpected[k])+rdisp*wordSize(type), rcount, type, rep+sdisp, j)); + rdisp += rcount; + } + k++; } HIPCHECK(hipDeviceSynchronize()); } diff --git a/src/broadcast.cu b/src/broadcast.cu index 61f0a9952a..dffb6b6256 100644 --- a/src/broadcast.cu +++ b/src/broadcast.cu @@ -32,14 +32,21 @@ testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncc size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank)); - TESTCHECK(InitData(args->expected[i], recvcount, type, rep, root)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitData(args->expected[k], recvcount, type, rep, root)); + k++; + } HIPCHECK(hipDeviceSynchronize()); } return testSuccess; @@ -116,4 +123,4 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t struct testEngine ncclTestEngine = { BroadcastGetBuffSize, BroadcastRunTest -}; \ No newline at end of file +}; diff --git a/src/common.cu b/src/common.cu index 45225ff10a..c31cff308e 100644 --- a/src/common.cu +++ b/src/common.cu @@ -78,6 +78,9 @@ static uint32_t cumask[4]; static int cudaGraphLaunches = 0; // Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX) static int average = 1; +static int numDevices = 1; +static int ranksPerGpu = 1; +static int enable_multiranks = 0; #define NUM_BLOCKS 32 @@ -117,6 +120,38 @@ static double parsesize(const char *value) { return size * units; } +static bool minReqVersion(int rmajor, int rminor, int rpatch) +{ + int version; + int major, minor, patch, rem; + ncclGetVersion(&version); + + if (version < 10000) { + major = version/1000; + rem = version%1000; + minor = rem/100; + patch = rem%100; + } + else { + major = version/10000; + rem = version%10000; + minor = rem/100; + patch = rem%100; + } + + if (major < rmajor) return false; + else if (major > rmajor) return true; + + // major == rmajor + if (minor < rminor) return false; + else if (minor > rminor) return true; + + // major == rmajor && minor == rminor + if (patch < rpatch) return false; + + return true; +} + double DeltaMaxValue(ncclDataType_t type) { switch(type) { case ncclHalf: return 1e-2; @@ -437,9 +472,9 @@ void Allreduce(struct threadArgs* args, double* value, int average) { testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, double *delta) { size_t count = args->expectedBytes/wordSize(type); double maxDelta = 0.0; - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { int device; - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i); NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); HIPCHECK(hipSetDevice(device)); void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i]; @@ -474,20 +509,20 @@ testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t //} #endif } - double nranks = args->nProcs*args->nThreads*args->nGpus; + double nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; if (args->reportErrors && maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++; *delta = maxDelta; return testSuccess; } -testResult_t testStreamSynchronize(int ngpus, hipStream_t* streams, ncclComm_t* comms) { +testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_t* comms) { hipError_t hipErr; - int remaining = ngpus; - int* done = (int*)malloc(sizeof(int)*ngpus); - memset(done, 0, sizeof(int)*ngpus); + int remaining = nStreams; + int* done = (int*)malloc(sizeof(int)*nStreams); + memset(done, 0, sizeof(int)*nStreams); while (remaining) { int idle = 1; - for (int i=0; imaxbytes / totalnbytes : 1; size_t shift = totalnbytes * (iter % steps); - if (args->nGpus > 1) NCCLCHECK(ncclGroupStart()); - for (int i = 0; i < args->nGpus; i++) { + if (args->nGpus> 1 || args->nRanks > 1) NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus*args->nRanks; i++) { #ifndef NCCL_MAJOR int hipDev; NCCLCHECK(ncclCommCuDevice(args->comms[i], &hipDev)); HIPCHECK(hipSetDevice(hipDev)); #endif - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i); char* recvBuff = ((char*)args->recvbuffs[i]) + shift; char* sendBuff = ((char*)args->sendbuffs[i]) + shift; ncclRedOp_t op; @@ -585,11 +620,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } #endif } - if (args->nGpus > 1) NCCLCHECK(ncclGroupEnd()); + if (args->nGpus > 1 || args->nRanks > 1) NCCLCHECK(ncclGroupEnd()); if (blocking_coll) { // Complete op before returning - TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms)); + TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms)); } if (blocking_coll) Barrier(args); return testSuccess; @@ -598,10 +633,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t testResult_t completeColl(struct threadArgs* args) { if (blocking_coll) return testSuccess; - TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms)); + TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms)); return testSuccess; } +//EDGAR: Revisit because of cudaGraphLaunches testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { size_t count = args->nbytes / wordSize(type); if (datacheck) { @@ -616,11 +652,11 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t Barrier(args); #if CUDART_VERSION >= 11030 - hipGraph_t graphs[args->nGpus]; - hipGraphExec_t graphExec[args->nGpus]; + hipGraph_t graphs[args->nGpus*args->nRanks]; + hipGraphExec_t graphExec[args->nGpus*args->nRanks]; if (cudaGraphLaunches >= 1) { // Begin cuda graph capture - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { // Thread local mode is needed for: // - Multi-thread mode // - P2P pre-connect @@ -642,18 +678,18 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t #if CUDART_VERSION >= 11030 if (cudaGraphLaunches >= 1) { // End cuda graph capture - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i)); } // Instantiate cuda graph - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0)); } // Resync CPU, restart timing, launch cuda graph Barrier(args); start = std::chrono::high_resolution_clock::now(); for (int l=0; lnGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i])); } } @@ -671,7 +707,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t #if CUDART_VERSION >= 11030 if (cudaGraphLaunches >= 1) { //destroy cuda graph - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipGraphExecDestroy(graphExec[i])); HIPCHECK(hipGraphDestroy(graphs[i])); } @@ -679,7 +715,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t #endif double algBw, busBw; - args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus); + args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus*args->nRanks); Barrier(args); @@ -694,7 +730,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t #if CUDART_VERSION >= 11030 if (cudaGraphLaunches >= 1) { // Begin cuda graph capture for data check - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(chiptreamBeginCapture(args->streams[i], args->nThreads > 1 ? hipStreamCaptureModeThreadLocal : hipStreamCaptureModeGlobal)); } } @@ -706,15 +742,15 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t #if CUDART_VERSION >= 11030 if (cudaGraphLaunches >= 1) { // End cuda graph capture - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i)); } // Instantiate cuda graph - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0)); } // Launch cuda graph - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i])); } } @@ -725,7 +761,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t #if CUDART_VERSION >= 11030 if (cudaGraphLaunches >= 1) { //destroy cuda graph - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { HIPCHECK(hipGraphExecDestroy(graphExec[i])); HIPCHECK(hipGraphDestroy(graphs[i])); } @@ -759,7 +795,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) { - int nranks = args->nProcs*args->nGpus*args->nThreads; + int nranks = args->nProcs*args->nGpus*args->nThreads*args->nRanks; size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset; count = size / wordSize(type); @@ -806,6 +842,8 @@ testResult_t threadRunTests(struct threadArgs* args) { // will be done on the current GPU (by default : 0) and if the GPUs are in // exclusive mode those operations will fail. int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus; + if (enable_multiranks) + gpuid = gpuid % numDevices; HIPCHECK(hipSetDevice(gpuid)); TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop])); return testSuccess; @@ -814,23 +852,33 @@ testResult_t threadRunTests(struct threadArgs* args) { testResult_t threadInit(struct threadArgs* args) { char hostname[1024]; getHostName(hostname, 1024); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; //set main thread again is_main_thread = (args->proc == 0 && args->thread == 0) ? 1 : 0; NCCLCHECK(ncclGroupStart()); for (int i=0; inGpus; i++) { - int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i; int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (enable_multiranks) + gpuid = gpuid % numDevices; HIPCHECK(hipSetDevice(gpuid)); - NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank)); + + for (int j=0; jnRanks; j++) { + int rank = (args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + j; + if (args->enable_multiranks) + NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank)); +#ifdef RCCL_MULTIRANKPERGPU + else + NCCLCHECK(ncclCommInitRankMulti(args->comms+i*args->nRanks+j, nranks, args->ncclId, rank, rank)); +#endif + } } NCCLCHECK(ncclGroupEnd()); TESTCHECK(threadRunTests(args)); - for (int i=0; inGpus; i++) { + for (int i=0; inGpus*args->nRanks; i++) { NCCLCHECK(ncclCommDestroy(args->comms[i])); } return testSuccess; @@ -925,13 +973,21 @@ int main(int argc, char* argv[]) { {"cumask", required_argument, 0, 'u'}, {"cudagraph", required_argument, 0, 'G'}, {"average", required_argument, 0, 'a'}, +#ifdef RCCL_MULTIRANKPERGPU + {"enable_multiranks", required_argument, 0, 'x'}, + {"ranks_per_gpu", required_argument, 0, 'R'}, +#endif {"help", no_argument, 0, 'h'}, {} }; while(1) { int c; +#ifdef RCCL_MULTIRANKPERGPU + c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:R:x:", longopts, &longindex); +#else c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:", longopts, &longindex); +#endif if (c == -1) break; @@ -1022,6 +1078,14 @@ int main(int argc, char* argv[]) { case 'a': average = (int)strtol(optarg, NULL, 0); break; +#ifdef RCCL_MULTIRANKPERGPU + case 'x': + enable_multiranks = (int)strtol(optarg, NULL, 0); + break; + case 'R': + ranksPerGpu = (int)strtol(optarg, NULL, 0); + break; +#endif case 'h': default: if (c != 'h') printf("invalid option '%c'\n", c); @@ -1052,26 +1116,43 @@ int main(int argc, char* argv[]) { "[-u,--cumask ] \n\t" "[-G,--cudagraph ] \n\t" "[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t" +#ifdef RCCL_MULTIRANKPERGPU + "[-x,--enable_multiranks <0/1> enable using multiple ranks per GPU] \n\t" + "[-R,--ranks_per_gpu] \n\t" +#endif "[-h,--help]\n", basename(argv[0])); return 0; } } - int numDevices; HIPCHECK(hipGetDeviceCount(&numDevices)); if (nGpus > numDevices) { fprintf(stderr, "[ERROR] The number of requested GPUs (%d) is greater than the number of GPUs available (%d)\n", nGpus, numDevices); return testNcclError; } - if (minBytes > maxBytes) { fprintf(stderr, "invalid sizes for 'minbytes' and 'maxbytes': %llu > %llu\n", (unsigned long long)minBytes, (unsigned long long)maxBytes); return -1; } + if (!minReqVersion(2, 12, 12) && enable_multiranks) { + fprintf(stderr, "Multiple Ranks per GPU requested, but rccl library found does not support this feature.\n"); + fprintf(stderr, "Please check LD_LIBRARY_PATH. Resetting enable_multiranks and ranksPerGpu to default values.\n"); + enable_multiranks = 0; + ranksPerGpu = 1; + } + + if (enable_multiranks && parallel_init) { + fprintf(stderr, "Cannot use parallel_init when using multiple ranks per GPU.\n"); + return -1; + } + if (ranksPerGpu > 1 && !enable_multiranks) { + fprintf(stderr, "Need to enable multiranks option to use multiple ranks per GPU\n"); + return -1; + } #ifdef MPI_SUPPORT MPI_Init(&argc, &argv); #endif @@ -1098,7 +1179,7 @@ testResult_t run() { #endif is_main_thread = (proc == 0) ? 1 : 0; - PRINT("# nThread: %d nGpus: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d validation: %d \n", nThreads, nGpus, minBytes, maxBytes, + PRINT("# nThreads: %d nGpus: %d nRanks: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d validation: %d \n", nThreads, nGpus, ranksPerGpu, minBytes, maxBytes, (stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck); if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n"); if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n"); @@ -1111,18 +1192,20 @@ testResult_t run() { size_t maxMem = ~0; for (int i=0; ilen ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n", - rank, getpid(), hostname, hipDev, busIdStr, prop.name); - len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [0x%02x] %s\n", - rank, getpid(), hostname, hipDev, prop.pciBusID, prop.name); - maxMem = std::min(maxMem, prop.totalGlobalMem); - } + for (int j=0; jlen ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n", + rank, getpid(), hostname, hipDev, busIdStr, prop.name); + maxMem = std::min(maxMem, prop.totalGlobalMem); + } + } #if MPI_SUPPORT char *lines = (proc == 0) ? (char *)malloc(nProcs*MAX_LINE) : NULL; // Gather all output in rank order to root (0) @@ -1152,42 +1235,61 @@ testResult_t run() { MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD); #endif - hipStream_t streams[nGpus*nThreads]; - void* sendbuffs[nGpus*nThreads]; - void* recvbuffs[nGpus*nThreads]; - void* expected[nGpus*nThreads]; + hipStream_t streams[nGpus*nThreads*ranksPerGpu]; + void* sendbuffs[nGpus*nThreads*ranksPerGpu]; + void* recvbuffs[nGpus*nThreads*ranksPerGpu]; + void* expected[nGpus*nThreads*ranksPerGpu]; size_t sendBytes, recvBytes; - ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads); + ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads*ranksPerGpu); - for (int i=0; isendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault)); - if (rank == root) { - for (int j=0; jexpected[i])+args->sendBytes*j, sendcount, type, rep, j)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault)); + if (rank == root) { + for (int j=0; jexpected[k])+args->sendBytes*j, sendcount, type, rep, j)); + } } + k++; } HIPCHECK(hipDeviceSynchronize()); } diff --git a/src/hypercube.cu b/src/hypercube.cu index 946c9c670b..d654617ccd 100644 --- a/src/hypercube.cu +++ b/src/hypercube.cu @@ -33,17 +33,24 @@ void HyperCubeGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par testResult_t HyperCubeInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - for (int j=0; jexpected[i])+args->sendBytes*j, sendcount, type, rep, j)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + for (int j=0; jexpected[k])+args->sendBytes*j, sendcount, type, rep, j)); + } + k++; } HIPCHECK(hipDeviceSynchronize()); } @@ -66,7 +73,6 @@ testResult_t HyperCubeRunColl(void* sendbuff, void* recvbuff, size_t count, nccl int rank; NCCLCHECK(ncclCommUserRank(comm, &rank)); size_t rankSize = count * wordSize(type); - if (rbuff+rank*rankSize != sbuff) HIPCHECK(hipMemcpyAsync(rbuff+rank*rankSize, sbuff, rankSize, hipMemcpyDeviceToDevice, stream)); // Hypercube AllGather diff --git a/src/reduce.cu b/src/reduce.cu index d0792a49f9..7ea7b0f726 100644 --- a/src/reduce.cu +++ b/src/reduce.cu @@ -31,17 +31,24 @@ void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault)); - if (rank == root) TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault)); + if (rank == root) TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks)); + k++; + } HIPCHECK(hipDeviceSynchronize()); } return testSuccess; @@ -119,4 +126,4 @@ testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t typ struct testEngine ncclTestEngine = { ReduceGetBuffSize, ReduceRunTest -}; \ No newline at end of file +}; diff --git a/src/reduce_scatter.cu b/src/reduce_scatter.cu index bf5cbede8d..23b99de35b 100644 --- a/src/reduce_scatter.cu +++ b/src/reduce_scatter.cu @@ -31,17 +31,24 @@ void ReduceScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault)); - TESTCHECK(InitDataReduce(args->expected[i], recvcount, rank*recvcount, type, op, rep, nranks)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault)); + TESTCHECK(InitDataReduce(args->expected[k], recvcount, rank*recvcount, type, op, rep, nranks)); + k++; + } HIPCHECK(hipDeviceSynchronize()); } return testSuccess; @@ -111,4 +118,4 @@ testResult_t ReduceScatterRunTest(struct threadArgs* args, int root, ncclDataTyp struct testEngine ncclTestEngine = { ReduceScatterGetBuffSize, ReduceScatterRunTest -}; \ No newline at end of file +}; diff --git a/src/scatter.cu b/src/scatter.cu index 884ec96a46..ec8c06b092 100644 --- a/src/scatter.cu +++ b/src/scatter.cu @@ -32,14 +32,22 @@ testResult_t ScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclR size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank)); - TESTCHECK(InitData(args->expected[i], recvcount, type, rep+rank*recvcount, root)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitData(args->expected[k], recvcount, type, rep+rank*recvcount, root)); + k++; + + } HIPCHECK(hipDeviceSynchronize()); } return testSuccess; diff --git a/src/sendrecv.cu b/src/sendrecv.cu index 6ded375678..84d7398e42 100644 --- a/src/sendrecv.cu +++ b/src/sendrecv.cu @@ -31,17 +31,24 @@ void SendRecvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para testResult_t SendRecvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); size_t recvcount = args->expectedBytes / wordSize(type); - int nranks = args->nProcs*args->nThreads*args->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; + int k=0; for (int i=0; inGpus; i++) { int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + if (args->enable_multiranks) + gpuid = gpuid % args->localNumDevices; HIPCHECK(hipSetDevice(gpuid)); - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); - int peer = (rank-1+nranks)%nranks; - TESTCHECK(InitData(args->expected[i], recvcount, type, rep, peer)); + + for (int l=0; lnRanks; l++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); + HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + int peer = (rank-1+nranks)%nranks; + TESTCHECK(InitData(args->expected[k], recvcount, type, rep, peer)); + k++; + } HIPCHECK(hipDeviceSynchronize()); } // We don't support in-place sendrecv