diff --git a/Makefile b/Makefile index 4025f10e06..8e0154aa31 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,9 @@ # See LICENCE.txt for license information # +BUILDDIR ?= build +override BUILDDIR := $(abspath $(BUILDDIR)) + .PHONY : all clean default : src.build @@ -14,7 +17,7 @@ all: ${TARGETS:%=%.build} clean: ${TARGETS:%=%.clean} %.build: - ${MAKE} -C $* build + ${MAKE} -C $* build BUILDDIR=${BUILDDIR} %.clean: - ${MAKE} -C $* clean + ${MAKE} -C $* clean BUILDDIR=${BUILDDIR} diff --git a/src/Makefile b/src/Makefile index 3dbd41ff9a..0c3c424616 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,6 +1,6 @@ # -# Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. -# Modifications are Copyright (c) 2019 Advanced Micro Devices, Inc. All rights reserved. +# Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved. +# Modifications are Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved. # # See LICENSE.txt for license information # @@ -65,13 +65,22 @@ build: ${BIN_FILES} clean: rm -rf ${DST_DIR} -${DST_DIR}/%.o: %.cu common.h +TEST_VERIFIABLE_SRCDIR := ../verifiable +TEST_VERIFIABLE_BUILDDIR := $(BUILDDIR)/verifiable +include ../verifiable/verifiable.mk + +${DST_DIR}/%.o: %.cu common.h $(TEST_VERIFIABLE_HDRS) @printf "Compiling %-35s > %s\n" $< $@ @mkdir -p ${DST_DIR} echo "$(HIPCC) -o $@ $(HIPCUFLAGS) -c $<" $(HIPCC) -o $@ $(HIPCUFLAGS) -c $< -${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o +${DST_DIR}/timer.o: timer.cc timer.h + @printf "Compiling %-35s > %s\n" $< $@ + @mkdir -p ${DST_DIR} + $(CXX) $(CXXFLAGS) -o $@ -c timer.cc + +${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_OBJS) @printf "Linking %-35s > %s\n" $< $@ @mkdir -p ${DST_DIR} echo "$(HIPCC) -o $@ $(HIPCUFLAGS) $^ ${HIPLDFLAGS}" diff --git a/src/all_gather.cu b/src/all_gather.cu index bc1c59969c..759f347d98 100644 --- a/src/all_gather.cu +++ b/src/all_gather.cu @@ -1,6 +1,6 @@ /************************************************************************* - * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. - * Modifications Copyright (c) 2019 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,24 +8,15 @@ #include #include "common.h" -void print_header() { - PRINT("# %10s %12s %8s out-of-place in-place \n", "", "", ""); - PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", - "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); - PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", - "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); -} - -void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { - PRINT("%12li %12li %8s", size, count, typeName); -} +#define ALIGN 4 void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { - *sendcount = count/nranks; - *recvcount = (count/nranks)*nranks; - *sendInplaceOffset = count/nranks; + size_t base = (count/(ALIGN*nranks))*ALIGN; + *sendcount = base; + *recvcount = base*nranks; + *sendInplaceOffset = base; *recvInplaceOffset = 0; - *paramcount = *sendcount; + *paramcount = base; } testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { @@ -35,18 +26,15 @@ testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncc int k=0; for (int i=0; inGpus; i++) { - int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; - if (args->enable_multiranks) - gpuid = gpuid % args->localNumDevices; - HIPCHECK(hipSetDevice(gpuid)); + HIPCHECK(hipSetDevice(args->gpus[i])); for (int l=0; lnRanks; l++) { int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0)); for (int j=0; jexpected[k])+args->sendBytes*j, sendcount, type, rep, j)); + TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0)); } k++; } @@ -98,7 +86,7 @@ testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t } for (int i=0; i #include "common.h" -void print_header() { - PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", ""); - PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop", - "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); - PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", - "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); -} - -void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { - PRINT("%12li %12li %8s %6s", size, count, typeName, opName); -} - void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { *sendcount = count; *recvcount = count; @@ -35,16 +23,13 @@ testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncc int k = 0; for (int i=0; inGpus; i++) { - int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; - if (args->enable_multiranks) - gpuid = gpuid % args->localNumDevices; - HIPCHECK(hipSetDevice(gpuid)); + HIPCHECK(hipSetDevice(args->gpus[i])); for (int l=0; lnRanks; l++) { int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank)); TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks)); k++; } diff --git a/src/alltoall.cu b/src/alltoall.cu index 48020e4fa3..77546f4eb7 100644 --- a/src/alltoall.cu +++ b/src/alltoall.cu @@ -1,6 +1,6 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. - * Modifications Copyright (c) 2019 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,18 +8,6 @@ #include #include "common.h" -void print_header() { - PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", ""); - PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop", - "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); - PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", - "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); -} - -void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { - PRINT("%12li %12li %8s %6s", size, count, typeName, opName); -} - void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { *sendcount = (count/nranks)*nranks; *recvcount = (count/nranks)*nranks; @@ -35,19 +23,16 @@ testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, nccl int k=0; for (int i=0; inGpus; i++) { - char* str = getenv("NCCL_TESTS_DEVICE"); - int gpuid = str ? atoi(str) : args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; - if (args->enable_multiranks) - gpuid = gpuid % args->localNumDevices; - HIPCHECK(hipSetDevice(gpuid)); + HIPCHECK(hipSetDevice(args->gpus[i])); for (int l=0; lnRanks; l++) { int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; - TESTCHECK(InitData(data, sendcount, type, rep, rank)); + TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0)); for (int j=0; jexpected[k])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j)); + size_t partcount = sendcount/nranks; + TESTCHECK(InitData(((char*)args->expected[k])+ j*partcount*wordSize(type), partcount, rank*partcount, type, ncclSum, 33*rep + j, 1, 0)); } k++; } @@ -101,7 +86,7 @@ testResult_t AlltoAllRunTest(struct threadArgs* args, int root, ncclDataType_t t } for (int i=0; i #include "common.h" -void print_header() { - PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", ""); - PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "root", - "time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error"); - PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", - "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", ""); -} - -void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) { - PRINT("%12li %12li %8s %6i", size, count, typeName, root); -} - void BroadcastGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { *sendcount = count; *recvcount = count; @@ -34,17 +22,14 @@ testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncc int k=0; for (int i=0; inGpus; i++) { - int gpuid = args->localRank*args->nThreads*args->nGpus + args->thread*args->nGpus + i; - if (args->enable_multiranks) - gpuid = gpuid % args->localNumDevices; - HIPCHECK(hipSetDevice(gpuid)); + HIPCHECK(hipSetDevice(args->gpus[i])); for (int l=0; lnRanks; l++) { int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l); HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes)); void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k]; - if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank)); - TESTCHECK(InitData(args->expected[k], recvcount, type, rep, root)); + if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0)); + TESTCHECK(InitData(args->expected[k], recvcount, 0, type, ncclSum, rep, 1, 0)); k++; } HIPCHECK(hipDeviceSynchronize()); @@ -114,7 +99,7 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t for (int i=0; i #include +#include #include #include //#define DEBUG_PRINT +#include "../verifiable/verifiable.h" + int test_ncclVersion = 0; // init'd with ncclGetVersion() #if NCCL_MAJOR >= 2 @@ -54,6 +57,12 @@ int test_ncclVersion = 0; // init'd with ncclGetVersion() const char *test_memorytypes[nccl_NUM_MTYPES] = {"coarse", "fine", "host", "managed"}; +// For libnccl's < 2.13 +extern "C" __attribute__((weak)) char const* ncclGetLastError(ncclComm_t comm) { + return ""; +} + +int is_main_proc = 0; thread_local int is_main_thread = 0; // Command line parameter defaults @@ -75,7 +84,10 @@ static int blocking_coll = 0; static int memorytype = 0; static int stress_cycles = 1; static uint32_t cumask[4]; +static int streamnull = 0; +static int timeout = 0; static int cudaGraphLaunches = 0; +static int report_cputime = 0; // Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX) static int average = 1; static int numDevices = 1; @@ -152,374 +164,164 @@ static bool minReqVersion(int rmajor, int rminor, int rpatch) return true; } -double DeltaMaxValue(ncclDataType_t type) { - switch(type) { - case ncclHalf: return 1e-2; -#if NCCL_MAJOR >= 2 && RCCL_BFLOAT16 == 1 - case ncclBfloat16: return 1e-2; -#endif - case ncclFloat: return 1e-5; - case ncclDouble: return 1e-12; - case ncclInt: -#if NCCL_MAJOR >= 2 - case ncclUint8: - //case ncclInt32: - case ncclUint32: -#endif - case ncclInt64: - case ncclUint64: return 1e-200; - } - return 1e-200; -} - -template __device__ -double absDiff(T a, T b) { - return fabs((double)(b - a)); -} - -template<> __device__ -double absDiff(half a, half b) { - float x = __half2float(a); - float y = __half2float(b); - return fabs((double)(y-x)); -} - -template __device__ -float toFloat(T a) { - return (float)a; -} -template<> __device__ -float toFloat(half a) { - return __half2float(a); -} -#if defined(RCCL_BFLOAT16) -template<> __device__ -float toFloat(rccl_bfloat16 a) { - return (float)(a); -} -#endif - -template __global__ -void deltaKern(void* A_, void* B_, size_t count, double* max) { - const T* A = (const T*)A_; - const T* B = (const T*)B_; - __shared__ double temp[BSIZE]; - int tid = blockIdx.x*blockDim.x + threadIdx.x; - double locmax = 0.0; - for(size_t i=tid; i locmax ) { - locmax = delta; -#ifdef DEBUG_PRINT - if (delta > .1) printf("Error at %ld/%ld(%p) : %f != %f\n", i, count, B+i, toFloat(A[i]), toFloat(B[i])); -#endif - } - } - - tid = threadIdx.x; - temp[tid] = locmax; - for(int stride = BSIZE/2; stride > 1; stride>>=1) { - __syncthreads(); - if( tid < stride ) - temp[tid] = temp[tid] > temp[tid+stride] ? temp[tid] : temp[tid+stride]; - } - __syncthreads(); - if( threadIdx.x == 0) - max[blockIdx.x] = temp[0] > temp[1] ? temp[0] : temp[1]; -} - -testResult_t CheckDelta(void* results, void* expected, size_t count, ncclDataType_t type, double* devmax) { - switch (type) { -#if NCCL_MAJOR >= 2 && RCCL_BFLOAT16 == 1 - case ncclBfloat16: - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; -#endif - case ncclHalf: - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; - case ncclFloat: - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; - case ncclDouble: - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; - case ncclChar: -#if NCCL_MAJOR >= 2 - case ncclUint8: -#endif - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; - case ncclInt: -#if NCCL_MAJOR >= 2 - case ncclUint32: -#endif - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; - case ncclInt64: - case ncclUint64: - hipLaunchKernelGGL((deltaKern), dim3(1), dim3(512), 0, 0, results, expected, count, devmax); break; - } - HIPCHECK(hipDeviceSynchronize()); - for (int i=1; i -__device__ T testValue(const size_t offset, const int rep, const int rank) { - uint8_t v = (rep+rank+offset) % 256; - return (T)v; +testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks) { + ncclVerifiablePrepareExpected(data, count, (int)type, (int)op, nranks, seed, offset, cudaStreamDefault); + return testSuccess; } -// For floating point datatype, we use values between 0 and 1 otherwise the -// Product operation will produce NaNs. -template<> -__device__ double testValue(const size_t offset, const int rep, const int rank) { - return 1.0/(1.0+(double)testValue(offset, rep, rank)); -} -template<> -__device__ float testValue(const size_t offset, const int rep, const int rank) { - return 1.0/(1.0+(float)testValue(offset, rep, rank)); -} -template<> -__device__ half testValue(const size_t offset, const int rep, const int rank) { - return __float2half(testValue(offset, rep, rank)); -} -#if NCCL_MAJOR >= 2 && RCCL_BFLOAT16 == 1 -template<> -__device__ rccl_bfloat16 testValue(const size_t offset, const int rep, const int rank) { - return rccl_bfloat16(testValue(offset, rep, rank)); -} -#endif - -// Operations -template -__device__ T ncclOpSum(T a, T b) { return a+b; } -template -__device__ T ncclOpProd(T a, T b) { return a*b; } -template -__device__ T ncclOpMax(T a, T b) { return a>b ? a : b; } -template -__device__ T ncclOpMin(T a, T b) { return a -__device__ half ncclOpSum(half a, half b) { return __float2half(__half2float(a)+__half2float(b)); } -template<> -__device__ half ncclOpProd(half a, half b) { return __float2half(__half2float(a)*__half2float(b)); } -template<> -__device__ half ncclOpMax(half a, half b) { return __half2float(a)>__half2float(b) ? a : b; } -template<> -__device__ half ncclOpMin(half a, half b) { return __half2float(a)<__half2float(b) ? a : b; } - -template -__device__ T ncclPPOpIdent(T x, int arg) { return x; } -template -__device__ T ncclPPOpMul(T x, int arg) { return x*T(arg); } -template -__device__ T ncclPPOpDiv(T x, int arg) { return x/T(arg); } -template<> -__device__ half ncclPPOpMul(half x, int arg) { - return __float2half(__half2float(x)*float(arg)); -} -template<> -__device__ half ncclPPOpDiv(half x, int n) { - return __float2half(__half2float(x)/n); -} -#if RCCL_BFLOAT16 == 1 -template<> -__device__ rccl_bfloat16 ncclPPOpMul(rccl_bfloat16 x, int arg) { - return (rccl_bfloat16)((float)(x)*float(arg)); -} -template<> -__device__ rccl_bfloat16 ncclPPOpDiv(rccl_bfloat16 x, int n) { - return (rccl_bfloat16)((float)(x)/(float)(n));; -} -#endif - -__host__ __device__ int preMulScalar(int rank) { - return 1 + rank%2; +testResult_t InitData(void* data, const size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int rank) { + ncclVerifiablePrepareInput(data, count, (int)type, (int)op, nranks, rank, seed, offset, cudaStreamDefault); + return testSuccess; } -template -__global__ void InitDataReduceKernel(T* data, const size_t N, const size_t offset, const int rep, const int nranks) { - for (size_t o=blockIdx.x*blockDim.x+threadIdx.x; o(o+offset, rep, 0); - val = PreOp(val, preMulScalar(0)); - for (int i=1; i(o+offset, rep, i); - val1 = PreOp(val1, preMulScalar(i)); - val = Op(val, val1); - } - data[o] = PostOp(val, nranks); +void Barrier(struct threadArgs *args) { + thread_local int epoch = 0; + static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}; + static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER}; + static int counter[2] = {0, 0}; + + pthread_mutex_lock(&lock[epoch]); + if(++counter[epoch] == args->nThreads) + pthread_cond_broadcast(&cond[epoch]); + + if(args->thread+1 == args->nThreads) { + while(counter[epoch] != args->nThreads) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + #ifdef MPI_SUPPORT + MPI_Barrier(MPI_COMM_WORLD); + #endif + counter[epoch] = 0; + pthread_cond_broadcast(&cond[epoch]); } + else { + while(counter[epoch] != 0) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + } + pthread_mutex_unlock(&lock[epoch]); + epoch ^= 1; } -#define KERN(type, op, preop, postop) (void*)InitDataReduceKernel, preop, postop > -#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) - #define OPS(type) \ - KERN(type, ncclOpSum, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpProd, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpMax, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpMin, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpSum/*Avg*/, ncclPPOpIdent, ncclPPOpDiv), \ - KERN(type, ncclOpSum/*PreMulSum*/, ncclPPOpMul, ncclPPOpIdent) -#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) - #define OPS(type) \ - KERN(type, ncclOpSum, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpProd, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpMax, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpMin, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpSum/*Avg*/, ncclPPOpIdent, ncclPPOpDiv) -#else - #define OPS(type) \ - KERN(type, ncclOpSum, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpProd, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpMax, ncclPPOpIdent, ncclPPOpIdent), \ - KERN(type, ncclOpMin, ncclPPOpIdent, ncclPPOpIdent) -#endif - -static void* const redInitDataKerns[test_opNumMax*ncclNumTypes] = { - OPS(int8_t), OPS(uint8_t), OPS(int32_t), OPS(uint32_t), OPS(int64_t), OPS(uint64_t), OPS(half), OPS(float), OPS(double), -#if NCCL_MAJOR >= 2 && RCCL_BFLOAT16 == 1 - OPS(rccl_bfloat16) -#endif -}; - -testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, const int rep, const int nranks) { - dim3 grid = { 32, 1, 1 }; - dim3 block = { 256, 1, 1 }; - void* args[5] = { (void*)&data, (void*)&count, (void*)&offset, (void*)&rep, (void*)&nranks }; - HIPCHECK(hipLaunchKernel(redInitDataKerns[type*test_opNumMax+op], grid, block, args, 0, hipStreamDefault)); - return testSuccess; -} - +// Inter-thread/process barrier+allreduce. The quality of the return value +// for average=0 (which means broadcast from rank=0) is dubious. The returned +// value will actually be the result of process-local broadcast from the local thread=0. template -__global__ void InitDataKernel(T* data, const size_t N, const int rep, const int rank) { - for (size_t o=blockIdx.x*blockDim.x+threadIdx.x; o(o, rep, rank); -} +void Allreduce(struct threadArgs* args, T* value, int average) { + thread_local int epoch = 0; + static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}; + static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER}; + static T accumulator[2]; + static int counter[2] = {0, 0}; -static void* const initDataKerns[ncclNumTypes] = { - (void*)InitDataKernel< int8_t>, - (void*)InitDataKernel< uint8_t>, - (void*)InitDataKernel< int32_t>, - (void*)InitDataKernel, - (void*)InitDataKernel< int64_t>, - (void*)InitDataKernel, - (void*)InitDataKernel< half>, - (void*)InitDataKernel< float>, - (void*)InitDataKernel< double>, -#if RCCL_BFLOAT16 == 1 && NCCL_MAJOR >= 2 - (void*)InitDataKernel -#endif -}; - -template -testResult_t InitDataType(void* dest, const size_t N, const int rep, const int rank) { - T* ptr = (T*)dest; - hipLaunchKernelGGL((InitDataKernel), dim3(16), dim3(512), 0, 0, ptr, N, rep, rank); - return testSuccess; -} - -testResult_t InitData(void* data, const size_t count, ncclDataType_t type, const int rep, const int rank) { - dim3 grid = { 32, 1, 1 }; - dim3 block = { 256, 1, 1 }; - void* args[4] = { (void*)&data, (void*)&count, (void*)&rep, (void*)&rank }; - HIPCHECK(hipLaunchKernel(initDataKerns[type], grid, block, args, 0, hipStreamDefault)); - return testSuccess; -} - -void Barrier(struct threadArgs* args) { - while (args->barrier[args->barrier_idx] != args->thread) pthread_yield(); - args->barrier[args->barrier_idx] = args->thread + 1; - if (args->thread+1 == args->nThreads) { -#ifdef MPI_SUPPORT - MPI_Barrier(MPI_COMM_WORLD); -#endif - args->barrier[args->barrier_idx] = 0; + pthread_mutex_lock(&lock[epoch]); + if(counter[epoch] == 0) { + if(average != 0 || args->thread == 0) accumulator[epoch] = *value; } else { - while (args->barrier[args->barrier_idx]) pthread_yield(); - } - args->barrier_idx=!args->barrier_idx; -} - -// Inter-thread/process barrier+allreduce -void Allreduce(struct threadArgs* args, double* value, int average) { - while (args->barrier[args->barrier_idx] != args->thread) pthread_yield(); - double val = *value; - if (args->thread > 0) { - double val2 = args->reduce[args->barrier_idx]; - if (average == 1) val += val2; - if (average == 2) val = std::min(val, val2); - if (average == 3) val = std::max(val, val2); - } - if (average || args->thread == 0) args->reduce[args->barrier_idx] = val; - args->barrier[args->barrier_idx] = args->thread + 1; - if (args->thread+1 == args->nThreads) { -#ifdef MPI_SUPPORT - if (average != 0) { - MPI_Op op = average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : MPI_MAX; - MPI_Allreduce(MPI_IN_PLACE, (void*)&args->reduce[args->barrier_idx], 1, MPI_DOUBLE, op, MPI_COMM_WORLD); + switch(average) { + case /*r0*/ 0: if(args->thread == 0) accumulator[epoch] = *value; break; + case /*avg*/1: accumulator[epoch] += *value; break; + case /*min*/2: accumulator[epoch] = std::min(accumulator[epoch], *value); break; + case /*max*/3: accumulator[epoch] = std::max(accumulator[epoch], *value); break; + case /*sum*/4: accumulator[epoch] += *value; break; } -#endif - if (average == 1) args->reduce[args->barrier_idx] /= args->nProcs*args->nThreads; - args->reduce[1-args->barrier_idx] = 0; - args->barrier[args->barrier_idx] = 0; - } else { - while (args->barrier[args->barrier_idx]) pthread_yield(); } - *value = args->reduce[args->barrier_idx]; - args->barrier_idx=!args->barrier_idx; + + if(++counter[epoch] == args->nThreads) + pthread_cond_broadcast(&cond[epoch]); + + if(args->thread+1 == args->nThreads) { + while(counter[epoch] != args->nThreads) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + + #ifdef MPI_SUPPORT + if(average != 0) { + static_assert(std::is_same::value || std::is_same::value, "Allreduce only for T in {long long, double}"); + MPI_Datatype ty = std::is_same::value ? MPI_LONG_LONG : + std::is_same::value ? MPI_DOUBLE : + MPI_Datatype(); + MPI_Op op = average == 1 ? MPI_SUM : + average == 2 ? MPI_MIN : + average == 3 ? MPI_MAX : + average == 4 ? MPI_SUM : MPI_Op(); + MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD); + } + #endif + + if(average == 1) accumulator[epoch] /= args->totalProcs*args->nThreads; + counter[epoch] = 0; + pthread_cond_broadcast(&cond[epoch]); + } + else { + while(counter[epoch] != 0) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + } + pthread_mutex_unlock(&lock[epoch]); + + *value = accumulator[epoch]; + epoch ^= 1; } -testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, double *delta) { +testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int64_t *wrongElts) { + int nranks = args->nProcs*args->nGpus*args->nThreads; size_t count = args->expectedBytes/wordSize(type); - double maxDelta = 0.0; + + int64_t *wrongPerGpu = nullptr; + CUDACHECK(hipHostAlloc((void**)&wrongPerGpu, args->nGpus*sizeof(int64_t), hipHostAllocMapped)); + for (int i=0; inGpus*args->nRanks; i++) { int device; int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i); NCCLCHECK(ncclCommCuDevice(args->comms[i], &device)); HIPCHECK(hipSetDevice(device)); void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i]; - TESTCHECK(CheckDelta(data , args->expected[i], count, type, args->deltaHost)); - maxDelta = std::max(*(args->deltaHost), maxDelta); -#ifdef DEBUG_PRINT - //if (rank == 0) { - int *expectedHost = (int *)malloc(args->expectedBytes); - int *dataHost = (int *)malloc(args->expectedBytes); + TESTCHECK(CheckDelta(data, args->expected[i], count, 0, type, op, 0, nranks, wrongPerGpu+i)); - hipMemcpy(expectedHost, args->expected[rank], args->expectedBytes, hipMemcpyDeviceToHost); +#if 1 && DEBUG_PRINT + if (args->reportErrors && wrongPerGpu[i] != 0) { + printf("rank=%d #wrong=%d\n", rank, (int)wrongPerGpu[i]); + char *expectedHost = (char*)malloc(args->expectedBytes); + char *dataHost = (char*)malloc(args->expectedBytes); + int eltsz = wordSize(type); + hipMemcpy(expectedHost, args->expected[i], args->expectedBytes, hipMemcpyDeviceToHost); hipMemcpy(dataHost, data, args->expectedBytes, hipMemcpyDeviceToHost); - int j, k, l; - for (j=0; jexpectedBytes/sizeof(int); j++) - if (expectedHost[j] != dataHost[j]) break; - k = j; - for (; jexpectedBytes/sizeof(int); j++) - if (expectedHost[j] == dataHost[j]) break; - l = j; - printf("\n Rank [%d] Expected: ", rank); - for (j=k; jexpectedBytes/sizeof(int) && jexpectedBytes/eltsz; j++) { + unsigned long long want, got; + want = 0; + memcpy(&want, expectedHost + j*eltsz, eltsz); + got = 0; + memcpy(&got, dataHost + j*eltsz, eltsz); + if(want != got) { + printf(" rank=%d elt[%d]: want=0x%llx got=0x%llx\n", rank, j, want, got); + } } - printf("\n Rank [%d] Actual : ", rank); - for (j=k; jexpectedBytes/sizeof(int) && jnProcs*args->nThreads*args->nGpus*args->nRanks; - if (args->reportErrors && maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++; - *delta = maxDelta; + + *wrongElts = 0; + for (int i=0; i < args->nGpus; i++) *wrongElts += wrongPerGpu[i]; + hipFree(wrongPerGpu); + + if (args->reportErrors && *wrongElts) args->errors[0]++; return testSuccess; } - + testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_t* comms) { hipError_t hipErr; int remaining = nStreams; int* done = (int*)malloc(sizeof(int)*nStreams); memset(done, 0, sizeof(int)*nStreams); + timer tim; + while (remaining) { int idle = 1; for (int i=0; i timeout && timeout > 0) { + for (int i=0; icomms[i], &hipDev)); HIPCHECK(hipSetDevice(hipDev)); + //CUDACHECK(cudaSetDevice(args->gpus[i])); EDGAR CHECK LATER #endif int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i); char* recvBuff = ((char*)args->recvbuffs[i]) + shift; @@ -590,19 +406,18 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t rccl_bfloat16 bf16; #endif }; - int scalar = preMulScalar(rank); switch(type) { - case ncclInt8: i8 = int8_t(scalar); break; - case ncclUint8: u8 = uint8_t(scalar); break; - case ncclInt32: i32 = int32_t(scalar); break; - case ncclUint32: u32 = uint32_t(scalar); break; - case ncclInt64: i64 = int32_t(scalar); break; - case ncclUint64: u64 = uint32_t(scalar); break; - case ncclFloat16: f16 = __float2half(float(scalar)); break; - case ncclFloat32: f32 = float(scalar); break; - case ncclFloat64: f64 = double(scalar); break; + case ncclInt8: i8 = ncclVerifiablePremulScalar(rank); break; + case ncclUint8: u8 = ncclVerifiablePremulScalar(rank); break; + case ncclInt32: i32 = ncclVerifiablePremulScalar(rank); break; + case ncclUint32: u32 = ncclVerifiablePremulScalar(rank); break; + case ncclInt64: i64 = ncclVerifiablePremulScalar(rank); break; + case ncclUint64: u64 = ncclVerifiablePremulScalar(rank); break; + case ncclFloat16: f16 = ncclVerifiablePremulScalar(rank); break; + case ncclFloat32: f32 = ncclVerifiablePremulScalar(rank); break; + case ncclFloat64: f64 = ncclVerifiablePremulScalar(rank); break; #if defined(RCCL_BFLOAT16) - case ncclBfloat16: bf16 = (rccl_bfloat16)(float(scalar)); break; + case ncclBfloat16: bf16 = ncclVerifiablePremulScalar<__nv_bfloat16>(rank); break; #endif } NCCLCHECK(ncclRedOpCreatePreMulSum(&op, &u64, type, ncclScalarHostImmediate, args->comms[i])); @@ -657,16 +472,17 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t if (cudaGraphLaunches >= 1) { // Begin cuda graph capture for (int i=0; inGpus*args->nRanks; i++) { - // Thread local mode is needed for: - // - Multi-thread mode - // - P2P pre-connect + // Thread local mdoe is needed for: + // - Multi-thread mode: where graph capture and instantiation can happen concurrently across threads + // - P2P pre-connect: when there is no warm-up, P2P pre-connect is done during graph capture. + // Since pre-connect calls cudaMalloc, we cannot use global capture mode HIPCHECK(hipStreamBeginCapture(args->streams[i], hipStreamCaptureModeThreadLocal)); } } #endif // Performance Benchmark - auto start = std::chrono::high_resolution_clock::now(); + timer tim; for (int iter = 0; iter < iters; iter++) { if (agg_iters>1) NCCLCHECK(ncclGroupStart()); for (int aiter = 0; aiter < agg_iters; aiter++) { @@ -687,7 +503,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } // Resync CPU, restart timing, launch cuda graph Barrier(args); - start = std::chrono::high_resolution_clock::now(); + tim.reset(); for (int l=0; lnGpus*args->nRanks; i++) { HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i])); @@ -696,10 +512,10 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } #endif + double cputimeSec = tim.elapsed()/(iters*agg_iters); TESTCHECK(completeColl(args)); - auto delta = std::chrono::high_resolution_clock::now() - start; - double deltaSec = std::chrono::duration_cast>(delta).count(); + double deltaSec = tim.elapsed(); deltaSec = deltaSec/(iters*agg_iters); if (cudaGraphLaunches >= 1) deltaSec = deltaSec/cudaGraphLaunches; Allreduce(args, &deltaSec, average); @@ -719,8 +535,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t Barrier(args); - double maxDelta = 0; - bool error = false; + int64_t wrongElts = 0; static __thread int rep = 0; rep++; if (datacheck) { @@ -768,13 +583,15 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } #endif - TESTCHECK(CheckData(args, type, op, root, in_place, &maxDelta)); + TESTCHECK(CheckData(args, type, op, root, in_place, &wrongElts)); //aggregate delta from all threads and procs - Allreduce(args, &maxDelta, 3); + long long wrongElts1 = wrongElts; + Allreduce(args, &wrongElts1, /*sum*/4); + wrongElts = wrongElts1; } - double timeUsec = deltaSec*1.0E6; + double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6; char timeStr[100]; if (timeUsec >= 10000.0) { sprintf(timeStr, "%7.0f", timeUsec); @@ -783,10 +600,10 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } else { sprintf(timeStr, "%7.2f", timeUsec); } - if (datacheck) { - PRINT(" %7s %6.2f %6.2f %5.0le%s", timeStr, algBw, busBw, maxDelta, error ? "*" : ""); + if (args->reportErrors) { + PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts); } else { - PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A"); + PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A"); } args->bw[0] += busBw; @@ -809,6 +626,9 @@ void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) { } testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root) { + // Sync to avoid first-call timeout + Barrier(args); + // Warm-up for large size setupArgs(args->maxbytes, type, args); for (int iter = 0; iter < warmup_iters; iter++) { @@ -855,7 +675,7 @@ testResult_t threadInit(struct threadArgs* args) { int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks; //set main thread again - is_main_thread = (args->proc == 0 && args->thread == 0) ? 1 : 0; + is_main_thread = (is_main_proc && args->thread == 0) ? 1 : 0; NCCLCHECK(ncclGroupStart()); for (int i=0; inGpus; i++) { @@ -863,6 +683,7 @@ testResult_t threadInit(struct threadArgs* args) { if (enable_multiranks) gpuid = gpuid % numDevices; HIPCHECK(hipSetDevice(gpuid)); + //CUDACHECK(cudaSetDevice(args->gpus[i])); for (int j=0; jnRanks; j++) { int rank = (args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + j; @@ -968,10 +789,13 @@ int main(int argc, char* argv[]) { {"datatype", required_argument, 0, 'd'}, {"root", required_argument, 0, 'r'}, {"blocking", required_argument, 0, 'z'}, - {"memory_type", required_argument, 0, 'y'}, - {"stress_cycles", required_argument, 0, 's'}, - {"cumask", required_argument, 0, 'u'}, + {"memory_type", required_argument, 0, 'y'}, //RCCL + {"stress_cycles", required_argument, 0, 's'}, //RCCL + {"cumask", required_argument, 0, 'u'}, //RCCL + {"stream_null", required_argument, 0, 'y'}, //NCCL + {"timeout", required_argument, 0, 'T'}, //NCCL {"cudagraph", required_argument, 0, 'G'}, + {"report_cputime", required_argument, 0, 'C'}, {"average", required_argument, 0, 'a'}, #ifdef RCCL_MULTIRANKPERGPU {"enable_multiranks", required_argument, 0, 'x'}, @@ -983,10 +807,12 @@ int main(int argc, char* argv[]) { while(1) { int c; -#ifdef RCCL_MULTIRANKPERGPU - c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:R:x:", longopts, &longindex); + // EDGAR NOTE: y is used by 'memory_type' (a RCCL argument) and 'stream_null' (a NCCL argument) + // also not sure about G vs. hG (we had G, they have hG) +#ifdef RCCL_MULTIRANKPERGPU + c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z :y :T:G:C:a :y :s:u:h:R:x:", longopts, &longindex); #else - c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:G:a:y:s:u:h:", longopts, &longindex); + c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z :y :T:G:C:a :y :s:u:h:", longopts, &longindex); #endif if (c == -1) @@ -1067,6 +893,12 @@ int main(int argc, char* argv[]) { mask = strtok(NULL, ","); }; } + break; + case 'y': + streamnull = strtol(optarg, NULL, 0); + break; + case 'T': + timeout = strtol(optarg, NULL, 0); break; case 'G': #if (NCCL_MAJOR > 2 || (NCCL_MAJOR >= 2 && NCCL_MINOR >= 9)) && HIP_VERSION >= 50221310 @@ -1075,6 +907,9 @@ int main(int argc, char* argv[]) { printf("Option -G (HIP graph) not supported before NCCL 2.9 + ROCm 5.2 Ignoring\n"); #endif break; + case 'C': + report_cputime = strtol(optarg, NULL, 0); + break; case 'a': average = (int)strtol(optarg, NULL, 0); break; @@ -1114,15 +949,18 @@ int main(int argc, char* argv[]) { "[-y,--memory_type ] \n\t" "[-s,--stress_cycles ] \n\t" "[-u,--cumask ] \n\t" + "[-y,--stream_null <0/1>] \n\t" + "[-T,--timeout