From cf311b71eeea90dffe69ddf22f63f98e86d0fcc6 Mon Sep 17 00:00:00 2001 From: saurabhAMD <160164138+saurabhAMD@users.noreply.github.com> Date: Mon, 22 Jul 2024 10:21:29 -0500 Subject: [PATCH] Adding performance collection feature in rccl_replayer, and updating MSCCL logging and replayer parsing (#1265) * Adding performance collection feature in rccl_replayer, and updating MSCCL logging and replayer parsing * Performance collection feature in rccl_replayer, and updating MSCCL logging and replayer parsing --- src/misc/msccl/msccl_lifecycle.cc | 4 +- tools/rccl_replayer/rcclReplayer.cpp | 51 +++++++++++++++++++---- tools/rccl_replayer/rcclReplayer.hpp | 61 ++++++++++++++++++++++++++-- 3 files changed, 104 insertions(+), 12 deletions(-) diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index d9a54ee4f5..1d2b482f29 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -377,9 +377,9 @@ static ncclResult_t mscclSaveCountsAndDispls(struct mscclSavedSchedulerParam* pa static ncclResult_t mscclRunSavedParams() { mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); for (auto& param : threadLocalStatus.savedSchedulerParams) { - INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", + INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p task %d globalrank %d", mscclFuncNames[param.p.func], param.p.opCount, param.p.sendBuff, param.p.recvBuff, param.p.count, - param.p.dataType, param.p.op, param.p.root, param.comm, param.p.nRanks, param.stream); + param.p.dataType, param.p.op, param.p.root, param.comm, param.p.nRanks, param.stream, param.comm->tasks.nTasksP2p + param.comm->tasks.nTasksColl, param.comm->localRankToRank[param.comm->localRank]); NCCLCHECK(mscclRunAlgo( param.p.sendBuff, param.p.sendCounts, param.p.sDisPls, diff --git a/tools/rccl_replayer/rcclReplayer.cpp b/tools/rccl_replayer/rcclReplayer.cpp index df2e6505df..fef808b7ac 100644 --- a/tools/rccl_replayer/rcclReplayer.cpp +++ b/tools/rccl_replayer/rcclReplayer.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "rcclReplayer.hpp" @@ -80,6 +81,14 @@ int main(int argc, char **argv) printf("Rank %d Done setting up communicators\n", mpiRank); int numSkippedCalls = 0; + double runTime; + std::ofstream datafile; + datafile.open("replayer_data.csv"); + if (!datafile.is_open()) { + printf("[ERROR] Unable to open file replayer_data.csv\n"); + exit(-1); + } + datafile << "callNumber, functionName, inPlace, count(numElements), datatype, op, root, time(msec), groupCallBusBandwidth(GB/s)\n"; auto start = std::chrono::high_resolution_clock::now(); for (size_t i = 0; i < collCalls.groupCalls.size(); i++) { MPI_Barrier(MPI_COMM_WORLD); @@ -89,7 +98,10 @@ int main(int argc, char **argv) printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size()); PrintGroupCall(collCalls.groupCalls[i]); } - ReplayRccl(collCalls, i); + double runTime = ReplayRccl(collCalls, i); + if (mpiRank == 0) { + dataToCsv(collCalls.groupCalls[i], datafile, runTime); + } } else { if (mpiRank == 0) { printf("[ERROR] in group call: (skipping...)\n"); @@ -107,6 +119,7 @@ int main(int argc, char **argv) } auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration duration = end - start; + datafile.close(); // Destroy all communicators for (int commIdx = 0; commIdx < collCalls.numCommsPerRank; commIdx++) { @@ -137,7 +150,6 @@ void PrintGroupCall(GroupCall const& gc) for (int task = 0; task < rd.second.tasks.size(); task++) { TaskInfo ti = rd.second.tasks[task]; std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType]; - std::tuple key(funcName, ti.count, ti.datatype, ti.op); printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n", task, funcName.c_str(), ti.inPlace, ti.count, ti.datatype, ti.op, ti.root); } @@ -145,6 +157,23 @@ void PrintGroupCall(GroupCall const& gc) } +void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime) +{ + auto rd = *(gc.rankData.begin()); + TaskInfo ti = rd.second.tasks[0]; + std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType]; + double n = (double) (ti.count); + double S = (double) (n * (double)DataTypeToBytes(ti.datatype)); + double t = (double) (runTime/1000); //milliseconds to seconds + double busBw = (S/t); + if (funcName == "AllReduce") busBw *= (2*(n- 1)/n); + else if (funcName == "ReduceScatter" || funcName == "AllGather") busBw *= ((n-1)/n); + busBw /= (1e9); //in gb/s + std::string dataTypeName = DataTypeToName(ti.datatype); + std::string redOp = getRedOp(ti.op); + datafile << gc.opCount << ", " << funcName.c_str() << ", " << ti.inPlace << ", " << ti.count << ", " << dataTypeName << ", " << redOp << ", " << ti.root << ", " << runTime << ", " << busBw << "\n"; +} + void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc) { bool verbose = isFirstRank && (getenv("VERBOSE") != NULL); @@ -308,13 +337,10 @@ bool ParseLineItem(char const* line, LineItem& li) &li.nRanks, &li.stream, &li.task, &li.globalRank) == 17; } -void ReplayRccl(CollectiveCalls const& cc, int groupIdx) +double ReplayRccl(CollectiveCalls const& cc, int groupIdx) { int numLocalRanks = cc.localRankComms.size(); - - - // Allocate memory for collective std::vector> sendbuff(numLocalRanks); std::vector> recvbuff(numLocalRanks); @@ -356,6 +382,7 @@ void ReplayRccl(CollectiveCalls const& cc, int groupIdx) } // Execute the collective call (task) + std::chrono::time_point start = std::chrono::high_resolution_clock::now(); NCCL_CALL(ncclGroupStart()); for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { int globalRank = cc.firstGlobalRank + localIdx; @@ -379,16 +406,26 @@ void ReplayRccl(CollectiveCalls const& cc, int groupIdx) if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue; RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); - int numTasks = rankData.tasks.size(); int commIdx = rankData.commIdx; HIP_CALL(hipStreamSynchronize(cc.localRankStreams[localIdx][commIdx])); + } + std::chrono::time_point end = std::chrono::high_resolution_clock::now(); + std::chrono::duration duration = (end - start); + double runTime = duration.count(); + runTime *= 1000; //convering into milliseconds + + for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { + int globalRank = cc.firstGlobalRank + localIdx; + RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + int numTasks = rankData.tasks.size(); for (int taskId = 0; taskId < numTasks; taskId++) { TaskInfo const& task = rankData.tasks[taskId]; HIP_CALL(hipFree(sendbuff[localIdx][taskId])); if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId])); } } + return runTime; } // GetSize will return a pair of bytes where first element in pair represents bytesSent and the second bytesRecv diff --git a/tools/rccl_replayer/rcclReplayer.hpp b/tools/rccl_replayer/rcclReplayer.hpp index 3df45659f4..3809a90590 100644 --- a/tools/rccl_replayer/rcclReplayer.hpp +++ b/tools/rccl_replayer/rcclReplayer.hpp @@ -84,6 +84,21 @@ char const ncclFuncNames[ncclNumFuncs][32] = "Recv" }; +char const mscclFuncNames[ncclNumFuncs][32] = +{ + "mscclFuncBroadcast", + "mscclFuncReduce", + "mscclFuncAllGather", + "mscclFuncReduceScatter", + "mscclFuncAllReduce", + "mscclFuncGather", + "mscclFuncScatter", + "mscclFuncAllToAll", + "mscclFuncAllToAllv", + "mscclFuncSend", + "mscclFuncRecv" +}; + struct TaskInfo { ncclFunc_t funcType; @@ -144,11 +159,48 @@ size_t DataTypeToBytes(ncclDataType_t const dataType) } } +std::string DataTypeToName(ncclDataType_t const dataType) +{ + switch (dataType) { + case ncclInt8: return "Int8"; + case ncclUint8: return "Uint8"; + case ncclInt32: return "Int32"; + case ncclUint32: return "Uint32"; + case ncclInt64: return "Int64"; + case ncclUint64: return "Uint64"; + case ncclFloat16: return "Float16"; + case ncclFloat32: return "Float32"; + case ncclFloat64: return "Float64"; + case ncclBfloat16: return "Bfloat16"; + case ncclFp8E4M3: return "Fp8E4M3"; + case ncclFp8E5M2: return "Fp8E5M2"; + default: + printf("Unsupported datatype (%d)\n", dataType); + exit(0); + } +} + +std::string getRedOp(ncclRedOp_t const op) +{ + switch (op) { + case ncclSum: return "Sum"; + case ncclProd: return "Product"; + case ncclMax: return "Max"; + case ncclMin: return "Min"; + case ncclAvg: return "Average"; + case ncclNumOps: return "Number of built-in reduction ops"; + case ncclMaxRedOp: return "Largest value for ncclRedOp_t"; + default: + printf("Unsupported redOp (%d)\n", op); + exit(0); + } +} + ncclFunc_t GetFuncType(char* func) { for (int i = 0; i < ncclNumFuncs; i++) - if (!strcmp(func, ncclFuncNames[i])) return (ncclFunc_t)i; - printf("[ERROR] Unrecognzied func %s\n", func); + if (!strcmp(func, ncclFuncNames[i]) || !strcmp(func, mscclFuncNames[i])) return (ncclFunc_t)i; + printf("[ERROR] Unrecognized func %s\n", func); exit(1); } @@ -161,11 +213,14 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls // allocates send/recv buff, sets the device based on which rank the task belongs to, // syncronize devices after executing all the tasks and free device memory. -void ReplayRccl(CollectiveCalls const& collCall, int groupIdx); +double ReplayRccl(CollectiveCalls const& collCall, int groupIdx); // Print information about a group call void PrintGroupCall(GroupCall const& gc); +// Records performance data of each group call in a csv file named replayer_data.csv +void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime); + // size differ for each collective call and getSize gives a specific size in bytes depending on type of task, // global rank, element count and data type std::pair GetSize(TaskInfo taskInfo, int numGlobalRanks);