implementation of multi-rank support in rccl-tests.
Этот коммит содержится в:
@@ -31,17 +31,24 @@ void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
|
||||
testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, type, rep, j));
|
||||
}
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
@@ -99,4 +106,4 @@ testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t
|
||||
struct testEngine ncclTestEngine = {
|
||||
AllGatherGetBuffSize,
|
||||
AllGatherRunTest
|
||||
};
|
||||
};
|
||||
|
||||
@@ -31,16 +31,23 @@ void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
|
||||
testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k = 0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
return testSuccess;
|
||||
@@ -110,4 +117,4 @@ testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t
|
||||
struct testEngine ncclTestEngine = {
|
||||
AllReduceGetBuffSize,
|
||||
AllReduceRunTest
|
||||
};
|
||||
};
|
||||
|
||||
+14
-7
@@ -31,18 +31,25 @@ void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para
|
||||
testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
char* str = getenv("NCCL_TESTS_DEVICE");
|
||||
int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j));
|
||||
}
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
|
||||
+38
-31
@@ -41,44 +41,51 @@ void AlltoAllvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
|
||||
testResult_t AlltoAllvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
char* str = getenv("NCCL_TESTS_DEVICE");
|
||||
int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
#if 0
|
||||
int *dataHost = (int *)malloc(args->sendBytes);
|
||||
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
|
||||
printf(" Rank [%d] Original: ", rank);
|
||||
for(int j=0; j<sendcount; j++) {
|
||||
printf("%d:%d ", j, dataHost[j]);
|
||||
}
|
||||
printf("\n");
|
||||
free(dataHost);
|
||||
#endif
|
||||
size_t rdisp = 0;
|
||||
size_t data_count = sendcount*2/nranks;
|
||||
size_t chunksize = data_count/nranks;
|
||||
for (int j=0; j<nranks; j++) {
|
||||
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
|
||||
if (j+rank == nranks-1)
|
||||
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
|
||||
size_t sdisp = 0;
|
||||
for (int k=0; k<nranks; k++) {
|
||||
scount = ((k+j)%nranks)*chunksize;
|
||||
if (k+j == nranks-1)
|
||||
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
|
||||
if (k == rank)
|
||||
break;
|
||||
sdisp += scount;
|
||||
int *dataHost = (int *)malloc(args->sendBytes);
|
||||
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
|
||||
printf(" Rank [%d] Original: ", rank);
|
||||
for(int j=0; j<sendcount; j++) {
|
||||
printf("%d:%d ", j, dataHost[j]);
|
||||
}
|
||||
TESTCHECK(InitData(((char*)args->expected[i])+rdisp*wordSize(type), rcount, type, rep+sdisp, j));
|
||||
rdisp += rcount;
|
||||
printf("\n");
|
||||
free(dataHost);
|
||||
#endif
|
||||
size_t rdisp = 0;
|
||||
size_t data_count = sendcount*2/nranks;
|
||||
size_t chunksize = data_count/nranks;
|
||||
for (int j=0; j<nranks; j++) {
|
||||
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
|
||||
if (j+rank == nranks-1)
|
||||
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
|
||||
size_t sdisp = 0;
|
||||
for (int k=0; k<nranks; k++) {
|
||||
scount = ((k+j)%nranks)*chunksize;
|
||||
if (k+j == nranks-1)
|
||||
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
|
||||
if (k == rank)
|
||||
break;
|
||||
sdisp += scount;
|
||||
}
|
||||
TESTCHECK(InitData(((char*)args->expected[k])+rdisp*wordSize(type), rcount, type, rep+sdisp, j));
|
||||
rdisp += rcount;
|
||||
}
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
|
||||
+13
-6
@@ -32,14 +32,21 @@ testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncc
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, root));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
TESTCHECK(InitData(args->expected[k], recvcount, type, rep, root));
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
return testSuccess;
|
||||
@@ -116,4 +123,4 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t
|
||||
struct testEngine ncclTestEngine = {
|
||||
BroadcastGetBuffSize,
|
||||
BroadcastRunTest
|
||||
};
|
||||
};
|
||||
|
||||
+181
-77
@@ -78,6 +78,9 @@ static uint32_t cumask[4];
|
||||
static int cudaGraphLaunches = 0;
|
||||
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
|
||||
static int average = 1;
|
||||
static int numDevices = 1;
|
||||
static int ranksPerGpu = 1;
|
||||
static int enable_multiranks = 0;
|
||||
|
||||
#define NUM_BLOCKS 32
|
||||
|
||||
@@ -117,6 +120,38 @@ static double parsesize(const char *value) {
|
||||
return size * units;
|
||||
}
|
||||
|
||||
static bool minReqVersion(int rmajor, int rminor, int rpatch)
|
||||
{
|
||||
int version;
|
||||
int major, minor, patch, rem;
|
||||
ncclGetVersion(&version);
|
||||
|
||||
if (version < 10000) {
|
||||
major = version/1000;
|
||||
rem = version%1000;
|
||||
minor = rem/100;
|
||||
patch = rem%100;
|
||||
}
|
||||
else {
|
||||
major = version/10000;
|
||||
rem = version%10000;
|
||||
minor = rem/100;
|
||||
patch = rem%100;
|
||||
}
|
||||
|
||||
if (major < rmajor) return false;
|
||||
else if (major > rmajor) return true;
|
||||
|
||||
// major == rmajor
|
||||
if (minor < rminor) return false;
|
||||
else if (minor > rminor) return true;
|
||||
|
||||
// major == rmajor && minor == rminor
|
||||
if (patch < rpatch) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
double DeltaMaxValue(ncclDataType_t type) {
|
||||
switch(type) {
|
||||
case ncclHalf: return 1e-2;
|
||||
@@ -437,9 +472,9 @@ void Allreduce(struct threadArgs* args, double* value, int average) {
|
||||
testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, double *delta) {
|
||||
size_t count = args->expectedBytes/wordSize(type);
|
||||
double maxDelta = 0.0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
int device;
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
|
||||
NCCLCHECK(ncclCommCuDevice(args->comms[i], &device));
|
||||
HIPCHECK(hipSetDevice(device));
|
||||
void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i];
|
||||
@@ -474,20 +509,20 @@ testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
//}
|
||||
#endif
|
||||
}
|
||||
double nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
double nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
if (args->reportErrors && maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++;
|
||||
*delta = maxDelta;
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t testStreamSynchronize(int ngpus, hipStream_t* streams, ncclComm_t* comms) {
|
||||
testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_t* comms) {
|
||||
hipError_t hipErr;
|
||||
int remaining = ngpus;
|
||||
int* done = (int*)malloc(sizeof(int)*ngpus);
|
||||
memset(done, 0, sizeof(int)*ngpus);
|
||||
int remaining = nStreams;
|
||||
int* done = (int*)malloc(sizeof(int)*nStreams);
|
||||
memset(done, 0, sizeof(int)*nStreams);
|
||||
while (remaining) {
|
||||
int idle = 1;
|
||||
for (int i=0; i<ngpus; i++) {
|
||||
for (int i=0; i<nStreams; i++) {
|
||||
if (done[i]) continue;
|
||||
|
||||
hipErr = hipStreamQuery(streams[i]);
|
||||
@@ -507,7 +542,7 @@ testResult_t testStreamSynchronize(int ngpus, hipStream_t* streams, ncclComm_t*
|
||||
if (ncclAsyncErr != ncclSuccess) {
|
||||
// An asynchronous error happened. Stop the operation and destroy
|
||||
// the communicator
|
||||
for (int i=0; i<ngpus; i++)
|
||||
for (int i=0; i<nStreams; i++)
|
||||
NCCLCHECK(ncclCommAbort(comms[i]));
|
||||
// Abort the perf test
|
||||
NCCLCHECK(ncclAsyncErr);
|
||||
@@ -531,14 +566,14 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
|
||||
size_t shift = totalnbytes * (iter % steps);
|
||||
|
||||
if (args->nGpus > 1) NCCLCHECK(ncclGroupStart());
|
||||
for (int i = 0; i < args->nGpus; i++) {
|
||||
if (args->nGpus> 1 || args->nRanks > 1) NCCLCHECK(ncclGroupStart());
|
||||
for (int i = 0; i < args->nGpus*args->nRanks; i++) {
|
||||
#ifndef NCCL_MAJOR
|
||||
int hipDev;
|
||||
NCCLCHECK(ncclCommCuDevice(args->comms[i], &hipDev));
|
||||
HIPCHECK(hipSetDevice(hipDev));
|
||||
#endif
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
|
||||
char* recvBuff = ((char*)args->recvbuffs[i]) + shift;
|
||||
char* sendBuff = ((char*)args->sendbuffs[i]) + shift;
|
||||
ncclRedOp_t op;
|
||||
@@ -585,11 +620,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
}
|
||||
#endif
|
||||
}
|
||||
if (args->nGpus > 1) NCCLCHECK(ncclGroupEnd());
|
||||
if (args->nGpus > 1 || args->nRanks > 1) NCCLCHECK(ncclGroupEnd());
|
||||
|
||||
if (blocking_coll) {
|
||||
// Complete op before returning
|
||||
TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms));
|
||||
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
|
||||
}
|
||||
if (blocking_coll) Barrier(args);
|
||||
return testSuccess;
|
||||
@@ -598,10 +633,11 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
testResult_t completeColl(struct threadArgs* args) {
|
||||
if (blocking_coll) return testSuccess;
|
||||
|
||||
TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms));
|
||||
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
//EDGAR: Revisit because of cudaGraphLaunches
|
||||
testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) {
|
||||
size_t count = args->nbytes / wordSize(type);
|
||||
if (datacheck) {
|
||||
@@ -616,11 +652,11 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
Barrier(args);
|
||||
|
||||
#if CUDART_VERSION >= 11030
|
||||
hipGraph_t graphs[args->nGpus];
|
||||
hipGraphExec_t graphExec[args->nGpus];
|
||||
hipGraph_t graphs[args->nGpus*args->nRanks];
|
||||
hipGraphExec_t graphExec[args->nGpus*args->nRanks];
|
||||
if (cudaGraphLaunches >= 1) {
|
||||
// Begin cuda graph capture
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
// Thread local mode is needed for:
|
||||
// - Multi-thread mode
|
||||
// - P2P pre-connect
|
||||
@@ -642,18 +678,18 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
#if CUDART_VERSION >= 11030
|
||||
if (cudaGraphLaunches >= 1) {
|
||||
// End cuda graph capture
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
|
||||
}
|
||||
// Instantiate cuda graph
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
|
||||
}
|
||||
// Resync CPU, restart timing, launch cuda graph
|
||||
Barrier(args);
|
||||
start = std::chrono::high_resolution_clock::now();
|
||||
for (int l=0; l<cudaGraphLaunches; l++) {
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
|
||||
}
|
||||
}
|
||||
@@ -671,7 +707,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
#if CUDART_VERSION >= 11030
|
||||
if (cudaGraphLaunches >= 1) {
|
||||
//destroy cuda graph
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
|
||||
HIPCHECK(hipGraphDestroy(graphs[i]));
|
||||
}
|
||||
@@ -679,7 +715,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
#endif
|
||||
|
||||
double algBw, busBw;
|
||||
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus);
|
||||
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus*args->nRanks);
|
||||
|
||||
Barrier(args);
|
||||
|
||||
@@ -694,7 +730,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
#if CUDART_VERSION >= 11030
|
||||
if (cudaGraphLaunches >= 1) {
|
||||
// Begin cuda graph capture for data check
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(chiptreamBeginCapture(args->streams[i], args->nThreads > 1 ? hipStreamCaptureModeThreadLocal : hipStreamCaptureModeGlobal));
|
||||
}
|
||||
}
|
||||
@@ -706,15 +742,15 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
#if CUDART_VERSION >= 11030
|
||||
if (cudaGraphLaunches >= 1) {
|
||||
// End cuda graph capture
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
|
||||
}
|
||||
// Instantiate cuda graph
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
|
||||
}
|
||||
// Launch cuda graph
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
|
||||
}
|
||||
}
|
||||
@@ -725,7 +761,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
#if CUDART_VERSION >= 11030
|
||||
if (cudaGraphLaunches >= 1) {
|
||||
//destroy cuda graph
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
|
||||
HIPCHECK(hipGraphDestroy(graphs[i]));
|
||||
}
|
||||
@@ -759,7 +795,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
|
||||
}
|
||||
|
||||
void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) {
|
||||
int nranks = args->nProcs*args->nGpus*args->nThreads;
|
||||
int nranks = args->nProcs*args->nGpus*args->nThreads*args->nRanks;
|
||||
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
|
||||
|
||||
count = size / wordSize(type);
|
||||
@@ -806,6 +842,8 @@ testResult_t threadRunTests(struct threadArgs* args) {
|
||||
// will be done on the current GPU (by default : 0) and if the GPUs are in
|
||||
// exclusive mode those operations will fail.
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus;
|
||||
if (enable_multiranks)
|
||||
gpuid = gpuid % numDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop]));
|
||||
return testSuccess;
|
||||
@@ -814,23 +852,33 @@ testResult_t threadRunTests(struct threadArgs* args) {
|
||||
testResult_t threadInit(struct threadArgs* args) {
|
||||
char hostname[1024];
|
||||
getHostName(hostname, 1024);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
//set main thread again
|
||||
is_main_thread = (args->proc == 0 && args->thread == 0) ? 1 : 0;
|
||||
|
||||
NCCLCHECK(ncclGroupStart());
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (enable_multiranks)
|
||||
gpuid = gpuid % numDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
|
||||
|
||||
for (int j=0; j<args->nRanks; j++) {
|
||||
int rank = (args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + j;
|
||||
if (args->enable_multiranks)
|
||||
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
|
||||
#ifdef RCCL_MULTIRANKPERGPU
|
||||
else
|
||||
NCCLCHECK(ncclCommInitRankMulti(args->comms+i*args->nRanks+j, nranks, args->ncclId, rank, rank));
|
||||
#endif
|
||||
}
|
||||
}
|
||||
NCCLCHECK(ncclGroupEnd());
|
||||
|
||||
TESTCHECK(threadRunTests(args));
|
||||
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
for (int i=0; i<args->nGpus*args->nRanks; i++) {
|
||||
NCCLCHECK(ncclCommDestroy(args->comms[i]));
|
||||
}
|
||||
return testSuccess;
|
||||
@@ -925,13 +973,21 @@ int main(int argc, char* argv[]) {
|
||||
{"cumask", required_argument, 0, 'u'},
|
||||
{"cudagraph", required_argument, 0, 'G'},
|
||||
{"average", required_argument, 0, 'a'},
|
||||
#ifdef RCCL_MULTIRANKPERGPU
|
||||
{"enable_multiranks", required_argument, 0, 'x'},
|
||||
{"ranks_per_gpu", required_argument, 0, 'R'},
|
||||
#endif
|
||||
{"help", no_argument, 0, 'h'},
|
||||
{}
|
||||
};
|
||||
|
||||
while(1) {
|
||||
int c;
|
||||
#ifdef RCCL_MULTIRANKPERGPU
|
||||
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:R:x:", longopts, &longindex);
|
||||
#else
|
||||
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:", longopts, &longindex);
|
||||
#endif
|
||||
|
||||
if (c == -1)
|
||||
break;
|
||||
@@ -1022,6 +1078,14 @@ int main(int argc, char* argv[]) {
|
||||
case 'a':
|
||||
average = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
#ifdef RCCL_MULTIRANKPERGPU
|
||||
case 'x':
|
||||
enable_multiranks = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'R':
|
||||
ranksPerGpu = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
#endif
|
||||
case 'h':
|
||||
default:
|
||||
if (c != 'h') printf("invalid option '%c'\n", c);
|
||||
@@ -1052,26 +1116,43 @@ int main(int argc, char* argv[]) {
|
||||
"[-u,--cumask <d0,d1,d2,d3>] \n\t"
|
||||
"[-G,--cudagraph <num graph launches>] \n\t"
|
||||
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
|
||||
#ifdef RCCL_MULTIRANKPERGPU
|
||||
"[-x,--enable_multiranks <0/1> enable using multiple ranks per GPU] \n\t"
|
||||
"[-R,--ranks_per_gpu] \n\t"
|
||||
#endif
|
||||
"[-h,--help]\n",
|
||||
basename(argv[0]));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int numDevices;
|
||||
HIPCHECK(hipGetDeviceCount(&numDevices));
|
||||
if (nGpus > numDevices)
|
||||
{
|
||||
fprintf(stderr, "[ERROR] The number of requested GPUs (%d) is greater than the number of GPUs available (%d)\n", nGpus, numDevices);
|
||||
return testNcclError;
|
||||
}
|
||||
|
||||
if (minBytes > maxBytes) {
|
||||
fprintf(stderr, "invalid sizes for 'minbytes' and 'maxbytes': %llu > %llu\n",
|
||||
(unsigned long long)minBytes,
|
||||
(unsigned long long)maxBytes);
|
||||
return -1;
|
||||
}
|
||||
if (!minReqVersion(2, 12, 12) && enable_multiranks) {
|
||||
fprintf(stderr, "Multiple Ranks per GPU requested, but rccl library found does not support this feature.\n");
|
||||
fprintf(stderr, "Please check LD_LIBRARY_PATH. Resetting enable_multiranks and ranksPerGpu to default values.\n");
|
||||
enable_multiranks = 0;
|
||||
ranksPerGpu = 1;
|
||||
}
|
||||
|
||||
if (enable_multiranks && parallel_init) {
|
||||
fprintf(stderr, "Cannot use parallel_init when using multiple ranks per GPU.\n");
|
||||
return -1;
|
||||
}
|
||||
if (ranksPerGpu > 1 && !enable_multiranks) {
|
||||
fprintf(stderr, "Need to enable multiranks option to use multiple ranks per GPU\n");
|
||||
return -1;
|
||||
}
|
||||
#ifdef MPI_SUPPORT
|
||||
MPI_Init(&argc, &argv);
|
||||
#endif
|
||||
@@ -1098,7 +1179,7 @@ testResult_t run() {
|
||||
#endif
|
||||
is_main_thread = (proc == 0) ? 1 : 0;
|
||||
|
||||
PRINT("# nThread: %d nGpus: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d validation: %d \n", nThreads, nGpus, minBytes, maxBytes,
|
||||
PRINT("# nThreads: %d nGpus: %d nRanks: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d validation: %d \n", nThreads, nGpus, ranksPerGpu, minBytes, maxBytes,
|
||||
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck);
|
||||
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
|
||||
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
|
||||
@@ -1111,18 +1192,20 @@ testResult_t run() {
|
||||
size_t maxMem = ~0;
|
||||
for (int i=0; i<nThreads*nGpus; i++) {
|
||||
int hipDev = localRank*nThreads*nGpus+i;
|
||||
int rank = proc*nThreads*nGpus+i;
|
||||
if (enable_multiranks)
|
||||
hipDev = hipDev % numDevices;
|
||||
hipDeviceProp_t prop;
|
||||
HIPCHECK(hipGetDeviceProperties(&prop, hipDev));
|
||||
char busIdStr[] = "00000000:00:00.0";
|
||||
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), hipDev));
|
||||
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
|
||||
rank, getpid(), hostname, hipDev, busIdStr, prop.name);
|
||||
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [0x%02x] %s\n",
|
||||
rank, getpid(), hostname, hipDev, prop.pciBusID, prop.name);
|
||||
maxMem = std::min(maxMem, prop.totalGlobalMem);
|
||||
}
|
||||
|
||||
for (int j=0; j<ranksPerGpu; j++) {
|
||||
int rank = proc*nThreads*nGpus*ranksPerGpu+i*ranksPerGpu + j;
|
||||
char busIdStr[] = "00000000:00:00.0";
|
||||
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), hipDev));
|
||||
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
|
||||
rank, getpid(), hostname, hipDev, busIdStr, prop.name);
|
||||
maxMem = std::min(maxMem, prop.totalGlobalMem);
|
||||
}
|
||||
}
|
||||
#if MPI_SUPPORT
|
||||
char *lines = (proc == 0) ? (char *)malloc(nProcs*MAX_LINE) : NULL;
|
||||
// Gather all output in rank order to root (0)
|
||||
@@ -1152,42 +1235,61 @@ testResult_t run() {
|
||||
MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, MPI_COMM_WORLD);
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
#endif
|
||||
hipStream_t streams[nGpus*nThreads];
|
||||
void* sendbuffs[nGpus*nThreads];
|
||||
void* recvbuffs[nGpus*nThreads];
|
||||
void* expected[nGpus*nThreads];
|
||||
hipStream_t streams[nGpus*nThreads*ranksPerGpu];
|
||||
void* sendbuffs[nGpus*nThreads*ranksPerGpu];
|
||||
void* recvbuffs[nGpus*nThreads*ranksPerGpu];
|
||||
void* expected[nGpus*nThreads*ranksPerGpu];
|
||||
size_t sendBytes, recvBytes;
|
||||
|
||||
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads);
|
||||
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads*ranksPerGpu);
|
||||
|
||||
for (int i=0; i<nGpus*nThreads; i++) {
|
||||
HIPCHECK(hipSetDevice(localRank*nThreads*nGpus+i));
|
||||
AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes, nProcs*nThreads*nGpus);
|
||||
//PRINT("sendbuffs[%d]=%p(size=%lu) recvbuffs[%d]=%p(size=%lu)\n", i, sendbuffs[i], sendBytes, i, recvbuffs[i], recvBytes);
|
||||
if (cumask[0] || cumask[1] || cumask[2] || cumask[3]) {
|
||||
PRINT("cumask: ");
|
||||
for (int i = 0; i < 4 ; i++) PRINT("%x,", cumask[i]);
|
||||
PRINT("\n");
|
||||
HIPCHECK(hipExtStreamCreateWithCUMask(streams+i, 4, cumask));
|
||||
} else
|
||||
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
|
||||
// initialize data buffer to avoid all zero data
|
||||
TESTCHECK(InitData(sendbuffs[i], sendBytes, ncclUint8, 0, i));
|
||||
for (int ii=0; ii<nGpus*nThreads; ii++) {
|
||||
int gpuid = localRank*nThreads*nGpus+ii;
|
||||
if (enable_multiranks)
|
||||
gpuid = gpuid % numDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
for (int j=0; j<ranksPerGpu; j++) {
|
||||
int i = ii*ranksPerGpu+j;
|
||||
AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes, nProcs*nThreads*nGpus*ranksPerGpu);
|
||||
//PRINT("sendbuffs[%d]=%p(size=%lu) recvbuffs[%d]=%p(size=%lu)\n", i, sendbuffs[i], sendBytes, i, recvbuffs[i], recvBytes);
|
||||
if (cumask[0] || cumask[1] || cumask[2] || cumask[3]) {
|
||||
PRINT("cumask: ");
|
||||
for (int i = 0; i < 4 ; i++) PRINT("%x,", cumask[i]);
|
||||
PRINT("\n");
|
||||
HIPCHECK(hipExtStreamCreateWithCUMask(streams+i, 4, cumask));
|
||||
} else
|
||||
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
|
||||
// initialize data buffer to avoid all zero data
|
||||
TESTCHECK(InitData(sendbuffs[i], sendBytes, ncclUint8, 0, i));
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
|
||||
//if parallel init is not selected, use main thread to initialize NCCL
|
||||
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus);
|
||||
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus*ranksPerGpu);
|
||||
if (!parallel_init) {
|
||||
if (nProcs == 1) {
|
||||
if (nProcs == 1 && !enable_multiranks) {
|
||||
int gpuArray[nGpus*nThreads];
|
||||
for (int i=0; i<nGpus*nThreads; i++) gpuArray[i] = i;
|
||||
NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpuArray));
|
||||
} else {
|
||||
NCCLCHECK(ncclGroupStart());
|
||||
for (int i=0; i<nGpus*nThreads; i++) {
|
||||
HIPCHECK(hipSetDevice(localRank*nThreads*nGpus+i));
|
||||
NCCLCHECK(ncclCommInitRank(comms+i, nProcs*nThreads*nGpus, ncclId, proc*nThreads*nGpus+i));
|
||||
for (int ii=0; ii<nGpus*nThreads; ii++) {
|
||||
int gpuid = localRank*nThreads*nGpus+ii;
|
||||
if (enable_multiranks) {
|
||||
gpuid = gpuid % numDevices;
|
||||
}
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
if (!enable_multiranks) {
|
||||
NCCLCHECK(ncclCommInitRank(comms+ii, nProcs*nThreads*nGpus, ncclId, proc*nThreads*nGpus+ii));
|
||||
}
|
||||
#ifdef RCCL_MULTIRANKPERGPU
|
||||
else
|
||||
for (int j=0; j<ranksPerGpu; j++) {
|
||||
int i = ii*ranksPerGpu+j;
|
||||
NCCLCHECK(ncclCommInitRankMulti(comms+i, nProcs*nThreads*nGpus*ranksPerGpu, ncclId, proc*nThreads*nGpus*ranksPerGpu+i, proc*nThreads*nGpus*ranksPerGpu+i));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
NCCLCHECK(ncclGroupEnd());
|
||||
}
|
||||
@@ -1219,18 +1321,20 @@ testResult_t run() {
|
||||
threads[t].args.stepbytes=stepBytes;
|
||||
threads[t].args.stepfactor=stepFactor;
|
||||
threads[t].args.localRank = localRank;
|
||||
|
||||
threads[t].args.localNumDevices = numDevices;
|
||||
threads[t].args.enable_multiranks = enable_multiranks;
|
||||
threads[t].args.nRanks = ranksPerGpu;
|
||||
threads[t].args.nProcs=nProcs;
|
||||
threads[t].args.proc=proc;
|
||||
threads[t].args.nThreads=nThreads;
|
||||
threads[t].args.thread=t;
|
||||
threads[t].args.nGpus=nGpus;
|
||||
threads[t].args.sendbuffs = sendbuffs+t*nGpus;
|
||||
threads[t].args.recvbuffs = recvbuffs+t*nGpus;
|
||||
threads[t].args.expected = expected+t*nGpus;
|
||||
threads[t].args.sendbuffs = sendbuffs+t*nGpus*ranksPerGpu;
|
||||
threads[t].args.recvbuffs = recvbuffs+t*nGpus*ranksPerGpu;
|
||||
threads[t].args.expected = expected+t*nGpus*ranksPerGpu;
|
||||
threads[t].args.ncclId = ncclId;
|
||||
threads[t].args.comms=comms+t*nGpus;
|
||||
threads[t].args.streams=streams+t*nGpus;
|
||||
threads[t].args.comms=comms+t*nGpus*ranksPerGpu;
|
||||
threads[t].args.streams=streams+t*nGpus*ranksPerGpu;
|
||||
|
||||
threads[t].args.barrier = (volatile int*)barrier;
|
||||
threads[t].args.barrier_idx = 0;
|
||||
@@ -1267,17 +1371,17 @@ testResult_t run() {
|
||||
#endif
|
||||
|
||||
if (!parallel_init) {
|
||||
for(int i=0; i<nGpus*nThreads; ++i)
|
||||
for(int i=0; i<nGpus*nThreads*ranksPerGpu; ++i)
|
||||
NCCLCHECK(ncclCommDestroy(comms[i]));
|
||||
free(comms);
|
||||
}
|
||||
|
||||
for (int i=0; i<nGpus*nThreads; i++) {
|
||||
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
|
||||
HIPCHECK(hipStreamDestroy(streams[i]));
|
||||
}
|
||||
|
||||
// Free off HIP allocated memory
|
||||
for (int i=0; i<nGpus*nThreads; i++) {
|
||||
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
|
||||
if (memorytype == ncclHost) {
|
||||
HIPCHECK(hipHostFree(sendbuffs[i]));
|
||||
HIPCHECK(hipHostFree(recvbuffs[i]));
|
||||
|
||||
@@ -102,6 +102,9 @@ struct threadArgs {
|
||||
int thread;
|
||||
int nGpus;
|
||||
int localRank;
|
||||
int localNumDevices;
|
||||
int enable_multiranks;
|
||||
int nRanks;
|
||||
void** sendbuffs;
|
||||
size_t sendBytes;
|
||||
size_t sendInplaceOffset;
|
||||
|
||||
+16
-9
@@ -31,20 +31,27 @@ void GatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc
|
||||
testResult_t GatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
|
||||
if (rank == root) {
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
|
||||
if (rank == root) {
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, type, rep, j));
|
||||
}
|
||||
}
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
|
||||
+14
-8
@@ -33,17 +33,24 @@ void HyperCubeGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *par
|
||||
testResult_t HyperCubeInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
for (int j=0; j<nranks; j++) {
|
||||
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, type, rep, j));
|
||||
}
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
@@ -66,7 +73,6 @@ testResult_t HyperCubeRunColl(void* sendbuff, void* recvbuff, size_t count, nccl
|
||||
int rank;
|
||||
NCCLCHECK(ncclCommUserRank(comm, &rank));
|
||||
size_t rankSize = count * wordSize(type);
|
||||
|
||||
if (rbuff+rank*rankSize != sbuff) HIPCHECK(hipMemcpyAsync(rbuff+rank*rankSize, sbuff, rankSize, hipMemcpyDeviceToDevice, stream));
|
||||
|
||||
// Hypercube AllGather
|
||||
|
||||
+15
-8
@@ -31,17 +31,24 @@ void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramc
|
||||
testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
|
||||
if (rank == root) TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
|
||||
if (rank == root) TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
return testSuccess;
|
||||
@@ -119,4 +126,4 @@ testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t typ
|
||||
struct testEngine ncclTestEngine = {
|
||||
ReduceGetBuffSize,
|
||||
ReduceRunTest
|
||||
};
|
||||
};
|
||||
|
||||
@@ -31,17 +31,24 @@ void ReduceScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t
|
||||
testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
HIPCHECK(hipMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, hipMemcpyDefault));
|
||||
TESTCHECK(InitDataReduce(args->expected[i], recvcount, rank*recvcount, type, op, rep, nranks));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
|
||||
TESTCHECK(InitDataReduce(args->expected[k], recvcount, rank*recvcount, type, op, rep, nranks));
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
return testSuccess;
|
||||
@@ -111,4 +118,4 @@ testResult_t ReduceScatterRunTest(struct threadArgs* args, int root, ncclDataTyp
|
||||
struct testEngine ncclTestEngine = {
|
||||
ReduceScatterGetBuffSize,
|
||||
ReduceScatterRunTest
|
||||
};
|
||||
};
|
||||
|
||||
+13
-5
@@ -32,14 +32,22 @@ testResult_t ScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclR
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
TESTCHECK(InitData(args->expected[i], recvcount, type, rep+rank*recvcount, root));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
TESTCHECK(InitData(args->expected[k], recvcount, type, rep+rank*recvcount, root));
|
||||
k++;
|
||||
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
return testSuccess;
|
||||
|
||||
+14
-7
@@ -31,17 +31,24 @@ void SendRecvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *para
|
||||
testResult_t SendRecvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
|
||||
size_t sendcount = args->sendBytes / wordSize(type);
|
||||
size_t recvcount = args->expectedBytes / wordSize(type);
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus;
|
||||
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
|
||||
|
||||
int k=0;
|
||||
for (int i=0; i<args->nGpus; i++) {
|
||||
int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
|
||||
if (args->enable_multiranks)
|
||||
gpuid = gpuid % args->localNumDevices;
|
||||
HIPCHECK(hipSetDevice(gpuid));
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[i], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
int peer = (rank-1+nranks)%nranks;
|
||||
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, peer));
|
||||
|
||||
for (int l=0; l<args->nRanks; l++) {
|
||||
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
|
||||
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
|
||||
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
|
||||
TESTCHECK(InitData(data, sendcount, type, rep, rank));
|
||||
int peer = (rank-1+nranks)%nranks;
|
||||
TESTCHECK(InitData(args->expected[k], recvcount, type, rep, peer));
|
||||
k++;
|
||||
}
|
||||
HIPCHECK(hipDeviceSynchronize());
|
||||
}
|
||||
// We don't support in-place sendrecv
|
||||
|
||||
Ссылка в новой задаче
Block a user