reverting change to RcclReplayer (#1657)

[ROCm/rccl commit: 45e1c3f3e2]
Cette révision appartient à :
Tim
2025-04-23 15:36:46 -04:00
révisé par GitHub
Parent fb1fdef8e2
révision 38f91fa2c8
2 fichiers modifiés avec 53 ajouts et 54 suppressions
+28 -27
Voir le fichier
@@ -180,36 +180,31 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime)
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc)
{
bool verbose = isFirstRank && (getenv("VERBOSE") != NULL);
cc.globalRankComms.clear();
cc.globalRankComms.resize(cc.numGlobalRanks);
cc.groupCalls.clear();
std::ifstream fp(logFilename, std::ios::binary);
FILE* fp = fopen(logFilename, "r");
if (!fp) {
printf("[ERROR] Unable to open file %s\n", logFilename);
exit(-1);
}
constexpr size_t size = sizeof(LineItem) + 1;
char line[size]; //size of collectivecall struct
char line[2048];
LineItem li;
int lineNum = 0;
while (fp.getline(line, size))
{
LineItem li = *((LineItem*)line);
while (fgets(line, 2048, fp)) {
++lineNum;
if (li.coll == 10 || li.coll == 11)
{
continue;
}
// if need hostname will parse together with num of line in output
// Ignore invalid lines and collectives
if (li.nRanks != cc.numGlobalRanks) continue;
//Ignore invalid lines and collectives
if (!ParseLineItem(line, li) || li.nRanks != cc.numGlobalRanks) continue;
// Figure out commIdx for this globalrank
int commIdx = -1;
for (auto i = 0; i < cc.globalRankComms[li.globalRank].size(); i++) {
if (cc.globalRankComms[li.globalRank][i] != li.comm) {
if (!strcmp(cc.globalRankComms[li.globalRank][i].c_str(), li.comm)) {
commIdx = i;
break;
}
@@ -220,8 +215,8 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
}
TaskInfo taskInfo;
taskInfo.funcType = (ncclFunc_t) li.coll;
taskInfo.inPlace = li.sendbuff == li.recvbuff;
taskInfo.funcType = GetFuncType(li.opName);
taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff);
taskInfo.count = li.count;
taskInfo.datatype = (ncclDataType_t) li.datatype;
taskInfo.op = (ncclRedOp_t) li.op;
@@ -234,7 +229,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
if (gc.opCount != li.opCount) continue;
if (gc.rankData.count(li.globalRank)) {
RankData& rd = gc.rankData[li.globalRank];
if (rd.commIdx != commIdx || rd.tasks.size() != li.nTasks)
if (rd.commIdx != commIdx || rd.tasks.size() != li.task)
continue;
rd.tasks.push_back(taskInfo);
@@ -242,7 +237,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
break;
}
// Rank has no tasks - make sure this is task 0
else if (li.nTasks == 0) {
else if (li.task == 0) {
gc.rankData[li.globalRank].lineNum = lineNum;
gc.rankData[li.globalRank].commIdx = commIdx;
gc.rankData[li.globalRank].tasks.push_back(taskInfo);
@@ -253,7 +248,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
// If no collectives were found, create new one
if (!found) {
if (li.nTasks != 0) {
if (li.task != 0) {
if (isFirstRank) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum);
}
@@ -265,7 +260,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
cc.groupCalls.push_back(gc);
}
}
fp.close();
fclose(fp);
// Validate group calls
// - For non Send/Recv, check that all ranks participate with same parameters count
@@ -380,6 +375,18 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
}
}
bool ParseLineItem(char const* line, LineItem& li)
{
return sscanf(line,
"%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %x sendbuff %s "
"recvbuff %s count %lu datatype %d op %d root %d comm %s "
"[nranks=%d] stream %p task %d globalrank %d",
li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName,
&li.opCount, li.sendbuff, li.recvbuff,
&li.count, &li.datatype, &li.op, &li.root, li.comm,
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
}
double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid)
{
int numLocalRanks = cc.localRankComms.size();
@@ -559,12 +566,6 @@ void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t strea
case ncclCollRecv:
NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
case ncclStartGroup:
NCCL_CALL(ncclGroupStart());
break;
case ncclEndGroup:
NCCL_CALL(ncclGroupEnd());
break;
default:
printf("Error: unsupported collective\n");
exit(1);
@@ -754,4 +755,4 @@ void PrepData_Send(TaskInfo& taskInfo, int globalRank) {
void PrepData_Recv(TaskInfo& taskInfo, int globalRank) {
FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank);
}
}
+25 -27
Voir le fichier
@@ -1,6 +1,5 @@
#pragma once
#include <map>
#include <chrono>
#include <cstring>
#include <rccl/rccl.h>
@@ -37,25 +36,23 @@
struct LineItem
{
int pid;
int tid;
int cudaDev;
//int graph;
int groupDepth;
int coll;
uint64_t opCount;
void* sendbuff;
void* recvbuff;
size_t count;
int datatype;
int op;
int root;
void* comm;
int nRanks;
void* stream;
int nTasks;
int globalRank;
char hostname[MPI_MAX_PROCESSOR_NAME];
int pid;
int tid;
int cudaDev;
char opName[32];
int opCount;
char sendbuff[32];
char recvbuff[32];
size_t count;
int datatype;
int op;
int root;
char comm[32];
int nRanks;
void* stream;
int task;
int globalRank;
};
// Enumeration of all collective functions currently supported
@@ -72,9 +69,7 @@ typedef enum
ncclCollAllToAllv,
ncclCollSend,
ncclCollRecv,
ncclNumFuncs,
ncclStartGroup = 10;
ncclEndGroup = 11;
ncclNumFuncs
} ncclFunc_t;
char const ncclFuncNames[ncclNumFuncs][32] =
@@ -149,8 +144,8 @@ struct RankData
struct GroupCall
{
bool isValid;
uint64_t opCount;
bool isValid;
int opCount;
std::map<int, RankData> rankData;
};
@@ -158,7 +153,7 @@ struct CollectiveCalls
{
int numGlobalRanks;
int numGpusPerMpiRank;
std::vector<std::vector<void*>> globalRankComms; // Set of comms used by each global rank
std::vector<std::vector<std::string>> globalRankComms; // Set of comms used by each global rank
std::vector<GroupCall> groupCalls; // List of group calls for each global rank
int localGpuOffset; // First local GPU device idx for this MPI process
@@ -389,6 +384,9 @@ bool IsRootUsed(ncclFunc_t funcType) {
funcType == ncclCollGather || funcType == ncclCollScatter);
}
// parse the logs and assign them into lineItem
bool ParseLineItem(char const* line, LineItem& li);
// this covers grouping the logs based on opCount and task number,
// validatation of the groupCalls for both non-send/recv collectives and send/recv
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& collectiveCalls);
@@ -428,4 +426,4 @@ void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool is
void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_Send(TaskInfo& taskInfo, int globalRank);
void PrepData_Recv(TaskInfo& taskInfo, int globalRank);
void PrepData_Recv(TaskInfo& taskInfo, int globalRank);