Merge remote-tracking branch 'nccl-tests/master' into HEAD

[ROCm/rccl-tests commit: 621dde544d]
Этот коммит содержится в:
Wenkai Du
2024-03-01 17:39:14 +00:00
родитель ff97af6529 5d52f0285c
Коммит b49f6da1ec
14 изменённых файлов: 184 добавлений и 335 удалений
+3 -3
Просмотреть файл
@@ -81,9 +81,9 @@ All tests support the same set of arguments :
* `-m,--agg_iters <aggregation count>` number of operations to aggregate together in each iteration. Default : 1.
* `-a,--average <0/1/2/3>` Report performance as an average across all ranks (MPI=1 only). <0=Rank0,1=Avg,2=Min,3=Max>. Default : 1.
* Test operation
* `-p,--parallel_init <0/1>` use threads to initialize RCCL in parallel. Default : 0.
* `-c,--check <0/1>` check correctness of results. This can be quite slow on large numbers of GPUs. Default : 1.
* `-z,--blocking <0/1>` Make RCCL collective blocking, i.e. have CPUs wait and sync after each collective. Default : 0.
* `-p,--parallel_init <0/1>` use threads to initialize NCCL in parallel. Default : 0.
* `-c,--check <check iteration count>` perform count iterations, checking correctness of results on each iteration. This can be quite slow on large numbers of GPUs. Default : 1.
* `-z,--blocking <0/1>` Make NCCL collective blocking, i.e. have CPUs wait and sync after each collective. Default : 0.
* `-G,--cudagraph <num graph launches>` Capture iterations as a CUDA graph and then replay specified number of times. Default : 0.
## Unit tests
+7 -12
Просмотреть файл
@@ -22,21 +22,16 @@ void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
k++;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData((char*)args->expected[i] + args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
HIPCHECK(hipDeviceSynchronize());
}
+6 -11
Просмотреть файл
@@ -19,20 +19,15 @@ void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k = 0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
k++;
}
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
+8 -13
Просмотреть файл
@@ -19,22 +19,17 @@ void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para
testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
size_t partcount = sendcount/nranks;
TESTCHECK(InitData(((char*)args->expected[k])+ j*partcount*wordSize(type), partcount, rank*partcount, type, ncclSum, 33*rep + j, 1, 0));
}
k++;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
size_t partcount = sendcount/nranks;
TESTCHECK(InitData((char*)args->expected[i] + j*partcount*wordSize(type), partcount, rank*partcount, type, ncclSum, 33*rep + j, 1, 0));
}
HIPCHECK(hipDeviceSynchronize());
}
+30 -35
Просмотреть файл
@@ -29,49 +29,44 @@ void AlltoAllvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t AlltoAllvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep+rank, 1, 0));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep+rank, 1, 0));
#if 0
int *dataHost = (int *)malloc(args->sendBytes);
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
printf(" Rank [%d] Original: ", rank);
for(int j=0; j<sendcount; j++) {
printf("%d:%d ", j, dataHost[j]);
}
printf("\n");
free(dataHost);
int *dataHost = (int *)malloc(args->sendBytes);
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
printf(" Rank [%d] Original: ", rank);
for(int j=0; j<sendcount; j++) {
printf("%d:%d ", j, dataHost[j]);
}
printf("\n");
free(dataHost);
#endif
size_t rdisp = 0;
size_t data_count = sendcount*2/nranks;
size_t chunksize = data_count/nranks;
for (int j=0; j<nranks; j++) {
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
if ((j+rank)%nranks == 0)
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
size_t sdisp = 0;
for (int kk=0; kk<nranks; kk++) {
scount = ((kk+j)%nranks)*chunksize;
if ((kk+j)%nranks == 0)
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
if (kk == rank)
break;
sdisp += scount;
}
TESTCHECK(InitData(((char*)args->expected[k])+rdisp*wordSize(type), rcount, sdisp, type, ncclSum, 33*rep+j, 1, 0));
rdisp += rcount;
size_t rdisp = 0;
size_t data_count = sendcount*2/nranks;
size_t chunksize = data_count/nranks;
for (int j=0; j<nranks; j++) {
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
if ((j+rank)%nranks == 0)
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
size_t sdisp = 0;
for (int k=0; k<nranks; k++) {
scount = ((k+j)%nranks)*chunksize;
if ((k+j)%nranks == 0)
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
if (k == rank)
break;
sdisp += scount;
}
k++;
TESTCHECK(InitData(((char*)args->expected[i])+rdisp*wordSize(type), rcount, sdisp, type, ncclSum, 33*rep+j, 1, 0));
rdisp += rcount;
}
HIPCHECK(hipDeviceSynchronize());
}
+5 -10
Просмотреть файл
@@ -20,18 +20,13 @@ testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncc
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[k], recvcount, 0, type, ncclSum, rep, 1, 0));
k++;
}
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[i], recvcount, 0, type, ncclSum, rep, 1, 0));
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
+84 -174
Просмотреть файл
@@ -91,8 +91,6 @@ static int report_cputime = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
static int average = 1;
static int numDevices = 1;
static int ranksPerGpu = 1;
static int enable_multiranks = 0;
static int delay_inout_place = 0;
static int enable_out_of_place = 1;
@@ -275,11 +273,9 @@ testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
int64_t *wrongPerGpu = nullptr;
HIPCHECK(hipHostMalloc((void**)&wrongPerGpu, args->nGpus*sizeof(int64_t), hipHostMallocMapped));
for (int i=0; i<args->nGpus*args->nRanks; i++) {
int device;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
NCCLCHECK(ncclCommCuDevice(args->comms[i], &device));
HIPCHECK(hipSetDevice(device));
for (int i=0; i<args->nGpus; i++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipSetDevice(args->gpus[i]));
void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i];
TESTCHECK(CheckDelta(data, args->expected[i], count, 0, type, op, 0, nranks, wrongPerGpu+i));
@@ -317,16 +313,16 @@ testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
return testSuccess;
}
testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_t* comms) {
testResult_t testStreamSynchronize(int ngpus, hipStream_t* streams, ncclComm_t* comms) {
hipError_t hipErr;
int remaining = nStreams;
int* done = (int*)malloc(sizeof(int)*nStreams);
memset(done, 0, sizeof(int)*nStreams);
int remaining = ngpus;
int* done = (int*)malloc(sizeof(int)*ngpus);
memset(done, 0, sizeof(int)*ngpus);
timer tim;
while (remaining) {
int idle = 1;
for (int i=0; i<nStreams; i++) {
for (int i=0; i<ngpus; i++) {
if (done[i]) continue;
hipErr = hipStreamQuery(streams[i]);
@@ -346,7 +342,7 @@ testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_
if (ncclAsyncErr != ncclSuccess) {
// An asynchronous error happened. Stop the operation and destroy
// the communicator
for (int i=0; i<nStreams; i++)
for (int i=0; i<ngpus; i++)
NCCLCHECK(ncclCommAbort(comms[i]));
// Abort the perf test
NCCLCHECK(ncclAsyncErr);
@@ -354,7 +350,7 @@ testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_
}
double delta = tim.elapsed();
if (delta > timeout && timeout > 0) {
for (int i=0; i<nStreams; i++)
for (int i=0; i<ngpus; i++)
NCCLCHECK(ncclCommAbort(comms[i]));
char hostname[1024];
getHostName(hostname, 1024);
@@ -383,14 +379,12 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
size_t shift = totalnbytes * (iter % steps);
if (args->nGpus> 1 || args->nRanks > 1) NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus*args->nRanks; i++) {
if (args->nGpus > 1) NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus; i++) {
#ifndef NCCL_MAJOR
int hipDev;
NCCLCHECK(ncclCommCuDevice(args->comms[i], &hipDev));
HIPCHECK(hipSetDevice(hipDev));
HIPCHECK(hipSetDevice(args->gpus[i]));
#endif
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
char* recvBuff = ((char*)args->recvbuffs[i]) + shift;
char* sendBuff = ((char*)args->sendbuffs[i]) + shift;
ncclRedOp_t op;
@@ -436,11 +430,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
#endif
}
if (args->nGpus > 1 || args->nRanks > 1) NCCLCHECK(ncclGroupEnd());
if (args->nGpus > 1) NCCLCHECK(ncclGroupEnd());
if (blocking_coll) {
// Complete op before returning
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms));
}
if (blocking_coll) Barrier(args);
return testSuccess;
@@ -449,11 +443,10 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
testResult_t completeColl(struct threadArgs* args) {
if (blocking_coll) return testSuccess;
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms));
return testSuccess;
}
//RCCL: Revisit because of cudaGraphLaunches
testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) {
size_t count = args->nbytes / wordSize(type);
if (datacheck) {
@@ -470,15 +463,15 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
Barrier(args);
#if HIP_VERSION >= 50221310
hipGraph_t graphs[args->nGpus*args->nRanks];
hipGraphExec_t graphExec[args->nGpus*args->nRanks];
hipGraph_t graphs[args->nGpus];
hipGraphExec_t graphExec[args->nGpus];
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
// Thread local mdoe is needed for:
// - Multi-thread mode: where graph capture and instantiation can happen concurrently across threads
// - P2P pre-connect: when there is no warm-up, P2P pre-connect is done during graph capture.
// Since pre-connect calls cudaMalloc, we cannot use global capture mode
// Since pre-connect calls hipMalloc, we cannot use global capture mode
HIPCHECK(hipStreamBeginCapture(args->streams[i], hipStreamCaptureModeThreadLocal));
}
}
@@ -497,18 +490,18 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
// End cuda graph capture
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
}
// Instantiate cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
}
// Resync CPU, restart timing, launch cuda graph
Barrier(args);
tim.reset();
for (int l=0; l<cudaGraphLaunches; l++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
}
}
@@ -526,7 +519,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
HIPCHECK(hipGraphDestroy(graphs[i]));
}
@@ -534,21 +527,21 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#endif
double algBw, busBw;
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus*args->nRanks);
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus);
Barrier(args);
int64_t wrongElts = 0;
static __thread int rep = 0;
rep++;
if (datacheck) {
for (int c = 0; c < datacheck; c++) {
// Initialize sendbuffs, recvbuffs and expected
TESTCHECK(args->collTest->initData(args, type, op, root, rep, in_place));
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture for data check
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipStreamBeginCapture(args->streams[i], args->nThreads > 1 ? hipStreamCaptureModeThreadLocal : hipStreamCaptureModeGlobal));
}
}
@@ -560,15 +553,15 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
// End cuda graph capture
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
}
// Instantiate cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
}
// Launch cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
}
}
@@ -579,7 +572,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
HIPCHECK(hipGraphDestroy(graphs[i]));
}
@@ -590,8 +583,10 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
//aggregate delta from all threads and procs
long long wrongElts1 = wrongElts;
//if (wrongElts) fprintf(stderr, "\nERROR: Data corruption : rank %d size %ld wrongElts %ld\n", args->proc, args->expectedBytes, wrongElts);
Allreduce(args, &wrongElts1, /*sum*/4);
wrongElts = wrongElts1;
if (wrongElts) break;
}
double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6;
@@ -615,7 +610,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) {
int nranks = args->nProcs*args->nGpus*args->nThreads*args->nRanks;
int nranks = args->nProcs*args->nGpus*args->nThreads;
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
count = size / wordSize(type);
@@ -677,30 +672,22 @@ testResult_t threadRunTests(struct threadArgs* args) {
testResult_t threadInit(struct threadArgs* args) {
char hostname[1024];
getHostName(hostname, 1024);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
//set main thread again
is_main_thread = (is_main_proc && args->thread == 0) ? 1 : 0;
NCCLCHECK(ncclGroupStart());
for (int i=0; i<args->nGpus; i++) {
int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int j=0; j<args->nRanks; j++) {
int rank = (args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + j;
if (args->enable_multiranks)
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
#ifdef RCCL_MULTIRANKPERGPU
else
NCCLCHECK(ncclCommInitRankMulti(args->comms+i*args->nRanks+j, nranks, args->ncclId, rank, rank));
#endif
}
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
}
NCCLCHECK(ncclGroupEnd());
TESTCHECK(threadRunTests(args));
for (int i=0; i<args->nGpus*args->nRanks; i++) {
for (int i=0; i<args->nGpus; i++) {
NCCLCHECK(ncclCommDestroy(args->comms[i]));
}
return testSuccess;
@@ -799,10 +786,6 @@ int main(int argc, char* argv[]) {
{"report_cputime", required_argument, 0, 'C'},
{"average", required_argument, 0, 'a'},
{"out_of_place", required_argument, 0, 'O'},
#ifdef RCCL_MULTIRANKPERGPU
{"enable_multiranks", required_argument, 0, 'x'},
{"ranks_per_gpu", required_argument, 0, 'R'},
#endif
{"help", no_argument, 0, 'h'},
{}
};
@@ -810,11 +793,7 @@ int main(int argc, char* argv[]) {
while(1) {
int c;
#ifdef RCCL_MULTIRANKPERGPU
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:Y:T:G:C:O:a:y:s:u:h:R:x:q:", longopts, &longindex);
#else
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:Y:T:G:C:O:a:y:s:u:h:q:", longopts, &longindex);
#endif
if (c == -1)
break;
@@ -917,14 +896,6 @@ int main(int argc, char* argv[]) {
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
#ifdef RCCL_MULTIRANKPERGPU
case 'x':
enable_multiranks = (int)strtol(optarg, NULL, 0);
break;
case 'R':
ranksPerGpu = (int)strtol(optarg, NULL, 0);
break;
#endif
case 'q':
delay_inout_place = (int)strtol(optarg, NULL, 10);
break;
@@ -942,7 +913,7 @@ int main(int argc, char* argv[]) {
"[-m,--agg_iters <aggregated iteration count>] \n\t"
"[-w,--warmup_iters <warmup iteration count>] \n\t"
"[-p,--parallel_init <0/1>] \n\t"
"[-c,--check <0/1>] \n\t"
"[-c,--check <check iteration count>] \n\t"
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
"[-o,--op <sum/prod/min/max/avg/mulsum/all>] \n\t"
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0)
@@ -962,10 +933,6 @@ int main(int argc, char* argv[]) {
"[-C,--report_cputime <0/1>] \n\t"
"[-O,--out_of_place <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
#ifdef RCCL_MULTIRANKPERGPU
"[-x,--enable_multiranks <0/1> enable using multiple ranks per GPU] \n\t"
"[-R,--ranks_per_gpu] \n\t"
#endif
"[-q,--delay <delay between out-of-place and in-place in microseconds>] \n\t"
"[-h,--help]\n",
basename(argv[0]));
@@ -987,21 +954,6 @@ int main(int argc, char* argv[]) {
(unsigned long long)maxBytes);
return -1;
}
if (!minReqVersion(2, 12, 12) && enable_multiranks) {
fprintf(stderr, "Multiple Ranks per GPU requested, but rccl library found does not support this feature.\n");
fprintf(stderr, "Please check LD_LIBRARY_PATH. Resetting enable_multiranks and ranksPerGpu to default values.\n");
enable_multiranks = 0;
ranksPerGpu = 1;
}
if (enable_multiranks && parallel_init) {
fprintf(stderr, "Cannot use parallel_init when using multiple ranks per GPU.\n");
return -1;
}
if (ranksPerGpu > 1 && !enable_multiranks) {
fprintf(stderr, "Need to enable multiranks option to use multiple ranks per GPU\n");
return -1;
}
#ifdef MPI_SUPPORT
MPI_Init(&argc, &argv);
#endif
@@ -1046,10 +998,10 @@ testResult_t run() {
#endif
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
PRINT("# nThreads: %d nGpus: %d nRanks: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
nThreads, nGpus, ranksPerGpu, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
nThreads, nGpus, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
PRINT("#\n");
@@ -1062,20 +1014,15 @@ testResult_t run() {
char* envstr = getenv("NCCL_TESTS_DEVICE");
int gpu0 = envstr ? atoi(envstr) : -1;
for (int i=0; i<nThreads*nGpus; i++) {
int hipDev = localRank*nThreads*nGpus+i;
if (enable_multiranks)
hipDev = hipDev % numDevices;
int cudaDev = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
int rank = proc*nThreads*nGpus+i;
hipDeviceProp_t prop;
HIPCHECK(hipGetDeviceProperties(&prop, hipDev));
for (int j=0; j<ranksPerGpu; j++) {
int rank = proc*nThreads*nGpus*ranksPerGpu+i*ranksPerGpu + j;
char busIdStr[] = "00000000:00:00.0";
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), hipDev));
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
rank, getpid(), hostname, hipDev, busIdStr, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
}
HIPCHECK(hipGetDeviceProperties(&prop, cudaDev));
char busIdStr[] = "00000000:00:00.0";
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), cudaDev));
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
rank, getpid(), hostname, cudaDev, busIdStr, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
}
#if MPI_SUPPORT
char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
@@ -1104,64 +1051,40 @@ testResult_t run() {
}
#ifdef MPI_SUPPORT
MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, mpi_comm);
MPI_Barrier(MPI_COMM_WORLD); // Ensure Bcast is complete for HCOLL
#endif
int gpus[nGpus*nThreads];
hipStream_t streams[nGpus*nThreads*ranksPerGpu];
void* sendbuffs[nGpus*nThreads*ranksPerGpu];
void* recvbuffs[nGpus*nThreads*ranksPerGpu];
void* expected[nGpus*nThreads*ranksPerGpu];
hipStream_t streams[nGpus*nThreads];
void* sendbuffs[nGpus*nThreads];
void* recvbuffs[nGpus*nThreads];
void* expected[nGpus*nThreads];
size_t sendBytes, recvBytes;
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)ncclProcs*nGpus*nThreads*ranksPerGpu);
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)ncclProcs*nGpus*nThreads);
envstr = getenv("NCCL_TESTS_DEVICE");
gpu0 = envstr ? atoi(envstr) : -1;
for (int ii=0; ii<nGpus*nThreads; ii++) {
int gpuid = localRank*nThreads*nGpus+ii;
if (enable_multiranks)
gpuid = gpuid % numDevices;
gpus[ii] = gpu0 != -1 ? gpu0+ii : gpuid;
HIPCHECK(hipSetDevice(gpus[ii]));
for (int j=0; j<ranksPerGpu; j++) {
int i = ii*ranksPerGpu+j;
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
if (streamnull)
streams[i] = NULL;
else {
if (cumask[0] || cumask[1] || cumask[2] || cumask[3]) {
PRINT("cumask: ");
for (int i = 0; i < 4 ; i++) PRINT("%x,", cumask[i]);
PRINT("\n");
HIPCHECK(hipExtStreamCreateWithCUMask(streams+i, 4, cumask));
} else
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
}
}
for (int i=0; i<nGpus*nThreads; i++) {
gpus[i] = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
HIPCHECK(hipSetDevice(gpus[i]));
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
if (streamnull)
streams[i] = NULL;
else
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
}
//if parallel init is not selected, use main thread to initialize NCCL
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus*ranksPerGpu);
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus);
if (!parallel_init) {
if (ncclProcs == 1 && !enable_multiranks) {
if (ncclProcs == 1) {
NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpus));
} else {
NCCLCHECK(ncclGroupStart());
for (int ii=0; ii<nGpus*nThreads; ii++) {
HIPCHECK(hipSetDevice(gpus[ii]));
if (!enable_multiranks) {
NCCLCHECK(ncclCommInitRank(comms+ii, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+ii));
}
#ifdef RCCL_MULTIRANKPERGPU
else
for (int j=0; j<ranksPerGpu; j++) {
int i = ii*ranksPerGpu+j;
NCCLCHECK(ncclCommInitRankMulti(comms+i, ncclProcs*nThreads*nGpus*ranksPerGpu, ncclId,
proc*nThreads*nGpus*ranksPerGpu+i, proc*nThreads*nGpus*ranksPerGpu+i));
}
#endif
for (int i=0; i<nGpus*nThreads; i++) {
HIPCHECK(hipSetDevice(gpus[i]));
NCCLCHECK(ncclCommInitRank(comms+i, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+i));
}
NCCLCHECK(ncclGroupEnd());
}
@@ -1206,21 +1129,18 @@ testResult_t run() {
threads[t].args.localRank = localRank;
threads[t].args.totalProcs = totalProcs;
threads[t].args.localNumDevices = numDevices;
threads[t].args.enable_multiranks = enable_multiranks;
threads[t].args.nRanks = ranksPerGpu;
threads[t].args.nProcs=ncclProcs;
threads[t].args.proc=ncclProc;
threads[t].args.nThreads=nThreads;
threads[t].args.thread=t;
threads[t].args.nGpus=nGpus;
threads[t].args.gpus=gpus+t*nGpus;
threads[t].args.sendbuffs = sendbuffs+t*nGpus*ranksPerGpu;
threads[t].args.recvbuffs = recvbuffs+t*nGpus*ranksPerGpu;
threads[t].args.expected = expected+t*nGpus*ranksPerGpu;
threads[t].args.sendbuffs = sendbuffs+t*nGpus;
threads[t].args.recvbuffs = recvbuffs+t*nGpus;
threads[t].args.expected = expected+t*nGpus;
threads[t].args.ncclId = ncclId;
threads[t].args.comms=comms+t*nGpus*ranksPerGpu;
threads[t].args.streams=streams+t*nGpus*ranksPerGpu;
threads[t].args.comms=comms+t*nGpus;
threads[t].args.streams=streams+t*nGpus;
threads[t].args.enable_out_of_place=enable_out_of_place;
threads[t].args.errors=errors+t;
threads[t].args.bw=bw+t;
@@ -1251,27 +1171,16 @@ testResult_t run() {
#endif
if (!parallel_init) {
for(int i=0; i<nGpus*nThreads*ranksPerGpu; ++i)
for(int i=0; i<nGpus*nThreads; ++i)
NCCLCHECK(ncclCommDestroy(comms[i]));
free(comms);
}
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
HIPCHECK(hipStreamDestroy(streams[i]));
}
// Free off HIP allocated memory
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
if (memorytype == ncclHost) {
HIPCHECK(hipHostFree(sendbuffs[i]));
HIPCHECK(hipHostFree(recvbuffs[i]));
if (datacheck) HIPCHECK(hipHostFree(expected[i]));
}
else {
HIPCHECK(hipFree(sendbuffs[i]));
HIPCHECK(hipFree(recvbuffs[i]));
if (datacheck) HIPCHECK(hipFree(expected[i]));
}
// Free off CUDA allocated memory
for (int i=0; i<nGpus*nThreads; i++) {
if (sendbuffs[i]) HIPCHECK(hipFree((char*)sendbuffs[i]));
if (recvbuffs[i]) HIPCHECK(hipFree((char*)recvbuffs[i]));
if (datacheck) HIPCHECK(hipFree(expected[i]));
}
HIPCHECK(hipHostFree(delta));
@@ -1284,6 +1193,7 @@ testResult_t run() {
PRINT("# Avg bus bandwidth : %g %s\n", bw[0], check_avg_bw == -1 ? "" : (bw[0] < check_avg_bw*(0.9) ? "FAILED" : "OK"));
PRINT("#\n");
#ifdef MPI_SUPPORT
MPI_Comm_free(&mpi_comm);
MPI_Finalize();
#endif
-3
Просмотреть файл
@@ -125,10 +125,7 @@ struct threadArgs {
int nGpus;
int* gpus;
int localRank;
int localNumDevices;
int enable_multiranks;
int enable_out_of_place;
int nRanks;
void** sendbuffs;
size_t sendBytes;
size_t sendInplaceOffset;
+8 -15
Просмотреть файл
@@ -19,24 +19,17 @@ void GatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc
testResult_t GatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
if (rank == root) {
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k]), nranks*sendcount, 0, type, ncclSum, rep, 1, 0));
}
}
k++;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
if (rank == root) {
TESTCHECK(InitData(args->expected[i], nranks*sendcount, 0, type, ncclSum, rep, 1, 0));
}
HIPCHECK(hipDeviceSynchronize());
}
+7 -12
Просмотреть файл
@@ -22,21 +22,16 @@ void HyperCubeGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t HyperCubeInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
k++;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData((char*)args->expected[i] + args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
HIPCHECK(hipDeviceSynchronize());
}
+7 -12
Просмотреть файл
@@ -19,21 +19,16 @@ void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc
testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
if (rank == root) TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
k++;
}
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
if (rank == root) TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
+7 -12
Просмотреть файл
@@ -22,21 +22,16 @@ void ReduceScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t
testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
TESTCHECK(InitDataReduce(args->expected[k], recvcount, rank*recvcount, type, op, rep, nranks));
k++;
}
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
TESTCHECK(InitDataReduce(args->expected[i], recvcount, rank*recvcount, type, op, rep, nranks));
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
+5 -11
Просмотреть файл
@@ -20,19 +20,13 @@ testResult_t ScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclR
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[k], recvcount, rank*recvcount, type, ncclSum, rep, 1, 0));
k++;
}
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[i], recvcount, rank*recvcount, type, ncclSum, rep, 1, 0));
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
+7 -12
Просмотреть файл
@@ -19,21 +19,16 @@ void SendRecvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para
testResult_t SendRecvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int nranks = args->nProcs*args->nThreads*args->nGpus;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
int peer = (rank-1+nranks)%nranks;
TESTCHECK(InitData(args->expected[k], recvcount, peer*recvcount, type, ncclSum, rep, 1, 0));
k++;
}
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
int peer = (rank-1+nranks)%nranks;
TESTCHECK(InitData(args->expected[i], recvcount, peer*recvcount, type, ncclSum, rep, 1, 0));
HIPCHECK(hipDeviceSynchronize());
}
// We don't support in-place sendrecv