diff --git a/src/device/common_kernel.h b/src/device/common_kernel.h index 4161bb6b10..4e198e0f7e 100644 --- a/src/device/common_kernel.h +++ b/src/device/common_kernel.h @@ -23,7 +23,7 @@ inline __device__ int min(int a, ssize_t b) { return (a < b) ? a : b; } inline __device__ int loadInt(int* ptr) { int v; - v = atomicAdd((unsigned long long *)ptr, 0); + v = __atomic_load_n(ptr, __ATOMIC_RELAXED); return v; } diff --git a/src/device/op128.h b/src/device/op128.h index 2bf7d07396..72f90d0c14 100644 --- a/src/device/op128.h +++ b/src/device/op128.h @@ -199,8 +199,7 @@ template<> __device__ __forceinline__ void st_global<0>(uintptr_t addr, BytePack } \ template<> \ __device__ __forceinline__ void st_##space(addr_cxx_ty addr, BytePack value) { \ - data_cxx_ty tmp = value.native; \ - *((data_cxx_ty *)addr) = tmp; \ + __builtin_nontemporal_store(value.native, (data_cxx_ty *)addr); \ } // #if __CUDA_ARCH__ >= 700 diff --git a/src/device/primitives.h b/src/device/primitives.h index c9f8245357..0ecd48ff42 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -16,37 +16,42 @@ #define NCCL_SPINS_BEFORE_CHECK_ABORT 1000000 #if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) -#define barrier_by_group() do { \ - if (nthreads == NCCL_MAX_NTHREADS) { \ - __builtin_amdgcn_s_barrier(); \ - } else { \ - const int w = threadIdx.x/WARP_SIZE; \ - const int wid = threadIdx.x%WARP_SIZE; \ - if (wid == 0) { \ - barrier_next[w] += nthreads/WARP_SIZE; \ - atomicAdd((unsigned long long *)barriers, 1); \ - while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) __builtin_amdgcn_s_sleep(1); \ - __asm__ __volatile__("s_wakeup"); \ - } \ - } \ -} while (0) +#define __THREAD_FENCE __threadfence_block() #else +#define __THREAD_FENCE __threadfence() +#endif + #define barrier_by_group() do { \ if (nthreads == NCCL_MAX_NTHREADS) { \ - __threadfence(); __builtin_amdgcn_s_barrier(); \ + __THREAD_FENCE; __builtin_amdgcn_s_barrier(); \ } else { \ const int w = threadIdx.x/WARP_SIZE; \ const int wid = threadIdx.x%WARP_SIZE; \ - __threadfence(); \ if (wid == 0) { \ barrier_next[w] += nthreads/WARP_SIZE; \ - atomicAdd((unsigned long long *)barriers, 1); \ - while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) __builtin_amdgcn_s_sleep(1); \ + __hip_atomic_fetch_add(barriers, 1, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ + int spins = 0; \ + int rate_limit = 50; \ + __THREAD_FENCE; \ + while (__hip_atomic_load(barriers, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next[w]) { \ + spins++; \ + if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ + if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ + ncclShmem.aborted = 1; \ + break; \ + } \ + spins = 0; \ + } \ + if (spins == 0 && rate_limit > 0) { \ + rate_limit --; \ + traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP), barrier_next[w]); \ + } \ + __builtin_amdgcn_s_sleep(1); \ + } \ __asm__ __volatile__("s_wakeup"); \ } \ } \ } while (0) -#endif /* Protocol classes: ProtoSimple, ProtoLL, ProtoLL128 * We use these as template args to the Primtiives class instead of integral diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index e76c28a9fc..1887ea154b 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -43,7 +43,7 @@ class Primitives< Fan fan; int index; // Peer index I'm responsible for int flags; - int group; + const int group; uint64_t step; struct ncclConnFifo* connFifo = NULL; T* connEltsFifo; @@ -55,6 +55,7 @@ class Primitives< uint32_t* next_hdp_reg; uint64_t* barriers; uint64_t* barrier_next; + int repeat; #if defined(ENABLE_NPKIT) public: @@ -113,12 +114,16 @@ private: if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) || ((flags & (Send*RoleWaitSend)) && !noSendWait)) { int spins = 0; + repeat = 50; while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) { __builtin_amdgcn_s_sleep(1); connStepCache = loadStepValue(connStepPtr); if (checkAbort(spins)) break; //if (spins == 0) printf("r=%d b=%d t=%d SPUN OUT got=%d want=%d\n", ncclShmem.comm.rank, blockIdx.x, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice)); - if (spins == 0) traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice)); + if (spins == 0 && repeat > 0) { + repeat --; + traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice)); + } } __asm__ __volatile__("s_wakeup"); } diff --git a/src/enqueue.cc b/src/enqueue.cc index 9b52608992..f622309be0 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -30,7 +30,7 @@ struct ncclKernelMatch { }; #ifdef ENABLE_COLLTRACE -#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceThread ? 2 : 0)) +#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceEnabled ? 2 : 0)) static ncclKernelMatch const ncclKerns[4] = { {(void *)ncclDevKernel_Generic, true}, {(void *)ncclDevKernel_Generic_4, true}, diff --git a/src/include/comm.h b/src/include/comm.h index 48f83427b9..94b8b4457e 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -580,6 +580,7 @@ struct ncclComm { union ncclCollTraceTail *collTraceTail; pthread_t collTraceThread; volatile bool collTraceExit; + bool collTraceEnabled; #endif ncclConfig_t config; diff --git a/src/init.cc b/src/init.cc index e4e22c1e27..586102c291 100644 --- a/src/init.cc +++ b/src/init.cc @@ -123,7 +123,7 @@ static constexpr int64_t defaultEnableMscclpp = 0; RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp); // GDRCOPY support: Off by default -NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0); +NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 1); // GDRCOPY support gdr_t ncclGdrCopy = NULL; @@ -217,6 +217,7 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) { } RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0); +RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0); #ifdef ENABLE_COLLTRACE // Should be in sync with 'ALL_COLLS' in Generator.cmake @@ -230,16 +231,14 @@ void *ncclCommThreadMain(void *arg) { do { int numActiveChans = MAXCHANNELS; for (int channel = 0; channel < MAXCHANNELS; channel++) { - int tail = comm->collTraceTail[channel].tail%COLLTRACE_NUM_ITEMS; + int tail = comm->collTraceTail[channel].tail; int count; - if (head[channel] <= tail) - count = tail - head[channel]; - else - count = COLLTRACE_NUM_ITEMS + head[channel] - tail; + count = tail - head[channel]; if (count == 0) { numActiveChans--; continue; } + count = count%COLLTRACE_NUM_ITEMS; for (int i = 0; i < count; i++) { volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel]; uint8_t type = td->type; @@ -292,14 +291,16 @@ void *ncclCommThreadMain(void *arg) { INFO(NCCL_COLL, "%s", line); td->type = ncclCollTraceNotReady; head[channel] ++; - head[channel] %= COLLTRACE_NUM_ITEMS; } } if (comm->collTraceExit && numActiveChans == 0) break; usleep(1000); //sleep 1ms } while(true); - pthread_exit(NULL); + if (comm->collTraceThread) + pthread_exit(NULL); + else + return 0; } #endif @@ -395,7 +396,12 @@ static ncclResult_t commFree(ncclComm_t comm) { #ifdef ENABLE_COLLTRACE comm->collTraceExit = 1; - if (comm->collTraceThread) pthread_join(comm->collTraceThread, NULL); + if (comm->collTraceEnabled) { + if (comm->collTraceThread) + pthread_join(comm->collTraceThread, NULL); + else + ncclCommThreadMain((void *)comm); + } NCCLCHECK(ncclCudaHostFree((void *)comm->collTrace)); NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail)); #endif @@ -585,10 +591,14 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS)); NCCLCHECK(ncclCudaHostCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS)); comm->collTraceExit = 0; - if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) - pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm); - else - comm->collTraceThread = 0; + comm->collTraceEnabled = false; // we can enable colltrace without starting a thread + if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) { + comm->collTraceEnabled = true; + if (rcclParamKernelCollTraceThreadEnable()) + pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm); + else + comm->collTraceThread = 0; + } #endif comm->collNetSupport = 0; memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix)); diff --git a/src/transport/net.cc b/src/transport/net.cc index 899a8e03ac..3b8f04fcf6 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -250,7 +250,11 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, netId, 0, &req.useGdr)); // Determine whether we need to flush the GDR buffer on recv or not - if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush)); + if (req.useGdr) { + NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush)); + CUDACHECK(hipDeviceGetAttribute((int*)&req.curr_hdp_reg, hipDeviceAttributeHdpMemFlushCntl, myInfo->cudaDev)); + recv->conn.curr_hdp_reg = req.curr_hdp_reg; + } // We don't support PXN on receive yet tpProxyRank = comm->topParentRanks[myInfo->rank]; @@ -667,6 +671,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc resources->needFlush = req->needFlush; resources->channelId = req->channelId; resources->connIndex = req->connIndex; + resources->curr_hdp_reg = req->curr_hdp_reg; ncclNetProperties_t props; NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props)); /* DMA-BUF support */ @@ -1175,7 +1180,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct int sharedBuffSlot = sub->posted%maxDepth; int offset; NCCLCHECK(sharedBuffersGet(proxyState, sub->channelId, sharedBuffSlot*args->nsubs+s, &offset, NULL)); - resources->recvMem->connFifo[buffSlot].offset = offset; + __atomic_store_n(&resources->recvMem->connFifo[buffSlot].offset, offset, __ATOMIC_RELAXED); __sync_synchronize(); } volatile uint64_t* sendHead = resources->gdcSync ? resources->gdcSync : &resources->sendMem->head; @@ -1362,6 +1367,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct return ncclSuccess; } +RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 0); + static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) g_npkit_net_poll_cnt++; @@ -1441,7 +1448,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct int sharedBuffSlot = sub->posted%maxDepth; int offset; NCCLCHECK(sharedBuffersGet(proxyState, sub->channelId, sharedBuffSlot*args->nsubs+s+i, &offset, sizes+subCount)); - connFifo[buffSlot].offset = offset; + __atomic_store_n(&connFifo[buffSlot].offset, offset, __ATOMIC_RELAXED); ptrs[subCount] = localBuff+offset; } } else { @@ -1550,16 +1557,31 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) { // GDRCOPY support struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); + if (rcclParamNetHdpFlush() && resources->curr_hdp_reg) { + static bool once = true; + *resources->curr_hdp_reg = 0x1; + __sync_synchronize(); + if (once) { + once = false; + INFO(NCCL_INIT, "%s: flushed HDP %p", __func__, resources->curr_hdp_reg); + } + } if (resources->gdcFlush) { #if defined (__x86_64__) // Force a PCI-E read from GPU memory + static bool once = true; asm volatile ("mov (%0), %%eax" :: "l"(resources->gdcFlush) : "%eax"); + if (once) { + once = false; + INFO(NCCL_INIT, "%s: issued GDC flush", __func__); + } #else WARN("NET: GDR Flush only supported on x86_64"); return ncclInternalError; #endif } else { int subCount = 0; + static bool once = true; for (int i=0; igroupSize; i++) { struct ncclProxySubArgs* sub = subGroup + i; if (step < sub->nsteps) { @@ -1576,6 +1598,10 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct } struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS))); + if (once) { + once = false; + INFO(NCCL_INIT, "%s: issued GDR flush", __func__); + } } } args->idle = 0;