From d2fbcfea02a5f761f52c766f7a589e5369bac1e7 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Tue, 26 Nov 2019 16:33:13 -0800 Subject: [PATCH] Misc fixes and improvements for 2.5.6 1. Fix RCCL unit test 2. Add ROME detection and tuning 3. Change default P2P level 4. Fix search algorithm for XGMI 5. Remove explicit channel duplication with implicit by using half of link speed 6. Add collective trace support 7. Correct Intel Skylake CPU detection and bandwidth 8. Fix topo connect function 9. Disable GDR read and remove unreachable code 10. Disable LL128 kernels 11. Add tuning parameters 12. Use original clock64() implementation which returns RTC counter value 13. Print out timestamp of collective trace 14. Do not use struct ncclColl in kernel launch parameter 15. Fix abort handling and add tracing 17. Add __launch_bounds__ to kernel functions 18. Remove unused abortCount 19. Unset default MIN_NRINGS and MIN_NCHANNELS 20. Do not allocate shared memory when not using LL128 kernels 21. Correct time print out in tuning log [ROCm/rccl commit: 1e55645d974fde68b78fff966a61f976b68e2b31] --- projects/rccl/CMakeLists.txt | 5 + .../rccl/src/collectives/device/all_reduce.h | 4 +- .../rccl/src/collectives/device/broadcast.h | 4 +- projects/rccl/src/collectives/device/common.h | 93 ++++++++++++++----- .../rccl/src/collectives/device/primitives.h | 16 ++-- projects/rccl/src/enqueue.cc | 18 ++-- projects/rccl/src/graph/connect.cc | 14 +-- projects/rccl/src/graph/search.cc | 3 +- projects/rccl/src/graph/topo.cc | 55 +++++++++-- projects/rccl/src/graph/topo.h | 5 + projects/rccl/src/graph/tuning.cc | 16 ++-- projects/rccl/src/include/collectives.h | 2 +- projects/rccl/src/include/comm.h | 4 +- projects/rccl/src/include/devcomm.h | 29 +++++- projects/rccl/src/init.cc | 66 +++++++++++++ projects/rccl/src/transport/net.cc | 22 +++-- projects/rccl/src/transport/p2p.cc | 6 +- projects/rccl/test/test_AllReduceAbort.cpp | 25 ++--- projects/rccl/test/test_BroadcastAbort.cpp | 25 ++--- .../tools/rccl-prim-test/rccl_prim_test.cpp | 16 +++- 20 files changed, 307 insertions(+), 121 deletions(-) diff --git a/projects/rccl/CMakeLists.txt b/projects/rccl/CMakeLists.txt index d0f6c4f7bd..3e1fec1403 100644 --- a/projects/rccl/CMakeLists.txt +++ b/projects/rccl/CMakeLists.txt @@ -147,6 +147,11 @@ if(PROFILE) add_definitions(-DENABLE_PROFILING) endif() +set(COLLTRACE 1 CACHE BOOL "Collective Trace Option") +if(COLLTRACE) + add_definitions(-DENABLE_COLLTRACE) +endif() + foreach(target ${AMDGPU_TARGETS}) target_link_libraries(rccl PRIVATE --amdgpu-target=${target}) endforeach() diff --git a/projects/rccl/src/collectives/device/all_reduce.h b/projects/rccl/src/collectives/device/all_reduce.h index 55508a82cf..f3973bfd34 100644 --- a/projects/rccl/src/collectives/device/all_reduce.h +++ b/projects/rccl/src/collectives/device/all_reduce.h @@ -26,7 +26,7 @@ __device__ void ncclAllReduceRingKernel(struct CollectiveArgs* args) { #ifdef ENABLE_PROFILING auto devProf = comm->devProf; uint64_t clk, t0 = 0ULL, ws, wr; - if (tid == 0) clk = clock64(); + if (tid == 0) clk = __rtc64(); #endif // Compute pointers @@ -98,7 +98,7 @@ __device__ void ncclAllReduceRingKernel(struct CollectiveArgs* args) { ACCUMULATE_COUNTER(directRecv); } #ifdef ENABLE_PROFILING - if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), clock64() - clk, __ATOMIC_SEQ_CST); + if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), __rtc64() - clk, __ATOMIC_SEQ_CST); #endif } diff --git a/projects/rccl/src/collectives/device/broadcast.h b/projects/rccl/src/collectives/device/broadcast.h index 78ea3c3bc8..8399af890f 100644 --- a/projects/rccl/src/collectives/device/broadcast.h +++ b/projects/rccl/src/collectives/device/broadcast.h @@ -28,7 +28,7 @@ __device__ void ncclBroadcastRingKernel(struct CollectiveArgs* args) { #ifdef ENABLE_PROFILING auto devProf = comm->devProf; uint64_t clk, t0 = 0ULL, ws, wr; - if (tid == 0) clk = clock64(); + if (tid == 0) clk = __rtc64(); #endif // Compute pointers @@ -65,7 +65,7 @@ __device__ void ncclBroadcastRingKernel(struct CollectiveArgs* args) { } } #ifdef ENABLE_PROFILING - if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), clock64() - clk, __ATOMIC_SEQ_CST); + if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), __rtc64() - clk, __ATOMIC_SEQ_CST); #endif } diff --git a/projects/rccl/src/collectives/device/common.h b/projects/rccl/src/collectives/device/common.h index a16f9c89db..90a2444aa5 100644 --- a/projects/rccl/src/collectives/device/common.h +++ b/projects/rccl/src/collectives/device/common.h @@ -12,6 +12,16 @@ #include "collectives.h" #include "devcomm.h" +__device__ +inline __attribute((always_inline)) +long long int __rtc64() { +#if __HIP__ + return (long long int) __builtin_amdgcn_s_memrealtime(); +#else + return (long long int) __clock_u64(); +#endif +} + // Exit If Abort Barrier across CTA: make sure all threads exit consistently // Each thread sets a predicate to true if abort == 1 // all CTA's threads enter the barrier and do a popc on their predicates being True @@ -20,7 +30,7 @@ #define exitIfAbortBarrier(abort, abortCount) \ if (abort) __atomic_fetch_add(abortCount, 1, __ATOMIC_SEQ_CST); \ __syncthreads(); \ - if (LOAD(abortCount)) { asm volatile ("s_endpgm"); return; } + if (LOAD(abortCount)) { /*asm volatile ("s_endpgm");*/ return false; } #define __syncwarp() #else static inline __device__ void exitIfAbortBarrier(int abort) { @@ -36,7 +46,7 @@ static inline __device__ void exitIfAbortBarrier(int abort) { #define NCCL_FUNC5(coll, op, dtype) \ NCCL_COLL_NAME(coll##LL, op, dtype), \ - NCCL_COLL_NAME(coll##LL128, op, dtype), \ + NCCL_COLL_NAME(coll##LL, op, dtype), \ NCCL_COLL_NAME(coll, op, dtype) #define NCCL_FUNC4(coll, op, dtype) \ @@ -149,17 +159,54 @@ static __device__ void load_parallel(void* dst, void* src, size_t size, int tid, for (int o = tid; o < (size/sizeof(int)); o += blockDim.x) d[o] = s[o]; } -static __device__ void load_coll(struct ncclColl* localColl, struct ncclColl* hostColl, int tid, struct ncclDevComm* comm, uint32_t* abortCount) { +static __device__ bool load_coll(struct ncclColl* localColl, struct ncclColl* hostColl, int tid, struct ncclDevComm* comm, uint32_t* abortCount) { // Check whether the last operation was aborted and make sure all threads exit int abort = tid == 0 ? *(comm->abortFlag) : 0; exitIfAbortBarrier(abort, abortCount); load_parallel(localColl, hostColl, sizeof(struct ncclColl), tid, abortCount); __syncthreads(); if (tid == 0) hostColl->active = 0; + return true; } +#ifdef ENABLE_COLLTRACE +#define traceColl(fIdx) \ + uint32_t pos = __atomic_fetch_add(comm->collTraceTail, 1, __ATOMIC_SEQ_CST)%COLLTRACE_NUM_ITEMS; \ + comm->collTrace[pos].timeStamp = __rtc64(); \ + comm->collTrace[pos].opCount = localColl.args.opCount; \ + comm->collTrace[pos].bid = bid; \ + comm->collTrace[pos].funcIndex = fIdx; +#define traceKernelLaunch(fIdx) { \ + traceColl(fIdx); \ + comm->collTrace[pos].type = ncclCollTraceKernelLaunchType; \ + asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_HW_ID)" : "=s" (comm->collTrace[pos].data_0)); \ + } +#define traceCollEnd(fIdx) { \ + traceColl(fIdx); \ + comm->collTrace[pos].type = ncclCollTraceCollEndType; \ + } +#define traceAbort(fIdx) { \ + traceColl(fIdx); \ + comm->collTrace[pos].type = ncclCollTraceAbortType; \ + } +#else +#define traceKernelLaunch() +#define traceCollEnd() +#define traceAbort() +#endif + extern __device__ volatile uint64_t* ncclShmem; +#ifdef ENABLE_LL128 +#define ALLOCATE_SHMEM \ + __shared__ volatile uint64_t shmem[NCCL_LL128_SHMEM_SIZE]; \ + ncclShmem = shmem; \ + __shared__ uint32_t sync[NCCL_LL128_MAX_NTHREADS/WARP_SIZE]; +#else +#define ALLOCATE_SHMEM \ + uint32_t* sync = 0; +#endif + /* Functions for aggregation case */ #define IMPL_COLL_FUNC(coll, op, ncclFunc, dtype, ctype) \ __device__ void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args) { \ @@ -168,47 +215,45 @@ __device__ void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args) { \ /* Kernels with the first operation inlined */ #define IMPL_COLL_KERN(coll, op, ncclFunc, dtype, ctype, fIndex) \ -__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl firstColl) { \ +__launch_bounds__(NCCL_MAX_NTHREADS, 1) \ +__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclDevComm* comm) { \ int tid = threadIdx.x; \ int bid = blockIdx.x; \ - __shared__ volatile uint64_t shmem[NCCL_LL128_SHMEM_SIZE]; \ - ncclShmem = shmem; \ + ALLOCATE_SHMEM; \ __shared__ struct ncclColl localColl; \ __shared__ uint32_t abortCount; \ - __shared__ uint32_t sync[NCCL_LL128_MAX_NTHREADS/WARP_SIZE]; \ if (tid == 0) abortCount = 0; \ __syncthreads(); \ \ - struct ncclDevComm* comm = firstColl.args.comm; \ struct ncclChannel* channel = comm->channels+bid; \ - struct ncclColl* c; \ - channel->abortCount = &abortCount; \ channel->sync = sync; \ - if (bid == 0) { \ - /* To optimize for latency, (only) the first operation is passed as argument.*/ \ - c = &firstColl; \ - } else { \ - c = &localColl; \ - load_coll(c, channel->devCollectives+channel->collFifoHead, tid, comm, &abortCount); \ + if (!load_coll(&localColl, channel->devCollectives+channel->collFifoHead, tid, comm, &abortCount)) { \ + if (tid == 0) traceAbort(-1); \ + return; \ } \ + if (tid == 0) traceKernelLaunch(localColl.funcIndex); \ while (1) { \ - if (tid < c->args.nThreads) { \ - if (c->funcIndex == fIndex) { \ - coll##Kernel, ctype>(&c->args); \ + if (tid < localColl.args.nThreads) { \ + if (localColl.funcIndex == fIndex) { \ + coll##Kernel, ctype>(&localColl.args); \ } else { \ - NCCL_CALL_FUNCTIONS(c); \ + NCCL_CALL_FUNCTIONS(&localColl); \ } \ } \ - int nextIndex = c->nextIndex; \ + int nextIndex = localColl.nextIndex; \ if (tid == 0) channel->collFifoHead = nextIndex; \ \ - if (c->active == 2) { \ + if (localColl.active == 2) { \ + if (tid == 0) traceCollEnd(-1); \ return; \ } \ \ /* Load next collective operation*/ \ - c = &localColl; /* for bid 0 */ \ - load_coll(c, channel->devCollectives+nextIndex, tid, comm, &abortCount); \ + if (!load_coll(&localColl, channel->devCollectives+nextIndex, tid, comm, &abortCount)) { \ + if (tid == 0) traceAbort(-1); \ + break; \ + } \ + if (tid == 0) traceCollEnd(localColl.funcIndex); \ } \ } diff --git a/projects/rccl/src/collectives/device/primitives.h b/projects/rccl/src/collectives/device/primitives.h index 2d34d4bac4..e307ff483a 100644 --- a/projects/rccl/src/collectives/device/primitives.h +++ b/projects/rccl/src/collectives/device/primitives.h @@ -65,8 +65,7 @@ class ncclPrimitives { #endif const T* recvBuff[NRECV]; T* sendBuff[NSEND]; - struct ncclDevComm* comm; - uint32_t* abortCount; + const struct ncclDevComm* comm; inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; } inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; } @@ -112,14 +111,14 @@ class ncclPrimitives { if (sendConnHeadPtr) { #ifdef ENABLE_PROFILING auto devProf = comm->devProf; - uint64_t t0 = clock64(); + uint64_t t0 = __rtc64(); #endif while (sendConnHeadCache + NCCL_STEPS < sendConnHead + SLICESTEPS) { sendConnHeadCache = LOAD(sendConnHeadPtr); if (checkAbort(wid, 1)) break; } #ifdef ENABLE_PROFILING - __atomic_fetch_add(&devProf->wait_send_cycle[blockIdx.x], clock64() - t0, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&devProf->wait_send_cycle[blockIdx.x], __rtc64() - t0, __ATOMIC_SEQ_CST); #endif if (sendConnFifoPtr) { STORE(sendConnFifoPtr+sendConnHead%NCCL_STEPS, nbytes); @@ -134,14 +133,14 @@ class ncclPrimitives { if (recvConnTailPtr) { #ifdef ENABLE_PROFILING auto devProf = comm->devProf; - uint64_t t0 = clock64(); + uint64_t t0 = __rtc64(); #endif while (recvConnTailCache < recvConnTail + SLICESTEPS) { recvConnTailCache = LOAD(recvConnTailPtr); if (checkAbort(wid, 0)) break; } #ifdef ENABLE_PROFILING - __atomic_fetch_add(&devProf->wait_recv_cycle[blockIdx.x], clock64() - t0, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&devProf->wait_recv_cycle[blockIdx.x], __rtc64() - t0, __ATOMIC_SEQ_CST); #endif recvConnTail += SLICESTEPS; } @@ -340,7 +339,6 @@ inline __device__ int directSendInc(int i, int directInc, int sliceInc) { ncclPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount) : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), stepSize(stepSize), opCount(opCount) { // Make sure step is updated before we read it. - abortCount = channel->abortCount; barrier(); for (int i=0; i= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, 0); @@ -417,11 +415,11 @@ inline __device__ int directSendInc(int i, int directInc, int sliceInc) { #ifdef ENABLE_PROFILING #define INIT_COUNTER \ - if (tid == 0) { t0 = clock64(); ws = LOAD(&(devProf->wait_send_cycle[blockIdx.x])); \ + if (tid == 0) { t0 = __rtc64(); ws = LOAD(&(devProf->wait_send_cycle[blockIdx.x])); \ wr = LOAD(&(devProf->wait_recv_cycle[blockIdx.x])); } #define ACCUMULATE_COUNTER(prim) \ - if (tid == 0) { __atomic_fetch_add(&(devProf->prim##_cycle), clock64() - t0 \ + if (tid == 0) { __atomic_fetch_add(&(devProf->prim##_cycle), __rtc64() - t0 \ + ws - LOAD(&(devProf->wait_send_cycle[blockIdx.x])) \ + wr - LOAD(&(devProf->wait_recv_cycle[blockIdx.x])), \ __ATOMIC_SEQ_CST); \ diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index e84b5bcc68..92290cade1 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -54,7 +54,7 @@ NCCL_FUNCS3B(coll, copy), \ NCCL_FUNCS3B(coll, copy) -typedef void(*ncclKern_t)(struct ncclColl); +typedef void(*ncclKern_t)(struct ncclDevComm*); // Must be consistent with the ncclFuncSet enum static ncclKern_t const ncclKerns[NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS] = { NCCL_FUNCS2B(ncclBroadcast), @@ -80,7 +80,7 @@ ncclResult_t ncclLaunchCooperativeKernelMultiDevice(hipLaunchParams *paramsList, for (int i = 0; i < numDevices; i++) { hipLaunchParams* params = paramsList+i; CUDACHECK(hipSetDevice(cudaDevs[i])); - hipLaunchKernelGGL(((void (*)(struct ncclColl))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclColl **)(params->args))); + hipLaunchKernelGGL(((void (*)(struct ncclDevComm*))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclDevComm ***)(params->args))); } CUDACHECK(hipSetDevice(savedDev)); return ncclSuccess; @@ -98,10 +98,8 @@ ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) { // Find the first operation, choose the kernel accordingly and pass it // as the first argument. struct ncclColl* coll = comm->channels[0].collectives+comm->channels[0].collStart; - memcpy(&comm->args, coll, sizeof(struct ncclColl)); - // As we pass that coll directly, we can free it immediately. - STORE(&coll->active, 0); + comm->args = comm->devComm; params->func = (void *)ncclKerns[coll->funcIndex]; return ncclSuccess; } @@ -194,7 +192,7 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) { hipLaunchParams *params = comm->myParams; if (comm->launchMode == ncclComm::PARALLEL) { - hipLaunchKernelGGL(((void (*)(struct ncclColl))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclColl **)(params->args))); + hipLaunchKernelGGL(((void (*)(struct ncclDevComm*))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclDevComm ***)(params->args))); } // Start the network proxies as soon as the kernel has been launched. We can't // perform any CUDA call between the two or having a hipFree between the CUDA @@ -232,9 +230,9 @@ ncclResult_t ncclEnqueueEvents(ncclComm_t comm) { // Trees are not perfectly sticking to the model for medium sizes. Applying a static correction // factor is not ideal but works quite well. Powers of two, 64 B to 1 GB. static float treeCorrectionFactor[NCCL_NUM_PROTOCOLS][22] = { - { 1.0, 1.0, 1.0, 1.0, .9, .8, .7, .7, .7, .7, .6, .5, .5, .5, .6, .7, .8, .9, .9, 1.0, 1.0, 1.0 }, - { 1.0, 1.0, 1.0, 1.0, 1.0, .9, .8, .8, .8, .8, .7, .7, .7, .6, .6, .7, .7, .8, .8, .9, .9, 1.0 }, - { .9, .9, .9, .9, .9, .9, .9, .8, .7, .6, .6, .5, .5, .5, .5, .5, .5, .6, .6, .7, .8, .9 } + { 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, .84, .49, .42, .60, .75, .87, .94, .94, .99, 1.0, 1.0 , 1.0 , 1.0 , 1.0 , 1.0 }, + { 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, .84, .49, .42, .60, .75, .87, .94, .94, .99, 1.0, 1.0 , 1.0 , 1.0 , 1.0 , 1.0 }, + { 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, .41, .27, .25, .39, .46, .72, .76, .87, .92, .97, 1.0, 1.0 , 1.0 , 1.0 , 1.0 , 1.0 } }; static ncclResult_t getAlgoInfo(struct ncclInfo* info) { @@ -262,7 +260,7 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { return ncclInternalError; } - if (comm->rank == 0) INFO(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %d", info->nBytes, info->algorithm, info->protocol, minTime); + if (comm->rank == 0) INFO(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %d", info->nBytes, info->algorithm, info->protocol, (int)minTime); TRACE(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %f", info->nBytes, info->algorithm, info->protocol, minTime); int nc = comm->nChannels; diff --git a/projects/rccl/src/graph/connect.cc b/projects/rccl/src/graph/connect.cc index af481d28b5..486cd2b14a 100644 --- a/projects/rccl/src/graph/connect.cc +++ b/projects/rccl/src/graph/connect.cc @@ -182,7 +182,7 @@ NCCL_PARAM(MinNchannels, "MIN_NCHANNELS", -2); NCCL_PARAM(MaxNchannels, "MAX_NCHANNELS", -2); int ncclMinNchannels() { - int minNchannels = 0; + int minNchannels = 2; if (ncclParamMinNrings() != -2) minNchannels = ncclParamMinNrings(); if (ncclParamMinNchannels() != -2) minNchannels = ncclParamMinNchannels(); if (minNchannels > MAXCHANNELS) { @@ -234,12 +234,14 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, struct nccl NCCLCHECK(connectRings(comm, ringRecv, ringSend, ringPrev, ringNext, firstRanks)); NCCLCHECK(connectTrees(comm, treeUpRecv, treeUpSend, treeDnRecv, treeDnSend, firstRanks)); - // Duplicate ringPrev/ringNext for ncclBuildRing - memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int)); - memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int)); + if (nChannels == 1) { + // Duplicate ringPrev/ringNext for ncclBuildRing + memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int)); + memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int)); - // Duplication should be complete now - nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2); + // Duplication should be complete now + nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2); + } // Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS. // We permit combining max, then min, to only use the first channels, then duplicate them. diff --git a/projects/rccl/src/graph/search.cc b/projects/rccl/src/graph/search.cc index 3a8b4e7a9d..752b236c8b 100644 --- a/projects/rccl/src/graph/search.cc +++ b/projects/rccl/src/graph/search.cc @@ -12,6 +12,7 @@ static ncclResult_t ncclTopoFollowPath(struct ncclTopoGraph* graph, struct ncclT if (path->count == 0) return ncclSuccess; *node = NULL; + width /= 2; if (width > 0) { if (path->type > graph->type) return ncclSuccess; graph->type = std::max(graph->type, path->type); @@ -204,7 +205,7 @@ ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopo NCCLCHECK(ncclTopoCompareGraphs(graph, saveGraph, ©)); if (copy) { memcpy(saveGraph, graph, sizeof(struct ncclTopoGraph)); - if (graph->nChannels*graph->speedIntra == maxSpeed) *time = -1; + if (graph->nChannels*graph->speedIntra/2 == maxSpeed) *time = -1; } if (graph->nChannels < MAXCHANNELS/2) { NCCLCHECK(ncclTopoSearchRec(system, graph, saveGraph, maxSpeed, time)); diff --git a/projects/rccl/src/graph/topo.cc b/projects/rccl/src/graph/topo.cc index 29c83da052..6e2e3382ac 100644 --- a/projects/rccl/src/graph/topo.cc +++ b/projects/rccl/src/graph/topo.cc @@ -13,6 +13,10 @@ #include "net.h" #include #include +#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) +#include +#include +#endif #define BUSID_SIZE (sizeof("0000:00:00.0")) #define BUSID_REDUCED_SIZE (sizeof("0000:00")) @@ -96,15 +100,16 @@ ncclResult_t ncclTopoCudaPath(int cudaDev, char** path) { int interCpuWidth = 0; int cpuPciWidth = 0; +int p2pPciWidth = 0; static ncclResult_t getCpuWidths() { // Check if already detected - if (interCpuWidth + cpuPciWidth) return ncclSuccess; + if (interCpuWidth + cpuPciWidth + p2pPciWidth) return ncclSuccess; // Defaults char cpu[256]; sprintf(cpu, "Generic"); - cpuPciWidth = interCpuWidth = PCI_WIDTH; + cpuPciWidth = interCpuWidth = p2pPciWidth = PCI_WIDTH; #ifdef __PPC__ sprintf(cpu, "ppc64"); @@ -124,6 +129,7 @@ static ncclResult_t getCpuWidths() { asm volatile("cpuid" : "=b" (cpuid0.ebx), "=c" (cpuid0.ecx), "=d" (cpuid0.edx) : "a" (0)); if (strncmp(cpuid0.vendor, "GenuineIntel", 12) == 0) sprintf(cpu, "Intel"); + else if (strncmp(cpuid0.vendor, "AuthenticAMD", 12) == 0) sprintf(cpu, "AMD"); if (strcmp(cpu, "Intel") == 0) { union { @@ -133,22 +139,47 @@ static ncclResult_t getCpuWidths() { int familyId:4; int processorType:2; int resv0:2; - int extModelId:4; - int modelId:8; + int extModel:4; + int extFamily:8; int resv1:4; }; uint32_t val; } cpuid1; asm volatile("cpuid" : "=a" (cpuid1.val) : "a" (1)); - if (cpuid1.familyId == 6 && cpuid1.modelId >= 0x55) { // Skylake + if (cpuid1.familyId == 6 && (cpuid1.model + cpuid1.extModel * 16) >= 0x55) { // Skylake sprintf(cpu, "Intel/Skylake (or later)"); interCpuWidth = SKL_QPI_WIDTH; + cpuPciWidth = SKL_CPUPCI_WIDTH; + p2pPciWidth = SKL_PCI_WIDTH; + } else { + interCpuWidth = QPI_WIDTH; + } + } + else if (strcmp(cpu, "AMD") == 0) { + union { + struct { + uint32_t steppingId:4; + uint32_t model:4; + uint32_t family:4; + uint32_t resv0:4; + uint32_t extModel:4; + uint32_t extFamily:8; + uint32_t resv1:4; + }; + uint32_t val; + } cpuid1; + asm volatile("cpuid" : "=a" (cpuid1.val) : "a" (1)); + if ((cpuid1.family + cpuid1.extFamily) == 23 && (cpuid1.model + cpuid1.extModel * 16) >= 49) { + sprintf(cpu, "AMD/Rome (or later)"); + interCpuWidth = ROME_QPI_WIDTH; + cpuPciWidth = ROME_CPUPCI_WIDTH; + p2pPciWidth = ROME_PCI_WIDTH; } else { interCpuWidth = QPI_WIDTH; } } #endif - INFO(NCCL_GRAPH, "%s CPU (PCI %d, InterCpu %d)", cpu, cpuPciWidth, interCpuWidth); + INFO(NCCL_GRAPH, "%s CPU (CPU-PCI %d, PCI/P2P %d, InterCpu %d)", cpu, cpuPciWidth, p2pPciWidth, interCpuWidth); return ncclSuccess; } @@ -163,7 +194,8 @@ static ncclResult_t ncclTopoGetCpuPciP2pWidth(int* width) { return ncclSuccess; } static ncclResult_t ncclTopoGetPciWidth(int* width) { - *width = PCI_WIDTH; + NCCLCHECK(getCpuWidths()); + *width = p2pPciWidth; return ncclSuccess; } static ncclResult_t ncclTopoGetNetWidth(int* width) { @@ -226,8 +258,9 @@ ncclResult_t ncclTopoConnectCpu(struct ncclTopoSystem* system, int numaId, struc #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) #define VEGA_XGMI_WIDTH 20 +extern int busIdToCudaDev(int64_t busId); -ncclResult_t ncclTopoConnectXGMI(int num_gpus, struct ncclTopoSystem* system) { +ncclResult_t ncclTopoConnectXGMI(struct ncclComm* comm, struct ncclTopoSystem* system) { struct ncclTopoNode* nvsNode = NULL; int minNvlinks = 2, minWidth = VEGA_XGMI_WIDTH; @@ -237,7 +270,9 @@ ncclResult_t ncclTopoConnectXGMI(int num_gpus, struct ncclTopoSystem* system) { struct ncclTopoNode* gpu1 = system->nodes[GPU].nodes+g1; struct ncclTopoNode* gpu2 = system->nodes[GPU].nodes+g2; uint32_t link_type, hops; - if (hipExtGetLinkTypeAndHopCount(gpu1->rank, gpu2->rank, &link_type, &hops) == hipSuccess) { + int cudaDev1 = busIdToCudaDev(comm->peerInfo[gpu1->rank].busId); + int cudaDev2 = busIdToCudaDev(comm->peerInfo[gpu2->rank].busId); + if (hipExtGetLinkTypeAndHopCount(cudaDev1, cudaDev2, &link_type, &hops) == hipSuccess) { if (link_type == HSA_AMD_LINK_INFO_TYPE_XGMI && hops == 1) { NCCLCHECK(ncclTopoConnectNodes(gpu1, gpu2, LINK_NVL, minWidth)); } @@ -613,7 +648,7 @@ ncclResult_t ncclTopoGetSystem(struct ncclComm* comm, struct ncclTopoSystem** sy } #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) - NCCLCHECK(ncclTopoConnectXGMI(g, s)); + NCCLCHECK(ncclTopoConnectXGMI(comm, s)); #else NCCLCHECK(ncclTopoConnectNVLink(nvmlDevs, s)); #endif diff --git a/projects/rccl/src/graph/topo.h b/projects/rccl/src/graph/topo.h index 6b8a2f9f8d..0c59c3176b 100644 --- a/projects/rccl/src/graph/topo.h +++ b/projects/rccl/src/graph/topo.h @@ -16,8 +16,13 @@ #define PCI_WIDTH 12 // PCI Gen3 x16 #define QPI_WIDTH 8 #define SKL_QPI_WIDTH 12 +#define SKL_PCI_WIDTH 14 +#define SKL_CPUPCI_WIDTH 10 #define P9_WIDTH 32 #define NET_WIDTH 12 // 100Gbit +#define ROME_QPI_WIDTH 12 +#define ROME_PCI_WIDTH 22 +#define ROME_CPUPCI_WIDTH 16 // Intel CPU convert GPU P2P traffic into 64B PCI TLPs, to GPU // to GPU traffic consumed more PCI bandwidth. diff --git a/projects/rccl/src/graph/tuning.cc b/projects/rccl/src/graph/tuning.cc index 15e4f9db19..5a8f201804 100644 --- a/projects/rccl/src/graph/tuning.cc +++ b/projects/rccl/src/graph/tuning.cc @@ -58,7 +58,7 @@ static const char* ncclProtoStr[] = { "LL", "LL128", "Simple" }; // Latencies in us, Bandwidths in GB/s // Tree { LL, LL128, Simple } , Ring { LL, LL128, Simple } -static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4, 4.4, 0 }, { 3.6, 3.6, 8.4 } }; +static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 37.9, 37.9, 40.4 }, { 20.5, 20.5, 27.9 } }; // NVLink, PCI, Network #define NCCL_HW_NVLINK 0 @@ -67,11 +67,11 @@ static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4, // Tree/Simple is the latency a 256kB chunk, which is ~ base lat + 256k/12GB/s (+ 256k/12GB/s for the network). static const float hwLat [3][NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { /* NVLINK */ - { /* Tree (LL/LL128/Simple)*/ { .5, 1.9, 28 }, /* Ring (LL/LL128/Simple)*/ { .4, 2.5, 5.7 } }, + { /* Tree (LL/LL128/Simple)*/ { 1.2, 1.2, 3.8 }, /* Ring (LL/LL128/Simple)*/ { 2.3, 2.3, 2.7 } }, /* PCI */ - { /* Tree (LL/LL128/Simple)*/ { 1.0, 1.9, 28 }, /* Ring (LL/LL128/Simple)*/ { 1.0, 2.5, 5.7 } }, + { /* Tree (LL/LL128/Simple)*/ { 2.2, 2.2, 5.7 }, /* Ring (LL/LL128/Simple)*/ { 1.3, 1.3, 1.9 } }, /* NET */ - { /* Tree (LL/LL128/Simple)*/ { 5.0, 7.5, 50 }, /* Ring (LL/LL128/Simple)*/ { .9, 2.5, 6.6 } } + { /* Tree (LL/LL128/Simple)*/ { 9.8, 9.8, 19.5 }, /* Ring (LL/LL128/Simple)*/ { 2.0, 2.0, 4.5 } } }; // LL128 max BW for the different collectives @@ -102,13 +102,13 @@ ncclResult_t ncclSetThresholds(struct ncclComm* comm, int minCompCap, int maxCom for (int p=0; pnNodes <= 2 ? graphs[a]->speedIntra : graphs[a]->speedInter; - float busBw = graphs[a]->nChannels * speed * 1.0; + float busBw = graphs[a]->nChannels * speed * 0.6; // Various model refinements - if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL) busBw *= 1.0/4.0; + if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL) busBw *= 1.0/5.0; if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL128) busBw = std::min(busBw*120.0/128.0, ll128MaxBw[coll]); - if (a == NCCL_ALGO_TREE) busBw = std::min(busBw*.9, comm->nNodes > 1 ? 70.0 : 90.0); - if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL) busBw *= 1.0/3.0; + if (a == NCCL_ALGO_TREE) busBw = std::min(busBw*.27, comm->nNodes > 1 ? 70.0 : 90.0); + if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL) busBw *= 1.0/2.3; if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL128) busBw *= 7.0/9.0; // Convert bus BW to algorithm BW diff --git a/projects/rccl/src/include/collectives.h b/projects/rccl/src/include/collectives.h index 589987d79e..ddefef2062 100644 --- a/projects/rccl/src/include/collectives.h +++ b/projects/rccl/src/include/collectives.h @@ -23,7 +23,7 @@ /* Declare all collective operations */ #define DECL_COLL5(coll, op, dtype) \ extern __device__ __attribute__((noinline)) void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args); \ - extern __global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl c); \ + extern __global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclDevComm* comm); \ #define DECL_COLL4(coll, op, dtype) \ DECL_COLL5(coll, op, dtype) \ diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index e81858e0f6..abc3fa10fa 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -134,8 +134,8 @@ struct ncclComm { int* intraCudaDevs; int* intraCGMode; // Whether we can use CUDA9 CGMD or not int* intraCC; // Only to check all have the same ComputeCap and disable CGMode if not - struct ncclColl args; - struct ncclColl* argsptr; + struct ncclDevComm* args; + struct ncclDevComm** argsptr; // Global proxy thread pthread_t proxyThread; diff --git a/projects/rccl/src/include/devcomm.h b/projects/rccl/src/include/devcomm.h index d861ab3a1a..ea7e142a6d 100644 --- a/projects/rccl/src/include/devcomm.h +++ b/projects/rccl/src/include/devcomm.h @@ -207,7 +207,6 @@ struct ncclChannel { int collFifoHead; // Only used by GPU int collFifoTail; // Only used by CPU - uint32_t* abortCount; uint32_t* sync; }; int data[0x80]; @@ -255,6 +254,27 @@ struct ncclProf { }; #endif +#ifdef ENABLE_COLLTRACE +typedef enum { + ncclCollTraceKernelLaunchType, + ncclCollTraceCollEndType, + ncclCollTraceAbortType +} ncclCollTraceDataType_t; + +struct ncclCollTrace { + uint8_t type; + uint8_t bid; + int16_t funcIndex; + uint32_t data_0; + uint64_t timeStamp; + uint64_t opCount; + uint64_t data_1; +}; +static_assert(sizeof(struct ncclCollTrace) == 8*sizeof(int), "ncclCollTrace must have a pow2 size"); + +#define COLLTRACE_NUM_ITEMS 1024 +#endif + typedef enum { ncclDevSuccess, ncclDevAssertedMismatch, @@ -276,6 +296,13 @@ struct ncclDevComm { // Profiling counters struct ncclProf* devProf; #endif + +#ifdef ENABLE_COLLTRACE + struct ncclCollTrace* collTrace; + uint32_t collTraceHead, *collTraceTail; + pthread_t collTraceThread; + bool collTraceExit; +#endif }; #endif diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index 11dab7b03a..5f724886a9 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -131,6 +131,54 @@ void __attribute__((optimize("O0"))) commPoison(ncclComm_t comm) { comm->rank = comm->cudaDev = comm->busId = comm->nRanks = -1; } +#ifdef ENABLE_COLLTRACE +void *ncclCommThreadMain(void *arg) { + ncclComm_t comm = (ncclComm_t)arg; + do { + int tail = LOAD(comm->hostDevComm.collTraceTail)%COLLTRACE_NUM_ITEMS; + int head = comm->hostDevComm.collTraceHead; + int count; + if (head <= tail) + count = tail - head; + else + count = COLLTRACE_NUM_ITEMS + head - tail; + usleep(1000); //sleep 1ms + for (int i = 0; i < count; i++) { + char line[1024]; + int offset = 0; + #define VEGA_GPU_RTC_FREQUENCY 2.5E7 + sprintf(line, "## [%12.6f] [%02d:%02d] %06lx", + (double)(comm->hostDevComm.collTrace[head].timeStamp)/VEGA_GPU_RTC_FREQUENCY, comm->rank, comm->hostDevComm.collTrace[head].bid, comm->hostDevComm.collTrace[head].opCount); + offset = strlen(line); + switch (comm->hostDevComm.collTrace[head].type) { + case ncclCollTraceKernelLaunchType: + sprintf(line+offset, " KL hwid %8x funcIndex %d", + comm->hostDevComm.collTrace[head].data_0, comm->hostDevComm.collTrace[head].funcIndex); + break; + case ncclCollTraceCollEndType: + if (comm->hostDevComm.collTrace[head].funcIndex != -1) + sprintf(line+offset, " CE next funcIndex %d", + comm->hostDevComm.collTrace[head].funcIndex); + else + sprintf(line+offset, " KE"); + break; + case ncclCollTraceAbortType: + sprintf(line+offset, " Abort"); + break; + default: + sprintf(line+offset, " unknown collective trace data type"); + break; + } + INFO(NCCL_COLL, "%s", line); + head ++; + head %= COLLTRACE_NUM_ITEMS; + } + comm->hostDevComm.collTraceHead = tail; + } while(!LOAD(&comm->hostDevComm.collTraceExit)); + pthread_exit(NULL); +} +#endif + static ncclResult_t commFree(ncclComm_t comm) { if (comm == NULL) return ncclSuccess; @@ -164,6 +212,13 @@ static ncclResult_t commFree(ncclComm_t comm) { CUDACHECK(hipFree(comm->hostDevComm.devProf)); #endif +#ifdef ENABLE_COLLTRACE + STORE(&comm->hostDevComm.collTraceExit, 1); + if (comm->hostDevComm.collTraceThread) pthread_join(comm->hostDevComm.collTraceThread, NULL); + CUDACHECK(hipHostFree((void *)comm->hostDevComm.collTrace)); + CUDACHECK(hipHostFree((void *)comm->hostDevComm.collTraceTail)); +#endif + free(comm->peerInfo); ncclTopoFree(comm->topo); @@ -248,6 +303,17 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { NCCLCHECK(ncclCudaCalloc(&comm->hostDevComm.devProf, 1)); #endif +#ifdef ENABLE_COLLTRACE + CUDACHECK(hipHostMalloc((void**) &comm->hostDevComm.collTraceTail, sizeof(uint32_t), hipHostMallocMapped)); + CUDACHECK(hipHostMalloc((void**) &comm->hostDevComm.collTrace, sizeof(struct ncclCollTrace) * COLLTRACE_NUM_ITEMS, hipHostMallocMapped)); + memset(comm->hostDevComm.collTrace, 0, sizeof(struct ncclCollTrace) * COLLTRACE_NUM_ITEMS); + comm->hostDevComm.collTraceExit = comm->hostDevComm.collTraceHead = *comm->hostDevComm.collTraceTail = 0; + if ((ncclDebugLevel >= NCCL_LOG_INFO) && (ncclDebugMask & NCCL_COLL)) + pthread_create(&comm->hostDevComm.collTraceThread, NULL, ncclCommThreadMain, (void *)comm); + else + comm->hostDevComm.collTraceThread = 0; +#endif + *comret = comm; return ncclSuccess; } diff --git a/projects/rccl/src/transport/net.cc b/projects/rccl/src/transport/net.cc index f30ca9e411..9cba5e114d 100644 --- a/projects/rccl/src/transport/net.cc +++ b/projects/rccl/src/transport/net.cc @@ -64,11 +64,15 @@ static ncclResult_t netGetGdrSupport(struct ncclTopoSystem* topo, int64_t busId, if (read) { // For reads (sends) only enable under certain conditions int gdrReadParam = ncclParamNetGdrRead(); if (gdrReadParam == 0) return ncclSuccess; +#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) + return ncclSuccess; +#else if (gdrReadParam < 0) { int nvlink; NCCLCHECK(ncclTopoHasNvlink(topo, busId, &nvlink)); if (!nvlink) return ncclSuccess; } +#endif } // Check if we are close enough that it makes sense to enable GDR @@ -234,7 +238,7 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) { struct netSendResources* resources = (struct netSendResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { // Update opCount - resources->hostRecvMem->opCount = args->opCount; + STORE(&resources->hostRecvMem->opCount, args->opCount); // Round to next multiple of sliceSteps resources->step = ROUNDUP(resources->step, args->chunkSteps); @@ -251,9 +255,9 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) { volatile uint64_t* recvTail = &resources->hostRecvMem->tail; if (args->protocol == NCCL_PROTO_LL128) { int stepSize = NCCL_LL128_BUFF_SIZE/NCCL_STEPS; - if (args->tail < *recvTail) { + if (args->tail < LOAD(recvTail)) { int buffSlot = args->tail%NCCL_STEPS; - if (sizesFifo[buffSlot] != -1) { + if (LOAD(sizesFifo+buffSlot) != -1) { struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem; char* localBuff = (char*)localMem->ll128Buff; int ready = resources->useGdr; @@ -261,18 +265,18 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) { // When data is in sysmem, we need to wait until all flags are correct since the GPU only // called threadfence() uint64_t flag = args->tail + 1; - int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS); + int nFifoLines = DIVUP(LOAD(sizesFifo+buffSlot), sizeof(uint64_t)*NCCL_LL128_LINEELEMS); volatile uint64_t* lines = (volatile uint64_t*)(localBuff+buffSlot*stepSize); ready = 1; for (int i=0; inetSendComm, localBuff+buffSlot*stepSize, sizesFifo[buffSlot], resources->ll128Mhandle, args->requests+buffSlot)); + NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, LOAD(sizesFifo+buffSlot), resources->ll128Mhandle, args->requests+buffSlot)); if (args->requests[buffSlot] != NULL) { - sizesFifo[buffSlot] = -1; + STORE(sizesFifo+buffSlot, -1); // Make sure size is reset to zero before we update the head. __sync_synchronize(); args->tail += args->sliceSteps; @@ -311,7 +315,7 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) { struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem; // Send through network int buffSlot = args->tail%NCCL_STEPS; - if (sizesFifo[buffSlot] != -1) { + if (LOAD(sizesFifo+buffSlot) != -1) { NCCLCHECK(ncclNetIsend(resources->netSendComm, localMem->buff+buffSlot*stepSize, LOAD(sizesFifo+buffSlot), resources->mhandle, args->requests+buffSlot)); if (args->requests[buffSlot] != NULL) { STORE(sizesFifo+buffSlot, -1); @@ -347,7 +351,7 @@ ncclResult_t netRecvProxy(struct ncclProxyArgs* args) { struct netRecvResources* resources = (struct netRecvResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { // Update opCount - resources->hostSendMem->opCount = args->opCount; + STORE(&resources->hostSendMem->opCount, args->opCount); // Round to next multiple of sliceSteps resources->step = ROUNDUP(resources->step, args->chunkSteps); diff --git a/projects/rccl/src/transport/p2p.cc b/projects/rccl/src/transport/p2p.cc index 9c97f721b7..b5a09c711c 100644 --- a/projects/rccl/src/transport/p2p.cc +++ b/projects/rccl/src/transport/p2p.cc @@ -51,7 +51,7 @@ NCCL_PARAM(P2pLevel, "P2P_LEVEL", -2); NCCL_PARAM(P2pDisable, "P2P_DISABLE", -2); /* Convert a PCI busId string into a local cudaDev device index (cf. CUDA_VISIBLE_DEVICES) */ -static int busIdToCudaDev(int64_t busId) { +int busIdToCudaDev(int64_t busId) { int ndev; if (hipGetDeviceCount(&ndev) != hipSuccess) return -1; @@ -71,10 +71,14 @@ static int busIdToCudaDev(int64_t busId) { ncclResult_t p2pCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) { int cpuCount; NCCLCHECK(ncclTopoCpuCount(topo, &cpuCount)); +#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) + int p2pLevel = PATH_ARRAY_SIZE; +#else // Do not use P2P across sockets by default (provided CUDA permits it). // When we are on a single socket, don't even use P2P through the CPU as // it should be able to sustain two flows to sysmem faster than PCI P2P. int p2pLevel = cpuCount == 1 ? PATH_PHB : PATH_NODE; +#endif if (ncclParamP2pDisable() == 1) p2pLevel = 0; if (ncclParamP2pLevel() != -2) p2pLevel = ncclParamP2pLevel(); diff --git a/projects/rccl/test/test_AllReduceAbort.cpp b/projects/rccl/test/test_AllReduceAbort.cpp index 9400bd84fc..7b329d249d 100644 --- a/projects/rccl/test/test_AllReduceAbort.cpp +++ b/projects/rccl/test/test_AllReduceAbort.cpp @@ -5,7 +5,7 @@ ************************************************************************/ #include "test_AllReduceAbort.hpp" -#include "../include/core.h" +#include "../include/comm.h" #include #define NUM_ITER 8 @@ -41,31 +41,24 @@ namespace CorrectnessTests hipStream_t stream; HIPCHECK(hipStreamCreateWithFlags(&stream, hipStreamNonBlocking)); struct ncclChannel* channel = comm->channels; - struct ncclRing *ring = &channel->ring; - struct ncclConnector* send = &channel->peers[ring->next].send; - size_t op_offset = &(send->conn.opCountRem) - (uint64_t **)channel->peers; - size_t head_offset = &(send->conn.head) - (uint64_t **)channel->peers; - uint64_t **p_dev_opCount = (uint64_t **)(channel->devPeers) + op_offset; - uint64_t **p_dev_head = (uint64_t **)(channel->devPeers) + head_offset; + uint64_t **p_dev_opCount = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.opCountRem)); + uint64_t **p_dev_head = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.head)); uint64_t *real_opCount, *fake_opCount, *fake_o; uint64_t *real_head, *fake_head, *fake_h; // get original opCount and head - HIPCHECK(hipMemcpyAsync(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipMemcpyAsync(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipStreamSynchronize(stream)); + HIPCHECK(hipMemcpy(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault)); + HIPCHECK(hipMemcpy(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault)); // allocate and install fakes HIPCHECK(hipHostMalloc(&fake_opCount, sizeof(uint64_t*), hipHostMallocMapped)); - HIPCHECK(hipMemcpyAsync(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyHostToDevice, stream)); + HIPCHECK(hipMemcpy(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyDefault)); *fake_opCount = FAKE_OP_COUNT; HIPCHECK(hipHostMalloc(&fake_head, sizeof(uint64_t*), hipHostMallocMapped)); - HIPCHECK(hipMemcpyAsync(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyHostToDevice, stream)); + HIPCHECK(hipMemcpy(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyDefault)); *fake_head = 0; - HIPCHECK(hipStreamSynchronize(stream)); // read back fakes to confirm - HIPCHECK(hipMemcpyAsync(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipMemcpyAsync(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipStreamSynchronize(stream)); + HIPCHECK(hipMemcpy(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault)); + HIPCHECK(hipMemcpy(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault)); //std::cerr << "[ ] replaced gpu " << gpu << " real_opCount = " << real_opCount << " to fake_opCount = " << fake_o << std::endl; //std::cerr << "[ ] replaced gpu " << gpu << " real_head = " << real_head << " to fake_head = " << fake_h << std::endl; diff --git a/projects/rccl/test/test_BroadcastAbort.cpp b/projects/rccl/test/test_BroadcastAbort.cpp index 28596cc52a..e49a872e0b 100644 --- a/projects/rccl/test/test_BroadcastAbort.cpp +++ b/projects/rccl/test/test_BroadcastAbort.cpp @@ -5,7 +5,7 @@ ************************************************************************/ #include "test_BroadcastAbort.hpp" -#include "../include/core.h" +#include "../include/comm.h" #include #define NUM_ITER 8 @@ -42,31 +42,24 @@ namespace CorrectnessTests hipStream_t stream; HIPCHECK(hipStreamCreateWithFlags(&stream, hipStreamNonBlocking)); struct ncclChannel* channel = comm->channels; - struct ncclRing *ring = &channel->ring; - struct ncclConnector* send = &channel->peers[ring->next].send; - size_t op_offset = &(send->conn.opCountRem) - (uint64_t **)channel->peers; - size_t head_offset = &(send->conn.head) - (uint64_t **)channel->peers; - uint64_t **p_dev_opCount = (uint64_t **)(channel->devPeers) + op_offset; - uint64_t **p_dev_head = (uint64_t **)(channel->devPeers) + head_offset; + uint64_t **p_dev_opCount = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.opCountRem)); + uint64_t **p_dev_head = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.head)); uint64_t *real_opCount, *fake_opCount, *fake_o; uint64_t *real_head, *fake_head, *fake_h; // get original opCount and head - HIPCHECK(hipMemcpyAsync(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipMemcpyAsync(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipStreamSynchronize(stream)); + HIPCHECK(hipMemcpy(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault)); + HIPCHECK(hipMemcpy(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault)); // allocate and install fakes HIPCHECK(hipHostMalloc(&fake_opCount, sizeof(uint64_t*), hipHostMallocMapped)); - HIPCHECK(hipMemcpyAsync(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyHostToDevice, stream)); + HIPCHECK(hipMemcpy(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyDefault)); *fake_opCount = FAKE_OP_COUNT; HIPCHECK(hipHostMalloc(&fake_head, sizeof(uint64_t*), hipHostMallocMapped)); - HIPCHECK(hipMemcpyAsync(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyHostToDevice, stream)); + HIPCHECK(hipMemcpy(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyDefault)); *fake_head = 0; - HIPCHECK(hipStreamSynchronize(stream)); // read back fakes to confirm - HIPCHECK(hipMemcpyAsync(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipMemcpyAsync(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream)); - HIPCHECK(hipStreamSynchronize(stream)); + HIPCHECK(hipMemcpy(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault)); + HIPCHECK(hipMemcpy(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault)); //std::cerr << "[ ] replaced gpu " << gpu << " real_opCount = " << real_opCount << " to fake_opCount = " << fake_o << std::endl; //std::cerr << "[ ] replaced gpu " << gpu << " real_head = " << real_head << " to fake_head = " << fake_h << std::endl; diff --git a/projects/rccl/tools/rccl-prim-test/rccl_prim_test.cpp b/projects/rccl/tools/rccl-prim-test/rccl_prim_test.cpp index 49d98394b9..87e1df00e9 100644 --- a/projects/rccl/tools/rccl-prim-test/rccl_prim_test.cpp +++ b/projects/rccl/tools/rccl-prim-test/rccl_prim_test.cpp @@ -34,7 +34,7 @@ THE SOFTWARE. #include "copy_kernel.h" #define MAX_GPU 8 -#define MAX_WORKGROUPS 16 +#define MAX_WORKGROUPS 32 #define THREADS 256 #define COPY_UNROLL 4 @@ -56,6 +56,16 @@ THE SOFTWARE. #define RTC_CLOCK_FREQ_MI100 2.5E07 #define RTC_CLOCK_FREQ_DEFAULT 2.7E07 +__device__ +inline __attribute((always_inline)) +long long int __rtc64() { +#if __HIP__ + return (long long int) __builtin_amdgcn_s_memrealtime(); +#else + return (long long int) __clock_u64(); +#endif +} + struct transfer_data_t { float *dest0[MAX_WORKGROUPS]; //remote fine grain float *src0[MAX_WORKGROUPS]; //local fine grain @@ -116,7 +126,7 @@ __global__ void flag_sync_kernel(struct transfer_data_t* transfer_data, struct p __syncthreads(); if (idx == 0) { - curr_time = clock64(); + curr_time = __rtc64(); } if (op == OP_COPY) Copy(transfer_data->dest0[bid], transfer_data->src0[bid], n); @@ -129,7 +139,7 @@ __global__ void flag_sync_kernel(struct transfer_data_t* transfer_data, struct p if (op == OP_READ) Copy(transfer_data->src0[bid],transfer_data->dest0[bid], n); __syncthreads(); if (idx == 0) { - next_time = clock64(); + next_time = __rtc64(); __atomic_fetch_add(&(profiling_data->write_cycles[bid]), next_time - curr_time, __ATOMIC_SEQ_CST); __atomic_fetch_add(&(profiling_data->bytes_transferred[bid]), n * sizeof(float), __ATOMIC_SEQ_CST); }