Add HDP flush for gfx940 (#1434)

* Fix collective trace

* Use nontemporal for st_global

* Fix previous commit

* Add HDP flush to data receive path

* Fix previous commit

* Control flushing by NCCL_NET_FORCE_FLUSH and RCCL_NET_HDP_FLUSH

* Introduce RCCL_NET_HDP_FLUSH and RCCL_NET_GDR_FLUSH

Both are on by default. Turn both off will skip all flush will likely
result in data error.

* Enable GDR copy by default

* Remove GDR flush env var because it is disabled by GDC flush

* Output kernel collective trace at comm destroy by default

* Limit kernel timeout messages to 100

* Use system relaxed atomic for loadInt

* Refine timeout messages and use atomic for setting offset from CPU

* Add kernel trace for barrier timeout

* Add backup barrier to avoid race in atomicAdd

* Use different counters for different warps

* Rework barrier implementation

* Fix for other GFX

* Use __hip_atomic_store and __hip_atomic_load

* Fix bug in previous commit

* Don't reset barrier values in running kernel

* Update trace format

* Fix typo

* Switch back to hip_atomic_fetch_add

* Use same barrier implementation for all GFX

* Remove extra threadfence

* Turn off HDP flush by default

Please use RCCL_NET_HDP_FLUSH=1 to switch on HDP flush

* Remove unnecessary changes from alterative barrier implementation

* Added back __threadfence_block

* Revert back to threadfence for gfx other than gfx94x

[ROCm/rccl commit: caba0bc049]
Bu işleme şunda yer alıyor:
Wenkai Du
2025-01-31 07:51:10 -08:00
işlemeyi yapan: GitHub
ebeveyn ffe6030ee6
işleme f94af0c9ba
8 değiştirilmiş dosya ile 87 ekleme ve 41 silme
+1 -1
Dosyayı Görüntüle
@@ -23,7 +23,7 @@ inline __device__ int min(int a, ssize_t b) { return (a < b) ? a : b; }
inline __device__ int loadInt(int* ptr) {
int v;
v = atomicAdd((unsigned long long *)ptr, 0);
v = __atomic_load_n(ptr, __ATOMIC_RELAXED);
return v;
}
+1 -2
Dosyayı Görüntüle
@@ -199,8 +199,7 @@ template<> __device__ __forceinline__ void st_global<0>(uintptr_t addr, BytePack
} \
template<> \
__device__ __forceinline__ void st_##space<bytes>(addr_cxx_ty addr, BytePack<bytes> value) { \
data_cxx_ty tmp = value.native; \
*((data_cxx_ty *)addr) = tmp; \
__builtin_nontemporal_store(value.native, (data_cxx_ty *)addr); \
}
// #if __CUDA_ARCH__ >= 700
+24 -19
Dosyayı Görüntüle
@@ -16,37 +16,42 @@
#define NCCL_SPINS_BEFORE_CHECK_ABORT 1000000
#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__)
#define barrier_by_group() do { \
if (nthreads == NCCL_MAX_NTHREADS) { \
__builtin_amdgcn_s_barrier(); \
} else { \
const int w = threadIdx.x/WARP_SIZE; \
const int wid = threadIdx.x%WARP_SIZE; \
if (wid == 0) { \
barrier_next[w] += nthreads/WARP_SIZE; \
atomicAdd((unsigned long long *)barriers, 1); \
while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) __builtin_amdgcn_s_sleep(1); \
__asm__ __volatile__("s_wakeup"); \
} \
} \
} while (0)
#define __THREAD_FENCE __threadfence_block()
#else
#define __THREAD_FENCE __threadfence()
#endif
#define barrier_by_group() do { \
if (nthreads == NCCL_MAX_NTHREADS) { \
__threadfence(); __builtin_amdgcn_s_barrier(); \
__THREAD_FENCE; __builtin_amdgcn_s_barrier(); \
} else { \
const int w = threadIdx.x/WARP_SIZE; \
const int wid = threadIdx.x%WARP_SIZE; \
__threadfence(); \
if (wid == 0) { \
barrier_next[w] += nthreads/WARP_SIZE; \
atomicAdd((unsigned long long *)barriers, 1); \
while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) __builtin_amdgcn_s_sleep(1); \
__hip_atomic_fetch_add(barriers, 1, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
int spins = 0; \
int rate_limit = 50; \
__THREAD_FENCE; \
while (__hip_atomic_load(barriers, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next[w]) { \
spins++; \
if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \
if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \
ncclShmem.aborted = 1; \
break; \
} \
spins = 0; \
} \
if (spins == 0 && rate_limit > 0) { \
rate_limit --; \
traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP), barrier_next[w]); \
} \
__builtin_amdgcn_s_sleep(1); \
} \
__asm__ __volatile__("s_wakeup"); \
} \
} \
} while (0)
#endif
/* Protocol classes: ProtoSimple, ProtoLL, ProtoLL128
* We use these as template args to the Primtiives class instead of integral
+7 -2
Dosyayı Görüntüle
@@ -43,7 +43,7 @@ class Primitives<
Fan fan;
int index; // Peer index I'm responsible for
int flags;
int group;
const int group;
uint64_t step;
struct ncclConnFifo* connFifo = NULL;
T* connEltsFifo;
@@ -55,6 +55,7 @@ class Primitives<
uint32_t* next_hdp_reg;
uint64_t* barriers;
uint64_t* barrier_next;
int repeat;
#if defined(ENABLE_NPKIT)
public:
@@ -113,12 +114,16 @@ private:
if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||
((flags & (Send*RoleWaitSend)) && !noSendWait)) {
int spins = 0;
repeat = 50;
while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {
__builtin_amdgcn_s_sleep(1);
connStepCache = loadStepValue(connStepPtr);
if (checkAbort(spins)) break;
//if (spins == 0) printf("r=%d b=%d t=%d SPUN OUT got=%d want=%d\n", ncclShmem.comm.rank, blockIdx.x, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
if (spins == 0) traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
if (spins == 0 && repeat > 0) {
repeat --;
traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
}
}
__asm__ __volatile__("s_wakeup");
}
+1 -1
Dosyayı Görüntüle
@@ -30,7 +30,7 @@ struct ncclKernelMatch {
};
#ifdef ENABLE_COLLTRACE
#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceThread ? 2 : 0))
#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceEnabled ? 2 : 0))
static ncclKernelMatch const ncclKerns[4] = {
{(void *)ncclDevKernel_Generic, true},
{(void *)ncclDevKernel_Generic_4, true},
+1
Dosyayı Görüntüle
@@ -580,6 +580,7 @@ struct ncclComm {
union ncclCollTraceTail *collTraceTail;
pthread_t collTraceThread;
volatile bool collTraceExit;
bool collTraceEnabled;
#endif
ncclConfig_t config;
+23 -13
Dosyayı Görüntüle
@@ -123,7 +123,7 @@ static constexpr int64_t defaultEnableMscclpp = 0;
RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp);
// GDRCOPY support: Off by default
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0);
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 1);
// GDRCOPY support
gdr_t ncclGdrCopy = NULL;
@@ -217,6 +217,7 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) {
}
RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0);
RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0);
#ifdef ENABLE_COLLTRACE
// Should be in sync with 'ALL_COLLS' in Generator.cmake
@@ -230,16 +231,14 @@ void *ncclCommThreadMain(void *arg) {
do {
int numActiveChans = MAXCHANNELS;
for (int channel = 0; channel < MAXCHANNELS; channel++) {
int tail = comm->collTraceTail[channel].tail%COLLTRACE_NUM_ITEMS;
int tail = comm->collTraceTail[channel].tail;
int count;
if (head[channel] <= tail)
count = tail - head[channel];
else
count = COLLTRACE_NUM_ITEMS + head[channel] - tail;
count = tail - head[channel];
if (count == 0) {
numActiveChans--;
continue;
}
count = count%COLLTRACE_NUM_ITEMS;
for (int i = 0; i < count; i++) {
volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel];
uint8_t type = td->type;
@@ -292,14 +291,16 @@ void *ncclCommThreadMain(void *arg) {
INFO(NCCL_COLL, "%s", line);
td->type = ncclCollTraceNotReady;
head[channel] ++;
head[channel] %= COLLTRACE_NUM_ITEMS;
}
}
if (comm->collTraceExit && numActiveChans == 0)
break;
usleep(1000); //sleep 1ms
} while(true);
pthread_exit(NULL);
if (comm->collTraceThread)
pthread_exit(NULL);
else
return 0;
}
#endif
@@ -395,7 +396,12 @@ static ncclResult_t commFree(ncclComm_t comm) {
#ifdef ENABLE_COLLTRACE
comm->collTraceExit = 1;
if (comm->collTraceThread) pthread_join(comm->collTraceThread, NULL);
if (comm->collTraceEnabled) {
if (comm->collTraceThread)
pthread_join(comm->collTraceThread, NULL);
else
ncclCommThreadMain((void *)comm);
}
NCCLCHECK(ncclCudaHostFree((void *)comm->collTrace));
NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail));
#endif
@@ -585,10 +591,14 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS));
NCCLCHECK(ncclCudaHostCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS));
comm->collTraceExit = 0;
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->collTraceThread = 0;
comm->collTraceEnabled = false; // we can enable colltrace without starting a thread
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) {
comm->collTraceEnabled = true;
if (rcclParamKernelCollTraceThreadEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->collTraceThread = 0;
}
#endif
comm->collNetSupport = 0;
memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));
+29 -3
Dosyayı Görüntüle
@@ -250,7 +250,11 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph
NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, netId, 0, &req.useGdr));
// Determine whether we need to flush the GDR buffer on recv or not
if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush));
if (req.useGdr) {
NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush));
CUDACHECK(hipDeviceGetAttribute((int*)&req.curr_hdp_reg, hipDeviceAttributeHdpMemFlushCntl, myInfo->cudaDev));
recv->conn.curr_hdp_reg = req.curr_hdp_reg;
}
// We don't support PXN on receive yet
tpProxyRank = comm->topParentRanks[myInfo->rank];
@@ -667,6 +671,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc
resources->needFlush = req->needFlush;
resources->channelId = req->channelId;
resources->connIndex = req->connIndex;
resources->curr_hdp_reg = req->curr_hdp_reg;
ncclNetProperties_t props;
NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props));
/* DMA-BUF support */
@@ -1175,7 +1180,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
int sharedBuffSlot = sub->posted%maxDepth;
int offset;
NCCLCHECK(sharedBuffersGet(proxyState, sub->channelId, sharedBuffSlot*args->nsubs+s, &offset, NULL));
resources->recvMem->connFifo[buffSlot].offset = offset;
__atomic_store_n(&resources->recvMem->connFifo[buffSlot].offset, offset, __ATOMIC_RELAXED);
__sync_synchronize();
}
volatile uint64_t* sendHead = resources->gdcSync ? resources->gdcSync : &resources->sendMem->head;
@@ -1362,6 +1367,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
return ncclSuccess;
}
RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 0);
static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
@@ -1441,7 +1448,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
int sharedBuffSlot = sub->posted%maxDepth;
int offset;
NCCLCHECK(sharedBuffersGet(proxyState, sub->channelId, sharedBuffSlot*args->nsubs+s+i, &offset, sizes+subCount));
connFifo[buffSlot].offset = offset;
__atomic_store_n(&connFifo[buffSlot].offset, offset, __ATOMIC_RELAXED);
ptrs[subCount] = localBuff+offset;
}
} else {
@@ -1550,16 +1557,31 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) {
// GDRCOPY support
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
if (rcclParamNetHdpFlush() && resources->curr_hdp_reg) {
static bool once = true;
*resources->curr_hdp_reg = 0x1;
__sync_synchronize();
if (once) {
once = false;
INFO(NCCL_INIT, "%s: flushed HDP %p", __func__, resources->curr_hdp_reg);
}
}
if (resources->gdcFlush) {
#if defined (__x86_64__)
// Force a PCI-E read from GPU memory
static bool once = true;
asm volatile ("mov (%0), %%eax" :: "l"(resources->gdcFlush) : "%eax");
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDC flush", __func__);
}
#else
WARN("NET: GDR Flush only supported on x86_64");
return ncclInternalError;
#endif
} else {
int subCount = 0;
static bool once = true;
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup + i;
if (step < sub->nsteps) {
@@ -1576,6 +1598,10 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
}
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS)));
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDR flush", __func__);
}
}
}
args->idle = 0;