Improve collective trace

[ROCm/rccl commit: 2ddbe6646b]
Esse commit está contido em:
Wenkai Du
2020-12-22 13:28:21 -05:00
commit d4382de267
5 arquivos alterados com 83 adições e 22 exclusões
+18 -6
Ver Arquivo
@@ -117,6 +117,8 @@ struct Caller<f, f + 1>{
void call(struct ncclWorkElem* const c) noexcept { ncclFuncs[f](c); }
};
static_assert(FUNC_INDEX_P2P == 1800, "Wrong P2P function index");
inline
__device__
void NCCL_CALL_FUNCTIONS(struct ncclWorkElem* const c) noexcept {
@@ -177,7 +179,19 @@ class ncclFunction {
comm->collTrace[pos].timeStamp = __builtin_amdgcn_s_memrealtime(); \
comm->collTrace[pos].opCount = w->opCount; \
comm->collTrace[pos].bid = bid; \
comm->collTrace[pos].funcIndex = fIdx;
comm->collTrace[pos].funcIndex = fIdx; \
if (fIdx == FUNC_INDEX_P2P) { \
comm->collTrace[pos].p2p.nThreads = w->p2p.nThreads; \
comm->collTrace[pos].p2p.delta = (uint16_t)(w->p2p.delta); \
} else if (fIdx == FUNC_INDEX_A2AV) { \
comm->collTrace[pos].coll.nThreads = w->nThreads; \
comm->collTrace[pos].coll.bid = w->a2av.bid; \
comm->collTrace[pos].coll.nChannels = w->a2av.nChannels; \
} else { \
comm->collTrace[pos].coll.nThreads = w->nThreads; \
comm->collTrace[pos].coll.bid = w->coll.bid; \
comm->collTrace[pos].coll.nChannels = w->coll.nChannels; \
}
#define traceKernelLaunch(fIdx) { \
traceColl(fIdx); \
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_HW_ID)" : "=s" (comm->collTrace[pos].data_0)); \
@@ -262,7 +276,7 @@ __device__ void ncclKernel(struct ncclWorkElem first) {
if (w == NULL) {
w = shmem.localWork.elems;
if (!load_coll(&shmem.localWork, channel->workFifo+index, tid, comm, &abortCount)) {
if (COLLTRACE && tid == 0) traceAbort(-1);
if (COLLTRACE && tid == 0) traceAbort(0xffff);
return;
}
if (COLLTRACE && tid == 0) {
@@ -270,11 +284,9 @@ __device__ void ncclKernel(struct ncclWorkElem first) {
if (!firstLaunch) traceCollEnd(w->funcIndex);
firstLaunch = false;
}
} else {
if (COLLTRACE && tid == 0) {
} else if (COLLTRACE && tid == 0) {
traceKernelLaunch(w->funcIndex);
firstLaunch = false;
}
}
if (tid < w->nThreads) {
if (w->funcIndex == FINDEX) {
@@ -285,7 +297,7 @@ __device__ void ncclKernel(struct ncclWorkElem first) {
}
index = (index+1) % NCCL_MAX_OPS;
if (w->active == 2) {
if (COLLTRACE && tid == 0) traceCollEnd(-1);
if (COLLTRACE && tid == 0) traceCollEnd(0xffff);
return;
}
w = NULL;
+3
Ver Arquivo
@@ -9,6 +9,9 @@
#define NCCL_COLLECTIVES_H_
#define FUNC_INDEX_P2P (NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps)
#define FUNC_INDEX_A2A (FUNC_INDEX_P2P+1)
#define FUNC_INDEX_A2AV (FUNC_INDEX_P2P+2)
#define FUNC_INDEX(func, redop, ncclType, al, pr) ((func >= NCCL_NUM_FUNCTIONS) \
? (func-NCCL_NUM_FUNCTIONS+NCCL_NUM_FUNCTIONS*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS*ncclNumTypes*ncclNumOps) \
: ((((((func)*ncclNumOps + (redop))*ncclNumTypes) + (ncclType))*NCCL_NUM_ALGORITHMS+(al))*NCCL_NUM_PROTOCOLS+(pr)))
+12 -1
Ver Arquivo
@@ -286,7 +286,18 @@ struct ncclCollTrace {
uint32_t data_0;
uint64_t timeStamp;
uint64_t opCount;
uint64_t data_1;
union {
uint64_t data_1;
struct {
uint16_t nThreads;
uint8_t bid;
uint8_t nChannels;
} coll;
struct {
uint16_t nThreads;
uint16_t delta;
} p2p;
};
};
static_assert(sizeof(struct ncclCollTrace) == 8*sizeof(int), "ncclCollTrace must have a pow2 size");
+49 -14
Ver Arquivo
@@ -41,9 +41,11 @@ std::chrono::high_resolution_clock::time_point ncclEpoch;
#define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream
#endif
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+2] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "AllToAll", "AllToAllv" };
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "SendRecv", "AllToAll", "AllToAllv" };
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" };
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
const char* ncclRedOpStr[ncclNumOps] = { "Sum", "Prod", "Max", "Min" };
const char *ncclTypeStr[ncclNumTypes] = {"_i8", "_u8", "_i32", "_u32", "_i64", "_u64", "_f16", "_f32", "_f64", "_b16"};
NCCL_PARAM(GroupCudaStream, "GROUP_CUDA_STREAM", NCCL_GROUP_CUDA_STREAM);
@@ -161,6 +163,25 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) {
void *ncclCommThreadMain(void *arg) {
ncclComm_t comm = (ncclComm_t)arg;
int head = comm->hostDevComm.collTraceHead;
#define MAX_NAME_LENGTH 32
char* func_names = (char *)malloc(MAX_NAME_LENGTH*(FUNC_INDEX_A2AV+1));
for (int func = 0; func < NCCL_NUM_FUNCTIONS; func++) {
for (int al = 0; al < NCCL_NUM_ALGORITHMS; al++) {
for (int type = 0; type < ncclNumTypes; type++) {
for (int pr = 0; pr < NCCL_NUM_PROTOCOLS; pr++) {
for (int redop = 0; redop < ncclNumOps; redop++) {
char* line = func_names+MAX_NAME_LENGTH*FUNC_INDEX(func, redop, type, al, pr);
sprintf(line, "%s%s%s%s%s", ncclFuncStr[func], ncclAlgoStr[al], ncclProtoStr[pr],
ncclRedOpStr[redop], ncclTypeStr[type]);
}
}
}
}
}
for (int func = NCCL_NUM_FUNCTIONS; func < NCCL_NUM_FUNCTIONS+3; func++) {
char* line = func_names+MAX_NAME_LENGTH*(FUNC_INDEX_P2P+func-NCCL_NUM_FUNCTIONS);
sprintf(line, "%s", ncclFuncStr[func]);
}
do {
int tail = LOAD(comm->hostDevComm.collTraceTail)%COLLTRACE_NUM_ITEMS;
int count;
@@ -177,32 +198,45 @@ void *ncclCommThreadMain(void *arg) {
}
}
for (int i = 0; i < count; i++) {
uint8_t type = LOAD(&(comm->hostDevComm.collTrace[head].type));
struct ncclCollTrace *td = comm->hostDevComm.collTrace+head;
uint8_t type = LOAD(&(td->type));
if (type == ncclCollTraceNotReady)
break;
char line[1024];
int offset = 0;
uint16_t fIdx = td->funcIndex;
#define VEGA_GPU_RTC_FREQUENCY 2.5E7
if (type == ncclCollTraceDataType) {
sprintf(line, "## [%12.6f] [%02d:%02d] L:%04d DT %08x %016lx %016lx",
(double)(comm->hostDevComm.collTrace[head].timeStamp)/VEGA_GPU_RTC_FREQUENCY, comm->rank, comm->hostDevComm.collTrace[head].bid,
comm->hostDevComm.collTrace[head].funcIndex,
comm->hostDevComm.collTrace[head].data_0,
comm->hostDevComm.collTrace[head].opCount,
comm->hostDevComm.collTrace[head].data_1);
(double)(td->timeStamp)/VEGA_GPU_RTC_FREQUENCY, comm->rank, td->bid,
fIdx, td->data_0, td->opCount, td->data_1);
} else {
sprintf(line, "## [%12.6f] [%02d:%02d] %06lx",
(double)(comm->hostDevComm.collTrace[head].timeStamp)/VEGA_GPU_RTC_FREQUENCY, comm->rank, comm->hostDevComm.collTrace[head].bid, comm->hostDevComm.collTrace[head].opCount);
(double)(td->timeStamp)/VEGA_GPU_RTC_FREQUENCY, comm->rank, td->bid, td->opCount);
offset = strlen(line);
switch (type) {
case ncclCollTraceKernelLaunchType:
sprintf(line+offset, " KL hwid %8x funcIndex %d",
comm->hostDevComm.collTrace[head].data_0, comm->hostDevComm.collTrace[head].funcIndex);
sprintf(line+offset, " KL HWID %8x %s ",
td->data_0, func_names+MAX_NAME_LENGTH*fIdx);
offset = strlen(line);
if (fIdx > FUNC_INDEX_A2AV)
sprintf(line+offset, "ERROR bad function index %d", fIdx);
else if (fIdx == FUNC_INDEX_P2P)
sprintf(line+offset, "nt %d dt %d", td->p2p.nThreads, td->p2p.delta);
else
sprintf(line+offset, "nt %d bi %d nc %d", td->coll.nThreads, td->coll.bid, td->coll.nChannels);
break;
case ncclCollTraceCollEndType:
if (comm->hostDevComm.collTrace[head].funcIndex != -1)
sprintf(line+offset, " CE next funcIndex %d",
comm->hostDevComm.collTrace[head].funcIndex);
if (fIdx != 0xffff) {
sprintf(line+offset, " CE %s ", func_names+MAX_NAME_LENGTH*fIdx);
offset = strlen(line);
if (fIdx > FUNC_INDEX_A2AV)
sprintf(line+offset, "ERROR bad function index %d", fIdx);
else if (fIdx == FUNC_INDEX_P2P)
sprintf(line+offset, "nt %d dt %d", td->p2p.nThreads, td->p2p.delta);
else
sprintf(line+offset, "nt %d bi %d nc %d", td->coll.nThreads, td->coll.bid, td->coll.nChannels);
}
else
sprintf(line+offset, " KE");
break;
@@ -215,11 +249,12 @@ void *ncclCommThreadMain(void *arg) {
}
}
INFO(NCCL_COLL, "%s", line);
STORE(&(comm->hostDevComm.collTrace[head].type), ncclCollTraceNotReady);
STORE(&(td->type), ncclCollTraceNotReady);
head ++;
head %= COLLTRACE_NUM_ITEMS;
}
} while(1);
free(func_names);
comm->hostDevComm.collTraceHead = head;
pthread_exit(NULL);
}
+1 -1
Ver Arquivo
@@ -30,7 +30,7 @@
#include "model.h"
#include "utils.h"
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+1] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "AllToAll" };
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+3] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "SendRecv", "AllToAll", "AllToAllv" };
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" };
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };