diff --git a/README.md b/README.md index 0fd7a24bc3..7a4bbbc6ca 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # NCCL Tests -These tests check both the performance and the correctness of NCCL operations. They can be compiled against [NCCL 1](http://github.com/nvidia/nccl) and [NCCL 2](http://developer.nvidia.com/nccl). +These tests check both the performance and the correctness of NCCL operations. They can be compiled against [NCCL](http://github.com/nvidia/nccl) ## Build @@ -20,7 +20,7 @@ $ make MPI=1 MPI_HOME=/path/to/mpi CUDA_HOME=/path/to/cuda NCCL_HOME=/path/to/nc ## Usage -NCCL tests can run on multiple processes, multiple threads, and multiple CUDA devices per thread. The number of process is managed by MPI and is therefore not passed to the tests as argument. The total number of ranks (=CUDA devices) will be equal to (number of processes)\*(number of threads)\*(number of gpus per thread). +NCCL tests can run on multiple processes, multiple threads, and multiple CUDA devices per thread. The number of process is managed by MPI and is therefore not passed to the tests as argument. The total number of ranks (=CUDA devices) will be equal to (number of processes)\*(number of threads)\*(number of GPUs per thread). ### Quick examples @@ -44,7 +44,7 @@ All tests support the same set of arguments : * Number of GPUs * `-t,--nthreads ` number of threads per process. Default : 1. - * `-g,--ngpus ` number of gpus per thread. Default : 1. + * `-g,--ngpus ` number of gpus per thread. Default : 1. * Sizes to scan * `-b,--minbytes ` minimum size to start with. Default : 32M. * `-e,--maxbytes ` maximum size to end at. Default : 32M. @@ -55,16 +55,16 @@ All tests support the same set of arguments : * `-o,--op ` Specify which reduction operation to perform. Only relevant for reduction operations like Allreduce, Reduce or ReduceScatter. Default : Sum. * `-d,--datatype ` Specify which datatype to use. Default : Float. * `-r,--root ` Specify which root to use. Only for operations with a root like broadcast or reduce. Default : 0. -* Performance +* Performance * `-n,--iters ` number of iterations. Default : 20. * `-w,--warmup_iters ` number of warmup iterations (not timed). Default : 5. + * `-m,--agg_iters ` number of operations to aggregate together in each iteration. Default : 1. * Test operation - * `-s,--swap_args <0/1>` when used with multiple threads, have threads manage different GPUs for each iteration. Default : 0. * `-p,--parallel_init <0/1>` use threads to initialize NCCL in parallel. Default : 0. * `-c,--check <0/1>` check correctness of results. This can be quite slow on large numbers of GPUs. Default : 1. * `-z,--blocking <0/1>` Make NCCL collective blocking, i.e. have CPUs wait and sync after each collective. Default : 0. ## Copyright -NCCL tests are provided under the BSD licence. All source code and accompanying documentation is copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. +NCCL tests are provided under the BSD license. All source code and accompanying documentation is copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. diff --git a/src/Makefile b/src/Makefile index 45d31d54b0..034cc672fa 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,7 +1,7 @@ # -# Copyright (c) 2015-2017, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. # -# See LICENCE.txt for license information +# See LICENSE.txt for license information # CUDA_HOME ?= /usr/local/cuda @@ -18,10 +18,10 @@ NVCC = $(CUDA_HOME)/bin/nvcc NVCC_GENCODE ?= -gencode=arch=compute_30,code=sm_30 \ -gencode=arch=compute_35,code=sm_35 \ -gencode=arch=compute_50,code=sm_50 \ - -gencode=arch=compute_52,code=sm_52 \ - -gencode=arch=compute_60,code=sm_60 \ + -gencode=arch=compute_60,code=sm_60 \ -gencode=arch=compute_61,code=sm_61 \ - -gencode=arch=compute_61,code=compute_61 + -gencode=arch=compute_70,code=compute_70 \ + -gencode=arch=compute_70,code=sm_70 NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11 @@ -29,14 +29,16 @@ LDFLAGS := -L${CUDA_LIB} -lcudart -lrt NVLDFLAGS := -L${CUDA_LIB} -lcudart -lrt ifeq ($(DEBUG), 0) -NVCUFLAGS += -O3 -CXXFLAGS += -O3 +NVCUFLAGS += -O3 -g +CXXFLAGS += -O3 -g else NVCUFLAGS += -O0 -G -g CXXFLAGS += -O0 -g -ggdb3 endif -ifeq ($(VERBOSE), 0) +ifneq ($(VERBOSE), 0) +NVCUFLAGS += -Xcompiler -Wall,-Wextra,-Wno-unused-parameter +else .SILENT: endif @@ -45,7 +47,7 @@ endif BUILDDIR ?= ../build ifneq ($(NCCL_HOME), "") NVCUFLAGS += -I$(NCCL_HOME)/include/ -NVLDFLAGS += -L$(NCCL_HOME)/lib +NVLDFLAGS += -L$(NCCL_HOME)/lib endif ifeq ($(MPI), 1) @@ -53,7 +55,7 @@ NVCUFLAGS += -DMPI_SUPPORT -I$(MPI_HOME)/include NVLDFLAGS += -L$(MPI_HOME)/lib -lmpi endif LIBRARIES += curand nccl nvToolsExt -NVLDFLAGS += $(LIBRARIES:%=-l%) +NVLDFLAGS += $(LIBRARIES:%=-l%) DST_DIR := $(BUILDDIR) SRC_FILES := $(wildcard *.cu) @@ -66,7 +68,7 @@ build: ${BIN_FILES} clean: rm -rf ${DST_DIR} -${DST_DIR}/%.o: %.cu +${DST_DIR}/%.o: %.cu common.h @printf "Compiling %-35s > %s\n" $< $@ @mkdir -p ${DST_DIR} $(NVCC) -o $@ $(NVCUFLAGS) -c $< diff --git a/src/all_gather.cu b/src/all_gather.cu index 2386842cdd..cfb2ec356b 100644 --- a/src/all_gather.cu +++ b/src/all_gather.cu @@ -1,79 +1,53 @@ /************************************************************************* - * Copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. * - * See LICENCE.txt for license information + * See LICENSE.txt for license information ************************************************************************/ #include "cuda_runtime.h" #include "common.h" - void print_header() { - PRINT("# %10s %12s %6s %6s out-of-place in-place\n", "", "", "", ""); - PRINT("# %10s %12s %6s %7s %5s %5s %7s %7s %5s %5s %7s\n", "bytes", "N", "type", - "time", "algbw", "busbw", "res", "time", "algbw", "busbw", "res"); + PRINT("# %10s %12s %6s out-of-place in-place \n", "", "", ""); + PRINT("# %10s %12s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", + "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); + PRINT("# %10s %12s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", + "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); } void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { PRINT("%12li %12li %6s", size, count, typeName); } -void getCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t *procSharedCount, int *sameExpected, size_t count, int nranks) { - *sendcount = count/nranks; - *recvcount = (count/nranks)*nranks; - *sameExpected = 1; - *procSharedCount = 0; - *sendInplaceOffset = count/nranks; - *recvInplaceOffset = 0; - *paramcount = *sendcount; +void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { + *sendcount = count/nranks; + *recvcount = (count/nranks)*nranks; + *sendInplaceOffset = count/nranks; + *recvInplaceOffset = 0; + *paramcount = *sendcount; } -void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { - size_t nBytes = args->nbytes; - size_t count = nBytes / wordSize(type); - int proc = args->proc; - int nThreads = args->nThreads; - int t = args->thread; - int nGpus = args->nGpus; +testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { + size_t sendcount = args->sendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; - while (args->sync[args->sync_idx] != t) pthread_yield(); - - for (int i=0; inGpus; i++) { + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + CUDACHECK(cudaSetDevice(gpuid)); int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); - CUDACHECK(cudaSetDevice(device)); - - void* data = in_place ? (void *)((uintptr_t)args->recvbuffs[i] + args->sendInplaceOffset*rank) : args->sendbuffs[i]; - - CUDACHECK(cudaMemcpy((void *)((uintptr_t)args->expectedHost[0] + ((proc*nThreads + t)*nGpus + i)*nBytes), - data, - nBytes, cudaMemcpyDeviceToHost)); - - if (in_place == 0) { - CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + for (int j=0; jexpected[i])+args->sendBytes*j, sendcount, type, rep, j)); } CUDACHECK(cudaDeviceSynchronize()); } - - args->sync[args->sync_idx] = t + 1; - - if (t+1 == nThreads) { -#ifdef MPI_SUPPORT - // Last thread does the MPI allgather - MPI_Allgather(MPI_IN_PLACE, nBytes*nThreads*nGpus, MPI_BYTE, - args->expectedHost[0], - nBytes*nThreads*nGpus, MPI_BYTE, MPI_COMM_WORLD); -#endif - args->sync[args->sync_idx] = 0; - } else { - while (args->sync[args->sync_idx]) pthread_yield(); - } - - args->sync_idx=!args->sync_idx; + return testSuccess; } -void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { +void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { double baseBw = (double)(count * typesize * (nranks - 1)) / 1.0E9 / sec; *algBw = baseBw; @@ -81,26 +55,49 @@ void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, *busBw = baseBw * factor; } -void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { +testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { NCCLCHECK(ncclAllGather(sendbuff, recvbuff, count, type, comm, stream)); + return testSuccess; } -void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { +struct testColl allGatherTest = { + "AllGather", + AllGatherGetCollByteCount, + AllGatherInitData, + AllGatherGetBw, + AllGatherRunColl +}; + +void AllGatherGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &allGatherTest; ncclDataType_t *run_types; const char **run_typenames; int type_count; - if ((int)type != -1) { + if ((int)type != -1) { type_count = 1; run_types = &type; run_typenames = &typeName; - } else { + } else { type_count = ncclNumTypes; run_types = test_types; run_typenames = test_typenames; } - for (int i=0; inbytes / wordSize(type); - - while (args->sync[args->sync_idx] != args->thread) pthread_yield(); - - for (int i=0; inGpus; i++) { - int device; - NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); - CUDACHECK(cudaSetDevice(device)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - - if (is_first && i == 0) { - CUDACHECK(cudaMemcpy(args->expected[0], data, count*wordSize(type), cudaMemcpyDeviceToHost)); - } else { - Accumulate(args->expected[0], data, count, type, op); - } - - if (in_place == 0) { - CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->nbytes)); - } - CUDACHECK(cudaDeviceSynchronize()); - } - - args->sync[args->sync_idx] = args->thread + 1; - - if (args->thread+1 == args->nThreads) { -#ifdef MPI_SUPPORT - // Last thread does the MPI reduction - if (args->nbytes > 0) { - void* remote, *remoteHost = malloc(args->nbytes); - void* myInitialData = malloc(args->nbytes); - memcpy(myInitialData, args->expectedHost[0], args->nbytes); - CUDACHECK(cudaHostRegister(remoteHost, args->nbytes, cudaHostRegisterPortable | cudaHostRegisterMapped)); - CUDACHECK(cudaHostGetDevicePointer(&remote, remoteHost, 0)); - for (int i=0; inProcs; i++) { - if (i == args->proc) { - MPI_Bcast(myInitialData, args->nbytes, MPI_BYTE, i, MPI_COMM_WORLD); - free(myInitialData); - } else { - MPI_Bcast(remoteHost, args->nbytes, MPI_BYTE, i, MPI_COMM_WORLD); - Accumulate(args->expected[0], remote, count, type, op); - cudaDeviceSynchronize(); - } - } - CUDACHECK(cudaHostUnregister(remoteHost)); - free(remoteHost); - } -#endif - args->sync[args->sync_idx] = 0; - } else { - while (args->sync[args->sync_idx]) pthread_yield(); - } - - args->sync_idx = !args->sync_idx; +void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { + *sendcount = count; + *recvcount = count; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = *sendcount; } -void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { +testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { + size_t sendcount = args->sendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; + + for (int i=0; inGpus; i++) { + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + CUDACHECK(cudaSetDevice(gpuid)); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks)); + CUDACHECK(cudaDeviceSynchronize()); + } + return testSuccess; +} + +void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { double baseBw = (double)(count * typesize) / 1.0E9 / sec; *algBw = baseBw; @@ -91,40 +53,62 @@ void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, *busBw = baseBw * factor; } -void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { +testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, type, op, comm, stream)); + return testSuccess; } +struct testColl allReduceTest = { + "AllReduce", + AllReduceGetCollByteCount, + AllReduceInitData, + AllReduceGetBw, + AllReduceRunColl +}; -void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { +void AllReduceGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &allReduceTest; ncclDataType_t *run_types; ncclRedOp_t *run_ops; const char **run_typenames, **run_opnames; int type_count, op_count; - if ((int)type != -1) { + if ((int)type != -1) { type_count = 1; run_types = &type; run_typenames = &typeName; - } else { + } else { type_count = ncclNumTypes; run_types = test_types; run_typenames = test_typenames; } - if ((int)op != -1) { + if ((int)op != -1) { op_count = 1; run_ops = &op; run_opnames = &opName; - } else { + } else { op_count = ncclNumOps; run_ops = test_ops; run_opnames = test_opnames; } - for (int i=0; i void print_header() { - PRINT("# %10s %12s %6s %6s out-of-place\n", "", "", "", ""); - PRINT("# %10s %12s %6s %6s %7s %5s %5s %7s\n", "bytes", "N", "type", "root", - "time", "algbw", "busbw", "res"); + PRINT("# %10s %12s %6s %6s out-of-place in-place \n", "", "", "", ""); + PRINT("# %10s %12s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "root", + "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); + PRINT("# %10s %12s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", + "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); } void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { PRINT("%12li %12li %6s %6i", size, count, typeName, root); } -void getCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t *procSharedCount, int *sameExpected, size_t count, int nranks) { - *sendcount = count; - *recvcount = count; - *sameExpected = 0; - *procSharedCount = count; - *sendInplaceOffset = 0; - *recvInplaceOffset = 0; - *paramcount = *sendcount; +void BroadcastGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { + *sendcount = count; + *recvcount = count; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = *sendcount; } -void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { - int root_proc = root/(args->nThreads*args->nGpus); - int root_thread = (root/args->nGpus)%(args->nThreads); - int root_gpu = root%args->nGpus; - - assert(args->expectedBytes == args->nbytes); - - if (root_thread == args->thread) { - if (root_proc == args->proc) { - CUDACHECK(cudaMemcpy(args->procSharedHost, - args->sendbuffs[root_gpu], - args->nbytes, cudaMemcpyDeviceToHost)); - } -#ifdef MPI_SUPPORT - MPI_Bcast(args->procSharedHost, args->nbytes, MPI_BYTE, root_proc, MPI_COMM_WORLD); -#endif - - args->sync[0] = 0; - } - - Barrier(args); +testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { + size_t sendcount = args->sendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); for (int i=0; inGpus; i++) { - int device; - NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); - CUDACHECK(cudaSetDevice(device)); - - //set expected buf to zero at root, copy over source data at others - if ((root_proc == args->proc) - && (root_thread == args->thread) - && (root_gpu == i)) { - memset(args->expectedHost[i], 0, args->nbytes); - } else { - memcpy(args->expectedHost[i], args->procSharedHost, args->nbytes); - } - - //reset recvbufs to zero - CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->nbytes)); - CUDACHECK(cudaDeviceSynchronize()); + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + CUDACHECK(cudaSetDevice(gpuid)); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitData(args->expected[i], recvcount, type, rep, root)); + CUDACHECK(cudaDeviceSynchronize()); } - - Barrier(args); + return testSuccess; } -void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { +void BroadcastGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { double baseBw = (double)(count * typesize) / 1.0E9 / sec; *algBw = baseBw; @@ -80,42 +52,69 @@ void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, *busBw = baseBw * factor; } -void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - int rank; +testResult_t BroadcastRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + int rank; NCCLCHECK(ncclCommUserRank(comm, &rank)); - if (rank == root) { +#if NCCL_MAJOR >= 2 && NCCL_MINOR >= 2 + NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, count, type, root, comm, stream)); +#else + if (rank == root) { NCCLCHECK(ncclBcast(sendbuff, count, type, root, comm, stream)); - } else { + } else { NCCLCHECK(ncclBcast(recvbuff, count, type, root, comm, stream)); - } + } +#endif + return testSuccess; } -void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { +struct testColl broadcastTest = { + "Broadcast", + BroadcastGetCollByteCount, + BroadcastInitData, + BroadcastGetBw, + BroadcastRunColl +}; + +void BroadcastGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + BroadcastGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &broadcastTest; ncclDataType_t *run_types; const char **run_typenames; int type_count; - int begin_root, end_root; + int begin_root, end_root; - if ((int)type != -1) { + if ((int)type != -1) { type_count = 1; run_types = &type; run_typenames = &typeName; - } else { + } else { type_count = ncclNumTypes; run_types = test_types; run_typenames = test_typenames; } - if (root != -1) { - begin_root = end_root = root; - } else { - begin_root = 0; - end_root = args->nProcs*args->nThreads*args->nGpus-1; + if (root != -1) { + begin_root = end_root = root; + } else { + begin_root = 0; + end_root = args->nProcs*args->nThreads*args->nGpus-1; } - for (int i=0; i #include #include +#include #include "cuda.h" #if NCCL_MAJOR >= 2 @@ -22,13 +23,20 @@ const char *test_opnames[ncclNumOps] = {"sum", "prod", "max", "min"}; thread_local int is_main_thread = 0; +// Command line parameter defaults +static int nThreads = 1; +static int nGpus = 1; +static size_t minBytes = 32*1024*1024; +static size_t maxBytes = 32*1024*1024; +static size_t stepBytes = 1*1024*1024; +static size_t stepFactor = 1; static int datacheck = 1; static int warmup_iters = 5; static int iters = 20; +static int agg_iters = 1; static int ncclop = ncclSum; static int nccltype = ncclFloat; static int ncclroot = 0; -static int swap_args = 0; static int parallel_init = 0; static int blocking_coll = 0; @@ -83,12 +91,11 @@ template __device__ float toFloat(T a) { return (float)a; } -template<> __device__ +template<> __device__ float toFloat(half a) { return __half2float(a); } - template __global__ void deltaKern(void* A_, void* B_, size_t count, double* max) { const T* A = (const T*)A_; @@ -102,7 +109,7 @@ void deltaKern(void* A_, void* B_, size_t count, double* max) { if( delta > locmax ) { locmax = delta; #ifdef DEBUG_PRINT - if (delta > .1) printf("Error at %d/%d : %f != %f\n", i, count, toFloat(A[i]), toFloat(B[i])); + if (delta > .1) printf("Error at %d/%ld : %f != %f\n", i, count, toFloat(A[i]), toFloat(B[i])); #endif } } @@ -119,7 +126,7 @@ void deltaKern(void* A_, void* B_, size_t count, double* max) { } -void CheckDelta(void* expected, void* results, size_t count, ncclDataType_t type, double* devmax) { +testResult_t CheckDelta(void* expected, void* results, size_t count, ncclDataType_t type, double* devmax) { switch (type) { case ncclHalf: deltaKern<<<1, 512>>>(results, expected, count, devmax); break; @@ -142,223 +149,112 @@ void CheckDelta(void* expected, void* results, size_t count, ncclDataType_t type case ncclUint64: deltaKern<<<1, 512>>>(results, expected, count, devmax); break; } -} - -#define CURAND_CHK(cmd) \ - do { \ - curandStatus_t error = (cmd); \ - if (error != CURAND_STATUS_SUCCESS) { \ - printf("CuRAND error %i at %s:%i\n", error, __FILE__ , __LINE__); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - - -template -void GenerateRandom(curandGenerator_t generator, T * const dest, - const size_t N); - -template<> -void GenerateRandom(curandGenerator_t generator, int8_t * const dest, - const size_t N) { - size_t align = (4 - (((size_t)dest) & 3)) % 4; - CURAND_CHK(curandGenerate(generator, (unsigned int*)(dest+align), - N * sizeof(int8_t) / sizeof(int))); - CUDACHECK(cudaMemcpy(dest, dest+4, align, cudaMemcpyDeviceToDevice)); -} -template<> -void GenerateRandom(curandGenerator_t generator, uint8_t * const dest, - const size_t N) { - size_t align = (4 - (((size_t)dest) & 3)) % 4; - CURAND_CHK(curandGenerate(generator, (unsigned int*)(dest+align), - N * sizeof(uint8_t) / sizeof(int))); - CUDACHECK(cudaMemcpy(dest, dest+4, align, cudaMemcpyDeviceToDevice)); -} - -template<> -void GenerateRandom(curandGenerator_t generator, int32_t * const dest, - const size_t N) { - CURAND_CHK(curandGenerate(generator, (unsigned int*)dest, N)); -} - -template<> -void GenerateRandom(curandGenerator_t generator, uint32_t * const dest, - const size_t N) { - CURAND_CHK(curandGenerate(generator, (unsigned int*)dest, N)); -} - -template<> -void GenerateRandom(curandGenerator_t generator, float * const dest, - const size_t N) { - CURAND_CHK(curandGenerateUniform(generator, dest, N)); -} - -template<> -void GenerateRandom(curandGenerator_t generator, double * const dest, - const size_t N) { - CURAND_CHK(curandGenerateUniformDouble(generator, dest, N)); -} - -template<> -void GenerateRandom(curandGenerator_t generator, uint64_t * const dest, - const size_t N) { - CURAND_CHK(curandGenerate(generator, (unsigned int *)dest, N*2)); -} - -template<> -void GenerateRandom(curandGenerator_t generator, int64_t * const dest, - const size_t N) { - CURAND_CHK(curandGenerate(generator, (unsigned int *)dest, N*2)); -} - -template -void RandomizeType(void* dest, const size_t N, const int randomSeed) { - T* ptr = (T*)dest; - curandGenerator_t gen; - CURAND_CHK(curandCreateGenerator(&gen, CURAND_RNG_PSEUDO_MTGP32)); - CURAND_CHK(curandSetPseudoRandomGeneratorSeed(gen, randomSeed)); - GenerateRandom(gen, ptr, N); - CURAND_CHK(curandDestroyGenerator(gen)); CUDACHECK(cudaDeviceSynchronize()); + return testSuccess; } -__global__ void halve(const float * src, half* dest, size_t N) { - for(int tid = threadIdx.x + blockIdx.x*blockDim.x; - tid < N; tid += blockDim.x * gridDim.x) - dest[tid] = __float2half(src[tid]); +// For integer values, we use values between 0 and 255 +template +__device__ T testValue(const size_t offset, const int rep, const int rank) { + uint8_t v = (rep+rank+offset) % 256; + return (T)v; } -void RandomizeHalf(void* dest, const size_t N, const int randomSeed) { - half* ptr = (half*)dest; - curandGenerator_t gen; - CURAND_CHK(curandCreateGenerator(&gen, CURAND_RNG_PSEUDO_MTGP32)); - CURAND_CHK(curandSetPseudoRandomGeneratorSeed(gen, randomSeed)); - - float* temp; - CUDACHECK(cudaMalloc(&temp, N*sizeof(float))); - GenerateRandom(gen, temp, N); - halve<<<128, 512>>>(temp, ptr, N); - CURAND_CHK(curandDestroyGenerator(gen)); - CUDACHECK(cudaFree(temp)); - CUDACHECK(cudaDeviceSynchronize()); +// For floating point datatype, we use values between 0 and 1 otherwise the +// Product operation will produce NaNs. +template<> +__device__ double testValue(const size_t offset, const int rep, const int rank) { + return 1.0/(1.0+(double)testValue(offset, rep, rank)); +} +template<> +__device__ float testValue(const size_t offset, const int rep, const int rank) { + return 1.0/(1.0+(float)testValue(offset, rep, rank)); +} +template<> +__device__ half testValue(const size_t offset, const int rep, const int rank) { + return __float2half(testValue(offset, rep, rank)); } -void Randomize(void* ptr, const size_t count, ncclDataType_t type, const int seed) { - switch (type) { - case ncclChar: RandomizeType (ptr, count, seed); break; -#if NCCL_MAJOR >= 2 - case ncclUint8: RandomizeType (ptr, count, seed); break; -#endif - case ncclInt: RandomizeType (ptr, count, seed); break; -#if NCCL_MAJOR >= 2 - case ncclUint32: RandomizeType(ptr, count, seed); break; -#endif - case ncclInt64: RandomizeType (ptr, count, seed); break; - case ncclUint64: RandomizeType(ptr, count, seed); break; - case ncclHalf: RandomizeHalf (ptr, count, seed); break; - case ncclFloat: RandomizeType (ptr, count, seed); break; - case ncclDouble: RandomizeType (ptr, count, seed); break; - } -} +// Operations +template +__device__ T ncclOpSum(T a, T b) { return a+b; } +template +__device__ T ncclOpProd(T a, T b) { return a*b; } +template +__device__ T ncclOpMax(T a, T b) { return a>b ? a : b; } +template +__device__ T ncclOpMin(T a, T b) { return a __global__ static -void accumKern(T* acum, const T* contrib, size_t N) { - int tid = threadIdx.x + blockIdx.x*blockDim.x; - int offset = blockDim.x*gridDim.x; - for(int i=tid; i c) ? a : c; - } else if(OP == ncclMin) { - acum[i] = (a < c) ? a : c; +// Definitions for half +template<> +__device__ half ncclOpSum(half a, half b) { return __float2half(__half2float(a)+__half2float(b)); } +template<> +__device__ half ncclOpProd(half a, half b) { return __float2half(__half2float(a)*__half2float(b)); } +template<> +__device__ half ncclOpMax(half a, half b) { return __half2float(a)>__half2float(b) ? a : b; } +template<> +__device__ half ncclOpMin(half a, half b) { return __half2float(a)<__half2float(b) ? a : b; } + +template +__global__ void InitDataReduceKernel(T* data, const size_t N, const size_t offset, const int rep, const int nranks) { + for (size_t o=blockIdx.x*blockDim.x+threadIdx.x; o(o+offset, rep, 0); + for (int i=1; i(o+offset, rep, i)); } + data[o] = val; } } -template<> __global__ -void accumKern(half* acum, const half* contrib, size_t N) { - int tid = threadIdx.x + blockIdx.x*blockDim.x; - int offset = blockDim.x*gridDim.x; - for(int i=tid; i> +#define OPS(type) KERN(type, ncclOpSum), KERN(type, ncclOpProd), KERN(type, ncclOpMax), KERN(type, ncclOpMin) -template<> __global__ -void accumKern(half* acum, const half* contrib, size_t N) { - int tid = threadIdx.x + blockIdx.x*blockDim.x; - int offset = blockDim.x*gridDim.x; - for(int i=tid; i __global__ -void accumKern(half* acum, const half* contrib, size_t N) { - int tid = threadIdx.x + blockIdx.x*blockDim.x; - int offset = blockDim.x*gridDim.x; - for(int i=tid; ic) ? a : c ); - } -} - -template<> __global__ -void accumKern(half* acum, const half* contrib, size_t N) { - int tid = threadIdx.x + blockIdx.x*blockDim.x; - int offset = blockDim.x*gridDim.x; - for(int i=tid; i -void accVecType(void* out, void* in, size_t n, ncclRedOp_t op) { - switch(op) { - case ncclSum: accumKern <<<256,256>>>((T*)out, (T*)in, n); break; - case ncclProd: accumKern<<<256,256>>>((T*)out, (T*)in, n); break; - case ncclMax: accumKern <<<256,256>>>((T*)out, (T*)in, n); break; - case ncclMin: accumKern <<<256,256>>>((T*)out, (T*)in, n); break; - default: - printf("Unknown reduction operation.\n"); - exit(EXIT_FAILURE); - } +__global__ void InitDataKernel(T* data, const size_t N, const int rep, const int rank) { + for (size_t o=blockIdx.x*blockDim.x+threadIdx.x; o(o, rep, rank); } -void Accumulate(void* out, void* in, size_t n, ncclDataType_t type, ncclRedOp_t op) { - switch (type) { - case ncclChar: accVecType (out, in, n, op); break; -#if NCCL_MAJOR >= 2 - case ncclUint8: accVecType (out, in, n, op); break; -#endif - case ncclInt: accVecType (out, in, n, op); break; -#if NCCL_MAJOR >= 2 - case ncclUint32: accVecType (out, in, n, op); break; -#endif - case ncclInt64: accVecType (out, in, n, op); break; - case ncclUint64: accVecType (out, in, n, op); break; - case ncclHalf: accVecType (out, in, n, op); break; - case ncclFloat: accVecType (out, in, n, op); break; - case ncclDouble: accVecType (out, in, n, op); break; - default: - printf("Unknown reduction type.\n"); - exit(EXIT_FAILURE); - } +static void* const initDataKerns[ncclNumTypes] = { + (void*)InitDataKernel< int8_t>, + (void*)InitDataKernel< uint8_t>, + (void*)InitDataKernel< int32_t>, + (void*)InitDataKernel, + (void*)InitDataKernel< int64_t>, + (void*)InitDataKernel, + (void*)InitDataKernel< half>, + (void*)InitDataKernel< float>, + (void*)InitDataKernel< double> +}; + +template +testResult_t InitDataType(void* dest, const size_t N, const int rep, const int rank) { + T* ptr = (T*)dest; + InitDataKernel<<<16, 512>>>(ptr, N, rep, rank); + return testSuccess; } -void Barrier(struct threadArgs_t* args) +testResult_t InitData(void* data, const size_t count, ncclDataType_t type, const int rep, const int rank) { + dim3 grid = { 32, 1, 1 }; + dim3 block = { 256, 1, 1 }; + void* args[4] = { (void*)&data, (void*)&count, (void*)&rep, (void*)&rank }; + CUDACHECK(cudaLaunchKernel(initDataKerns[type], grid, block, args, 0, cudaStreamDefault)); + return testSuccess; +} + +void Barrier(struct threadArgs* args) { while (args->barrier[args->barrier_idx] != args->thread) pthread_yield(); @@ -376,16 +272,7 @@ void Barrier(struct threadArgs_t* args) args->barrier_idx=!args->barrier_idx; } -void RandomizeAccumulate(void* data, void* accum, size_t count, ncclDataType_t type, ncclRedOp_t op, int seed, int rank) { - Randomize(data, count, type, seed); - if (rank == 0) { - CUDACHECK(cudaMemcpy(accum, data, count*wordSize(type), cudaMemcpyDeviceToHost)); - } else { - Accumulate(accum, data, count, type, op); - } -} - -double CheckData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { +testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, double *delta) { size_t count = args->expectedBytes/wordSize(type); double maxDelta = 0.0; for (int i=0; inGpus; i++) { @@ -394,24 +281,25 @@ double CheckData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); CUDACHECK(cudaSetDevice(device)); void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i]; - CheckDelta(data , args->expected[i], count, type, args->delta); - cudaDeviceSynchronize(); + TESTCHECK(CheckDelta(data , args->expected[i], count, type, args->delta)); maxDelta = std::max(*(args->deltaHost), maxDelta); #ifdef DEBUG_PRINT - if (rank == 0) { - int *temp = (int *)malloc(args->expectedBytes); + if (rank == 0) { + int *expectedHost = (int *)malloc(args->expectedBytes); + int *dataHost = (int *)malloc(args->expectedBytes); + cudaMemcpy(expectedHost, args->expected[0], args->expectedBytes, cudaMemcpyDeviceToHost); printf("\n Expected: "); - for(int j=0; jexpectedBytes/sizeof(int); j++) { - printf("%d:%d ", j, *((int *)args->expectedHost[0] + j)); + for(int j=0; jexpectedBytes/sizeof(int); j++) { + printf("%d:%d ", j, expectedHost[j]); } printf("\n"); - cudaMemcpy(temp, data, args->expectedBytes, cudaMemcpyDeviceToHost); + cudaMemcpy(dataHost, data, args->expectedBytes, cudaMemcpyDeviceToHost); printf("\n Actual: "); - for (int j=0; jexpectedBytes/sizeof(int); j++) { - printf("%d:%d ", j, *((int *)temp + j)); + for (int j=0; jexpectedBytes/sizeof(int); j++) { + printf("%d:%d ", j, dataHost[j]); } printf("\n"); free(temp); @@ -420,173 +308,173 @@ double CheckData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, } double nranks = args->nProcs*args->nThreads*args->nGpus; if (maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++; - return maxDelta; + *delta = maxDelta; + return testSuccess; } -void InitSend(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { - size_t count = args->sendBytes / wordSize(type); - static int rep = 1; - for (int i=0; inGpus; i++) { - int device; - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); - CUDACHECK(cudaSetDevice(device)); - void* data = in_place ? (void *)((uintptr_t)args->recvbuffs[i] + args->sendInplaceOffset*rank) : args->sendbuffs[i]; - int seed = rank+count+rep+in_place; - Randomize(data, count, type, seed); +testResult_t testStreamSynchronize(int ngpus, cudaStream_t* streams, ncclComm_t* comms) { + cudaError_t cudaErr; + int remaining = ngpus; + int* done = (int*)malloc(sizeof(int)*ngpus); + memset(done, 0, sizeof(int)*ngpus); + while (remaining) { + int idle = 1; + for (int i=0; isendBytes); - cudaMemcpy(temp, data, args->sendBytes, cudaMemcpyDeviceToHost); - printf("\n Send Data at rank %d:", rank); - for (int i=0; isendBytes/sizeof(int); i++) { - printf("%d:%d ", i, *((int *)temp + i)); + cudaErr = cudaStreamQuery(streams[i]); + if (cudaErr == cudaSuccess) { + done[i] = 1; + remaining--; + idle = 0; + continue; + } + + if (cudaErr != cudaErrorNotReady) CUDACHECK(cudaErr); + +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,4,0) + if (comms) { + ncclResult_t ncclAsyncErr; + NCCLCHECK(ncclCommGetAsyncError(comms[i], &ncclAsyncErr)); + if (ncclAsyncErr != ncclSuccess) { + // An asynchronous error happened. Stop the operation and destroy + // the communicator + for (int i=0; inbytes / wordSize(type); - if (swap_args) { - args = (struct threadArgs_t*)args->proc_args + (args->thread + thread_offset)%args->nThreads; - } + // Try to change offset for each iteration so that we avoid cache effects and catch race conditions in ptrExchange + size_t totalnbytes = max(args->sendBytes, args->expectedBytes); + size_t shift = (totalnbytes * iter) % args->maxbytes; + if (shift + totalnbytes > args->maxbytes) shift = 0; - if (args->nGpus == 1) { - int rank = args->proc*args->nThreads + args->thread; - RunColl((void*)(in_place ? ((void *)((uintptr_t)args->recvbuffs[0] + args->sendInplaceOffset*rank)) : args->sendbuffs[0]), - (void*)(in_place ? (void*)((uintptr_t)args->recvbuffs[0] + args->recvInplaceOffset*rank) : args->recvbuffs[0]), - count, type, op, root, args->comms[0], args->streams[0]); - } else { - NCCLCHECK(ncclGroupStart()); - for (int i = 0; i < args->nGpus; i++) { + if (args->nGpus > 1) NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus; i++) { #ifndef NCCL_MAJOR - int cudaDev; - NCCLCHECK(ncclCommCuDevice(args->comms[i], &cudaDev)); - CUDACHECK(cudaSetDevice(cudaDev)); + int cudaDev; + NCCLCHECK(ncclCommCuDevice(args->comms[i], &cudaDev)); + CUDACHECK(cudaSetDevice(cudaDev)); #endif - int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); - RunColl((void*)(in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->sendInplaceOffset*rank)) : args->sendbuffs[i]), - (void*)(in_place ? (void*)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank) : args->recvbuffs[i]), - count, type, op, root, args->comms[i], args->streams[i]); - } - NCCLCHECK(ncclGroupEnd()); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + char* recvBuff = ((char*)args->recvbuffs[i]) + shift; + char* sendBuff = ((char*)args->sendbuffs[i]) + shift; + TESTCHECK(args->collTest->runColl( + (void*)(in_place ? recvBuff + args->sendInplaceOffset*rank : sendBuff), + (void*)(in_place ? recvBuff + args->recvInplaceOffset*rank : recvBuff), + count, type, op, root, args->comms[i], args->streams[i])); } + if (args->nGpus > 1) NCCLCHECK(ncclGroupEnd()); - if (swap_args || blocking_coll) { - //if args have been swapped, complete op before returning - for (int i = 0; i < args->nGpus; ++i) { - cudaError_t err = cudaErrorNotReady; - while (err == cudaErrorNotReady) { - err = cudaStreamQuery(args->streams[i]); - pthread_yield(); - } - CUDACHECK(err); - } + if (blocking_coll) { + // Complete op before returning + TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms)); } if (blocking_coll) Barrier(args); + return testSuccess; } -void completeColl(struct threadArgs_t* args) { - //it swap_args was enabled, op would have been completed immediately - if (swap_args || blocking_coll) return; +testResult_t completeColl(struct threadArgs* args) { + if (blocking_coll) return testSuccess; - for (int i = 0; i < args->nGpus; ++i) { - cudaError_t err = cudaErrorNotReady; - while (err == cudaErrorNotReady) { - err = cudaStreamQuery(args->streams[i]); - pthread_yield(); - } - CUDACHECK(err); - } + TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms)); + return testSuccess; } -void BenchTime(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { +testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { size_t count = args->nbytes / wordSize(type); - + // Sync - startColl(args, type, op, root, in_place, 0); - completeColl(args); + TESTCHECK(startColl(args, type, op, root, in_place, 0)); + TESTCHECK(completeColl(args)); Barrier(args); // Performance Benchmark auto start = std::chrono::high_resolution_clock::now(); for (int iter = 0; iter < iters; iter++) { - startColl(args, type, op, root, in_place, iter); + if (agg_iters>1) NCCLCHECK(ncclGroupStart()); + for (int aiter = 0; aiter < agg_iters; aiter++) { + TESTCHECK(startColl(args, type, op, root, in_place, iter*agg_iters+aiter)); + } + if (agg_iters>1) NCCLCHECK(ncclGroupEnd()); } - completeColl(args); + TESTCHECK(completeColl(args)); auto delta = std::chrono::high_resolution_clock::now() - start; double deltaSec = std::chrono::duration_cast>(delta).count(); - deltaSec = deltaSec/iters; + deltaSec = deltaSec/(iters*agg_iters); double algBw, busBw; - GetBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus); + args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus); Barrier(args); - if (datacheck) { - InitSend(args, type, op, root, in_place, args->thread == 0 ? 1 : 0); - InitRecvResult(args, type, op, root, in_place, args->thread == 0 ? 1 : 0); - cudaDeviceSynchronize(); - } - - //test validation in single itertion, should ideally be included into the multi-iteration run - startColl(args, type, op, root, in_place, 0); - completeColl(args); - double maxDelta = 0; -#ifdef CHECK - if (datacheck) { - maxDelta = CheckData(args, type, op, root, in_place); - } else { - maxDelta = -1.0; - } -#else - maxDelta = -1.0; -#endif + static __thread int rep = 0; + rep++; + if (datacheck) { + // Initialize sendbuffs, recvbuffs and expected + TESTCHECK(args->collTest->initData(args, type, op, root, rep, in_place)); - //aggregate delta from all threads and procs - Barrier(args); - if (args->thread == 0) { - for (int i=1; inThreads; i++) { + //test validation in single itertion, should ideally be included into the multi-iteration run + TESTCHECK(startColl(args, type, op, root, in_place, 0)); + TESTCHECK(completeColl(args)); + + TESTCHECK(CheckData(args, type, op, root, in_place, &maxDelta)); + + //aggregate delta from all threads and procs + Barrier(args); + if (args->thread == 0) { + for (int i=1; inThreads; i++) { maxDelta += args->deltaThreads[i]; - } + } #ifdef MPI_SUPPORT - MPI_Allreduce(MPI_IN_PLACE, &maxDelta, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, &maxDelta, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); #endif + } + Barrier(args); } - Barrier(args); - if (datacheck) { - PRINT(" %7.3f %5.2f %5.2f %7.0le", deltaSec * 1.0E3, algBw, busBw, - maxDelta); + double timeUsec = deltaSec*1.0E6; + char timeStr[10]; + if (timeUsec > 10000.0) { + sprintf(timeStr, "%7.0f", timeUsec); + } else if (timeUsec > 100.0) { + sprintf(timeStr, "%7.1f", timeUsec); } else { - PRINT(" %7.3f %5.2f %5.2f \tN/A", deltaSec * 1.0E3, algBw, busBw); + sprintf(timeStr, "%7.2f", timeUsec); + } + if (datacheck) { + PRINT(" %7s %6.2f %6.2f %5.0le", timeStr, algBw, busBw, maxDelta); + } else { + PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A"); } args->bw[0] += busBw; args->bw_count[0]++; + return testSuccess; } -void setupArgs(size_t size, ncclDataType_t type, struct threadArgs_t* args) { +void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) { int nranks = args->nProcs*args->nGpus*args->nThreads; - size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset, procSharedCount; - int sameExpected; - + size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset; + count = size / wordSize(type); - getCollByteCount(&sendCount, &recvCount, ¶mCount, &sendInplaceOffset, &recvInplaceOffset, &procSharedCount, &sameExpected, (size_t)count, (size_t)nranks); + args->collTest->getCollByteCount(&sendCount, &recvCount, ¶mCount, &sendInplaceOffset, &recvInplaceOffset, (size_t)count, (size_t)nranks); args->nbytes = paramCount * wordSize(type); args->sendBytes = sendCount * wordSize(type); @@ -595,260 +483,224 @@ void setupArgs(size_t size, ncclDataType_t type, struct threadArgs_t* args) { args->recvInplaceOffset = recvInplaceOffset * wordSize(type); } -void TimeTest(struct threadArgs_t* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root, int inPlace) { - // Warm-up +testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root) { + // Warm-up for large size setupArgs(args->maxbytes, type, args); for (int iter = 0; iter < warmup_iters; iter++) { - startColl(args, type, op, root, 0, iter); + TESTCHECK(startColl(args, type, op, root, 0, iter)); } - completeColl(args); + TESTCHECK(completeColl(args)); + + // Warm-up for small size + setupArgs(args->minbytes, type, args); + for (int iter = 0; iter < warmup_iters; iter++) { + TESTCHECK(startColl(args, type, op, root, 0, iter)); + } + TESTCHECK(completeColl(args)); // Benchmark for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) { setupArgs(size, type, args); print_line_header(max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, root); - BenchTime(args, type, op, root, 0); - if (inPlace) BenchTime(args, type, op, root, 1); + TESTCHECK(BenchTime(args, type, op, root, 0)); + TESTCHECK(BenchTime(args, type, op, root, 1)); PRINT("\n"); } + return testSuccess; } - -void* threadRunTests(void* args) { - struct threadArgs_t* targs = (struct threadArgs_t*)args; +testResult_t threadRunTests(struct threadArgs* args) { // Set device to the first of our GPUs. If we don't do that, some operations // will be done on the current GPU (by default : 0) and if the GPUs are in // exclusive mode those operations will fail. - int gpuid = targs->localRank*targs->nThreads*targs->nGpus + targs->thread*targs->nGpus; + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus; CUDACHECK(cudaSetDevice(gpuid)); - - RunTest(targs, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop]); - - return NULL; + TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop])); + return testSuccess; } -void* threadInit(void* args) { - struct threadArgs_t* targs = (struct threadArgs_t*)args; +testResult_t threadInit(struct threadArgs* args) { char hostname[1024]; getHostName(hostname, 1024); - int nranks = targs->nProcs*targs->nThreads*targs->nGpus; + int nranks = args->nProcs*args->nThreads*args->nGpus; //set main thread again - is_main_thread = (targs->proc == 0 && targs->thread == 0) ? 1 : 0; + is_main_thread = (args->proc == 0 && args->thread == 0) ? 1 : 0; NCCLCHECK(ncclGroupStart()); - for (int i=0; inGpus; i++) { - int rank = targs->proc*targs->nThreads*targs->nGpus + targs->thread*targs->nGpus + i; - int gpuid = targs->localRank*targs->nThreads*targs->nGpus + targs->thread*targs->nGpus + i; + for (int i=0; inGpus; i++) { + int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; CUDACHECK(cudaSetDevice(gpuid)); - NCCLCHECK(ncclCommInitRank(targs->comms+i, nranks, targs->ncclId, rank)); + NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank)); } NCCLCHECK(ncclGroupEnd()); - PRINT("# Using devices\n"); - for (int p=0; pnProcs; p++) { - if (p == targs->proc) { - for (int t=0; tnThreads; t++) { - if (t == targs->thread) { - for (int i=0; inGpus; i++) { - int cudaDev; - int rank; - cudaDeviceProp prop; - NCCLCHECK(ncclCommCuDevice(targs->comms[i], &cudaDev)); - NCCLCHECK(ncclCommUserRank(targs->comms[i], &rank)); - CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev)); - printf("# Rank %2d on %10s device %2d [0x%02x] %s\n", rank, hostname, cudaDev, - prop.pciBusID, prop.name); - fflush(stdout); - } - Barrier(targs); - fflush(stdout); - } - } - } + TESTCHECK(threadRunTests(args)); + + for (int i=0; inGpus; i++) { + NCCLCHECK(ncclCommDestroy(args->comms[i])); } + return testSuccess; +} - threadRunTests(args); - +void* threadLauncher(void* thread_) { + struct testThread* thread = (struct testThread*)thread_; + thread->ret = thread->func(&thread->args); return NULL; } - -void AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, size_t recvBytes, void **expected, void **expectedHost, size_t nbytes, int nranks, int sameExpected) { - static int is_first = 1; - static void *cached_ptr = NULL; - static void *cached_hostptr = NULL; - - CUDACHECK(cudaMalloc(sendbuff, sendBytes)); - //work around for inline reduce scatter where recv count is smaller that send count - CUDACHECK(cudaMalloc(recvbuff, (sendBytes > recvBytes) ? sendBytes : recvBytes)); - - if (is_first || !sameExpected) { - *expectedHost = malloc(recvBytes); - CUDACHECK(cudaHostRegister(*expectedHost, recvBytes, cudaHostRegisterPortable | cudaHostRegisterMapped)); - CUDACHECK(cudaHostGetDevicePointer(expected, *expectedHost, 0)); - cached_ptr = *expected; - cached_hostptr = *expectedHost; - is_first = 0; - } else { - *expected = cached_ptr; - *expectedHost = cached_hostptr; - } -} - -int ncclstringtotype(char *str) { - for (int t=0; tthread, NULL, threadLauncher, thread); + return testSuccess; } -int ncclstringtoop (char *str) { - for (int o=0; o] \n\t " - "[-g,--ngpus ] \n\t " - "[-b,--minbytes ] \n\t " - "[-e,--maxbytes ] \n\t " - "[-i,--stepbytes ] \n\t " - "[-f,--stepfactor ] \n\t " - "[-n,--iters ] \n\t " - "[-w,--warmup_iters ] \n\t" - "[-s,--swap_args <0/1>] \n\t " - "[-p,--parallel_init <0/1>] \n\t " - "[-c,--check <0/1>] \n\t " - "[-o,--op ] \n\t " - "[-d,--datatype ] \n\t " - "[-r,--root ] \n\t " - "[-z,--blocking <0/1>] \n\t " - "[-h,--help]\n"); - return 0; - default: - printf("invalid option \n"); - printf("USAGE: ./test \n\t" - "[-t,--nthreads ] \n\t " - "[-g,--ngpus ] \n\t " - "[-b,--minbytes ] \n\t " - "[-e,--maxbytes ] \n\t " - "[-i,--stepbytes ] \n\t " - "[-f,--stepfactor ] \n\t " - "[-n,--iters ] \n\t " - "[-w,--warmup_iters ] \n\t" - "[-s,--swap_args <0/1>] \n\t " - "[-p,--parallel_init <0/1>] \n\t " - "[-c,--check <0/1>] \n\t " - "[-o,--op ] \n\t " - "[-d,--datatype ] \n\t " - "[-r,--root ] \n\t " - "[-z,--blocking <0/1>] \n\t " - "[-h,--help]\n"); - return 0; - } - } - // Make sure everyline is flushed so that we see the progress of the test setlinebuf(stdout); + // Parse args + int longindex; + static struct option longopts[] = { + {"nthreads", required_argument, 0, 't'}, + {"ngpus", required_argument, 0, 'g'}, + {"minbytes", required_argument, 0, 'b'}, + {"maxbytes", required_argument, 0, 'e'}, + {"stepbytes", required_argument, 0, 'i'}, + {"stepfactor", required_argument, 0, 'f'}, + {"iters", required_argument, 0, 'n'}, + {"agg_iters", required_argument, 0, 'm'}, + {"warmup_iters", required_argument, 0, 'w'}, + {"parallel_init", required_argument, 0, 'p'}, + {"check", required_argument, 0, 'c'}, + {"op", required_argument, 0, 'o'}, + {"datatype", required_argument, 0, 'd'}, + {"root", required_argument, 0, 'r'}, + {"blocking", required_argument, 0, 'z'}, + {"help", no_argument, 0, 'h'} + }; + + while(1) { + int c; + c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:h", longopts, &longindex); + + if (c == -1) + break; + + switch(c) { + case 't': + nThreads = strtol(optarg, NULL, 0); + break; + case 'g': + nGpus = strtol(optarg, NULL, 0); + break; + case 'b': + minBytes = (size_t)parsesize(optarg); + break; + case 'e': + maxBytes = (size_t)parsesize(optarg); + break; + case 'i': + stepBytes = strtol(optarg, NULL, 0); + break; + case 'f': + stepFactor = strtol(optarg, NULL, 0); + break; + case 'n': + iters = (int)strtol(optarg, NULL, 0); + break; + case 'm': +#if NCCL_MAJOR >= 2 && NCCL_MINOR >= 2 + agg_iters = (int)strtol(optarg, NULL, 0); +#else + printf("Option -m not supported before NCCL 2.2. Ignoring\n"); +#endif + break; + case 'w': + warmup_iters = (int)strtol(optarg, NULL, 0); + break; + case 'c': + datacheck = (int)strtol(optarg, NULL, 0); + break; + case 'p': + parallel_init = (int)strtol(optarg, NULL, 0); + break; + case 'o': + ncclop = ncclstringtoop(optarg); + break; + case 'd': + nccltype = ncclstringtotype(optarg); + break; + case 'r': + ncclroot = strtol(optarg, NULL, 0); + break; + case 'z': + blocking_coll = strtol(optarg, NULL, 0); + break; + case 'h': + printf("USAGE: %s \n\t" + "[-t,--nthreads ] \n\t" + "[-g,--ngpus ] \n\t" + "[-b,--minbytes ] \n\t" + "[-e,--maxbytes ] \n\t" + "[-i,--stepbytes ] \n\t" + "[-f,--stepfactor ] \n\t" + "[-n,--iters ] \n\t" + "[-m,--agg_iters ] \n\t" + "[-w,--warmup_iters ] \n\t" + "[-p,--parallel_init <0/1>] \n\t" + "[-c,--check <0/1>] \n\t" + "[-o,--op ] \n\t" + "[-d,--datatype ] \n\t" + "[-r,--root ] \n\t" + "[-z,--blocking <0/1>] \n\t" + "[-h,--help]\n", + basename(argv[0])); + return 0; + default: + printf("invalid option \n"); + printf("USAGE: %s \n\t" + "[-t,--nthreads ] \n\t" + "[-g,--ngpus ] \n\t" + "[-b,--minbytes ] \n\t" + "[-e,--maxbytes ] \n\t" + "[-i,--stepbytes ] \n\t" + "[-f,--stepfactor ] \n\t" + "[-n,--iters ] \n\t" + "[-m,--agg_iters ] \n\t" + "[-w,--warmup_iters ] \n\t" + "[-p,--parallel_init <0/1>] \n\t" + "[-c,--check <0/1>] \n\t" + "[-o,--op ] \n\t" + "[-d,--datatype ] \n\t" + "[-r,--root ] \n\t" + "[-z,--blocking <0/1>] \n\t" + "[-h,--help]\n", + basename(argv[0])); + return 0; + } + } #ifdef MPI_SUPPORT MPI_Init(&argc, &argv); +#endif + return run(); +} + +testResult_t run() { + int nProcs = 1, proc = 0; + int localRank = 0; + char hostname[1024]; + getHostName(hostname, 1024); + +#ifdef MPI_SUPPORT MPI_Comm_size(MPI_COMM_WORLD, &nProcs); MPI_Comm_rank(MPI_COMM_WORLD, &proc); uint64_t hostHashs[nProcs]; @@ -861,14 +713,38 @@ int main(int argc, char* argv[]) { #endif is_main_thread = (proc == 0) ? 1 : 0; - if (proc == 0) { - printf("nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d \n", nThreads, nGpus, minBytes, maxBytes, - (stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck); - if (swap_args) printf("Swap Comms Enabled: swapping communicators among threads for each iteration \n"); - if (blocking_coll) printf("Blocking Enabled: wait for completion and barrier after each collective \n"); - if (parallel_init) printf("Parallel Init Enabled: threads call into NcclInitRank concurrently \n"); + PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d \n", nThreads, nGpus, minBytes, maxBytes, + (stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck); + if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n"); + if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n"); + PRINT("#\n"); + + PRINT("# Using devices\n"); +#define MAX_LINE 2048 + char line[MAX_LINE]; + int len = 0; + for (int i=0; i 0) { - procSharedHost = malloc(procSharedBytes); - CUDACHECK(cudaHostRegister(procSharedHost, procSharedBytes, cudaHostRegisterPortable | cudaHostRegisterMapped)); - CUDACHECK(cudaHostGetDevicePointer(&procShared, procSharedHost, 0)); + AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes, nProcs*nThreads*nGpus); + CUDACHECK(cudaStreamCreateWithFlags(streams+i, cudaStreamNonBlocking)); } //if parallel init is not selected, use main thread to initialize NCCL @@ -910,128 +777,113 @@ int main(int argc, char* argv[]) { NCCLCHECK(ncclGroupStart()); for (int i=0; i=0; t--) { - args[t].proc_args = (void *)args; - args[t].minbytes=minBytes; - args[t].maxbytes=maxBytes; - args[t].stepbytes=stepBytes; - args[t].stepfactor=stepFactor; - args[t].localRank = localRank; + threads[t].args.minbytes=minBytes; + threads[t].args.maxbytes=maxBytes; + threads[t].args.stepbytes=stepBytes; + threads[t].args.stepfactor=stepFactor; + threads[t].args.localRank = localRank; - args[t].nProcs=nProcs; - args[t].proc=proc; - args[t].nThreads=nThreads; - args[t].thread=t; - args[t].nGpus=nGpus; - args[t].sendbuffs = sendbuffs+t*nGpus; - args[t].recvbuffs = recvbuffs+t*nGpus; - args[t].ncclId = ncclId; - args[t].comms=comms+t*nGpus; - args[t].streams=streams+t*nGpus; + threads[t].args.nProcs=nProcs; + threads[t].args.proc=proc; + threads[t].args.nThreads=nThreads; + threads[t].args.thread=t; + threads[t].args.nGpus=nGpus; + threads[t].args.sendbuffs = sendbuffs+t*nGpus; + threads[t].args.recvbuffs = recvbuffs+t*nGpus; + threads[t].args.expected = expected+t*nGpus; + threads[t].args.ncclId = ncclId; + threads[t].args.comms=comms+t*nGpus; + threads[t].args.streams=streams+t*nGpus; - args[t].expectedHost = expectedHost + t*nGpus; - args[t].expected = expected + t*nGpus; - args[t].procSharedHost = procSharedHost; - args[t].procShared = procShared; - args[t].barrier = (volatile int*)barrier; - args[t].barrier_idx = 0; - args[t].sync = (volatile int*)sync; - args[t].sync_idx = 0; - args[t].deltaThreads = delta; - args[t].deltaHost = (delta + t); - CUDACHECK(cudaHostRegister(args[t].deltaHost, sizeof(double), cudaHostRegisterPortable|cudaHostRegisterMapped)); - CUDACHECK(cudaHostGetDevicePointer(&args[t].delta, args[t].deltaHost, 0)); - args[t].errors=errors+t; - args[t].bw=bw+t; - args[t].bw_count=bw_count+t; + threads[t].args.barrier = (volatile int*)barrier; + threads[t].args.barrier_idx = 0; + threads[t].args.sync = (volatile int*)sync; + threads[t].args.sync_idx = 0; + threads[t].args.deltaThreads = delta; + threads[t].args.deltaHost = (delta + t); + threads[t].args.delta = delta; + threads[t].args.errors=errors+t; + threads[t].args.bw=bw+t; + threads[t].args.bw_count=bw_count+t; - if (!parallel_init) { - if (t) - pthread_create(threads+t, NULL, threadRunTests, args+t); - else - threadRunTests(args); - } else { - if (t || (parallel_init && (proc == 0))) - pthread_create(threads+t, NULL, threadInit, args+t); - else - threadInit(args); + threads[t].func = parallel_init ? threadInit : threadRunTests; + if (t) + TESTCHECK(threadLaunch(threads+t)); + else + TESTCHECK(threads[t].func(&threads[t].args)); + } + + // Wait for other threads and accumulate stats and errors + for (int t=nThreads-1; t>=0; t--) { + if (t) pthread_join(threads[t].thread, NULL); + TESTCHECK(threads[t].ret); + if (t) { + errors[0] += errors[t]; + bw[0] += bw[t]; + bw_count[0] += bw_count[t]; } } - // Wait for other threads - for (int t=nThreads-1; t>=0; t--) { - if (t || (parallel_init && (proc == 0))) pthread_join(threads[t], NULL); - errors[0] += errors[t]; - bw[0] += bw[t]; - bw_count[0] += bw_count[t]; - } - #ifdef MPI_SUPPORT - MPI_Allreduce(MPI_IN_PLACE, &errors[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, &errors[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); #endif - for(int i=0; i @@ -17,23 +19,75 @@ #define CUDACHECK(cmd) do { \ cudaError_t e = cmd; \ if( e != cudaSuccess ) { \ - printf("Cuda failure %s:%d '%s'\n", \ + char hostname[1024]; \ + getHostName(hostname, 1024); \ + printf("%s: Test CUDA failure %s:%d '%s'\n", \ + hostname, \ __FILE__,__LINE__,cudaGetErrorString(e)); \ - exit(EXIT_FAILURE); \ + return testCudaError; \ } \ } while(0) #define NCCLCHECK(cmd) do { \ ncclResult_t r = cmd; \ if (r!= ncclSuccess) { \ - printf("NCCL failure %s:%d '%s'\n", \ + char hostname[1024]; \ + getHostName(hostname, 1024); \ + printf("%s: Test NCCL failure %s:%d '%s'\n", \ + hostname, \ __FILE__,__LINE__,ncclGetErrorString(r)); \ - exit(EXIT_FAILURE); \ + return testNcclError; \ } \ } while(0) -struct threadArgs_t { - void *proc_args; +typedef enum { + testSuccess = 0, + testInternalError = 1, + testCudaError = 2, + testNcclError = 3, + testCuRandError = 4 +} testResult_t; + +// Relay errors up and trace +#define TESTCHECK(cmd) do { \ + testResult_t r = cmd; \ + if (r!= testSuccess) { \ + char hostname[1024]; \ + getHostName(hostname, 1024); \ + printf(" .. %s: Test failure %s:%d\n", \ + hostname, \ + __FILE__,__LINE__); \ + return r; \ + } \ +} while(0) + +struct testColl { + const char name[20]; + void (*getCollByteCount)( + size_t *sendcount, size_t *recvcount, size_t *paramcount, + size_t *sendInplaceOffset, size_t *recvInplaceOffset, + size_t count, int nranks); + testResult_t (*initData)(struct threadArgs* args, ncclDataType_t type, + ncclRedOp_t op, int root, int rep, int in_place); + void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks); + testResult_t (*runColl)(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, + ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream); +}; +extern struct testColl allReduceTest; +extern struct testColl allGatherTest; +extern struct testColl reduceScatterTest; +extern struct testColl broadcastTest; +extern struct testColl reduceTest; + +struct testEngine { + void (*getBuffSize)(size_t *sendcount, size_t *recvcount, size_t count, int nranks); + testResult_t (*runTest)(struct threadArgs* args, int root, ncclDataType_t type, + const char* typeName, ncclRedOp_t op, const char* opName); +}; + +extern struct testEngine ncclTestEngine; + +struct threadArgs { size_t nbytes; size_t minbytes; size_t maxbytes; @@ -55,11 +109,8 @@ struct threadArgs_t { ncclComm_t* comms; cudaStream_t* streams; - void** expectedHost; void** expected; size_t expectedBytes; - void* procSharedHost; - void* procShared; volatile int* sync; int sync_idx; volatile int* barrier; @@ -72,27 +123,28 @@ struct threadArgs_t { int* errors; double* bw; int* bw_count; + + struct testColl* collTest; +}; + +typedef testResult_t (*threadFunc_t)(struct threadArgs* args); +struct testThread { + pthread_t thread; + threadFunc_t func; + struct threadArgs args; + testResult_t ret; }; #include // Provided by common.cu -extern void Barrier(struct threadArgs_t* args); -extern void TimeTest(struct threadArgs_t* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root, int inPlace); -extern void Randomize(void* ptr, size_t count, ncclDataType_t type, int seed); -extern void Accumulate(void* out, void* in, size_t n, ncclDataType_t type, ncclRedOp_t op); -extern void CheckDelta(void* expected, void* results, size_t count, ncclDataType_t type, double* devmax); -extern double DeltaMaxValue(ncclDataType_t type); +extern void Barrier(struct threadArgs* args); +extern testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root); +extern testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, const int rep, const int nranks); +extern testResult_t InitData(void* data, const size_t count, ncclDataType_t type, const int rep, const int rank); +extern void AllocateBuffs(void **sendbuff, void **recvbuff, void **expected, void **expectedHost, size_t nbytes, int nranks); // Provided by each coll -void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName); -extern void GetBw(size_t count, int typeSize, double sec, double* algBw, double* busBw, int nranks); -extern void RunColl(void* sendbuf, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream); -extern void InitData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int in_place, int is_first); -extern double CheckData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op); -extern void AllocateBuffs(void **sendbuff, void **recvbuff, void **expected, void **expectedHost, size_t nbytes, int nranks); -extern void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first); -extern void getCollByteCount(size_t *sendbytes, size_t *recvbytes, size_t *parambytes, size_t *sendInlineOffset, size_t *recvInlineOffset, size_t *procSharedBytes, int *sameexpected, size_t nbytes, int nranks); extern void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root); extern void print_header(); @@ -152,7 +204,33 @@ extern const char *test_typenames[ncclNumTypes]; extern ncclRedOp_t test_ops[ncclNumOps]; extern const char *test_opnames[ncclNumOps]; +static int ncclstringtotype(char *str) { + for (int t=0; t #include "cuda_runtime.h" #include "common.h" void print_header() { - PRINT("# %10s %12s %6s %6s out-of-place in-place\n", "", "", "", ""); - PRINT("# %10s %12s %6s %6s %6s %7s %5s %5s %7s %7s %5s %5s %7s\n", "bytes", "N", "type", "op", "root", - "time", "algbw", "busbw", "res", "time", "algbw", "busbw", "res"); + PRINT("# %10s %12s %6s %6s out-of-place in-place \n", "", "", "", ""); + PRINT("# %10s %12s %6s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop", "root", + "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); + PRINT("# %10s %12s %6s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "", + "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); } void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { PRINT("%12li %12li %6s %6s %6i", size, count, typeName, opName, root); } -void getCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t *procSharedCount, int *sameExpected, size_t count, int nranks) { - *sendcount = count; - *recvcount = count; - *sameExpected = 0; - *procSharedCount = count; - *sendInplaceOffset = 0; - *recvInplaceOffset = 0; - *paramcount = *sendcount; - } - -void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { - size_t count = args->expectedBytes / wordSize(type); - int root_gpu = root%args->nGpus; - - assert(args->expectedBytes == args->nbytes); - - while (args->sync[args->sync_idx] != args->thread) pthread_yield(); - - for (int i=0; inGpus; i++) { - int device; - NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); - CUDACHECK(cudaSetDevice(device)); - void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - - if (is_first && i == 0) { - CUDACHECK(cudaMemcpy(args->procSharedHost, data, count*wordSize(type), cudaMemcpyDeviceToHost)); - } else { - Accumulate(args->procShared, data, count, type, op); - } - - if (in_place == 0) { - CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); - } - CUDACHECK(cudaDeviceSynchronize()); - } - - args->sync[args->sync_idx] = args->thread + 1; - - if (args->thread+1 == args->nThreads) { -#ifdef MPI_SUPPORT - int root_proc = root/(args->nThreads*args->nGpus); - if (args->expectedBytes) { - // Last thread does the MPI reduction - if (root_proc == args->proc) { - void* temp, *tempHost = malloc(args->expectedBytes); - CUDACHECK(cudaHostRegister(tempHost, args->expectedBytes, 0)); - CUDACHECK(cudaHostGetDevicePointer(&temp, tempHost, 0)); - - for (int i=0; inProcs; i++) { - if (i == args->proc) continue; - MPI_Recv(tempHost, args->expectedBytes, MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - Accumulate(args->procShared, temp, count, type, op); - CUDACHECK(cudaDeviceSynchronize()); - } - - CUDACHECK(cudaHostUnregister(tempHost)); - free(tempHost); - } else { - MPI_Send(args->procSharedHost, args->expectedBytes, MPI_BYTE, root_proc, 0, MPI_COMM_WORLD); - } - } -#endif - args->sync[args->sync_idx] = 0; - } else { - while (args->sync[args->sync_idx]) pthread_yield(); - } - - //if root fill expected bytes with reduced data - // else if in_place, leave fill it with original data, else set to zero - for (int i=0; inGpus; i++) { - int rank = (args->proc*args->nThreads + args->thread)*args->nGpus + i; - if (rank == root) { - memcpy(args->expectedHost[root_gpu], args->procSharedHost, args->expectedBytes); - } else { - if (in_place == 1) { - CUDACHECK(cudaMemcpy(args->expectedHost[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDeviceToHost)); - } else { - memset(args->expectedHost[i], 0, args->expectedBytes); - } - } - } - - args->sync_idx = !args->sync_idx; +void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { + *sendcount = count; + *recvcount = count; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = *sendcount; } -void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { +testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { + size_t sendcount = args->sendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; + + for (int i=0; inGpus; i++) { + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + CUDACHECK(cudaSetDevice(gpuid)); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + CUDACHECK(cudaMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDefault)); + if (rank == root) TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks)); + CUDACHECK(cudaDeviceSynchronize()); + } + return testSuccess; +} + +void ReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { double baseBw = (double)(count * typesize) / 1.0E9 / sec; *algBw = baseBw; *busBw = baseBw; } -void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { +testResult_t ReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { NCCLCHECK(ncclReduce(sendbuff, recvbuff, count, type, op, root, comm, stream)); + return testSuccess; } +struct testColl reduceTest = { + "Reduce", + ReduceGetCollByteCount, + ReduceInitData, + ReduceGetBw, + ReduceRunColl +}; -void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { +void ReduceGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + ReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &reduceTest; ncclDataType_t *run_types; ncclRedOp_t *run_ops; const char **run_typenames, **run_opnames; int type_count, op_count; int begin_root, end_root; - if ((int)type != -1) { + if ((int)type != -1) { type_count = 1; run_types = &type; run_typenames = &typeName; - } else { + } else { type_count = ncclNumTypes; run_types = test_types; run_typenames = test_typenames; } - if ((int)op != -1) { + if ((int)op != -1) { op_count = 1; run_ops = &op; run_opnames = &opName; - } else { + } else { op_count = ncclNumOps; run_ops = test_ops; run_opnames = test_opnames; } - if (root != -1) { - begin_root = end_root = root; - } else { - begin_root = 0; - end_root = args->nProcs*args->nThreads*args->nGpus-1; + if (root != -1) { + begin_root = end_root = root; + } else { + begin_root = 0; + end_root = args->nProcs*args->nThreads*args->nGpus-1; } - for (int i=0; iexpectedBytes; - size_t recvcount = args->expectedBytes / wordSize(type); - size_t sendbytes = args->sendBytes; +testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { size_t sendcount = args->sendBytes / wordSize(type); - - while (args->sync[args->sync_idx] != args->thread) pthread_yield(); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; for (int i=0; inGpus; i++) { - int device; - NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); - CUDACHECK(cudaSetDevice(device)); + int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + CUDACHECK(cudaSetDevice(gpuid)); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; - - if (is_first && i == 0) { - CUDACHECK(cudaMemcpy(args->procSharedHost, data, sendbytes, cudaMemcpyDeviceToHost)); - } else { - Accumulate(args->procShared, data, sendcount, type, op); - } - - CUDACHECK(cudaDeviceSynchronize()); - if (in_place == 0) { - CUDACHECK(cudaMemset(args->recvbuffs[i], 0, recvbytes)); - } + TESTCHECK(InitData(data, sendcount, type, rep, rank)); + CUDACHECK(cudaMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDefault)); + TESTCHECK(InitDataReduce(args->expected[i], recvcount, rank*recvcount, type, op, rep, nranks)); CUDACHECK(cudaDeviceSynchronize()); } - - args->sync[args->sync_idx] = args->thread + 1; - - if (args->thread+1 == args->nThreads) { -#ifdef MPI_SUPPORT - if (sendbytes > 0) { - // Last thread does the MPI reduction - void* remote, *remoteHost = malloc(sendbytes); - void* myInitialData = malloc(sendbytes); - memcpy(myInitialData, args->procSharedHost, sendbytes); - CUDACHECK(cudaHostRegister(remoteHost, sendbytes, 0)); - CUDACHECK(cudaHostGetDevicePointer(&remote, remoteHost, 0)); - - for (int i=0; inProcs; i++) { - if (i == args->proc) { - MPI_Bcast(myInitialData, sendbytes, MPI_BYTE, i, MPI_COMM_WORLD); - free(myInitialData); - } else { - MPI_Bcast(remoteHost, sendbytes, MPI_BYTE, i, MPI_COMM_WORLD); - Accumulate(args->procShared, remote, sendcount, type, op); - cudaDeviceSynchronize(); - } - } - CUDACHECK(cudaHostUnregister(remoteHost)); - free(remoteHost); - } -#endif - args->sync[args->sync_idx] = 0; - } else { - while (args->sync[args->sync_idx]) pthread_yield(); - } - - for (int i=0; inGpus; i++) { - int offset = ((args->proc*args->nThreads + args->thread)*args->nGpus + i)*recvbytes; - memcpy(args->expectedHost[i], (void *)((uintptr_t)args->procSharedHost + offset), recvbytes); - } - - args->sync_idx = !args->sync_idx; + return testSuccess; } -void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { +void ReduceScatterGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { double baseBw = (double)(count * typesize * (nranks - 1)) / 1.0E9 / sec; *algBw = baseBw; @@ -101,17 +54,32 @@ void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, *busBw = baseBw * factor; } -void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { +testResult_t ReduceScatterRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, count, type, op, comm, stream)); + return testSuccess; } -void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { +struct testColl reduceScatterTest = { + "ReduceScatter", + ReduceScatterGetCollByteCount, + ReduceScatterInitData, + ReduceScatterGetBw, + ReduceScatterRunColl +}; + +void ReduceScatterGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + ReduceScatterGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t ReduceScatterRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &reduceScatterTest; ncclDataType_t *run_types; ncclRedOp_t *run_ops; const char **run_typenames, **run_opnames; int type_count, op_count; - if ((int)type != -1) { + if ((int)type != -1) { type_count = 1; run_types = &type; run_typenames = &typeName; @@ -121,19 +89,27 @@ void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const cha run_typenames = test_typenames; } - if ((int)op != -1) { + if ((int)op != -1) { run_ops = &op; run_opnames = &opName; op_count = 1; - } else { + } else { op_count = sizeof(test_ops)/sizeof(test_ops[0]); run_ops = test_ops; run_opnames = test_opnames; } - for (int i=0; i