Improve collective trace (#835)

[ROCm/rccl commit: c8085eb704]
This commit is contained in:
Wenkai Du
2023-08-03 07:16:12 -07:00
committato da GitHub
parent 74dd9c4807
commit c0729f28c8
4 ha cambiato i file con 114 aggiunte e 102 eliminazioni
@@ -284,11 +284,16 @@ class ncclFunction {
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_HW_ID)" : "=s" (collTrace->data_0));
#endif
#ifdef ENABLE_COLLTRACE
#define traceColl(launch_type) { \
uint32_t pos = atomicAdd_system((uint32_t*)ncclShmem.comm.collTraceTail, (uint32_t)1)%COLLTRACE_NUM_ITEMS; \
struct ncclCollTrace* collTrace = ncclShmem.comm.collTrace+pos; \
#define INC_COLL_TRACE \
uint32_t pos = atomicAdd(&ncclShmem.collTraceTail->tail, 1)%COLLTRACE_NUM_ITEMS; \
struct ncclCollTrace* collTrace = ncclShmem.collTrace+pos; \
collTrace->timeStamp = wall_clock64(); \
collTrace->bid = blockIdx.x; \
collTrace->bid = blockIdx.x;
// TODO: switch to atomicInc after llvm crash is fixed
// uint32_t pos = atomicInc(&ncclShmem.collTraceTail->tail, COLLTRACE_NUM_ITEMS)
#define traceKernelLaunch(launch_type) { \
INC_COLL_TRACE \
collTrace->funcIndex = ncclShmem.work.header.funcIndex; \
__trace_hwreg()\
if (ncclShmem.work.header.type == ncclWorkTypeP2p) { \
@@ -315,29 +320,20 @@ class ncclFunction {
collTrace->type = (launch_type) | ncclCollTraceCollElemType; \
} \
}
// #endif
#define traceKernelLaunch(firstLaunch) { \
traceColl(firstLaunch?ncclCollTraceKernelLaunchType:ncclCollTraceCollLaunchType); \
}
#define traceKernelEnd() { \
uint32_t pos = atomicAdd_system((uint32_t*)ncclShmem.comm.collTraceTail, (uint32_t)1)%COLLTRACE_NUM_ITEMS; \
struct ncclCollTrace* collTrace = ncclShmem.comm.collTrace+pos; \
collTrace->timeStamp = wall_clock64(); \
collTrace->bid = blockIdx.x; \
collTrace->type = ncclCollTraceKernelEndType; \
}
#define traceAbort() { \
uint32_t pos = atomicAdd_system((uint32_t*)ncclShmem.comm.collTraceTail, (uint32_t)1)%COLLTRACE_NUM_ITEMS; \
struct ncclCollTrace* collTrace = ncclShmem.comm.collTrace+pos; \
collTrace->timeStamp = wall_clock64(); \
collTrace->bid = blockIdx.x; \
collTrace->type = ncclCollTraceAbortType; \
#define traceKernelEnd(end_type) { \
INC_COLL_TRACE \
if (ncclShmem.work.header.type == ncclWorkTypeP2p) { \
struct ncclWorkElemP2p *p2pElems = ncclShmem.work.p2pElems; \
collTrace->p2pOpCount[0] = p2pElems[0].opCount; \
collTrace->p2pOpCount[1] = p2pElems[1].opCount; \
} else if (ncclShmem.work.header.type == ncclWorkTypeColl) { \
struct ncclWorkElem *elems = ncclShmem.work.elems; \
collTrace->opCount = elems[0].opCount; \
} \
collTrace->type = end_type; \
}
#define traceData(data2, data4, data8_0, data8_1) { \
uint32_t pos = atomicAdd_system((uint32_t*)ncclShmem.comm.collTraceTail, (uint32_t)1)%COLLTRACE_NUM_ITEMS; \
struct ncclCollTrace* collTrace = ncclShmem.comm.collTrace+pos; \
collTrace->bid = blockIdx.x; \
collTrace->timeStamp = wall_clock64(); \
INC_COLL_TRACE \
collTrace->funcIndex = data2; \
collTrace->data_0 = data4; \
collTrace->opCount = data8_0; \
@@ -345,10 +341,8 @@ class ncclFunction {
collTrace->type = ncclCollTraceDataType; \
}
#else
#define traceColl(launch_type)
#define traceKernelLaunch(firstLaunch)
#define traceKernelEnd()
#define traceAbort()
#define traceKernelLaunch(launch_type)
#define traceKernelEnd(end_type)
#define traceData(data2, data4, data8_0, data8_1)
#endif
@@ -369,6 +363,10 @@ struct ncclShmemData {
alignas(16) struct ncclDevComm comm;
alignas(16) struct ncclDevChannel channel;
alignas(16) struct ncclWork work;
#ifdef ENABLE_COLLTRACE
struct ncclCollTrace* collTrace;
union ncclCollTraceTail* collTraceTail;
#endif
#ifdef ENABLE_PROFILING
struct ncclProf prof;
#endif
@@ -518,6 +516,12 @@ __forceinline__ __device__ void ncclKernel(
}
copyToShmem16(tid%WARP_SIZE, dst, src, bytes);
}
#ifdef ENABLE_COLLTRACE
if (tid == 0) {
ncclShmem.collTrace = comm->collTrace + COLLTRACE_NUM_ITEMS*ncclShmem.channelId;
ncclShmem.collTraceTail = comm->collTraceTail + ncclShmem.channelId;
}
#endif
__synclds(); // publish shmem
#ifdef ENABLE_PROFILING
if (tid == 0) {
@@ -526,7 +530,7 @@ __forceinline__ __device__ void ncclKernel(
}
#endif
if (tid == 0) __insert_timestamp(__LINE__);
if (COLLTRACE && tid == 0) traceKernelLaunch(true);
if (COLLTRACE && tid == 0) traceKernelLaunch(ncclCollTraceKernelLaunchType);
while (true) {
// Notify host that all fifo reads are complete.
@@ -566,13 +570,13 @@ __forceinline__ __device__ void ncclKernel(
{ // Check whether the last operation was aborted and make sure all threads exit
int aborted = tid == 0 ? *comm->abortFlag : 0;
if (__any(aborted)) { // publish ncclShmem.work
traceAbort();
traceKernelEnd(ncclCollTraceAbortType);
break;
}
}
if (COLLTRACE && tid == 0) traceColl(false);
if (COLLTRACE && tid == 0) traceKernelLaunch(ncclCollTraceCollLaunchType);
}
if (COLLTRACE && tid == 0) traceKernelEnd();
if (COLLTRACE && tid == 0) traceKernelEnd(ncclCollTraceKernelEndType);
#ifdef ENABLE_PROFILING
if (ncclShmem.comm.devProf->seq < PROFILE_NUM_LAUNCHES) {
+1 -1
Vedi File
@@ -355,7 +355,7 @@ struct ncclComm {
#ifdef ENABLE_COLLTRACE
struct ncclCollTrace* collTrace;
volatile uint32_t *collTraceTail;
union ncclCollTraceTail *collTraceTail;
pthread_t collTraceThread;
volatile bool collTraceExit;
#endif
+6 -1
Vedi File
@@ -367,6 +367,11 @@ struct ncclCollTrace {
};
static_assert(sizeof(struct ncclCollTrace) == 8*sizeof(int), "ncclCollTrace must have a pow2 size");
union ncclCollTraceTail{
uint32_t tail;
char padding[4096];
};
#define COLLTRACE_NUM_ITEMS 8192
#endif
@@ -403,7 +408,7 @@ struct ncclDevComm {
#ifdef ENABLE_COLLTRACE
struct ncclCollTrace* collTrace;
volatile uint32_t *collTraceTail;
union ncclCollTraceTail *collTraceTail;
pthread_t collTraceThread;
#endif
+70 -67
Vedi File
@@ -170,9 +170,11 @@ RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0);
#ifdef ENABLE_COLLTRACE
void *ncclCommThreadMain(void *arg) {
ncclComm_t comm = (ncclComm_t)arg;
int head = 0;
int head[MAXCHANNELS];
hipDeviceProp_t devProp;
double vega_gpu_rtc_freq;
memset(head, 0, sizeof(int)*MAXCHANNELS);
hipError_t status = hipGetDeviceProperties(&devProp, comm->cudaDev);
if (devProp.gcnArch/10 == 94 && status == hipSuccess)
vega_gpu_rtc_freq = 1.0E8;
@@ -202,72 +204,74 @@ void *ncclCommThreadMain(void *arg) {
line += MAX_NAME_LENGTH;
sprintf(line, "AllToAllPivotRingSimpleSum_i8");
do {
int tail = (*comm->collTraceTail)%COLLTRACE_NUM_ITEMS;
int count;
if (head <= tail)
count = tail - head;
else
count = COLLTRACE_NUM_ITEMS + head - tail;
if (!count) {
usleep(1000); //sleep 1ms
continue;
}
for (int i = 0; i < count; i++) {
volatile struct ncclCollTrace *td = comm->collTrace+head;
uint8_t type = td->type;
if (type == ncclCollTraceNotReady)
break;
char line[1024];
int offset = 0;
uint16_t fIdx = td->funcIndex;
if (type == ncclCollTraceDataType) {
sprintf(line, "## [%012.6f] [%02d:%02d] L:%04d DT %08x %016lx %016lx",
(double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid,
fIdx, td->data_0, td->opCount, td->data_1);
} else {
if (fIdx == FUNC_INDEX_P2P || type == ncclCollTraceP2pElemType)
sprintf(line, "## [%012.6f] [%02d:%02d] %06x-%06x", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->p2pOpCount[0], td->p2pOpCount[1]);
else
sprintf(line, "## [%012.6f] [%02d:%02d] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->opCount);
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d busId %lx nRanks %d", func_names+MAX_NAME_LENGTH*fIdx, td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " PE %s %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d", func_names+MAX_NAME_LENGTH*fIdx,
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
td->p2p[1].connIndex, td->p2p[1].nWarps, td->p2p[1].warpStart, td->p2p[1].ngroups, td->p2p[1].peer, comm->busId, comm->nRanks);
for (int channel = 0; channel < MAXCHANNELS; channel++) {
int tail = comm->collTraceTail[channel].tail%COLLTRACE_NUM_ITEMS;
int count;
if (head[channel] <= tail)
count = tail - head[channel];
else
count = COLLTRACE_NUM_ITEMS + head[channel] - tail;
if (!count) {
usleep(1000); //sleep 1ms
continue;
}
for (int i = 0; i < count; i++) {
volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel];
uint8_t type = td->type;
if (type == ncclCollTraceNotReady)
break;
char line[1024];
int offset = 0;
uint16_t fIdx = td->funcIndex;
if (type == ncclCollTraceDataType) {
sprintf(line, "## [%012.6f] [%02d:%02d] L:%04d DT %08x %016lx %016lx",
(double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid,
fIdx, td->data_0, td->opCount, td->data_1);
} else {
switch (type&0xf) {
case ncclCollTraceKernelLaunchType:
case ncclCollTraceCollLaunchType:
if ((type&0xf) == ncclCollTraceKernelLaunchType)
sprintf(line+offset, " KL HWID %8x %s", td->data_0, func_names+MAX_NAME_LENGTH*fIdx);
else if ((type&0xf) == ncclCollTraceCollLaunchType)
sprintf(line+offset, " CL %s", func_names+MAX_NAME_LENGTH*fIdx);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d",
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
td->p2p[1].connIndex, td->p2p[1].nWarps, td->p2p[1].warpStart, td->p2p[1].ngroups, td->p2p[1].peer, comm->busId, comm->nRanks);
break;
case ncclCollTraceKernelEndType:
sprintf(line+offset, " KE busId %lx nRanks %d", comm->busId, comm->nRanks);
break;
case ncclCollTraceAbortType:
sprintf(line+offset, " Abort");
break;
default:
sprintf(line+offset, " unknown collective trace data type");
break;
if (fIdx == FUNC_INDEX_P2P || type == ncclCollTraceP2pElemType)
sprintf(line, "## [%012.6f] [%02d:%02d] %06x-%06x", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->p2pOpCount[0], td->p2pOpCount[1]);
else
sprintf(line, "## [%012.6f] [%02d:%02d] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->opCount);
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d busId %lx nRanks %d", func_names+MAX_NAME_LENGTH*fIdx, td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " PE %s %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d", func_names+MAX_NAME_LENGTH*fIdx,
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
td->p2p[1].connIndex, td->p2p[1].nWarps, td->p2p[1].warpStart, td->p2p[1].ngroups, td->p2p[1].peer, comm->busId, comm->nRanks);
} else {
switch (type&0xf) {
case ncclCollTraceKernelLaunchType:
case ncclCollTraceCollLaunchType:
if ((type&0xf) == ncclCollTraceKernelLaunchType)
sprintf(line+offset, " KL HWID %8x %s", td->data_0, func_names+MAX_NAME_LENGTH*fIdx);
else if ((type&0xf) == ncclCollTraceCollLaunchType)
sprintf(line+offset, " CL %s", func_names+MAX_NAME_LENGTH*fIdx);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d",
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
td->p2p[1].connIndex, td->p2p[1].nWarps, td->p2p[1].warpStart, td->p2p[1].ngroups, td->p2p[1].peer, comm->busId, comm->nRanks);
break;
case ncclCollTraceKernelEndType:
sprintf(line+offset, " KE busId %lx nRanks %d", comm->busId, comm->nRanks);
break;
case ncclCollTraceAbortType:
sprintf(line+offset, " Abort");
break;
default:
sprintf(line+offset, " unknown collective trace data type");
break;
}
}
}
INFO(NCCL_COLL, "%s", line);
td->type = ncclCollTraceNotReady;
head[channel] ++;
head[channel] %= COLLTRACE_NUM_ITEMS;
}
INFO(NCCL_COLL, "%s", line);
td->type = ncclCollTraceNotReady;
head ++;
head %= COLLTRACE_NUM_ITEMS;
}
} while(!comm->collTraceExit);
free(func_names);
@@ -533,10 +537,9 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;
#ifdef ENABLE_COLLTRACE
NCCLCHECK(ncclCudaHostCalloc((uint32_t **)&comm->collTraceTail, 1));
NCCLCHECK(ncclCudaHostCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS));
memset(comm->collTrace, 0, sizeof(struct ncclCollTrace) * COLLTRACE_NUM_ITEMS);
comm->collTraceExit = *comm->collTraceTail = 0;
NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS));
NCCLCHECK(ncclCudaHostCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS));
comm->collTraceExit = 0;
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else