Improve RCCL kernel coll trace (#2061)

Αυτή η υποβολή περιλαμβάνεται σε:
Wenkai Du
2026-01-08 16:07:18 -08:00
υποβλήθηκε από GitHub
γονέας de931f4c53
υποβολή 1d22c87167
4 αρχεία άλλαξαν με 63 προσθήκες και 26 διαγραφές
+24 -10
Προβολή Αρχείου
@@ -27,17 +27,30 @@
#endif
#if defined(__gfx1100__) || defined(__gfx1101__) || defined(__gfx1102__) || defined(__gfx1200__) || defined(__gfx1201__)
#define __trace_hwreg()
#define __trace_hwreg() \
collTrace->data_0 = 0;
#else
#define __trace_hwreg() \
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_HW_ID)" : "=s" (collTrace->data_0));
{ int32_t hwid; \
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_HW_ID)" : "=s" (hwid)); \
collTrace->data_0 = hwid >> 4; }
#endif
#if defined(__gfx942__) || defined(__gfx950__)
#define __trace_xccid() \
{ int32_t xccId; \
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_XCC_ID)" : "=s" (xccId)); \
collTrace->xccId = xccId; }
#else
#define __trace_xccid() \
collTrace->xccId = 0;
#endif
#ifdef ENABLE_COLLTRACE
#define INC_COLL_TRACE \
uint32_t pos = __hip_atomic_fetch_add(&ncclShmem.collTraceTail->tail, 1, __ATOMIC_SEQ_CST, __HIP_MEMORY_SCOPE_WORKGROUP)%COLLTRACE_NUM_ITEMS; \
struct ncclCollTrace* collTrace = ncclShmem.collTrace+pos; \
collTrace->timeStamp = wall_clock64(); \
collTrace->bid = blockIdx.x; \
collTrace->tid = threadIdx.x; \
collTrace->channelId = ncclShmem.channelId;
// TODO: switch to atomicInc after llvm crash is fixed
@@ -46,7 +59,8 @@
#define traceKernelLaunch(launch_type, ix) { \
INC_COLL_TRACE \
collTrace->funcIndex = ncclShmem.funcId; \
__trace_hwreg()\
__trace_hwreg() \
__trace_xccid() \
collTrace->batchIx = ix; \
if (ncclShmem.workType == ncclDevWorkTypeP2p) { \
struct ncclDevWorkP2p *p2pWork = (struct ncclDevWorkP2p*)ncclShmem.workStorage; \
@@ -63,7 +77,7 @@
collTrace->p2p.recvRegistered = p2pWork->recvNetReg; \
collTrace->p2pOpCount[0] = p2pWork->sendOpCount; \
collTrace->p2pOpCount[1] = p2pWork->recvOpCount; \
collTrace->type = (launch_type) | ncclCollTraceP2pElemType; \
__hip_atomic_store(&collTrace->type, (launch_type) | ncclCollTraceP2pElemType, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
} else if (ncclShmem.workType == ncclDevWorkTypeColl) { \
struct ncclDevWorkColl *collWork = (struct ncclDevWorkColl*)ncclShmem.workStorage; \
collTrace->coll.nWarps = collWork->nWarps; \
@@ -71,7 +85,7 @@
collTrace->coll.bid = ncclShmem.channelId - collWork->channelLo; \
collTrace->coll.root = collWork->root; \
collTrace->opCount = collWork->opCount; \
collTrace->type = (launch_type) | ncclCollTraceCollElemType; \
__hip_atomic_store(&collTrace->type, (launch_type) | ncclCollTraceCollElemType, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
} \
}
#define traceKernelEnd(end_type) { \
@@ -81,11 +95,11 @@
struct ncclDevWorkP2p *p2pWork = (struct ncclDevWorkP2p*)ncclShmem.workStorage; \
collTrace->p2pOpCount[0] = p2pWork->sendOpCount; \
collTrace->p2pOpCount[1] = p2pWork->recvOpCount; \
collTrace->type = (end_type) | ncclCollTraceP2pElemType; \
__hip_atomic_store(&collTrace->type, (end_type) | ncclCollTraceP2pElemType, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
} else if (ncclShmem.workType == ncclDevWorkTypeColl) { \
struct ncclDevWorkColl *collWork = (struct ncclDevWorkColl*)ncclShmem.workStorage; \
collTrace->opCount = collWork->opCount; \
collTrace->type = (end_type) | ncclCollTraceCollElemType; \
__hip_atomic_store(&collTrace->type, (end_type) | ncclCollTraceCollElemType, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
} \
}
#define traceData(data2, data4, data8_0, data8_1) { \
@@ -94,12 +108,12 @@
collTrace->data_0 = data4; \
collTrace->opCount = data8_0; \
collTrace->data_1 = data8_1; \
collTrace->type = ncclCollTraceDataType; \
__hip_atomic_store(&collTrace->type, ncclCollTraceDataType, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
}
#define traceAbort(){\
INC_COLL_TRACE\
collTrace->funcIndex = ncclShmem.funcId;\
collTrace->type = ncclCollTraceAbortType;\
__hip_atomic_store(&collTrace->type, ncclCollTraceAbortType, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \
}
#else
#define traceKernelLaunch(launch_type, batchIx)
+18 -1
Προβολή Αρχείου
@@ -684,6 +684,10 @@ static ncclResult_t scheduleCollTasksToPlan(
size_t trafficPerChannel = 0;
int channelId = 0;
size_t currentTraffic = 0;
size_t channelCounts[MAXCHANNELS];
for (int c=0; c<MAXCHANNELS; c++) channelCounts[c] = 0;
while (nPlanColls!=0 && !ncclIntruQueueEmpty(&planner->collTaskQueue)) {
struct ncclTaskColl* task = ncclIntruQueueHead(&planner->collTaskQueue);
struct ncclWorkList* workNode = ncclIntruQueueHead(&planner->collWorkQueue);
@@ -916,6 +920,10 @@ static ncclResult_t scheduleCollTasksToPlan(
int(devWork->cbd.chunkGrainsLo*rcclProtoGrainSize(task->protocol, comm)),
int(devWork->cbd.chunkGrainsMid*rcclProtoGrainSize(task->protocol, comm)),
int(devWork->cbd.chunkGrainsHi*rcclProtoGrainSize(task->protocol, comm)));
// channel traffic counter
channelCounts[devWork->channelLo] += (long)devWork->cbd.countLo;
if (devWork->channelLo != devWork->channelHi) channelCounts[devWork->channelHi] += (long)devWork->cbd.countHi;
for (int c=devWork->channelLo+1; c<devWork->channelHi; c++) channelCounts[c] += (long)devWork->cbd.countMid;
}
}
@@ -930,6 +938,15 @@ static ncclResult_t scheduleCollTasksToPlan(
ncclIntruQueueEnqueue(&plan->workQueue, workNode);
plan->workBytes += workNode->size;
}
char line[1024];
int offset = 0;
for (int c=0; c<MAXCHANNELS; c++) {
sprintf(line+offset, "%ld ", channelCounts[c]);
offset = strlen(line);
}
TRACE(NCCL_COLL, "Channel traffic counts: %s", line);
return ncclSuccess;
}
@@ -2794,7 +2811,7 @@ ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) {
}
NCCLCHECKGOTO(ArgsCheck(info), ret, fail);
INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p acc %p count %zu datatype %d op %d root %d comm %p [nranks=%d] stream %p task %d globalrank %d",
INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p acc %p count %u datatype %d op %d root %d comm %p [nranks=%d] stream %p task %d globalrank %d",
info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->acc, info->count,
info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream,
info->comm->planner.nTasksP2p + info->comm->planner.nTasksColl,
+3 -3
Προβολή Αρχείου
@@ -507,10 +507,10 @@ typedef enum {
} ncclCollTraceDataType_t;
struct ncclCollTrace {
uint8_t type;
uint8_t bid;
int16_t funcIndex;
uint16_t data_0;
uint8_t xccId:4;
uint16_t data_0:12;
uint8_t type;
uint8_t batchIx;
uint8_t tid;
uint8_t channelId;
+18 -12
Προβολή Αρχείου
@@ -297,21 +297,21 @@ void *ncclCommThreadMain(void *arg) {
}
for (int i = 0; i < count; i++) {
volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel]%COLLTRACE_NUM_ITEMS;
head[channel] ++;
const uint8_t type = td->type;
if (type == ncclCollTraceNotReady)
continue;
break;
head[channel] ++;
char line[1024];
int offset = 0;
const uint16_t fIdx = td->funcIndex;
if (type == ncclCollTraceDataType) {
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] L:%04d DT %08x %016lx %016lx",
(double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->channelId, td->tid, fIdx, td->data_0, td->opCount, td->data_1);
(double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, fIdx, td->data_0, td->opCount, td->data_1);
} else {
if (type & ncclCollTraceP2pElemType)
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06x-%06x", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->channelId, td->tid, td->p2pOpCount[0], td->p2pOpCount[1]);
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06x-%06x", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, td->p2pOpCount[0], td->p2pOpCount[1]);
else
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->channelId, td->tid, td->opCount);
sprintf(line, "## [%012.6f] [%02d:%02d-%02d:%02x] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, channel, td->channelId, td->tid, td->opCount);
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d root %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
@@ -324,9 +324,9 @@ void *ncclCommThreadMain(void *arg) {
case ncclCollTraceKernelLaunchType:
case ncclCollTraceCollLaunchType:
if ((type&0xf) == ncclCollTraceKernelLaunchType)
sprintf(line+offset, " KL %s [%02d:%02d-%02d:%02x] HWID %8x ", funcNames[fIdx], comm->rank, td->bid, td->channelId, td->tid, td->data_0);
sprintf(line+offset, " KL %s [%02d:%02d-%02d:%02x] HWID %d:%x ", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, td->xccId, td->data_0);
else if ((type&0xf) == ncclCollTraceCollLaunchType)
sprintf(line+offset, " CL %s [%02d:%02d-%02d:%02x] %d ", funcNames[fIdx], comm->rank, td->bid, td->channelId, td->tid, td->batchIx);
sprintf(line+offset, " CL %s [%02d:%02d-%02d:%02x] %d ", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, td->batchIx);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d root %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
@@ -336,10 +336,10 @@ void *ncclCommThreadMain(void *arg) {
comm->busId, comm->nRanks);
break;
case ncclCollTraceKernelEndType:
sprintf(line+offset, " KE %s [%02d:%02d-%02d:%02x] busId %lx nRanks %d", funcNames[fIdx], comm->rank, td->bid, td->channelId, td->tid, comm->busId, comm->nRanks);
sprintf(line+offset, " KE %s [%02d:%02d-%02d:%02x] busId %lx nRanks %d", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid, comm->busId, comm->nRanks);
break;
case ncclCollTraceAbortType:
sprintf(line+offset, " KA %s [%02d:%02d-%02d:%02x]", funcNames[fIdx], comm->rank, td->bid, td->channelId, td->tid);
sprintf(line+offset, " KA %s [%02d:%02d-%02d:%02x]", funcNames[fIdx], comm->rank, channel, td->channelId, td->tid);
break;
default:
sprintf(line+offset, " unknown collective trace data type");
@@ -348,7 +348,9 @@ void *ncclCommThreadMain(void *arg) {
}
}
INFO(NCCL_COLL, "%s td->type:%d", line, type);
td->type = ncclCollTraceNotReady;
volatile uint8_t *tdtype = &td->type;
*tdtype = ncclCollTraceNotReady;
(*tdtype); // read back for flushing
}
}
if (comm->collTraceExit && numActiveChans == 0)
@@ -477,7 +479,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
ncclCommThreadMain((void *)comm);
}
NCCLCHECK(ncclCudaFree((void *)comm->collTrace));
NCCLCHECK(ncclCudaFree((void *)comm->collTraceTail));
NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail));
#endif
free(comm->peerInfo);
@@ -678,8 +680,12 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;
#ifdef ENABLE_COLLTRACE
NCCLCHECK(ncclCudaCalloc(&comm->collTraceTail, MAXCHANNELS));
NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS));
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS, nullptr, hipDeviceMallocUncached));
#else
NCCLCHECK(ncclCudaCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS));
#endif
comm->collTraceExit = 0;
comm->collTraceEnabled = false; // we can enable colltrace without starting a thread
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) {