diff --git a/projects/rccl/tools/RcclReplayer/rcclReplayer.cpp b/projects/rccl/tools/RcclReplayer/rcclReplayer.cpp index 140f733780..37d250a997 100644 --- a/projects/rccl/tools/RcclReplayer/rcclReplayer.cpp +++ b/projects/rccl/tools/RcclReplayer/rcclReplayer.cpp @@ -180,36 +180,31 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime) void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc) { + bool verbose = isFirstRank && (getenv("VERBOSE") != NULL); cc.globalRankComms.clear(); cc.globalRankComms.resize(cc.numGlobalRanks); cc.groupCalls.clear(); - std::ifstream fp(logFilename, std::ios::binary); + FILE* fp = fopen(logFilename, "r"); if (!fp) { printf("[ERROR] Unable to open file %s\n", logFilename); exit(-1); } - constexpr size_t size = sizeof(LineItem) + 1; - char line[size]; //size of collectivecall struct + char line[2048]; + LineItem li; int lineNum = 0; - while (fp.getline(line, size)) - { - LineItem li = *((LineItem*)line); + while (fgets(line, 2048, fp)) { + ++lineNum; - if (li.coll == 10 || li.coll == 11) - { - continue; - } - // if need hostname will parse together with num of line in output - // Ignore invalid lines and collectives - if (li.nRanks != cc.numGlobalRanks) continue; + //Ignore invalid lines and collectives + if (!ParseLineItem(line, li) || li.nRanks != cc.numGlobalRanks) continue; // Figure out commIdx for this globalrank int commIdx = -1; for (auto i = 0; i < cc.globalRankComms[li.globalRank].size(); i++) { - if (cc.globalRankComms[li.globalRank][i] != li.comm) { + if (!strcmp(cc.globalRankComms[li.globalRank][i].c_str(), li.comm)) { commIdx = i; break; } @@ -220,8 +215,8 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls } TaskInfo taskInfo; - taskInfo.funcType = (ncclFunc_t) li.coll; - taskInfo.inPlace = li.sendbuff == li.recvbuff; + taskInfo.funcType = GetFuncType(li.opName); + taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff); taskInfo.count = li.count; taskInfo.datatype = (ncclDataType_t) li.datatype; taskInfo.op = (ncclRedOp_t) li.op; @@ -234,7 +229,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls if (gc.opCount != li.opCount) continue; if (gc.rankData.count(li.globalRank)) { RankData& rd = gc.rankData[li.globalRank]; - if (rd.commIdx != commIdx || rd.tasks.size() != li.nTasks) + if (rd.commIdx != commIdx || rd.tasks.size() != li.task) continue; rd.tasks.push_back(taskInfo); @@ -242,7 +237,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls break; } // Rank has no tasks - make sure this is task 0 - else if (li.nTasks == 0) { + else if (li.task == 0) { gc.rankData[li.globalRank].lineNum = lineNum; gc.rankData[li.globalRank].commIdx = commIdx; gc.rankData[li.globalRank].tasks.push_back(taskInfo); @@ -253,7 +248,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls // If no collectives were found, create new one if (!found) { - if (li.nTasks != 0) { + if (li.task != 0) { if (isFirstRank) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum); } @@ -265,7 +260,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls cc.groupCalls.push_back(gc); } } - fp.close(); + fclose(fp); // Validate group calls // - For non Send/Recv, check that all ranks participate with same parameters count @@ -380,6 +375,18 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls } } +bool ParseLineItem(char const* line, LineItem& li) +{ + return sscanf(line, + "%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %x sendbuff %s " + "recvbuff %s count %lu datatype %d op %d root %d comm %s " + "[nranks=%d] stream %p task %d globalrank %d", + li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName, + &li.opCount, li.sendbuff, li.recvbuff, + &li.count, &li.datatype, &li.op, &li.root, li.comm, + &li.nRanks, &li.stream, &li.task, &li.globalRank) == 17; +} + double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid) { int numLocalRanks = cc.localRankComms.size(); @@ -559,12 +566,6 @@ void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t strea case ncclCollRecv: NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; - case ncclStartGroup: - NCCL_CALL(ncclGroupStart()); - break; - case ncclEndGroup: - NCCL_CALL(ncclGroupEnd()); - break; default: printf("Error: unsupported collective\n"); exit(1); @@ -754,4 +755,4 @@ void PrepData_Send(TaskInfo& taskInfo, int globalRank) { void PrepData_Recv(TaskInfo& taskInfo, int globalRank) { FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank); -} +} \ No newline at end of file diff --git a/projects/rccl/tools/RcclReplayer/rcclReplayer.hpp b/projects/rccl/tools/RcclReplayer/rcclReplayer.hpp index 8979389793..486826d94e 100644 --- a/projects/rccl/tools/RcclReplayer/rcclReplayer.hpp +++ b/projects/rccl/tools/RcclReplayer/rcclReplayer.hpp @@ -1,6 +1,5 @@ #pragma once #include -#include #include #include @@ -37,25 +36,23 @@ struct LineItem { - int pid; - int tid; - int cudaDev; - //int graph; - int groupDepth; - - int coll; - uint64_t opCount; - void* sendbuff; - void* recvbuff; - size_t count; - int datatype; - int op; - int root; - void* comm; - int nRanks; - void* stream; - int nTasks; - int globalRank; + char hostname[MPI_MAX_PROCESSOR_NAME]; + int pid; + int tid; + int cudaDev; + char opName[32]; + int opCount; + char sendbuff[32]; + char recvbuff[32]; + size_t count; + int datatype; + int op; + int root; + char comm[32]; + int nRanks; + void* stream; + int task; + int globalRank; }; // Enumeration of all collective functions currently supported @@ -72,9 +69,7 @@ typedef enum ncclCollAllToAllv, ncclCollSend, ncclCollRecv, - ncclNumFuncs, - ncclStartGroup = 10; - ncclEndGroup = 11; + ncclNumFuncs } ncclFunc_t; char const ncclFuncNames[ncclNumFuncs][32] = @@ -149,8 +144,8 @@ struct RankData struct GroupCall { - bool isValid; - uint64_t opCount; + bool isValid; + int opCount; std::map rankData; }; @@ -158,7 +153,7 @@ struct CollectiveCalls { int numGlobalRanks; int numGpusPerMpiRank; - std::vector> globalRankComms; // Set of comms used by each global rank + std::vector> globalRankComms; // Set of comms used by each global rank std::vector groupCalls; // List of group calls for each global rank int localGpuOffset; // First local GPU device idx for this MPI process @@ -389,6 +384,9 @@ bool IsRootUsed(ncclFunc_t funcType) { funcType == ncclCollGather || funcType == ncclCollScatter); } +// parse the logs and assign them into lineItem +bool ParseLineItem(char const* line, LineItem& li); + // this covers grouping the logs based on opCount and task number, // validatation of the groupCalls for both non-send/recv collectives and send/recv void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& collectiveCalls); @@ -428,4 +426,4 @@ void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool is void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks); void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks); void PrepData_Send(TaskInfo& taskInfo, int globalRank); -void PrepData_Recv(TaskInfo& taskInfo, int globalRank); +void PrepData_Recv(TaskInfo& taskInfo, int globalRank); \ No newline at end of file