From 4cb47ccb21440cf949c95feb31ef00f60bcb68dc Mon Sep 17 00:00:00 2001 From: Sylvain Jeaugey Date: Tue, 8 Aug 2017 16:18:34 -0700 Subject: [PATCH] Initial commit [ROCm/rccl-tests commit: b188a152997740a84b3ce0da864bb4f1423eb35a] --- projects/rccl-tests/LICENSE.txt | 27 + projects/rccl-tests/Makefile | 20 + projects/rccl-tests/README.md | 62 ++ projects/rccl-tests/src/Makefile | 78 ++ projects/rccl-tests/src/all_gather.cu | 106 +++ projects/rccl-tests/src/all_reduce.cu | 130 +++ projects/rccl-tests/src/broadcast.cu | 121 +++ projects/rccl-tests/src/common.cu | 1036 +++++++++++++++++++++ projects/rccl-tests/src/common.h | 158 ++++ projects/rccl-tests/src/nccl1_compat.h | 47 + projects/rccl-tests/src/reduce.cu | 159 ++++ projects/rccl-tests/src/reduce_scatter.cu | 139 +++ 12 files changed, 2083 insertions(+) create mode 100644 projects/rccl-tests/LICENSE.txt create mode 100644 projects/rccl-tests/Makefile create mode 100644 projects/rccl-tests/README.md create mode 100644 projects/rccl-tests/src/Makefile create mode 100644 projects/rccl-tests/src/all_gather.cu create mode 100644 projects/rccl-tests/src/all_reduce.cu create mode 100644 projects/rccl-tests/src/broadcast.cu create mode 100644 projects/rccl-tests/src/common.cu create mode 100644 projects/rccl-tests/src/common.h create mode 100644 projects/rccl-tests/src/nccl1_compat.h create mode 100644 projects/rccl-tests/src/reduce.cu create mode 100644 projects/rccl-tests/src/reduce_scatter.cu diff --git a/projects/rccl-tests/LICENSE.txt b/projects/rccl-tests/LICENSE.txt new file mode 100644 index 0000000000..4573c07c44 --- /dev/null +++ b/projects/rccl-tests/LICENSE.txt @@ -0,0 +1,27 @@ + + Copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of NVIDIA CORPORATION, nor the names of their + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY + EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/projects/rccl-tests/Makefile b/projects/rccl-tests/Makefile new file mode 100644 index 0000000000..29409a8422 --- /dev/null +++ b/projects/rccl-tests/Makefile @@ -0,0 +1,20 @@ +# +# Copyright (c) 2017, NVIDIA CORPORATION. All rights reserved. +# +# See LICENCE.txt for license information +# + +.PHONY : all clean + +default : src.build + +TARGETS=src + +all: ${TARGETS:%=%.build} +clean: ${TARGETS:%=%.clean} + +%.build: + ${MAKE} -C $* build + +%.clean: + ${MAKE} -C $* clean diff --git a/projects/rccl-tests/README.md b/projects/rccl-tests/README.md new file mode 100644 index 0000000000..d70bb1f54c --- /dev/null +++ b/projects/rccl-tests/README.md @@ -0,0 +1,62 @@ +# NCCL Tests + +These tests check both the performance and the correctness of NCCL operations. They can be compiled against [NCCL 1](http://github.com/nvidia/nccl) and [NCCL 2](http://developer.nvidia.com/nccl). + +## Build + +To build the tests, just type `make`. + +If CUDA is not installed in /usr/local/cuda, you may specify CUDA\_HOME. Similarly, if NCCL is not installed in /usr, you may specify NCCL\_HOME. + +```shell +$ make CUDA_HOME=/path/to/cuda NCCL_HOME=/path/to/nccl +``` + +NCCL tests rely on MPI to work on multiple processes, hence multiple nodes. If you want to compile the tests with MPI support, you need to set MPI=1 and set MPI\_HOME to the path where MPI is installed. + +```shell +$ make MPI=1 MPI_HOME=/path/to/mpi CUDA_HOME=/path/to/cuda NCCL_HOME=/path/to/nccl +``` + +## Usage + +NCCL tests can run on multiple processes, multiple threads, and multiple CUDA devices per thread. The number of process is managed by MPI and is therefore not passed to the tests as argument. The total number of ranks (=CUDA devices) will be equal to (number of processes)\*(number of threads)\*(number of gpus per thread). + +### Quick examples + +Run on 8 GPUs (`-g 8`), scanning from 8 Bytes to 128MBytes : +```shell +$ ./build/all_reduce_perf -b 8 -e 128M -f 2 -g 8 +``` + +Run with MPI on 40 processes (potentially on multiple nodes) with 4 GPUs each, disabling checks : +```shell +$ mpirun -np 40 ./build/all_reduce_perf -b 8 -e 128M -f 2 -g 4 -c 0 +``` + +All tests support the same arguments : +* Number of GPUs + * `-t,--nthreads ` number of threads per process. Default : 1. + * `-g,--ngpus ` number of gpus per process. Default : 1. +* Sizes to scan + * `-b,--minbytes ` minimum size to start with. Default : 32M. + * `-e,--maxbytes ` maximum size to end at. Default : 32M. + * Increments can be either fixes of a multiplication factor. Only one of those should be used + * `-i,--stepbytes ` fixed increment between sizes. Default : (max-min)/10. + * `-f,--stepfactor ` multiplication factor between sizes. Default : disabled. +* Performance + * `-n,--iters ` number of iterations. Default : 20. + * `-w,--warmup_iters ` number of warmup iterations (not timed). Default : 5. +* `-s,--swap_args <0/1>` when used with multiple threads, have threads manage different GPUs for each iteration. Default : 0. +* `-p,--parallel_init <0/1>` use threads to initialize NCCL in parallel. +* `-c,--check <0/1>` check correctness of results. This can be quite slow on large numbers of GPUs. Default : 1. +* NCCL operations arguments + * `-o,--op ` Specify which reduction operation to perform. Only relevant for reduction operations. Default : Sum. + * `-d,--datatype ` Specify which datatype to use. Default : Float. + * `-r,--root ` Specify which root to use. Only for operations with a root like broadcast or reduce. + * `-z,--blocking <0/1>` Make NCCL collective blocking, i.e. have CPUs wait and sync after each collective. Default : 0. + +## Copyright + +NCCL tests are provided under the BSD licence. All source code and accompanying documentation is copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. + diff --git a/projects/rccl-tests/src/Makefile b/projects/rccl-tests/src/Makefile new file mode 100644 index 0000000000..6188d01424 --- /dev/null +++ b/projects/rccl-tests/src/Makefile @@ -0,0 +1,78 @@ +# +# Copyright (c) 2015-2017, NVIDIA CORPORATION. All rights reserved. +# +# See LICENCE.txt for license information +# + +CUDA_HOME ?= /usr/local/cuda +PREFIX ?= /usr/local +VERBOSE ?= 0 +DEBUG ?= 0 + +CUDA_LIB ?= $(CUDA_HOME)/lib64 +CUDA_INC ?= $(CUDA_HOME)/include +NVCC = $(CUDA_HOME)/bin/nvcc + +# Better define NVCC_GENCODE in your environment to the minimal set +# of archs to reduce compile time. +NVCC_GENCODE ?= -gencode=arch=compute_30,code=sm_30 \ + -gencode=arch=compute_35,code=sm_35 \ + -gencode=arch=compute_50,code=sm_50 \ + -gencode=arch=compute_52,code=sm_52 \ + -gencode=arch=compute_60,code=sm_60 \ + -gencode=arch=compute_61,code=sm_61 \ + -gencode=arch=compute_61,code=compute_61 + +NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11 + +LDFLAGS := -L${CUDA_LIB} -lcudart -lrt +NVLDFLAGS := -L${CUDA_LIB} -lcudart -lrt + +ifeq ($(DEBUG), 0) +NVCUFLAGS += -O3 +CXXFLAGS += -O3 +else +NVCUFLAGS += -O0 -G -g +CXXFLAGS += -O0 -g -ggdb3 +endif + +ifeq ($(VERBOSE), 0) +.SILENT: +endif + +.PHONY: build clean + +BUILDDIR ?= ../build +ifneq ($(NCCLDIR), "") +NVCUFLAGS += -I$(NCCLDIR)/include/ +NVLDFLAGS += -L$(NCCLDIR)/lib +endif + +ifeq ($(MPI), 1) +NVCUFLAGS += -DMPI_SUPPORT -I$(MPI_HOME)/include +NVLDFLAGS += -L$(MPI_HOME)/lib -lmpi +endif +LIBRARIES += curand nccl nvToolsExt +NVLDFLAGS += $(LIBRARIES:%=-l%) + +DST_DIR := $(BUILDDIR) +SRC_FILES := $(wildcard *.cu) +OBJ_FILES := $(SRC_FILES:%.cu=${DST_DIR}/%.o) +BIN_FILES_LIST := all_reduce all_gather broadcast reduce_scatter reduce +BIN_FILES := $(BIN_FILES_LIST:%=${DST_DIR}/%_perf) + +build: ${BIN_FILES} + +clean: + rm -rf ${DST_DIR} + +${DST_DIR}/%.o: %.cu + @printf "Compiling %-35s > %s\n" $< $@ + @mkdir -p ${DST_DIR} + $(NVCC) -o $@ $(NVCUFLAGS) -c $< + +${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o + @printf "Linking %-35s > %s\n" $< $@ + @mkdir -p ${DST_DIR} + $(NVCC) -o $@ $(NVCUFLAGS) $^ ${NVLDFLAGS} + diff --git a/projects/rccl-tests/src/all_gather.cu b/projects/rccl-tests/src/all_gather.cu new file mode 100644 index 0000000000..2386842cdd --- /dev/null +++ b/projects/rccl-tests/src/all_gather.cu @@ -0,0 +1,106 @@ +/************************************************************************* + * Copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. + * + * See LICENCE.txt for license information + ************************************************************************/ + +#include "cuda_runtime.h" +#include "common.h" + + +void print_header() { + PRINT("# %10s %12s %6s %6s out-of-place in-place\n", "", "", "", ""); + PRINT("# %10s %12s %6s %7s %5s %5s %7s %7s %5s %5s %7s\n", "bytes", "N", "type", + "time", "algbw", "busbw", "res", "time", "algbw", "busbw", "res"); +} + +void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { + PRINT("%12li %12li %6s", size, count, typeName); +} + +void getCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t *procSharedCount, int *sameExpected, size_t count, int nranks) { + *sendcount = count/nranks; + *recvcount = (count/nranks)*nranks; + *sameExpected = 1; + *procSharedCount = 0; + *sendInplaceOffset = count/nranks; + *recvInplaceOffset = 0; + *paramcount = *sendcount; +} + +void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { + size_t nBytes = args->nbytes; + size_t count = nBytes / wordSize(type); + int proc = args->proc; + int nThreads = args->nThreads; + int t = args->thread; + int nGpus = args->nGpus; + + while (args->sync[args->sync_idx] != t) pthread_yield(); + + for (int i=0; iproc*args->nThreads + args->thread)*args->nGpus + i); + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + + void* data = in_place ? (void *)((uintptr_t)args->recvbuffs[i] + args->sendInplaceOffset*rank) : args->sendbuffs[i]; + + CUDACHECK(cudaMemcpy((void *)((uintptr_t)args->expectedHost[0] + ((proc*nThreads + t)*nGpus + i)*nBytes), + data, + nBytes, cudaMemcpyDeviceToHost)); + + if (in_place == 0) { + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + } + CUDACHECK(cudaDeviceSynchronize()); + } + + args->sync[args->sync_idx] = t + 1; + + if (t+1 == nThreads) { +#ifdef MPI_SUPPORT + // Last thread does the MPI allgather + MPI_Allgather(MPI_IN_PLACE, nBytes*nThreads*nGpus, MPI_BYTE, + args->expectedHost[0], + nBytes*nThreads*nGpus, MPI_BYTE, MPI_COMM_WORLD); +#endif + args->sync[args->sync_idx] = 0; + } else { + while (args->sync[args->sync_idx]) pthread_yield(); + } + + args->sync_idx=!args->sync_idx; +} + +void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize * (nranks - 1)) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = 1; + *busBw = baseBw * factor; +} + +void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(ncclAllGather(sendbuff, recvbuff, count, type, comm, stream)); +} + +void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + ncclDataType_t *run_types; + const char **run_typenames; + int type_count; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = ncclNumTypes; + run_types = test_types; + run_typenames = test_typenames; + } + + for (int i=0; inbytes / wordSize(type); + + while (args->sync[args->sync_idx] != args->thread) pthread_yield(); + + for (int i=0; inGpus; i++) { + int device; + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + + if (is_first && i == 0) { + CUDACHECK(cudaMemcpy(args->expected[0], data, count*wordSize(type), cudaMemcpyDeviceToHost)); + } else { + Accumulate(args->expected[0], data, count, type, op); + } + + if (in_place == 0) { + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->nbytes)); + } + CUDACHECK(cudaDeviceSynchronize()); + } + + args->sync[args->sync_idx] = args->thread + 1; + + if (args->thread+1 == args->nThreads) { +#ifdef MPI_SUPPORT + // Last thread does the MPI reduction + if (args->nbytes > 0) { + void* remote, *remoteHost = malloc(args->nbytes); + void* myInitialData = malloc(args->nbytes); + memcpy(myInitialData, args->expectedHost[0], args->nbytes); + CUDACHECK(cudaHostRegister(remoteHost, args->nbytes, cudaHostRegisterPortable | cudaHostRegisterMapped)); + CUDACHECK(cudaHostGetDevicePointer(&remote, remoteHost, 0)); + for (int i=0; inProcs; i++) { + if (i == args->proc) { + MPI_Bcast(myInitialData, args->nbytes, MPI_BYTE, i, MPI_COMM_WORLD); + free(myInitialData); + } else { + MPI_Bcast(remoteHost, args->nbytes, MPI_BYTE, i, MPI_COMM_WORLD); + Accumulate(args->expected[0], remote, count, type, op); + cudaDeviceSynchronize(); + } + } + CUDACHECK(cudaHostUnregister(remoteHost)); + free(remoteHost); + } +#endif + args->sync[args->sync_idx] = 0; + } else { + while (args->sync[args->sync_idx]) pthread_yield(); + } + + args->sync_idx = !args->sync_idx; +} + +void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = ((double)(2*(nranks - 1)))/((double)nranks); + *busBw = baseBw * factor; +} + +void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, type, op, comm, stream)); +} + + +void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + ncclDataType_t *run_types; + ncclRedOp_t *run_ops; + const char **run_typenames, **run_opnames; + int type_count, op_count; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = ncclNumTypes; + run_types = test_types; + run_typenames = test_typenames; + } + + if ((int)op != -1) { + op_count = 1; + run_ops = &op; + run_opnames = &opName; + } else { + op_count = ncclNumOps; + run_ops = test_ops; + run_opnames = test_opnames; + } + + for (int i=0; i + +void print_header() { + PRINT("# %10s %12s %6s %6s out-of-place\n", "", "", "", ""); + PRINT("# %10s %12s %6s %6s %7s %5s %5s %7s\n", "bytes", "N", "type", "root", + "time", "algbw", "busbw", "res"); +} + +void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { + PRINT("%12li %12li %6s %6i", size, count, typeName, root); +} + +void getCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t *procSharedCount, int *sameExpected, size_t count, int nranks) { + *sendcount = count; + *recvcount = count; + *sameExpected = 0; + *procSharedCount = count; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = *sendcount; +} + +void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { + int root_proc = root/(args->nThreads*args->nGpus); + int root_thread = (root/args->nGpus)%(args->nThreads); + int root_gpu = root%args->nGpus; + + assert(args->expectedBytes == args->nbytes); + + if (root_thread == args->thread) { + if (root_proc == args->proc) { + CUDACHECK(cudaMemcpy(args->procSharedHost, + args->sendbuffs[root_gpu], + args->nbytes, cudaMemcpyDeviceToHost)); + } +#ifdef MPI_SUPPORT + MPI_Bcast(args->procSharedHost, args->nbytes, MPI_BYTE, root_proc, MPI_COMM_WORLD); +#endif + + args->sync[0] = 0; + } + + Barrier(args); + + for (int i=0; inGpus; i++) { + int device; + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + + //set expected buf to zero at root, copy over source data at others + if ((root_proc == args->proc) + && (root_thread == args->thread) + && (root_gpu == i)) { + memset(args->expectedHost[i], 0, args->nbytes); + } else { + memcpy(args->expectedHost[i], args->procSharedHost, args->nbytes); + } + + //reset recvbufs to zero + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->nbytes)); + CUDACHECK(cudaDeviceSynchronize()); + } + + Barrier(args); +} + +void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = 1; + *busBw = baseBw * factor; +} + +void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + int rank; + NCCLCHECK(ncclCommUserRank(comm, &rank)); + if (rank == root) { + NCCLCHECK(ncclBcast(sendbuff, count, type, root, comm, stream)); + } else { + NCCLCHECK(ncclBcast(recvbuff, count, type, root, comm, stream)); + } +} + +void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + ncclDataType_t *run_types; + const char **run_typenames; + int type_count; + int begin_root, end_root; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = ncclNumTypes; + run_types = test_types; + run_typenames = test_typenames; + } + + if (root != -1) { + begin_root = end_root = root; + } else { + begin_root = 0; + end_root = args->nProcs*args->nThreads*args->nGpus-1; + } + + for (int i=0; i +#include +#include +#include "cuda.h" + +#if NCCL_MAJOR >= 2 +ncclDataType_t test_types[ncclNumTypes] = {ncclInt8, ncclUint8, ncclInt32, ncclUint32, ncclInt64, ncclUint64, ncclHalf, ncclFloat, ncclDouble}; +const char *test_typenames[ncclNumTypes] = {"int8", "uint8", "int32", "uint32", "int64", "uint64", "half", "float", "double"}; +#else +ncclDataType_t test_types[ncclNumTypes] = {ncclChar, ncclInt, ncclHalf, ncclFloat, ncclDouble, ncclInt64, ncclUint64}; +const char *test_typenames[ncclNumTypes] = {"char", "int", "half", "float", "double", "int64", "uint64"}; +#endif +ncclRedOp_t test_ops[ncclNumOps] = {ncclSum, ncclProd, ncclMax, ncclMin}; +const char *test_opnames[ncclNumOps] = {"sum", "prod", "max", "min"}; + +thread_local int is_main_thread = 0; + +static int datacheck = 1; +static int warmup_iters = 5; +static int iters = 20; +static int ncclop = ncclSum; +static int nccltype = ncclFloat; +static int ncclroot = 0; +static int swap_args = 0; +static int parallel_init = 0; +static int blocking_coll = 0; + +double parsesize(char *value) { + long long int units; + double size; + + if (strchr(value, 'G') != NULL) { + units=1024*1024*1024; + } else if (strchr(value, 'M') != NULL) { + units=1024*1024; + } else if (strchr(value, 'K') != NULL) { + units=1024; + } else { + units=1; + } + + size = atof(value)*units; + return size; +} + +double DeltaMaxValue(ncclDataType_t type) { + switch(type) { + case ncclHalf: return 1e-2; + case ncclFloat: return 1e-5; + case ncclDouble: return 1e-12; + case ncclInt: +#if NCCL_MAJOR >= 2 + case ncclUint8: + //case ncclInt32: + case ncclUint32: +#endif + case ncclInt64: + case ncclUint64: return 1e-200; + } + return 1e-200; +} + +template __device__ +double absDiff(T a, T b) { + return fabs((double)(b - a)); +} + +template<> __device__ +double absDiff(half a, half b) { + float x = __half2float(a); + float y = __half2float(b); + return fabs((double)(y-x)); +} + +template __device__ +float toFloat(T a) { + return (float)a; +} +template<> __device__ +float toFloat(half a) { + return __half2float(a); +} + + +template __global__ +void deltaKern(void* A_, void* B_, size_t count, double* max) { + const T* A = (const T*)A_; + const T* B = (const T*)B_; + __shared__ double temp[BSIZE]; + int tid = threadIdx.x; + double locmax = 0.0; + for(int i=tid; i locmax ) { + locmax = delta; +#ifdef DEBUG_PRINT + if (delta > .1) printf("Error at %d/%d : %f != %f\n", i, count, toFloat(A[i]), toFloat(B[i])); +#endif + } + } + + temp[tid] = locmax; + for(int stride = BSIZE/2; stride > 1; stride>>=1) { + __syncthreads(); + if( tid < stride ) + temp[tid] = temp[tid] > temp[tid+stride] ? temp[tid] : temp[tid+stride]; + } + __syncthreads(); + if( threadIdx.x == 0) + *max = temp[0] > temp[1] ? temp[0] : temp[1]; +} + + +void CheckDelta(void* expected, void* results, size_t count, ncclDataType_t type, double* devmax) { + switch (type) { + case ncclHalf: + deltaKern<<<1, 512>>>(results, expected, count, devmax); break; + case ncclFloat: + deltaKern<<<1, 512>>>(results, expected, count, devmax); break; + case ncclDouble: + deltaKern<<<1, 512>>>(results, expected, count, devmax); break; + + case ncclChar: +#if NCCL_MAJOR >= 2 + case ncclUint8: +#endif + deltaKern<<<1, 512>>>(results, expected, count, devmax); break; + case ncclInt: +#if NCCL_MAJOR >= 2 + case ncclUint32: +#endif + deltaKern<<<1, 512>>>(results, expected, count, devmax); break; + case ncclInt64: + case ncclUint64: + deltaKern<<<1, 512>>>(results, expected, count, devmax); break; + } +} + +#define CURAND_CHK(cmd) \ + do { \ + curandStatus_t error = (cmd); \ + if (error != CURAND_STATUS_SUCCESS) { \ + printf("CuRAND error %i at %s:%i\n", error, __FILE__ , __LINE__); \ + exit(EXIT_FAILURE); \ + } \ + } while (false) + + +template +void GenerateRandom(curandGenerator_t generator, T * const dest, + const size_t N); + +template<> +void GenerateRandom(curandGenerator_t generator, int8_t * const dest, + const size_t N) { + size_t align = (4 - (((size_t)dest) & 3)) % 4; + CURAND_CHK(curandGenerate(generator, (unsigned int*)(dest+align), + N * sizeof(int8_t) / sizeof(int))); + CUDACHECK(cudaMemcpy(dest, dest+4, align, cudaMemcpyDeviceToDevice)); +} +template<> +void GenerateRandom(curandGenerator_t generator, uint8_t * const dest, + const size_t N) { + size_t align = (4 - (((size_t)dest) & 3)) % 4; + CURAND_CHK(curandGenerate(generator, (unsigned int*)(dest+align), + N * sizeof(uint8_t) / sizeof(int))); + CUDACHECK(cudaMemcpy(dest, dest+4, align, cudaMemcpyDeviceToDevice)); +} + +template<> +void GenerateRandom(curandGenerator_t generator, int32_t * const dest, + const size_t N) { + CURAND_CHK(curandGenerate(generator, (unsigned int*)dest, N)); +} + +template<> +void GenerateRandom(curandGenerator_t generator, uint32_t * const dest, + const size_t N) { + CURAND_CHK(curandGenerate(generator, (unsigned int*)dest, N)); +} + +template<> +void GenerateRandom(curandGenerator_t generator, float * const dest, + const size_t N) { + CURAND_CHK(curandGenerateUniform(generator, dest, N)); +} + +template<> +void GenerateRandom(curandGenerator_t generator, double * const dest, + const size_t N) { + CURAND_CHK(curandGenerateUniformDouble(generator, dest, N)); +} + +template<> +void GenerateRandom(curandGenerator_t generator, uint64_t * const dest, + const size_t N) { + CURAND_CHK(curandGenerate(generator, (unsigned int *)dest, N*2)); +} + +template<> +void GenerateRandom(curandGenerator_t generator, int64_t * const dest, + const size_t N) { + CURAND_CHK(curandGenerate(generator, (unsigned int *)dest, N*2)); +} + +template +void RandomizeType(void* dest, const size_t N, const int randomSeed) { + T* ptr = (T*)dest; + curandGenerator_t gen; + CURAND_CHK(curandCreateGenerator(&gen, CURAND_RNG_PSEUDO_MTGP32)); + CURAND_CHK(curandSetPseudoRandomGeneratorSeed(gen, randomSeed)); + GenerateRandom(gen, ptr, N); + CURAND_CHK(curandDestroyGenerator(gen)); + CUDACHECK(cudaDeviceSynchronize()); +} + +__global__ void halve(const float * src, half* dest, size_t N) { + for(int tid = threadIdx.x + blockIdx.x*blockDim.x; + tid < N; tid += blockDim.x * gridDim.x) + dest[tid] = __float2half(src[tid]); +} + +void RandomizeHalf(void* dest, const size_t N, const int randomSeed) { + half* ptr = (half*)dest; + curandGenerator_t gen; + CURAND_CHK(curandCreateGenerator(&gen, CURAND_RNG_PSEUDO_MTGP32)); + CURAND_CHK(curandSetPseudoRandomGeneratorSeed(gen, randomSeed)); + + float* temp; + CUDACHECK(cudaMalloc(&temp, N*sizeof(float))); + GenerateRandom(gen, temp, N); + halve<<<128, 512>>>(temp, ptr, N); + CURAND_CHK(curandDestroyGenerator(gen)); + CUDACHECK(cudaFree(temp)); + CUDACHECK(cudaDeviceSynchronize()); +} + +void Randomize(void* ptr, const size_t count, ncclDataType_t type, const int seed) { + switch (type) { + case ncclChar: RandomizeType (ptr, count, seed); break; +#if NCCL_MAJOR >= 2 + case ncclUint8: RandomizeType (ptr, count, seed); break; +#endif + case ncclInt: RandomizeType (ptr, count, seed); break; +#if NCCL_MAJOR >= 2 + case ncclUint32: RandomizeType(ptr, count, seed); break; +#endif + case ncclInt64: RandomizeType (ptr, count, seed); break; + case ncclUint64: RandomizeType(ptr, count, seed); break; + case ncclHalf: RandomizeHalf (ptr, count, seed); break; + case ncclFloat: RandomizeType (ptr, count, seed); break; + case ncclDouble: RandomizeType (ptr, count, seed); break; + } +} + +template __global__ static +void accumKern(T* acum, const T* contrib, size_t N) { + int tid = threadIdx.x + blockIdx.x*blockDim.x; + int offset = blockDim.x*gridDim.x; + for(int i=tid; i c) ? a : c; + } else if(OP == ncclMin) { + acum[i] = (a < c) ? a : c; + } + } +} + +template<> __global__ +void accumKern(half* acum, const half* contrib, size_t N) { + int tid = threadIdx.x + blockIdx.x*blockDim.x; + int offset = blockDim.x*gridDim.x; + for(int i=tid; i __global__ +void accumKern(half* acum, const half* contrib, size_t N) { + int tid = threadIdx.x + blockIdx.x*blockDim.x; + int offset = blockDim.x*gridDim.x; + for(int i=tid; i __global__ +void accumKern(half* acum, const half* contrib, size_t N) { + int tid = threadIdx.x + blockIdx.x*blockDim.x; + int offset = blockDim.x*gridDim.x; + for(int i=tid; ic) ? a : c ); + } +} + +template<> __global__ +void accumKern(half* acum, const half* contrib, size_t N) { + int tid = threadIdx.x + blockIdx.x*blockDim.x; + int offset = blockDim.x*gridDim.x; + for(int i=tid; i +void accVecType(void* out, void* in, size_t n, ncclRedOp_t op) { + switch(op) { + case ncclSum: accumKern <<<256,256>>>((T*)out, (T*)in, n); break; + case ncclProd: accumKern<<<256,256>>>((T*)out, (T*)in, n); break; + case ncclMax: accumKern <<<256,256>>>((T*)out, (T*)in, n); break; + case ncclMin: accumKern <<<256,256>>>((T*)out, (T*)in, n); break; + default: + printf("Unknown reduction operation.\n"); + exit(EXIT_FAILURE); + } +} + +void Accumulate(void* out, void* in, size_t n, ncclDataType_t type, ncclRedOp_t op) { + switch (type) { + case ncclChar: accVecType (out, in, n, op); break; +#if NCCL_MAJOR >= 2 + case ncclUint8: accVecType (out, in, n, op); break; +#endif + case ncclInt: accVecType (out, in, n, op); break; +#if NCCL_MAJOR >= 2 + case ncclUint32: accVecType (out, in, n, op); break; +#endif + case ncclInt64: accVecType (out, in, n, op); break; + case ncclUint64: accVecType (out, in, n, op); break; + case ncclHalf: accVecType (out, in, n, op); break; + case ncclFloat: accVecType (out, in, n, op); break; + case ncclDouble: accVecType (out, in, n, op); break; + default: + printf("Unknown reduction type.\n"); + exit(EXIT_FAILURE); + } +} + +void Barrier(struct threadArgs_t* args) +{ + while (args->barrier[args->barrier_idx] != args->thread) pthread_yield(); + + args->barrier[args->barrier_idx] = args->thread + 1; + + if (args->thread+1 == args->nThreads) { +#ifdef MPI_SUPPORT + MPI_Barrier(MPI_COMM_WORLD); +#endif + args->barrier[args->barrier_idx] = 0; + } else { + while (args->barrier[args->barrier_idx]) pthread_yield(); + } + + args->barrier_idx=!args->barrier_idx; +} + +void RandomizeAccumulate(void* data, void* accum, size_t count, ncclDataType_t type, ncclRedOp_t op, int seed, int rank) { + Randomize(data, count, type, seed); + if (rank == 0) { + CUDACHECK(cudaMemcpy(accum, data, count*wordSize(type), cudaMemcpyDeviceToHost)); + } else { + Accumulate(accum, data, count, type, op); + } +} + +double CheckData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { + size_t count = args->expectedBytes/wordSize(type); + double maxDelta = 0.0; + for (int i=0; inGpus; i++) { + int device; + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i]; + CheckDelta(data , args->expected[i], count, type, args->delta); + cudaDeviceSynchronize(); + maxDelta = std::max(*(args->deltaHost), maxDelta); + +#ifdef DEBUG_PRINT + if (rank == 0) { + int *temp = (int *)malloc(args->expectedBytes); + + printf("\n Expected: "); + for(int j=0; jexpectedBytes/sizeof(int); j++) { + printf("%d:%d ", j, *((int *)args->expectedHost[0] + j)); + } + printf("\n"); + + cudaMemcpy(temp, data, args->expectedBytes, cudaMemcpyDeviceToHost); + printf("\n Actual: "); + for (int j=0; jexpectedBytes/sizeof(int); j++) { + printf("%d:%d ", j, *((int *)temp + j)); + } + printf("\n"); + free(temp); + } +#endif + } + double nranks = args->nProcs*args->nThreads*args->nGpus; + if (maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++; + return maxDelta; +} + +void InitSend(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { + size_t count = args->sendBytes / wordSize(type); + static int rep = 1; + for (int i=0; inGpus; i++) { + int device; + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + void* data = in_place ? (void *)((uintptr_t)args->recvbuffs[i] + args->sendInplaceOffset*rank) : args->sendbuffs[i]; + int seed = rank+count+rep+in_place; + Randomize(data, count, type, seed); + +#ifdef DEBUG_PRINT + if (rank == 2) { + int *temp = (int *)malloc(args->sendBytes); + cudaMemcpy(temp, data, args->sendBytes, cudaMemcpyDeviceToHost); + printf("\n Send Data at rank %d:", rank); + for (int i=0; isendBytes/sizeof(int); i++) { + printf("%d:%d ", i, *((int *)temp + i)); + } + printf("\n"); + free(temp); + } +#endif + + cudaDeviceSynchronize(); + } + rep++; +} + +#define CHECK 1 + +void startColl(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int thread_offset) { + size_t count = args->nbytes / wordSize(type); + + if (swap_args) { + args = (struct threadArgs_t*)args->proc_args + (args->thread + thread_offset)%args->nThreads; + } + + if (args->nGpus == 1) { + int rank = args->proc*args->nThreads + args->thread; + RunColl((void*)(in_place ? ((void *)((uintptr_t)args->recvbuffs[0] + args->sendInplaceOffset*rank)) : args->sendbuffs[0]), + (void*)(in_place ? (void*)((uintptr_t)args->recvbuffs[0] + args->recvInplaceOffset*rank) : args->recvbuffs[0]), + count, type, op, root, args->comms[0], args->streams[0]); + } else { + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus; i++) { +#ifndef NCCL_MAJOR + int cudaDev; + NCCLCHECK(ncclCommCuDevice(args->comms[i], &cudaDev)); + CUDACHECK(cudaSetDevice(cudaDev)); +#endif + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + RunColl((void*)(in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->sendInplaceOffset*rank)) : args->sendbuffs[i]), + (void*)(in_place ? (void*)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank) : args->recvbuffs[i]), + count, type, op, root, args->comms[i], args->streams[i]); + } + NCCLCHECK(ncclGroupEnd()); + } + + if (swap_args || blocking_coll) { + //if args have been swapped, complete op before returning + for (int i = 0; i < args->nGpus; ++i) { + cudaError_t err = cudaErrorNotReady; + while (err == cudaErrorNotReady) { + err = cudaStreamQuery(args->streams[i]); + pthread_yield(); + } + CUDACHECK(err); + } + } + if (blocking_coll) Barrier(args); +} + +void completeColl(struct threadArgs_t* args) { + //it swap_args was enabled, op would have been completed immediately + if (swap_args || blocking_coll) return; + + for (int i = 0; i < args->nGpus; ++i) { + cudaError_t err = cudaErrorNotReady; + while (err == cudaErrorNotReady) { + err = cudaStreamQuery(args->streams[i]); + pthread_yield(); + } + CUDACHECK(err); + } +} + +void BenchTime(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { + size_t count = args->nbytes / wordSize(type); + + // Sync + startColl(args, type, op, root, in_place, 0); + completeColl(args); + + Barrier(args); + + // Performance Benchmark + auto start = std::chrono::high_resolution_clock::now(); + for (int iter = 0; iter < iters; iter++) { + startColl(args, type, op, root, in_place, iter); + } + completeColl(args); + + auto delta = std::chrono::high_resolution_clock::now() - start; + double deltaSec = std::chrono::duration_cast>(delta).count(); + deltaSec = deltaSec/iters; + + double algBw, busBw; + GetBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus); + + Barrier(args); + + if (datacheck) { + InitSend(args, type, op, root, in_place, args->thread == 0 ? 1 : 0); + InitRecvResult(args, type, op, root, in_place, args->thread == 0 ? 1 : 0); + cudaDeviceSynchronize(); + } + + //test validation in single itertion, should ideally be included into the multi-iteration run + startColl(args, type, op, root, in_place, 0); + completeColl(args); + + double maxDelta = 0; +#ifdef CHECK + if (datacheck) { + maxDelta = CheckData(args, type, op, root, in_place); + } else { + maxDelta = -1.0; + } +#else + maxDelta = -1.0; +#endif + + //aggregate delta from all threads and procs + Barrier(args); + if (args->thread == 0) { + for (int i=1; inThreads; i++) { + maxDelta += args->deltaThreads[i]; + } +#ifdef MPI_SUPPORT + MPI_Allreduce(MPI_IN_PLACE, &maxDelta, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); +#endif + } + Barrier(args); + + if (datacheck) { + PRINT(" %7.3f %5.2f %5.2f %7.0le", deltaSec * 1.0E3, algBw, busBw, + maxDelta); + } else { + PRINT(" %7.3f %5.2f %5.2f \tN/A", deltaSec * 1.0E3, algBw, busBw); + } + + args->bw[0] += busBw; + args->bw_count[0]++; +} + +void setupArgs(size_t size, ncclDataType_t type, struct threadArgs_t* args) { + int nranks = args->nProcs*args->nGpus*args->nThreads; + size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset, procSharedCount; + int sameExpected; + + count = size / wordSize(type); + getCollByteCount(&sendCount, &recvCount, ¶mCount, &sendInplaceOffset, &recvInplaceOffset, &procSharedCount, &sameExpected, (size_t)count, (size_t)nranks); + + args->nbytes = paramCount * wordSize(type); + args->sendBytes = sendCount * wordSize(type); + args->expectedBytes = recvCount * wordSize(type); + args->sendInplaceOffset = sendInplaceOffset * wordSize(type); + args->recvInplaceOffset = recvInplaceOffset * wordSize(type); +} + +void TimeTest(struct threadArgs_t* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root, int inPlace) { + // Warm-up + setupArgs(args->maxbytes, type, args); + for (int iter = 0; iter < warmup_iters; iter++) { + startColl(args, type, op, root, 0, iter); + } + completeColl(args); + + // Benchmark + for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) { + setupArgs(size, type, args); + print_line_header(max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, root); + BenchTime(args, type, op, root, 0); + if (inPlace) BenchTime(args, type, op, root, 1); + PRINT("\n"); + } +} + + +void* threadRunTests(void* args) { + struct threadArgs_t* targs = (struct threadArgs_t*)args; + // Set device to the first of our GPUs. If we don't do that, some operations + // will be done on the current GPU (by default : 0) and if the GPUs are in + // exclusive mode those operations will fail. + int gpuid = targs->localRank*targs->nThreads*targs->nGpus + targs->thread*targs->nGpus; + CUDACHECK(cudaSetDevice(gpuid)); + + RunTest(targs, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop]); + + return NULL; +} + +void* threadInit(void* args) { + struct threadArgs_t* targs = (struct threadArgs_t*)args; + char hostname[1024]; + getHostName(hostname, 1024); + int nranks = targs->nProcs*targs->nThreads*targs->nGpus; + + //set main thread again + is_main_thread = (targs->proc == 0 && targs->thread == 0) ? 1 : 0; + + NCCLCHECK(ncclGroupStart()); + for (int i=0; inGpus; i++) { + int rank = targs->proc*targs->nThreads*targs->nGpus + targs->thread*targs->nGpus + i; + int gpuid = targs->localRank*targs->nThreads*targs->nGpus + targs->thread*targs->nGpus + i; + CUDACHECK(cudaSetDevice(gpuid)); + NCCLCHECK(ncclCommInitRank(targs->comms+i, nranks, targs->ncclId, rank)); + } + NCCLCHECK(ncclGroupEnd()); + + PRINT("# Using devices\n"); + for (int p=0; pnProcs; p++) { + if (p == targs->proc) { + for (int t=0; tnThreads; t++) { + if (t == targs->thread) { + for (int i=0; inGpus; i++) { + int cudaDev; + int rank; + cudaDeviceProp prop; + NCCLCHECK(ncclCommCuDevice(targs->comms[i], &cudaDev)); + NCCLCHECK(ncclCommUserRank(targs->comms[i], &rank)); + CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev)); + printf("# Rank %2d on %10s device %2d [0x%02x] %s\n", rank, hostname, cudaDev, + prop.pciBusID, prop.name); + fflush(stdout); + } + Barrier(targs); + fflush(stdout); + } + } + } + } + + threadRunTests(args); + + return NULL; +} + +void AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, size_t recvBytes, void **expected, void **expectedHost, size_t nbytes, int nranks, int sameExpected) { + static int is_first = 1; + static void *cached_ptr = NULL; + static void *cached_hostptr = NULL; + + CUDACHECK(cudaMalloc(sendbuff, sendBytes)); + //work around for inline reduce scatter where recv count is smaller that send count + CUDACHECK(cudaMalloc(recvbuff, (sendBytes > recvBytes) ? sendBytes : recvBytes)); + + if (is_first || !sameExpected) { + *expectedHost = malloc(recvBytes); + CUDACHECK(cudaHostRegister(*expectedHost, recvBytes, cudaHostRegisterPortable | cudaHostRegisterMapped)); + CUDACHECK(cudaHostGetDevicePointer(expected, *expectedHost, 0)); + cached_ptr = *expected; + cached_hostptr = *expectedHost; + is_first = 0; + } else { + *expected = cached_ptr; + *expectedHost = cached_hostptr; + } +} + +int ncclstringtotype(char *str) { + for (int t=0; t] \n\t " + "[-g,--ngpus ] \n\t " + "[-b,--minbytes ] \n\t " + "[-e,--maxbytes ] \n\t " + "[-i,--stepbytes ] \n\t " + "[-f,--stepfactor ] \n\t " + "[-n,--iters ] \n\t " + "[-w,--warmup_iters ] \n\t" + "[-s,--swap_args <0/1>] \n\t " + "[-p,--parallel_init <0/1>] \n\t " + "[-c,--check <0/1>] \n\t " + "[-o,--op ] \n\t " + "[-d,--datatype ] \n\t " + "[-r,--root ] \n\t " + "[-z,--blocking <0/1>] \n\t " + "[-h,--help]\n"); + return 0; + default: + printf("invalid option \n"); + printf("USAGE: ./test \n\t" + "[-t,--nthreads ] \n\t " + "[-g,--ngpus ] \n\t " + "[-b,--minbytes ] \n\t " + "[-e,--maxbytes ] \n\t " + "[-i,--stepbytes ] \n\t " + "[-f,--stepfactor ] \n\t " + "[-n,--iters ] \n\t " + "[-w,--warmup_iters ] \n\t" + "[-s,--swap_args <0/1>] \n\t " + "[-p,--parallel_init <0/1>] \n\t " + "[-c,--check <0/1>] \n\t " + "[-o,--op ] \n\t " + "[-d,--datatype ] \n\t " + "[-r,--root ] \n\t " + "[-z,--blocking <0/1>] \n\t " + "[-h,--help]\n"); + return 0; + } + } + + // Make sure everyline is flushed so that we see the progress of the test + setlinebuf(stdout); + +#ifdef MPI_SUPPORT + MPI_Init(&argc, &argv); + MPI_Comm_size(MPI_COMM_WORLD, &nProcs); + MPI_Comm_rank(MPI_COMM_WORLD, &proc); + uint64_t hostHashs[nProcs]; + hostHashs[proc] = getHostHash(hostname); + MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD); + for (int p=0; p 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck); + if (swap_args) printf("Swap Comms Enabled: swapping communicators among threads for each iteration \n"); + if (blocking_coll) printf("Blocking Enabled: wait for completion and barrier after each collective \n"); + if (parallel_init) printf("Parallel Init Enabled: threads call into NcclInitRank concurrently \n"); + } + + ncclUniqueId ncclId; + if (proc == 0) { + NCCLCHECK(ncclGetUniqueId(&ncclId)); + } +#ifdef MPI_SUPPORT + MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, MPI_COMM_WORLD); +#endif + cudaStream_t streams[nGpus*nThreads]; + void* sendbuffs[nGpus*nThreads]; + void* recvbuffs[nGpus*nThreads]; + void* expected[nGpus*nThreads]; + void* expectedHost[nGpus*nThreads]; + void *procSharedHost, *procShared; + size_t sendBytes, recvBytes, paramBytes, procSharedBytes, sendInplaceOffset, recvInplaceOffset; + int sameExpected; + + getCollByteCount(&sendBytes, &recvBytes, ¶mBytes, &sendInplaceOffset, &recvInplaceOffset, &procSharedBytes, &sameExpected, (size_t)maxBytes, (size_t)nProcs*nGpus*nThreads); + + for (int i=0; i 0) { + procSharedHost = malloc(procSharedBytes); + CUDACHECK(cudaHostRegister(procSharedHost, procSharedBytes, cudaHostRegisterPortable | cudaHostRegisterMapped)); + CUDACHECK(cudaHostGetDevicePointer(&procShared, procSharedHost, 0)); + } + + //if parallel init is not selected, use main thread to initialize NCCL + ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus); + if (!parallel_init) { + if (nProcs == 1) { + int gpuArray[nGpus*nThreads]; + for (int i=0; i=0; t--) { + args[t].proc_args = (void *)args; + args[t].minbytes=minBytes; + args[t].maxbytes=maxBytes; + args[t].stepbytes=stepBytes; + args[t].stepfactor=stepFactor; + args[t].localRank = localRank; + + args[t].nProcs=nProcs; + args[t].proc=proc; + args[t].nThreads=nThreads; + args[t].thread=t; + args[t].nGpus=nGpus; + args[t].sendbuffs = sendbuffs+t*nGpus; + args[t].recvbuffs = recvbuffs+t*nGpus; + args[t].ncclId = ncclId; + args[t].comms=comms+t*nGpus; + args[t].streams=streams+t*nGpus; + + args[t].expectedHost = expectedHost + t*nGpus; + args[t].expected = expected + t*nGpus; + args[t].procSharedHost = procSharedHost; + args[t].procShared = procShared; + args[t].barrier = (volatile int*)barrier; + args[t].barrier_idx = 0; + args[t].sync = (volatile int*)sync; + args[t].sync_idx = 0; + args[t].deltaThreads = delta; + args[t].deltaHost = (delta + t); + CUDACHECK(cudaHostRegister(args[t].deltaHost, sizeof(double), cudaHostRegisterPortable|cudaHostRegisterMapped)); + CUDACHECK(cudaHostGetDevicePointer(&args[t].delta, args[t].deltaHost, 0)); + args[t].errors=errors+t; + args[t].bw=bw+t; + args[t].bw_count=bw_count+t; + + if (!parallel_init) { + if (t) + pthread_create(threads+t, NULL, threadRunTests, args+t); + else + threadRunTests(args); + } else { + if (t || (parallel_init && (proc == 0))) + pthread_create(threads+t, NULL, threadInit, args+t); + else + threadInit(args); + } + } + + // Wait for other threads + for (int t=nThreads-1; t>=0; t--) { + if (t || (parallel_init && (proc == 0))) pthread_join(threads[t], NULL); + errors[0] += errors[t]; + bw[0] += bw[t]; + bw_count[0] += bw_count[t]; + } + +#ifdef MPI_SUPPORT + MPI_Allreduce(MPI_IN_PLACE, &errors[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); +#endif + + for(int i=0; i +#include +#include +#ifdef MPI_SUPPORT +#include "mpi.h" +#endif +#include +#include "nccl1_compat.h" + +#define CUDACHECK(cmd) do { \ + cudaError_t e = cmd; \ + if( e != cudaSuccess ) { \ + printf("Cuda failure %s:%d '%s'\n", \ + __FILE__,__LINE__,cudaGetErrorString(e)); \ + exit(EXIT_FAILURE); \ + } \ +} while(0) + +#define NCCLCHECK(cmd) do { \ + ncclResult_t r = cmd; \ + if (r!= ncclSuccess) { \ + printf("NCCL failure %s:%d '%s'\n", \ + __FILE__,__LINE__,ncclGetErrorString(r)); \ + exit(EXIT_FAILURE); \ + } \ +} while(0) + +struct threadArgs_t { + void *proc_args; + size_t nbytes; + size_t minbytes; + size_t maxbytes; + size_t stepbytes; + size_t stepfactor; + + int nProcs; + int proc; + int nThreads; + int thread; + int nGpus; + int localRank; + void** sendbuffs; + size_t sendBytes; + size_t sendInplaceOffset; + void** recvbuffs; + size_t recvInplaceOffset; + ncclUniqueId ncclId; + ncclComm_t* comms; + cudaStream_t* streams; + + void** expectedHost; + void** expected; + size_t expectedBytes; + void* procSharedHost; + void* procShared; + volatile int* sync; + int sync_idx; + volatile int* barrier; + int barrier_idx; + int syncRank; + int syncNranks; + double* deltaThreads; + double* deltaHost; + double* delta; + int* errors; + double* bw; + int* bw_count; +}; + +#include + +// Provided by common.cu +extern void Barrier(struct threadArgs_t* args); +extern void TimeTest(struct threadArgs_t* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root, int inPlace); +extern void Randomize(void* ptr, size_t count, ncclDataType_t type, int seed); +extern void Accumulate(void* out, void* in, size_t n, ncclDataType_t type, ncclRedOp_t op); +extern void CheckDelta(void* expected, void* results, size_t count, ncclDataType_t type, double* devmax); +extern double DeltaMaxValue(ncclDataType_t type); + +// Provided by each coll +void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName); +extern void GetBw(size_t count, int typeSize, double sec, double* algBw, double* busBw, int nranks); +extern void RunColl(void* sendbuf, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream); +extern void InitData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int in_place, int is_first); +extern double CheckData(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op); +extern void AllocateBuffs(void **sendbuff, void **recvbuff, void **expected, void **expectedHost, size_t nbytes, int nranks); +extern void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first); +extern void getCollByteCount(size_t *sendbytes, size_t *recvbytes, size_t *parambytes, size_t *sendInlineOffset, size_t *recvInlineOffset, size_t *procSharedBytes, int *sameexpected, size_t nbytes, int nranks); +extern void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root); +extern void print_header(); + +#include + +static void getHostName(char* hostname, int maxlen) { + gethostname(hostname, maxlen); + for (int i=0; i< maxlen; i++) { + if (hostname[i] == '.') { + hostname[i] = '\0'; + return; + } + } +} + +#include + +static uint64_t getHostHash(const char* string) { + // Based on DJB2, result = result * 33 + char + uint64_t result = 5381; + for (int c = 0; string[c] != '\0'; c++){ + result = ((result << 5) + result) + string[c]; + } + return result; +} + +static size_t wordSize(ncclDataType_t type) { + switch(type) { + case ncclChar: +#if NCCL_MAJOR >= 2 + //case ncclInt8: + case ncclUint8: +#endif + return 1; + case ncclHalf: + //case ncclFloat16: + return 2; + case ncclInt: + case ncclFloat: +#if NCCL_MAJOR >= 2 + //case ncclInt32: + case ncclUint32: + //case ncclFloat32: +#endif + return 4; + case ncclInt64: + case ncclUint64: + case ncclDouble: + //case ncclFloat64: + return 8; + default: return 0; + } +} + +extern ncclDataType_t test_types[ncclNumTypes]; +extern const char *test_typenames[ncclNumTypes]; +extern ncclRedOp_t test_ops[ncclNumOps]; +extern const char *test_opnames[ncclNumOps]; + +extern thread_local int is_main_thread; +#define PRINT if (is_main_thread) printf + + diff --git a/projects/rccl-tests/src/nccl1_compat.h b/projects/rccl-tests/src/nccl1_compat.h new file mode 100644 index 0000000000..4279789af6 --- /dev/null +++ b/projects/rccl-tests/src/nccl1_compat.h @@ -0,0 +1,47 @@ +/************************************************************************* + * Copyright (c) 2017, NVIDIA CORPORATION. All rights reserved. + * + * See LICENCE.txt for license information + ************************************************************************/ + +#ifndef NCCL1_COMPAT_H +#define NCCL1_COMPAT_H + +#ifndef NCCL_MAJOR // NCCL 1.x +#define ncclNumOps nccl_NUM_OPS +#define ncclNumTypes nccl_NUM_TYPES + +static ncclResult_t ncclGroupStart() { return ncclSuccess; } +static ncclResult_t ncclGroupEnd() { return ncclSuccess; } + +#define CHECKCOUNT(count) if (count > INT_MAX) return ncclInvalidArgument; + +static ncclResult_t ncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + CHECKCOUNT(count); + return ncclReduce(sendbuff, recvbuff, (int)count, datatype, op, root, comm, stream); +} +static ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, cudaStream_t stream) { + CHECKCOUNT(count); + return ncclAllReduce(sendbuff, recvbuff, (int)count, datatype, op, comm, stream); +} +static ncclResult_t ncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root, + ncclComm_t comm, cudaStream_t stream) { + CHECKCOUNT(count); + return ncclBcast(buff, (int)count, datatype, root, comm, stream); +} +static ncclResult_t ncclReduceScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, + cudaStream_t stream) { + CHECKCOUNT(recvcount); + return ncclReduceScatter(sendbuff, recvbuff, (int)recvcount, datatype, op, comm, stream); +} +static ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream) { + CHECKCOUNT(sendcount); + return ncclAllGather(sendbuff, (int)sendcount, datatype, recvbuff, comm, stream); +} +#endif + +#endif diff --git a/projects/rccl-tests/src/reduce.cu b/projects/rccl-tests/src/reduce.cu new file mode 100644 index 0000000000..0bc9a7db83 --- /dev/null +++ b/projects/rccl-tests/src/reduce.cu @@ -0,0 +1,159 @@ +/************************************************************************* + * Copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. + * + * See LICENCE.txt for license information + ************************************************************************/ + +#include +#include "cuda_runtime.h" +#include "common.h" + +void print_header() { + PRINT("# %10s %12s %6s %6s out-of-place in-place\n", "", "", "", ""); + PRINT("# %10s %12s %6s %6s %6s %7s %5s %5s %7s %7s %5s %5s %7s\n", "bytes", "N", "type", "op", "root", + "time", "algbw", "busbw", "res", "time", "algbw", "busbw", "res"); +} + +void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { + PRINT("%12li %12li %6s %6s %6i", size, count, typeName, opName, root); +} + +void getCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t *procSharedCount, int *sameExpected, size_t count, int nranks) { + *sendcount = count; + *recvcount = count; + *sameExpected = 0; + *procSharedCount = count; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = *sendcount; + } + +void InitRecvResult(struct threadArgs_t* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int is_first) { + size_t count = args->expectedBytes / wordSize(type); + int root_gpu = root%args->nGpus; + + assert(args->expectedBytes == args->nbytes); + + while (args->sync[args->sync_idx] != args->thread) pthread_yield(); + + for (int i=0; inGpus; i++) { + int device; + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + + if (is_first && i == 0) { + CUDACHECK(cudaMemcpy(args->procSharedHost, data, count*wordSize(type), cudaMemcpyDeviceToHost)); + } else { + Accumulate(args->procShared, data, count, type, op); + } + + if (in_place == 0) { + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + } + CUDACHECK(cudaDeviceSynchronize()); + } + + args->sync[args->sync_idx] = args->thread + 1; + + if (args->thread+1 == args->nThreads) { +#ifdef MPI_SUPPORT + int root_proc = root/(args->nThreads*args->nGpus); + if (args->expectedBytes) { + // Last thread does the MPI reduction + if (root_proc == args->proc) { + void* temp, *tempHost = malloc(args->expectedBytes); + CUDACHECK(cudaHostRegister(tempHost, args->expectedBytes, 0)); + CUDACHECK(cudaHostGetDevicePointer(&temp, tempHost, 0)); + + for (int i=0; inProcs; i++) { + if (i == args->proc) continue; + MPI_Recv(tempHost, args->expectedBytes, MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + Accumulate(args->procShared, temp, count, type, op); + CUDACHECK(cudaDeviceSynchronize()); + } + + CUDACHECK(cudaHostUnregister(tempHost)); + free(tempHost); + } else { + MPI_Send(args->procSharedHost, args->expectedBytes, MPI_BYTE, root_proc, 0, MPI_COMM_WORLD); + } + } +#endif + args->sync[args->sync_idx] = 0; + } else { + while (args->sync[args->sync_idx]) pthread_yield(); + } + + //if root fill expected bytes with reduced data + // else if in_place, leave fill it with original data, else set to zero + for (int i=0; inGpus; i++) { + int rank = (args->proc*args->nThreads + args->thread)*args->nGpus + i; + if (rank == root) { + memcpy(args->expectedHost[root_gpu], args->procSharedHost, args->expectedBytes); + } else { + if (in_place == 1) { + CUDACHECK(cudaMemcpy(args->expectedHost[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDeviceToHost)); + } else { + memset(args->expectedHost[i], 0, args->expectedBytes); + } + } + } + + args->sync_idx = !args->sync_idx; +} + +void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + *algBw = baseBw; + *busBw = baseBw; +} + +void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(ncclReduce(sendbuff, recvbuff, count, type, op, root, comm, stream)); +} + + +void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + ncclDataType_t *run_types; + ncclRedOp_t *run_ops; + const char **run_typenames, **run_opnames; + int type_count, op_count; + int begin_root, end_root; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = ncclNumTypes; + run_types = test_types; + run_typenames = test_typenames; + } + + if ((int)op != -1) { + op_count = 1; + run_ops = &op; + run_opnames = &opName; + } else { + op_count = ncclNumOps; + run_ops = test_ops; + run_opnames = test_opnames; + } + + if (root != -1) { + begin_root = end_root = root; + } else { + begin_root = 0; + end_root = args->nProcs*args->nThreads*args->nGpus-1; + } + + for (int i=0; iexpectedBytes; + size_t recvcount = args->expectedBytes / wordSize(type); + size_t sendbytes = args->sendBytes; + size_t sendcount = args->sendBytes / wordSize(type); + + while (args->sync[args->sync_idx] != args->thread) pthread_yield(); + + for (int i=0; inGpus; i++) { + int device; + NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); + CUDACHECK(cudaSetDevice(device)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + + if (is_first && i == 0) { + CUDACHECK(cudaMemcpy(args->procSharedHost, data, sendbytes, cudaMemcpyDeviceToHost)); + } else { + Accumulate(args->procShared, data, sendcount, type, op); + } + + CUDACHECK(cudaDeviceSynchronize()); + if (in_place == 0) { + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, recvbytes)); + } + CUDACHECK(cudaDeviceSynchronize()); + } + + args->sync[args->sync_idx] = args->thread + 1; + + if (args->thread+1 == args->nThreads) { +#ifdef MPI_SUPPORT + if (sendbytes > 0) { + // Last thread does the MPI reduction + void* remote, *remoteHost = malloc(sendbytes); + void* myInitialData = malloc(sendbytes); + memcpy(myInitialData, args->procSharedHost, sendbytes); + CUDACHECK(cudaHostRegister(remoteHost, sendbytes, 0)); + CUDACHECK(cudaHostGetDevicePointer(&remote, remoteHost, 0)); + + for (int i=0; inProcs; i++) { + if (i == args->proc) { + MPI_Bcast(myInitialData, sendbytes, MPI_BYTE, i, MPI_COMM_WORLD); + free(myInitialData); + } else { + MPI_Bcast(remoteHost, sendbytes, MPI_BYTE, i, MPI_COMM_WORLD); + Accumulate(args->procShared, remote, sendcount, type, op); + cudaDeviceSynchronize(); + } + } + CUDACHECK(cudaHostUnregister(remoteHost)); + free(remoteHost); + } +#endif + args->sync[args->sync_idx] = 0; + } else { + while (args->sync[args->sync_idx]) pthread_yield(); + } + + for (int i=0; inGpus; i++) { + int offset = ((args->proc*args->nThreads + args->thread)*args->nGpus + i)*recvbytes; + memcpy(args->expectedHost[i], (void *)((uintptr_t)args->procSharedHost + offset), recvbytes); + } + + args->sync_idx = !args->sync_idx; +} + +void GetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize * (nranks - 1)) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = 1; + *busBw = baseBw * factor; +} + +void RunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, count, type, op, comm, stream)); +} + +void RunTest(struct threadArgs_t* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + ncclDataType_t *run_types; + ncclRedOp_t *run_ops; + const char **run_typenames, **run_opnames; + int type_count, op_count; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = ncclNumTypes; + run_types = test_types; + run_typenames = test_typenames; + } + + if ((int)op != -1) { + run_ops = &op; + run_opnames = &opName; + op_count = 1; + } else { + op_count = sizeof(test_ops)/sizeof(test_ops[0]); + run_ops = test_ops; + run_opnames = test_opnames; + } + + for (int i=0; i