Adding performance collection feature in rccl_replayer, and updating MSCCL logging and replayer parsing (#1265)

* Adding performance collection feature in rccl_replayer, and updating MSCCL logging and replayer parsing

* Performance collection feature in rccl_replayer, and updating MSCCL logging and replayer parsing
This commit is contained in:
saurabhAMD
2024-07-22 10:21:29 -05:00
committed by GitHub
parent b31b4082dd
commit cf311b71ee
3 changed files with 104 additions and 12 deletions
+2 -2
View File
@@ -377,9 +377,9 @@ static ncclResult_t mscclSaveCountsAndDispls(struct mscclSavedSchedulerParam* pa
static ncclResult_t mscclRunSavedParams() {
mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus();
for (auto& param : threadLocalStatus.savedSchedulerParams) {
INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p",
INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p task %d globalrank %d",
mscclFuncNames[param.p.func], param.p.opCount, param.p.sendBuff, param.p.recvBuff, param.p.count,
param.p.dataType, param.p.op, param.p.root, param.comm, param.p.nRanks, param.stream);
param.p.dataType, param.p.op, param.p.root, param.comm, param.p.nRanks, param.stream, param.comm->tasks.nTasksP2p + param.comm->tasks.nTasksColl, param.comm->localRankToRank[param.comm->localRank]);
NCCLCHECK(mscclRunAlgo(
param.p.sendBuff, param.p.sendCounts, param.p.sDisPls,
+44 -7
View File
@@ -4,6 +4,7 @@
#include <algorithm>
#include <chrono>
#include <mpi.h>
#include <fstream>
#include "rcclReplayer.hpp"
@@ -80,6 +81,14 @@ int main(int argc, char **argv)
printf("Rank %d Done setting up communicators\n", mpiRank);
int numSkippedCalls = 0;
double runTime;
std::ofstream datafile;
datafile.open("replayer_data.csv");
if (!datafile.is_open()) {
printf("[ERROR] Unable to open file replayer_data.csv\n");
exit(-1);
}
datafile << "callNumber, functionName, inPlace, count(numElements), datatype, op, root, time(msec), groupCallBusBandwidth(GB/s)\n";
auto start = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < collCalls.groupCalls.size(); i++) {
MPI_Barrier(MPI_COMM_WORLD);
@@ -89,7 +98,10 @@ int main(int argc, char **argv)
printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size());
PrintGroupCall(collCalls.groupCalls[i]);
}
ReplayRccl(collCalls, i);
double runTime = ReplayRccl(collCalls, i);
if (mpiRank == 0) {
dataToCsv(collCalls.groupCalls[i], datafile, runTime);
}
} else {
if (mpiRank == 0) {
printf("[ERROR] in group call: (skipping...)\n");
@@ -107,6 +119,7 @@ int main(int argc, char **argv)
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration = end - start;
datafile.close();
// Destroy all communicators
for (int commIdx = 0; commIdx < collCalls.numCommsPerRank; commIdx++) {
@@ -137,7 +150,6 @@ void PrintGroupCall(GroupCall const& gc)
for (int task = 0; task < rd.second.tasks.size(); task++) {
TaskInfo ti = rd.second.tasks[task];
std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType];
std::tuple<std::string, size_t, int, int> key(funcName, ti.count, ti.datatype, ti.op);
printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n",
task, funcName.c_str(), ti.inPlace, ti.count, ti.datatype, ti.op, ti.root);
}
@@ -145,6 +157,23 @@ void PrintGroupCall(GroupCall const& gc)
}
void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime)
{
auto rd = *(gc.rankData.begin());
TaskInfo ti = rd.second.tasks[0];
std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType];
double n = (double) (ti.count);
double S = (double) (n * (double)DataTypeToBytes(ti.datatype));
double t = (double) (runTime/1000); //milliseconds to seconds
double busBw = (S/t);
if (funcName == "AllReduce") busBw *= (2*(n- 1)/n);
else if (funcName == "ReduceScatter" || funcName == "AllGather") busBw *= ((n-1)/n);
busBw /= (1e9); //in gb/s
std::string dataTypeName = DataTypeToName(ti.datatype);
std::string redOp = getRedOp(ti.op);
datafile << gc.opCount << ", " << funcName.c_str() << ", " << ti.inPlace << ", " << ti.count << ", " << dataTypeName << ", " << redOp << ", " << ti.root << ", " << runTime << ", " << busBw << "\n";
}
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc)
{
bool verbose = isFirstRank && (getenv("VERBOSE") != NULL);
@@ -308,13 +337,10 @@ bool ParseLineItem(char const* line, LineItem& li)
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
}
void ReplayRccl(CollectiveCalls const& cc, int groupIdx)
double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
{
int numLocalRanks = cc.localRankComms.size();
// Allocate memory for collective
std::vector<std::vector<void*>> sendbuff(numLocalRanks);
std::vector<std::vector<void*>> recvbuff(numLocalRanks);
@@ -356,6 +382,7 @@ void ReplayRccl(CollectiveCalls const& cc, int groupIdx)
}
// Execute the collective call (task)
std::chrono::time_point start = std::chrono::high_resolution_clock::now();
NCCL_CALL(ncclGroupStart());
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
int globalRank = cc.firstGlobalRank + localIdx;
@@ -379,16 +406,26 @@ void ReplayRccl(CollectiveCalls const& cc, int groupIdx)
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
int numTasks = rankData.tasks.size();
int commIdx = rankData.commIdx;
HIP_CALL(hipStreamSynchronize(cc.localRankStreams[localIdx][commIdx]));
}
std::chrono::time_point end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration = (end - start);
double runTime = duration.count();
runTime *= 1000; //convering into milliseconds
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
int globalRank = cc.firstGlobalRank + localIdx;
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
int numTasks = rankData.tasks.size();
for (int taskId = 0; taskId < numTasks; taskId++) {
TaskInfo const& task = rankData.tasks[taskId];
HIP_CALL(hipFree(sendbuff[localIdx][taskId]));
if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId]));
}
}
return runTime;
}
// GetSize will return a pair of bytes where first element in pair represents bytesSent and the second bytesRecv
+58 -3
View File
@@ -84,6 +84,21 @@ char const ncclFuncNames[ncclNumFuncs][32] =
"Recv"
};
char const mscclFuncNames[ncclNumFuncs][32] =
{
"mscclFuncBroadcast",
"mscclFuncReduce",
"mscclFuncAllGather",
"mscclFuncReduceScatter",
"mscclFuncAllReduce",
"mscclFuncGather",
"mscclFuncScatter",
"mscclFuncAllToAll",
"mscclFuncAllToAllv",
"mscclFuncSend",
"mscclFuncRecv"
};
struct TaskInfo
{
ncclFunc_t funcType;
@@ -144,11 +159,48 @@ size_t DataTypeToBytes(ncclDataType_t const dataType)
}
}
std::string DataTypeToName(ncclDataType_t const dataType)
{
switch (dataType) {
case ncclInt8: return "Int8";
case ncclUint8: return "Uint8";
case ncclInt32: return "Int32";
case ncclUint32: return "Uint32";
case ncclInt64: return "Int64";
case ncclUint64: return "Uint64";
case ncclFloat16: return "Float16";
case ncclFloat32: return "Float32";
case ncclFloat64: return "Float64";
case ncclBfloat16: return "Bfloat16";
case ncclFp8E4M3: return "Fp8E4M3";
case ncclFp8E5M2: return "Fp8E5M2";
default:
printf("Unsupported datatype (%d)\n", dataType);
exit(0);
}
}
std::string getRedOp(ncclRedOp_t const op)
{
switch (op) {
case ncclSum: return "Sum";
case ncclProd: return "Product";
case ncclMax: return "Max";
case ncclMin: return "Min";
case ncclAvg: return "Average";
case ncclNumOps: return "Number of built-in reduction ops";
case ncclMaxRedOp: return "Largest value for ncclRedOp_t";
default:
printf("Unsupported redOp (%d)\n", op);
exit(0);
}
}
ncclFunc_t GetFuncType(char* func)
{
for (int i = 0; i < ncclNumFuncs; i++)
if (!strcmp(func, ncclFuncNames[i])) return (ncclFunc_t)i;
printf("[ERROR] Unrecognzied func %s\n", func);
if (!strcmp(func, ncclFuncNames[i]) || !strcmp(func, mscclFuncNames[i])) return (ncclFunc_t)i;
printf("[ERROR] Unrecognized func %s\n", func);
exit(1);
}
@@ -161,11 +213,14 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
// allocates send/recv buff, sets the device based on which rank the task belongs to,
// syncronize devices after executing all the tasks and free device memory.
void ReplayRccl(CollectiveCalls const& collCall, int groupIdx);
double ReplayRccl(CollectiveCalls const& collCall, int groupIdx);
// Print information about a group call
void PrintGroupCall(GroupCall const& gc);
// Records performance data of each group call in a csv file named replayer_data.csv
void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime);
// size differ for each collective call and getSize gives a specific size in bytes depending on type of task,
// global rank, element count and data type
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks);