implementation of multi-rank support in rccl-tests.

[ROCm/rccl-tests commit: 0500f2f132]
Этот коммит содержится в:
Edgar
2022-03-18 11:42:15 -04:00
родитель b199d173a2
Коммит dad6d819d0
13 изменённых файлов: 365 добавлений и 181 удалений
+15 -8
Просмотреть файл
@@ -31,17 +31,24 @@ void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, type, rep, j));
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
@@ -99,4 +106,4 @@ testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t
struct testEngine ncclTestEngine = {
AllGatherGetBuffSize,
AllGatherRunTest
};
};
+14 -7
Просмотреть файл
@@ -31,16 +31,23 @@ void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k = 0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
@@ -110,4 +117,4 @@ testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t
struct testEngine ncclTestEngine = {
AllReduceGetBuffSize,
AllReduceRunTest
};
};
+14 -7
Просмотреть файл
@@ -31,18 +31,25 @@ void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para
testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
char* str = getenv("NCCL_TESTS_DEVICE");
int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j));
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
+38 -31
Просмотреть файл
@@ -41,44 +41,51 @@ void AlltoAllvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t AlltoAllvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
char* str = getenv("NCCL_TESTS_DEVICE");
int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
#if 0
int *dataHost = (int *)malloc(args->sendBytes);
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
printf(" Rank [%d] Original: ", rank);
for(int j=0; j<sendcount; j++) {
printf("%d:%d ", j, dataHost[j]);
}
printf("\n");
free(dataHost);
#endif
size_t rdisp = 0;
size_t data_count = sendcount*2/nranks;
size_t chunksize = data_count/nranks;
for (int j=0; j<nranks; j++) {
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
if (j+rank == nranks-1)
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
size_t sdisp = 0;
for (int k=0; k<nranks; k++) {
scount = ((k+j)%nranks)*chunksize;
if (k+j == nranks-1)
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
if (k == rank)
break;
sdisp += scount;
int *dataHost = (int *)malloc(args->sendBytes);
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
printf(" Rank [%d] Original: ", rank);
for(int j=0; j<sendcount; j++) {
printf("%d:%d ", j, dataHost[j]);
}
TESTCHECK(InitData(((char*)args->expected[i])+rdisp*wordSize(type), rcount, type, rep+sdisp, j));
rdisp += rcount;
printf("\n");
free(dataHost);
#endif
size_t rdisp = 0;
size_t data_count = sendcount*2/nranks;
size_t chunksize = data_count/nranks;
for (int j=0; j<nranks; j++) {
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
if (j+rank == nranks-1)
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
size_t sdisp = 0;
for (int k=0; k<nranks; k++) {
scount = ((k+j)%nranks)*chunksize;
if (k+j == nranks-1)
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
if (k == rank)
break;
sdisp += scount;
}
TESTCHECK(InitData(((char*)args->expected[k])+rdisp*wordSize(type), rcount, type, rep+sdisp, j));
rdisp += rcount;
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
+13 -6
Просмотреть файл
@@ -32,14 +32,21 @@ testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncc
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, root));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(args->expected[k], recvcount, type, rep, root));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
@@ -116,4 +123,4 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t
struct testEngine ncclTestEngine = {
BroadcastGetBuffSize,
BroadcastRunTest
};
};
+181 -77
Просмотреть файл
@@ -78,6 +78,9 @@ static uint32_t cumask[4];
static int cudaGraphLaunches = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
static int average = 1;
static int numDevices = 1;
static int ranksPerGpu = 1;
static int enable_multiranks = 0;
#define NUM_BLOCKS 32
@@ -117,6 +120,38 @@ static double parsesize(const char *value) {
return size * units;
}
static bool minReqVersion(int rmajor, int rminor, int rpatch)
{
int version;
int major, minor, patch, rem;
ncclGetVersion(&version);
if (version < 10000) {
major = version/1000;
rem = version%1000;
minor = rem/100;
patch = rem%100;
}
else {
major = version/10000;
rem = version%10000;
minor = rem/100;
patch = rem%100;
}
if (major < rmajor) return false;
else if (major > rmajor) return true;
// major == rmajor
if (minor < rminor) return false;
else if (minor > rminor) return true;
// major == rmajor && minor == rminor
if (patch < rpatch) return false;
return true;
}
double DeltaMaxValue(ncclDataType_t type) {
switch(type) {
case ncclHalf: return 1e-2;
@@ -437,9 +472,9 @@ void Allreduce(struct threadArgs* args, double* value, int average) {
testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, double *delta) {
size_t count = args->expectedBytes/wordSize(type);
double maxDelta = 0.0;
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
int device;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
NCCLCHECK(ncclCommCuDevice(args->comms[i], &device));
HIPCHECK(hipSetDevice(device));
void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i];
@@ -474,20 +509,20 @@ testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
//}
#endif
}
double nranks = args->nProcs*args->nThreads*args->nGpus;
double nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
if (args->reportErrors && maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++;
*delta = maxDelta;
return testSuccess;
}
testResult_t testStreamSynchronize(int ngpus, hipStream_t* streams, ncclComm_t* comms) {
testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_t* comms) {
hipError_t hipErr;
int remaining = ngpus;
int* done = (int*)malloc(sizeof(int)*ngpus);
memset(done, 0, sizeof(int)*ngpus);
int remaining = nStreams;
int* done = (int*)malloc(sizeof(int)*nStreams);
memset(done, 0, sizeof(int)*nStreams);
while (remaining) {
int idle = 1;
for (int i=0; i<ngpus; i++) {
for (int i=0; i<nStreams; i++) {
if (done[i]) continue;
hipErr = hipStreamQuery(streams[i]);
@@ -507,7 +542,7 @@ testResult_t testStreamSynchronize(int ngpus, hipStream_t* streams, ncclComm_t*
if (ncclAsyncErr != ncclSuccess) {
// An asynchronous error happened. Stop the operation and destroy
// the communicator
for (int i=0; i<ngpus; i++)
for (int i=0; i<nStreams; i++)
NCCLCHECK(ncclCommAbort(comms[i]));
// Abort the perf test
NCCLCHECK(ncclAsyncErr);
@@ -531,14 +566,14 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
size_t shift = totalnbytes * (iter % steps);
if (args->nGpus > 1) NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus; i++) {
if (args->nGpus> 1 || args->nRanks > 1) NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus*args->nRanks; i++) {
#ifndef NCCL_MAJOR
int hipDev;
NCCLCHECK(ncclCommCuDevice(args->comms[i], &hipDev));
HIPCHECK(hipSetDevice(hipDev));
#endif
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
char* recvBuff = ((char*)args->recvbuffs[i]) + shift;
char* sendBuff = ((char*)args->sendbuffs[i]) + shift;
ncclRedOp_t op;
@@ -585,11 +620,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
#endif
}
if (args->nGpus > 1) NCCLCHECK(ncclGroupEnd());
if (args->nGpus > 1 || args->nRanks > 1) NCCLCHECK(ncclGroupEnd());
if (blocking_coll) {
// Complete op before returning
TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms));
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
}
if (blocking_coll) Barrier(args);
return testSuccess;
@@ -598,10 +633,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
testResult_t completeColl(struct threadArgs* args) {
if (blocking_coll) return testSuccess;
TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms));
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
return testSuccess;
}
//EDGAR: Revisit because of cudaGraphLaunches
testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) {
size_t count = args->nbytes / wordSize(type);
if (datacheck) {
@@ -616,11 +652,11 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
Barrier(args);
#if CUDART_VERSION >= 11030
hipGraph_t graphs[args->nGpus];
hipGraphExec_t graphExec[args->nGpus];
hipGraph_t graphs[args->nGpus*args->nRanks];
hipGraphExec_t graphExec[args->nGpus*args->nRanks];
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
// Thread local mode is needed for:
// - Multi-thread mode
// - P2P pre-connect
@@ -642,18 +678,18 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
// End cuda graph capture
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
}
// Instantiate cuda graph
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
}
// Resync CPU, restart timing, launch cuda graph
Barrier(args);
start = std::chrono::high_resolution_clock::now();
for (int l=0; l<cudaGraphLaunches; l++) {
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
}
}
@@ -671,7 +707,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
HIPCHECK(hipGraphDestroy(graphs[i]));
}
@@ -679,7 +715,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#endif
double algBw, busBw;
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus);
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus*args->nRanks);
Barrier(args);
@@ -694,7 +730,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture for data check
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(chiptreamBeginCapture(args->streams[i], args->nThreads > 1 ? hipStreamCaptureModeThreadLocal : hipStreamCaptureModeGlobal));
}
}
@@ -706,15 +742,15 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
// End cuda graph capture
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
}
// Instantiate cuda graph
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
}
// Launch cuda graph
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
}
}
@@ -725,7 +761,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
HIPCHECK(hipGraphDestroy(graphs[i]));
}
@@ -759,7 +795,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) {
int nranks = args->nProcs*args->nGpus*args->nThreads;
int nranks = args->nProcs*args->nGpus*args->nThreads*args->nRanks;
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
count = size / wordSize(type);
@@ -806,6 +842,8 @@ testResult_t threadRunTests(struct threadArgs* args) {
// will be done on the current GPU (by default : 0) and if the GPUs are in
// exclusive mode those operations will fail.
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus;
if (enable_multiranks)
gpuid = gpuid % numDevices;
HIPCHECK(hipSetDevice(gpuid));
TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop]));
return testSuccess;
@@ -814,23 +852,33 @@ testResult_t threadRunTests(struct threadArgs* args) {
testResult_t threadInit(struct threadArgs* args) {
char hostname[1024];
getHostName(hostname, 1024);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
//set main thread again
is_main_thread = (args->proc == 0 && args->thread == 0) ? 1 : 0;
NCCLCHECK(ncclGroupStart());
for (int i=0; i<args->nGpus; i++) {
int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (enable_multiranks)
gpuid = gpuid % numDevices;
HIPCHECK(hipSetDevice(gpuid));
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
for (int j=0; j<args->nRanks; j++) {
int rank = (args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + j;
if (args->enable_multiranks)
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
#ifdef RCCL_MULTIRANKPERGPU
else
NCCLCHECK(ncclCommInitRankMulti(args->comms+i*args->nRanks+j, nranks, args->ncclId, rank, rank));
#endif
}
}
NCCLCHECK(ncclGroupEnd());
TESTCHECK(threadRunTests(args));
for (int i=0; i<args->nGpus; i++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
NCCLCHECK(ncclCommDestroy(args->comms[i]));
}
return testSuccess;
@@ -925,13 +973,21 @@ int main(int argc, char* argv[]) {
{"cumask", required_argument, 0, 'u'},
{"cudagraph", required_argument, 0, 'G'},
{"average", required_argument, 0, 'a'},
#ifdef RCCL_MULTIRANKPERGPU
{"enable_multiranks", required_argument, 0, 'x'},
{"ranks_per_gpu", required_argument, 0, 'R'},
#endif
{"help", no_argument, 0, 'h'},
{}
};
while(1) {
int c;
#ifdef RCCL_MULTIRANKPERGPU
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:R:x:", longopts, &longindex);
#else
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:", longopts, &longindex);
#endif
if (c == -1)
break;
@@ -1022,6 +1078,14 @@ int main(int argc, char* argv[]) {
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
#ifdef RCCL_MULTIRANKPERGPU
case 'x':
enable_multiranks = (int)strtol(optarg, NULL, 0);
break;
case 'R':
ranksPerGpu = (int)strtol(optarg, NULL, 0);
break;
#endif
case 'h':
default:
if (c != 'h') printf("invalid option '%c'\n", c);
@@ -1052,26 +1116,43 @@ int main(int argc, char* argv[]) {
"[-u,--cumask <d0,d1,d2,d3>] \n\t"
"[-G,--cudagraph <num graph launches>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
#ifdef RCCL_MULTIRANKPERGPU
"[-x,--enable_multiranks <0/1> enable using multiple ranks per GPU] \n\t"
"[-R,--ranks_per_gpu] \n\t"
#endif
"[-h,--help]\n",
basename(argv[0]));
return 0;
}
}
int numDevices;
HIPCHECK(hipGetDeviceCount(&numDevices));
if (nGpus > numDevices)
{
fprintf(stderr, "[ERROR] The number of requested GPUs (%d) is greater than the number of GPUs available (%d)\n", nGpus, numDevices);
return testNcclError;
}
if (minBytes > maxBytes) {
fprintf(stderr, "invalid sizes for 'minbytes' and 'maxbytes': %llu > %llu\n",
(unsigned long long)minBytes,
(unsigned long long)maxBytes);
return -1;
}
if (!minReqVersion(2, 12, 12) && enable_multiranks) {
fprintf(stderr, "Multiple Ranks per GPU requested, but rccl library found does not support this feature.\n");
fprintf(stderr, "Please check LD_LIBRARY_PATH. Resetting enable_multiranks and ranksPerGpu to default values.\n");
enable_multiranks = 0;
ranksPerGpu = 1;
}
if (enable_multiranks && parallel_init) {
fprintf(stderr, "Cannot use parallel_init when using multiple ranks per GPU.\n");
return -1;
}
if (ranksPerGpu > 1 && !enable_multiranks) {
fprintf(stderr, "Need to enable multiranks option to use multiple ranks per GPU\n");
return -1;
}
#ifdef MPI_SUPPORT
MPI_Init(&argc, &argv);
#endif
@@ -1098,7 +1179,7 @@ testResult_t run() {
#endif
is_main_thread = (proc == 0) ? 1 : 0;
PRINT("# nThread: %d nGpus: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d validation: %d \n", nThreads, nGpus, minBytes, maxBytes,
PRINT("# nThreads: %d nGpus: %d nRanks: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d validation: %d \n", nThreads, nGpus, ranksPerGpu, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck);
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
@@ -1111,18 +1192,20 @@ testResult_t run() {
size_t maxMem = ~0;
for (int i=0; i<nThreads*nGpus; i++) {
int hipDev = localRank*nThreads*nGpus+i;
int rank = proc*nThreads*nGpus+i;
if (enable_multiranks)
hipDev = hipDev % numDevices;
hipDeviceProp_t prop;
HIPCHECK(hipGetDeviceProperties(&prop, hipDev));
char busIdStr[] = "00000000:00:00.0";
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), hipDev));
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
rank, getpid(), hostname, hipDev, busIdStr, prop.name);
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [0x%02x] %s\n",
rank, getpid(), hostname, hipDev, prop.pciBusID, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
}
for (int j=0; j<ranksPerGpu; j++) {
int rank = proc*nThreads*nGpus*ranksPerGpu+i*ranksPerGpu + j;
char busIdStr[] = "00000000:00:00.0";
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), hipDev));
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
rank, getpid(), hostname, hipDev, busIdStr, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
}
}
#if MPI_SUPPORT
char *lines = (proc == 0) ? (char *)malloc(nProcs*MAX_LINE) : NULL;
// Gather all output in rank order to root (0)
@@ -1152,42 +1235,61 @@ testResult_t run() {
MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
#endif
hipStream_t streams[nGpus*nThreads];
void* sendbuffs[nGpus*nThreads];
void* recvbuffs[nGpus*nThreads];
void* expected[nGpus*nThreads];
hipStream_t streams[nGpus*nThreads*ranksPerGpu];
void* sendbuffs[nGpus*nThreads*ranksPerGpu];
void* recvbuffs[nGpus*nThreads*ranksPerGpu];
void* expected[nGpus*nThreads*ranksPerGpu];
size_t sendBytes, recvBytes;
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads);
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads*ranksPerGpu);
for (int i=0; i<nGpus*nThreads; i++) {
HIPCHECK(hipSetDevice(localRank*nThreads*nGpus+i));
AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes, nProcs*nThreads*nGpus);
//PRINT("sendbuffs[%d]=%p(size=%lu) recvbuffs[%d]=%p(size=%lu)\n", i, sendbuffs[i], sendBytes, i, recvbuffs[i], recvBytes);
if (cumask[0] || cumask[1] || cumask[2] || cumask[3]) {
PRINT("cumask: ");
for (int i = 0; i < 4 ; i++) PRINT("%x,", cumask[i]);
PRINT("\n");
HIPCHECK(hipExtStreamCreateWithCUMask(streams+i, 4, cumask));
} else
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
// initialize data buffer to avoid all zero data
TESTCHECK(InitData(sendbuffs[i], sendBytes, ncclUint8, 0, i));
for (int ii=0; ii<nGpus*nThreads; ii++) {
int gpuid = localRank*nThreads*nGpus+ii;
if (enable_multiranks)
gpuid = gpuid % numDevices;
HIPCHECK(hipSetDevice(gpuid));
for (int j=0; j<ranksPerGpu; j++) {
int i = ii*ranksPerGpu+j;
AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes, nProcs*nThreads*nGpus*ranksPerGpu);
//PRINT("sendbuffs[%d]=%p(size=%lu) recvbuffs[%d]=%p(size=%lu)\n", i, sendbuffs[i], sendBytes, i, recvbuffs[i], recvBytes);
if (cumask[0] || cumask[1] || cumask[2] || cumask[3]) {
PRINT("cumask: ");
for (int i = 0; i < 4 ; i++) PRINT("%x,", cumask[i]);
PRINT("\n");
HIPCHECK(hipExtStreamCreateWithCUMask(streams+i, 4, cumask));
} else
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
// initialize data buffer to avoid all zero data
TESTCHECK(InitData(sendbuffs[i], sendBytes, ncclUint8, 0, i));
}
HIPCHECK(hipDeviceSynchronize());
}
//if parallel init is not selected, use main thread to initialize NCCL
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus);
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus*ranksPerGpu);
if (!parallel_init) {
if (nProcs == 1) {
if (nProcs == 1 && !enable_multiranks) {
int gpuArray[nGpus*nThreads];
for (int i=0; i<nGpus*nThreads; i++) gpuArray[i] = i;
NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpuArray));
} else {
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nGpus*nThreads; i++) {
HIPCHECK(hipSetDevice(localRank*nThreads*nGpus+i));
NCCLCHECK(ncclCommInitRank(comms+i, nProcs*nThreads*nGpus, ncclId, proc*nThreads*nGpus+i));
for (int ii=0; ii<nGpus*nThreads; ii++) {
int gpuid = localRank*nThreads*nGpus+ii;
if (enable_multiranks) {
gpuid = gpuid % numDevices;
}
HIPCHECK(hipSetDevice(gpuid));
if (!enable_multiranks) {
NCCLCHECK(ncclCommInitRank(comms+ii, nProcs*nThreads*nGpus, ncclId, proc*nThreads*nGpus+ii));
}
#ifdef RCCL_MULTIRANKPERGPU
else
for (int j=0; j<ranksPerGpu; j++) {
int i = ii*ranksPerGpu+j;
NCCLCHECK(ncclCommInitRankMulti(comms+i, nProcs*nThreads*nGpus*ranksPerGpu, ncclId, proc*nThreads*nGpus*ranksPerGpu+i, proc*nThreads*nGpus*ranksPerGpu+i));
}
#endif
}
NCCLCHECK(ncclGroupEnd());
}
@@ -1219,18 +1321,20 @@ testResult_t run() {
threads[t].args.stepbytes=stepBytes;
threads[t].args.stepfactor=stepFactor;
threads[t].args.localRank = localRank;
threads[t].args.localNumDevices = numDevices;
threads[t].args.enable_multiranks = enable_multiranks;
threads[t].args.nRanks = ranksPerGpu;
threads[t].args.nProcs=nProcs;
threads[t].args.proc=proc;
threads[t].args.nThreads=nThreads;
threads[t].args.thread=t;
threads[t].args.nGpus=nGpus;
threads[t].args.sendbuffs = sendbuffs+t*nGpus;
threads[t].args.recvbuffs = recvbuffs+t*nGpus;
threads[t].args.expected = expected+t*nGpus;
threads[t].args.sendbuffs = sendbuffs+t*nGpus*ranksPerGpu;
threads[t].args.recvbuffs = recvbuffs+t*nGpus*ranksPerGpu;
threads[t].args.expected = expected+t*nGpus*ranksPerGpu;
threads[t].args.ncclId = ncclId;
threads[t].args.comms=comms+t*nGpus;
threads[t].args.streams=streams+t*nGpus;
threads[t].args.comms=comms+t*nGpus*ranksPerGpu;
threads[t].args.streams=streams+t*nGpus*ranksPerGpu;
threads[t].args.barrier = (volatile int*)barrier;
threads[t].args.barrier_idx = 0;
@@ -1267,17 +1371,17 @@ testResult_t run() {
#endif
if (!parallel_init) {
for(int i=0; i<nGpus*nThreads; ++i)
for(int i=0; i<nGpus*nThreads*ranksPerGpu; ++i)
NCCLCHECK(ncclCommDestroy(comms[i]));
free(comms);
}
for (int i=0; i<nGpus*nThreads; i++) {
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
HIPCHECK(hipStreamDestroy(streams[i]));
}
// Free off HIP allocated memory
for (int i=0; i<nGpus*nThreads; i++) {
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
if (memorytype == ncclHost) {
HIPCHECK(hipHostFree(sendbuffs[i]));
HIPCHECK(hipHostFree(recvbuffs[i]));
+3
Просмотреть файл
@@ -102,6 +102,9 @@ struct threadArgs {
int thread;
int nGpus;
int localRank;
int localNumDevices;
int enable_multiranks;
int nRanks;
void** sendbuffs;
size_t sendBytes;
size_t sendInplaceOffset;
+16 -9
Просмотреть файл
@@ -31,20 +31,27 @@ void GatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc
testResult_t GatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
if (rank == root) {
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
if (rank == root) {
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, type, rep, j));
}
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
+14 -8
Просмотреть файл
@@ -33,17 +33,24 @@ void HyperCubeGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
testResult_t HyperCubeInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, type, rep, j));
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
@@ -66,7 +73,6 @@ testResult_t HyperCubeRunColl(void* sendbuff, void* recvbuff, size_t count, nccl
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankSize = count * wordSize(type);
if (rbuff+rank*rankSize != sbuff) HIPCHECK(hipMemcpyAsync(rbuff+rank*rankSize, sbuff, rankSize, hipMemcpyDeviceToDevice, stream));
// Hypercube AllGather
+15 -8
Просмотреть файл
@@ -31,17 +31,24 @@ void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc
testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
if (rank == root) TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
if (rank == root) TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
@@ -119,4 +126,4 @@ testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t typ
struct testEngine ncclTestEngine = {
ReduceGetBuffSize,
ReduceRunTest
};
};
+15 -8
Просмотреть файл
@@ -31,17 +31,24 @@ void ReduceScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t
testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
TESTCHECK(InitDataReduce(args->expected[i], recvcount, rank*recvcount, type, op, rep, nranks));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
TESTCHECK(InitDataReduce(args->expected[k], recvcount, rank*recvcount, type, op, rep, nranks));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
@@ -111,4 +118,4 @@ testResult_t ReduceScatterRunTest(struct threadArgs* args, int root, ncclDataTyp
struct testEngine ncclTestEngine = {
ReduceScatterGetBuffSize,
ReduceScatterRunTest
};
};
+13 -5
Просмотреть файл
@@ -32,14 +32,22 @@ testResult_t ScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclR
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(args->expected[i], recvcount, type, rep+rank*recvcount, root));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(args->expected[k], recvcount, type, rep+rank*recvcount, root));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
+14 -7
Просмотреть файл
@@ -31,17 +31,24 @@ void SendRecvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para
testResult_t SendRecvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
if (args->enable_multiranks)
gpuid = gpuid % args->localNumDevices;
HIPCHECK(hipSetDevice(gpuid));
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
int peer = (rank-1+nranks)%nranks;
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, peer));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
int peer = (rank-1+nranks)%nranks;
TESTCHECK(InitData(args->expected[k], recvcount, type, rep, peer));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
// We don't support in-place sendrecv