From 52fa5d11785682d21ff2fd6e1163db4f941e008c Mon Sep 17 00:00:00 2001 From: gilbertlee-amd <44450918+gilbertlee-amd@users.noreply.github.com> Date: Mon, 13 May 2024 10:56:32 -0600 Subject: [PATCH] RCCL Replayer - multi communicator support (#1176) --- tools/rccl_replayer/Makefile | 25 +- tools/rccl_replayer/rcclReplayer.cpp | 778 +++++++++++++++------------ tools/rccl_replayer/rcclReplayer.hpp | 106 ++-- 3 files changed, 488 insertions(+), 421 deletions(-) diff --git a/tools/rccl_replayer/Makefile b/tools/rccl_replayer/Makefile index e21d4a7390..fe4ebc68dc 100644 --- a/tools/rccl_replayer/Makefile +++ b/tools/rccl_replayer/Makefile @@ -1,21 +1,14 @@ +ROCM_DIR ?= /opt/rocm RCCL_DIR ?= ../../build/release +MPI_DIR ?= /opt/ompi +MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi +MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu + +INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include +LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl -ifdef MPI_DIR main: rcclReplayer.cpp - /opt/rocm/bin/hipcc rcclReplayer.cpp \ - -g \ - -o rcclReplayer \ - -I$(MPI_DIR)/ \ - -I$(RCCL_DIR) \ - -I$(RCCL_DIR)/include/rccl \ - -I/opt/rocm/include/hip \ - -L$(MPI_DIR)/lib \ - -L$(RCCL_DIR) -lmpich -lrccl -else -main: - @echo "Error: MPI_DIR was not specified." - @exit 1 -endif + $(ROCM_DIR)/bin/hipcc rcclReplayer.cpp -O1 -g -o rcclReplayer $(INCLUDES) $(LDFLAGS) clean: - rm -f ./rcclReplayer \ No newline at end of file + rm -f ./rcclReplayer diff --git a/tools/rccl_replayer/rcclReplayer.cpp b/tools/rccl_replayer/rcclReplayer.cpp index 0629a4d9b2..df2e6505df 100644 --- a/tools/rccl_replayer/rcclReplayer.cpp +++ b/tools/rccl_replayer/rcclReplayer.cpp @@ -1,387 +1,459 @@ #include #include #include -#include #include #include #include #include "rcclReplayer.hpp" -bool ParseLineItem(char const* line, LineItem& li) +int main(int argc, char **argv) { - return sscanf(line, - "%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %d sendbuff %s " - "recvbuff %s count %lu datatype %d op %d root %d comm %s " - "[nranks=%d] stream %p task %d globalrank %d", - li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName, - &li.opCount, li.sendbuff, li.recvbuff, - &li.count, &li.datatype, &li.op, &li.root, li.comm, - &li.nRanks, &li.stream, &li.task, &li.globalRank) == 17; -} + MPI_Init(&argc, &argv); + if (argc <= 1) { + printf("Usage: %s logfile [numGpusPerMpiRank = 1]\n", argv[0]); + exit(1); + } -void ParseCollectives(char const* logFilename, int const numGlobalRanks, std::vector& groupCalls) { - int mpiRank; - MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + // Parse rank information + int mpiRank, numMpiRanks; + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &numMpiRanks); - groupCalls.clear(); + // Parse command line arguments + char* logFilename = argv[1]; + int numGpusPerMpiRank = (argc > 2 ? atoi(argv[2]) : 1); + int parseOnly = (argc > 3 ? atoi(argv[3]) : 0); - FILE *fp = fopen(logFilename, "r"); - if (!fp) { - printf("[ERROR] Unable to open file %s\n", logFilename); - exit(-1); - } + CollectiveCalls collCalls; + collCalls.firstGlobalRank = mpiRank * numGpusPerMpiRank; + collCalls.numGlobalRanks = numMpiRanks * numGpusPerMpiRank; - char line[1000]; - LineItem li; - int lineNum = 0; - while (fgets(line, 1000, fp)) { - ++lineNum; + // Figure out starting GPU index to use based on hostname + int nameLen; + char name[MPI_MAX_PROCESSOR_NAME]; + std::vector allnames(numMpiRanks * MPI_MAX_PROCESSOR_NAME, 0); + MPI_Get_processor_name(name, &nameLen); + MPI_Allgather(name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, + allnames.data(), MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); - //Ignore invalid lines and collectives - if (!ParseLineItem(line, li) || li.nRanks != numGlobalRanks) continue; + // Offset local gpu device index based on number of previous ranks on the same host + collCalls.localGpuOffset = 0; + for (int rank = 0; rank < mpiRank; rank++) { + if (!strcmp(name, allnames.data() + (rank * MPI_MAX_PROCESSOR_NAME))) + collCalls.localGpuOffset += numGpusPerMpiRank; + } + if (mpiRank == 0) + printf("RCCL Replayer: %d x %d = %d total ranks\n", numMpiRanks, numGpusPerMpiRank, collCalls.numGlobalRanks); + printf("Rank %d [%s] LocalGpuOffset: %d GlobalRankFirst %d GlobalRankLast %d\n", + mpiRank, name, collCalls.localGpuOffset, collCalls.firstGlobalRank, collCalls.firstGlobalRank + numGpusPerMpiRank - 1); - TaskInfo taskInfo; - taskInfo.funcType = GetFuncType(li.opName); - taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff); - taskInfo.count = li.count; - taskInfo.datatype = (ncclDataType_t) li.datatype; - taskInfo.op = (ncclRedOp_t) li.op; - taskInfo.root = li.root; + // Parse collectives from logfile + if (parseOnly) collCalls.numGlobalRanks = parseOnly; + ParseCollectives(logFilename, mpiRank == 0, collCalls); + if (collCalls.groupCalls.size() == 0) { + MPI_Finalize(); + return 0; + } + if (parseOnly) return 0; - // Find the appropriate GroupCall that this task belongs to - // If it doesn't exist yet, then create it - bool found = false; - for (auto& gc : groupCalls) { - if (gc.rankData.count(li.globalRank)) { - RankData& rd = gc.rankData[li.globalRank]; - if (rd.comm != li.comm || rd.tasks.size() != li.task) - continue; - - rd.tasks.push_back(taskInfo); - found = true; - break; - } - // Rank has no tasks - make sure this is task 0 - else if (li.task == 0) { - gc.rankData[li.globalRank].comm = li.comm; - gc.rankData[li.globalRank].lineNum = lineNum; - gc.rankData[li.globalRank].tasks.push_back(taskInfo); - found = true; - break; - } - } + // Setup all communicators + if (mpiRank == 0) printf("Preparing %d communicator(s) per rank\n", collCalls.numCommsPerRank); + collCalls.localRankComms.resize(numGpusPerMpiRank, std::vector(collCalls.numCommsPerRank)); + collCalls.localRankStreams.resize(numGpusPerMpiRank, std::vector(collCalls.numCommsPerRank)); - // If no collectives were found, create new one - if (!found) { - if (li.task != 0) { - if (mpiRank == 0) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum); - } - - groupCalls.resize(groupCalls.size() + 1); - GroupCall& gc = groupCalls.back(); - gc.opCount = li.opCount; - gc.rankData[li.globalRank].comm = li.comm; - gc.rankData[li.globalRank].lineNum = lineNum; - gc.rankData[li.globalRank].tasks.push_back(taskInfo); - } - } - - // - For non Send/Recv, check that all ranks participate with same parameters count - // - For Send/Recv, check that pairs of Send/Recv calls exist - if (mpiRank == 0) printf("Found %lu groupCalls\n", groupCalls.size()); - for (int i = 0; i < groupCalls.size(); i++) { - GroupCall& gc = groupCalls[i]; - std::map, std::vector> arrivalCounter; - - gc.isValid = true; - - if (mpiRank == 0) { - printf("GroupCall %d\n", i); - printf(" - OpCount: %d\n", gc.opCount); - } - - for (auto rd : gc.rankData) { - if (mpiRank == 0) { - printf(" - Rank %02d: comm %s\n", rd.first, rd.second.comm.c_str()); - } - - for (int task = 0; task < rd.second.tasks.size(); task++) { - TaskInfo ti = rd.second.tasks[task]; - const char* funcName; - - if (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) - funcName = "Send/Recv"; - else - funcName = ncclFuncNames[ti.funcType]; - - std::tuple key(funcName, ti.count, ti.datatype, ti.op); - - if (mpiRank == 0) { - printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n", - task, funcName, ti.inPlace, ti.count, ti.datatype, ti.op, ti.root); - } - - auto& rankVector = arrivalCounter[key]; - if (rankVector.size() < numGlobalRanks) { - rankVector.resize(numGlobalRanks); - } - - // rankVector in arrivalCount represents the rank information - // Count the number of tasks that are going to be executed by each rank. This is to validate the group call later on. - // Nom-Send/Recv rank counts (rankVector elements) should be equal at the end, and for Send/Recv, all the elements of rankVector should be equal to 0 - if (ti.funcType == ncclCollRecv) { - rankVector[ti.root]--; - } else { - rankVector[rd.first]++; - } - } - } - - // Iterate through the map variable and report/validate the results - for (const auto& e : arrivalCounter) { - int maxVal; - const char* funcName = std::get<0>(e.first); - size_t count = std::get<1>(e.first); - int datatype = std::get<2>(e.first); - int op = std::get<3>(e.first); - - bool isp2p = (strcmp(std::get<0>(e.first), "Send/Recv") == 0); - if (!isp2p) maxVal = *std::max_element(e.second.begin(), e.second.end()); - - // Validate all the ranks have required amount of collective call (task) - for (int i = 0; i < e.second.size(); i++) { - if (e.second[i] != (isp2p ? 0 : maxVal)) { - std::string warning = (isp2p ? (e.second[i] > 0 ? "[WARN] Missing Recv" : "[WARN] Missing Send") : "[WARN] Missing " + std::string(funcName)) - + " count=" + std::to_string(count) + " datatype=" + std::to_string(datatype) + " op=" + std::to_string(op) + " at rank [" + std::to_string(i) + "]"; - if(mpiRank == 0) printf("%s\n", warning.c_str()); - - gc.isValid = false; - } - } - } - } -} - -// GetSize will return a pair of bytes where first element in pair represents bytesSent and the second bytesRecv -std::pair GetSize(TaskInfo taskInfo, int numGlobalRanks) { - size_t sendNumBytes; - size_t recvNumBytes; - - if (taskInfo.funcType == ncclCollBroadcast || taskInfo.funcType == ncclCollReduce || taskInfo.funcType == ncclCollAllReduce) { - sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); - recvNumBytes = sendNumBytes; - } else if (taskInfo.funcType == ncclCollAllGather || taskInfo.funcType == ncclCollGather) { - sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); - recvNumBytes = numGlobalRanks * sendNumBytes; - } else if (taskInfo.funcType == ncclCollReduceScatter || taskInfo.funcType == ncclCollScatter) { - recvNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); - sendNumBytes = numGlobalRanks * recvNumBytes; - } else if (taskInfo.funcType == ncclCollAllToAll) { - sendNumBytes = numGlobalRanks * taskInfo.count * DataTypeToBytes(taskInfo.datatype); - recvNumBytes = sendNumBytes; - } else { - sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); - recvNumBytes = sendNumBytes; - } - return std::make_pair(sendNumBytes, recvNumBytes); -} - -void ExecuteCollective(TaskInfo task, ncclComm_t comm, hipStream_t stream, const void *sendbuff, void *recvbuff) { - - int funcTypeValue = (int)task.funcType; - - switch (funcTypeValue) { - case ncclCollAllGather: - NCCLCHECK(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream)); - break; - case ncclCollAllReduce: - NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream)); - break; - case ncclCollBroadcast: - NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); - break; - case ncclCollReduce: - NCCLCHECK(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream)); - break; - case ncclCollReduceScatter: - NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream)); - break; - case ncclCollGather: - NCCLCHECK(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); - break; - case ncclCollScatter: - NCCLCHECK(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); - break; - case ncclCollAllToAll: - NCCLCHECK(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream)); - break; - case ncclCollSend: - NCCLCHECK(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream)); - break; - case ncclCollRecv: - NCCLCHECK(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream)); - break; - default: - printf("Error: unsupported collective\n"); - exit(1); - } -} - -void ReplayRccl(GroupCall& groupCall, std::vector comms, std::vector streams, - int const localGpuOffset, int const numGpusPerMpiRank, int const firstGlobalRank, int const numGlobalRanks) { - - std::vector> sendbuff(numGpusPerMpiRank); - std::vector> recvbuff(numGpusPerMpiRank); - - NCCLCHECK(ncclGroupStart()); - for (int localIdx = 0; localIdx < numGpusPerMpiRank; localIdx++) { - int globalRank = firstGlobalRank + localIdx; - RankData& rankData = groupCall.rankData[globalRank]; - - for (auto task : rankData.tasks) { - void* sendBuffer; - void* recvBuffer; - - // Each task has a size based on the type of collective (funcType) - std::pair numBytes = GetSize(task, numGlobalRanks); - - if (task.inPlace) { - numBytes.first = std::max(numBytes.first, numBytes.second); - numBytes.second = numBytes.first; - } - - // Set the device and allocate send/recv buffers - HIPCALL(hipSetDevice(localGpuOffset + localIdx)); - HIPCALL(hipMalloc(&sendBuffer, numBytes.first)); - HIPCALL(hipMalloc(&recvBuffer, numBytes.second)); - HIPCALL(hipMemset(sendBuffer, 0, numBytes.first)); - HIPCALL(hipMemset(recvBuffer, 0, numBytes.second)); - HIPCALL(hipDeviceSynchronize()); - - // Add the send and receive buffers to their respective vectors - sendbuff[localIdx].push_back(sendBuffer); - recvbuff[localIdx].push_back(recvBuffer); - - // Execute the collective call (task) - ExecuteCollective(task, comms[localIdx], streams[localIdx], sendBuffer, recvBuffer); - } - } - NCCLCHECK(ncclGroupEnd()); - - // Synchronize devices - for (int i = 0; i < numGpusPerMpiRank; i++) { - HIPCALL(hipStreamSynchronize(streams[i])); - } - - // Free device memory for each task on each GPU - for (int i = 0; i < numGpusPerMpiRank; i++) { - for (auto& sendBuffer : sendbuff[i]) HIPCALL(hipFree(sendBuffer)); - for (auto& recvBuffer : recvbuff[i]) HIPCALL(hipFree(recvBuffer)); - } -} - -int main(int argc, char **argv) { - MPI_Init(&argc, &argv); - if (argc <= 1) { - printf("Usage: %s logfile [numGpusPerMpiRank = 1]\n", argv[0]); - exit(1); - } - - // Parse rank information - int mpiRank, numMpiRanks; - MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); - MPI_Comm_size(MPI_COMM_WORLD, &numMpiRanks); - - // Default value for numGpusPerMpiRank is 1 - char* logFilename = argv[1]; - int numGpusPerMpiRank = (argc > 2 ? atoi(argv[2]) : 1); - int numGlobalRanks = numMpiRanks * numGpusPerMpiRank; - - if (mpiRank == 0) - printf("RCCL Replayer: %d x %d = %d total ranks\n", numMpiRanks, numGpusPerMpiRank, numGlobalRanks); - - // Parse logfile for Collectives - std::vector groupCalls; - ParseCollectives(logFilename, numGlobalRanks, groupCalls); - - int localGpuOffset = 0; - int firstGlobalRank = mpiRank * numGpusPerMpiRank; - int lastGlobalRank = firstGlobalRank + numGpusPerMpiRank - 1; - - // Figure out the host and get the localGpuOffset - int nameLen; - char name[MPI_MAX_PROCESSOR_NAME]; - std::vector allnames(numMpiRanks * MPI_MAX_PROCESSOR_NAME, 0); - - MPI_Get_processor_name(name, &nameLen); - MPI_Allgather(name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, - allnames.data(), MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); - - for (int rank = 0; rank < mpiRank; rank++) - { - if (!strcmp(name, allnames.data() + (rank * MPI_MAX_PROCESSOR_NAME))) - localGpuOffset += numGpusPerMpiRank; - } - - printf("Rank %d [%s] LocalGpuOffset: %d GlobalRankFirst %d GlobalRankLast %d\n", - mpiRank, name, localGpuOffset, firstGlobalRank, lastGlobalRank); - + for (int commIdx = 0; commIdx < collCalls.numCommsPerRank; commIdx++) { // Create a unique ID and broadcast it to all ranks ncclUniqueId uniqueId; if (mpiRank == 0) ncclGetUniqueId(&uniqueId); MPI_Bcast(&uniqueId, sizeof(ncclUniqueId), MPI_BYTE, 0, MPI_COMM_WORLD); - // Each rank has it's own comm and stream - std::vector comms(numGpusPerMpiRank); - std::vector streams(numGpusPerMpiRank); - // Initialize comms and strams - NCCLCHECK(ncclGroupStart()); + NCCL_CALL(ncclGroupStart()); for (int i = 0; i < numGpusPerMpiRank; i++) { - HIPCALL(hipSetDevice(localGpuOffset + i)); - NCCLCHECK(ncclCommInitRank(&(comms[i]), numGlobalRanks, uniqueId, firstGlobalRank + i)); - HIPCALL(hipStreamCreate(&(streams[i]))); + HIP_CALL(hipSetDevice(collCalls.localGpuOffset + i)); + NCCL_CALL(ncclCommInitRank(&collCalls.localRankComms[i][commIdx], collCalls.numGlobalRanks, uniqueId, collCalls.firstGlobalRank + i)); + HIP_CALL(hipStreamCreate(&collCalls.localRankStreams[i][commIdx])); } - NCCLCHECK(ncclGroupEnd()); - - int numSkippedCalls = 0; - auto start = std::chrono::high_resolution_clock::now(); - for (auto groupCall : groupCalls) - if (groupCall.isValid) - ReplayRccl(groupCall, comms, streams, localGpuOffset, numGpusPerMpiRank, firstGlobalRank, numGlobalRanks); - else { - if (mpiRank == 0) printf("[ERROR] in group call: (skipping...)\n"); - for (auto rd : groupCall.rankData) { - if (mpiRank == 0) printf(" - Rank %02d: comm %s in line %d\n", rd.first, rd.second.comm.c_str(), rd.second.lineNum); - for (int task = 0; task < rd.second.tasks.size(); task++) { - TaskInfo ti = rd.second.tasks[task]; - if (mpiRank == 0) - printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n", - task, ncclFuncNames[ti.funcType], ti.inPlace, ti.count, ti.datatype, ti.op, ti.root); - } - } - numSkippedCalls++; + NCCL_CALL(ncclGroupEnd()); + } + printf("Rank %d Done setting up communicators\n", mpiRank); + + int numSkippedCalls = 0; + auto start = std::chrono::high_resolution_clock::now(); + for (size_t i = 0; i < collCalls.groupCalls.size(); i++) { + MPI_Barrier(MPI_COMM_WORLD); + if (collCalls.groupCalls[i].isValid) { + if (mpiRank == 0) + { + printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size()); + PrintGroupCall(collCalls.groupCalls[i]); + } + ReplayRccl(collCalls, i); + } else { + if (mpiRank == 0) { + printf("[ERROR] in group call: (skipping...)\n"); + for (auto const& rd : collCalls.groupCalls[i].rankData) { + printf(" - Rank %02d: comm %d in line %d\n", rd.first, rd.second.commIdx, rd.second.lineNum); + for (int task = 0; task < rd.second.tasks.size(); task++) { + TaskInfo ti = rd.second.tasks[task]; + printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n", + task, ncclFuncNames[ti.funcType], ti.inPlace, ti.count, ti.datatype, ti.op, ti.root); + } } - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration duration = end - start; + } + numSkippedCalls++; + } + } + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration duration = end - start; - // Need to destroy comms and streams after collective execution is done - for (int i = 0; i < numGpusPerMpiRank; ++i) { - ncclCommDestroy(comms[i]); - HIPCALL(hipStreamDestroy(streams[i])); + // Destroy all communicators + for (int commIdx = 0; commIdx < collCalls.numCommsPerRank; commIdx++) { + for (int i = 0; i < numGpusPerMpiRank; i++) { + NCCL_CALL(ncclCommDestroy(collCalls.localRankComms[i][commIdx])); + HIP_CALL(hipStreamDestroy(collCalls.localRankStreams[i][commIdx])); + } + } + + if (mpiRank == 0) printf("Executed group calls: %zu\n", collCalls.groupCalls.size() - numSkippedCalls); + if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls); + + // Time it takes to execute all the group calls + if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count()); + printf("MPI Rank %d Success\n", mpiRank); + + MPI_Finalize(); + return 0; +} + +void PrintGroupCall(GroupCall const& gc) +{ + printf("OpCount: %d\n", gc.opCount); + + for (auto rd : gc.rankData) { + printf(" - Rank %02d: comm %d\n", rd.first, rd.second.commIdx); + + for (int task = 0; task < rd.second.tasks.size(); task++) { + TaskInfo ti = rd.second.tasks[task]; + std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType]; + std::tuple key(funcName, ti.count, ti.datatype, ti.op); + printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n", + task, funcName.c_str(), ti.inPlace, ti.count, ti.datatype, ti.op, ti.root); + } + } +} + + +void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc) +{ + bool verbose = isFirstRank && (getenv("VERBOSE") != NULL); + cc.globalRankComms.clear(); + cc.globalRankComms.resize(cc.numGlobalRanks); + cc.groupCalls.clear(); + + FILE* fp = fopen(logFilename, "r"); + if (!fp) { + printf("[ERROR] Unable to open file %s\n", logFilename); + exit(-1); + } + + char line[2048]; + LineItem li; + int lineNum = 0; + + while (fgets(line, 2048, fp)) { + ++lineNum; + + //Ignore invalid lines and collectives + if (!ParseLineItem(line, li) || li.nRanks != cc.numGlobalRanks) continue; + + // Figure out commIdx for this globalrank + int commIdx = -1; + for (auto i = 0; i < cc.globalRankComms[li.globalRank].size(); i++) { + if (!strcmp(cc.globalRankComms[li.globalRank][i].c_str(), li.comm)) { + commIdx = i; + break; + } + } + if (commIdx == -1) { + commIdx = cc.globalRankComms[li.globalRank].size(); + cc.globalRankComms[li.globalRank].push_back(li.comm); } - MPI_Finalize(); + TaskInfo taskInfo; + taskInfo.funcType = GetFuncType(li.opName); + taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff); + taskInfo.count = li.count; + taskInfo.datatype = (ncclDataType_t) li.datatype; + taskInfo.op = (ncclRedOp_t) li.op; + taskInfo.root = li.root; - if (mpiRank == 0) printf("Executed group calls: %zu\n", groupCalls.size() - numSkippedCalls); - if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls); + // Find the appropriate GroupCall that this task belongs to + // If it doesn't exist yet, then create it + bool found = false; + for (auto& gc : cc.groupCalls) { + if (gc.opCount != li.opCount) continue; + if (gc.rankData.count(li.globalRank)) { + RankData& rd = gc.rankData[li.globalRank]; + if (rd.commIdx != commIdx || rd.tasks.size() != li.task) + continue; - // Time it takes to execute all the group calls - if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count()); + rd.tasks.push_back(taskInfo); + found = true; + break; + } + // Rank has no tasks - make sure this is task 0 + else if (li.task == 0) { + gc.rankData[li.globalRank].lineNum = lineNum; + gc.rankData[li.globalRank].commIdx = commIdx; + gc.rankData[li.globalRank].tasks.push_back(taskInfo); + found = true; + break; + } + } - // Means no hang - printf("MPI Rank %d Success\n", mpiRank); - - return 0; + // If no collectives were found, create new one + if (!found) { + if (li.task != 0) { + if (isFirstRank) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum); + } + + GroupCall gc; + gc.opCount = li.opCount; + gc.rankData[li.globalRank].commIdx = commIdx; + gc.rankData[li.globalRank].lineNum = lineNum; + gc.rankData[li.globalRank].tasks.push_back(taskInfo); + cc.groupCalls.push_back(gc); + } + } + fclose(fp); + + // Validate group calls + // - For non Send/Recv, check that all ranks participate with same parameters count + // - For Send/Recv, check that pairs of Send/Recv calls exist + if (isFirstRank) printf("Found %lu groupCalls\n", cc.groupCalls.size()); + for (int i = 0; i < cc.groupCalls.size(); i++) { + GroupCall& gc = cc.groupCalls[i]; + std::map, std::vector> arrivalCounter; + + gc.isValid = true; + + for (auto rd : gc.rankData) { + for (int task = 0; task < rd.second.tasks.size(); task++) { + TaskInfo ti = rd.second.tasks[task]; + + std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType]; + std::tuple key(funcName, ti.count, ti.datatype, ti.op); + + auto& rankVector = arrivalCounter[key]; + if (rankVector.size() < cc.numGlobalRanks) + rankVector.resize(cc.numGlobalRanks); + + // rankVector in arrivalCount represents the rank information + // Count the number of tasks that are going to be executed by each rank. This is to validate the group call later on. + // Nom-Send/Recv rank counts (rankVector elements) should be equal at the end, and for Send/Recv, all the elements of rankVector should be equal to 0 + if (ti.funcType == ncclCollRecv) { + rankVector[ti.root]--; + } else { + rankVector[rd.first]++; + } + } + } + + // Iterate through the map variable and report/validate the results + for (const auto& e : arrivalCounter) { + int maxVal; + std::string funcName = std::get<0>(e.first); + size_t count = std::get<1>(e.first); + int const datatype = std::get<2>(e.first); + int const op = std::get<3>(e.first); + + bool isp2p = (funcName == "Send/Recv"); + if (!isp2p) maxVal = *std::max_element(e.second.begin(), e.second.end()); + + // Validate all the ranks have required amount of collective call (task) + for (int i = 0; i < e.second.size(); i++) { + if (e.second[i] != (isp2p ? 0 : maxVal)) { + std::string warning = (isp2p ? (e.second[i] > 0 ? "[WARN] Missing Recv" : "[WARN] Missing Send") : "[WARN] Missing " + std::string(funcName)) + + " count=" + std::to_string(count) + " datatype=" + std::to_string(datatype) + " op=" + std::to_string(op) + " at rank [" + std::to_string(i) + "]"; + if(isFirstRank) printf("%s\n", warning.c_str()); + + gc.isValid = false; + } + } + } + } + + // Check number of comms per rank + cc.numCommsPerRank = cc.globalRankComms[0].size(); + for (int i = 1; i < cc.numGlobalRanks; i++) { + if (cc.numCommsPerRank != cc.globalRankComms[i].size()) { + printf("[ERROR] Replayer currently only supports identical number of communicators across all ranks\n"); + printf("[ERROR] Rank %d has %lu communicators (expecting %d)\n", i, cc.globalRankComms[i].size(), cc.numCommsPerRank); + exit(1); + } + } +} + +bool ParseLineItem(char const* line, LineItem& li) +{ + return sscanf(line, + "%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %x sendbuff %s " + "recvbuff %s count %lu datatype %d op %d root %d comm %s " + "[nranks=%d] stream %p task %d globalrank %d", + li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName, + &li.opCount, li.sendbuff, li.recvbuff, + &li.count, &li.datatype, &li.op, &li.root, li.comm, + &li.nRanks, &li.stream, &li.task, &li.globalRank) == 17; +} + +void ReplayRccl(CollectiveCalls const& cc, int groupIdx) +{ + int numLocalRanks = cc.localRankComms.size(); + + + + + // Allocate memory for collective + std::vector> sendbuff(numLocalRanks); + std::vector> recvbuff(numLocalRanks); + + for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { + int globalRank = cc.firstGlobalRank + localIdx; + if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue; + HIP_CALL(hipSetDevice(cc.localGpuOffset + localIdx)); + + RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + int numTasks = rankData.tasks.size(); + sendbuff[localIdx].resize(numTasks); + recvbuff[localIdx].resize(numTasks); + + for (int taskId = 0; taskId < numTasks; taskId++) { + TaskInfo const& task = rankData.tasks[taskId]; + + // Each task has a size based on the type of collective (funcType) + std::pair numBytes = GetSize(task, cc.numGlobalRanks); + + if (task.inPlace) { + numBytes.first = std::max(numBytes.first, numBytes.second); + numBytes.second = numBytes.first; + } + + // Set the device and allocate send/recv buffers + HIP_CALL(hipMalloc(&sendbuff[localIdx][taskId], numBytes.first)); + HIP_CALL(hipMemset(sendbuff[localIdx][taskId], 0, numBytes.first)); + + if (!task.inPlace) { + HIP_CALL(hipMalloc(&recvbuff[localIdx][taskId], numBytes.second)); + HIP_CALL(hipMemset(recvbuff[localIdx][taskId], 0, numBytes.second)); + } else { + recvbuff[localIdx][taskId] = sendbuff[localIdx][taskId]; + } + + HIP_CALL(hipDeviceSynchronize()); + } + } + + // Execute the collective call (task) + NCCL_CALL(ncclGroupStart()); + for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { + int globalRank = cc.firstGlobalRank + localIdx; + if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue; + + RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + int numTasks = rankData.tasks.size(); + int commIdx = rankData.commIdx; + for (int taskId = 0; taskId < numTasks; taskId++) { + TaskInfo const& task = rankData.tasks[taskId]; + ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx], + sendbuff[localIdx][taskId], + recvbuff[localIdx][taskId]); + } + } + NCCL_CALL(ncclGroupEnd()); + + // Synchronize devices and free memory + for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { + int globalRank = cc.firstGlobalRank + localIdx; + if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue; + + RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + int numTasks = rankData.tasks.size(); + int commIdx = rankData.commIdx; + HIP_CALL(hipStreamSynchronize(cc.localRankStreams[localIdx][commIdx])); + + for (int taskId = 0; taskId < numTasks; taskId++) { + TaskInfo const& task = rankData.tasks[taskId]; + HIP_CALL(hipFree(sendbuff[localIdx][taskId])); + if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId])); + } + } +} + +// GetSize will return a pair of bytes where first element in pair represents bytesSent and the second bytesRecv +std::pair GetSize(TaskInfo taskInfo, int numGlobalRanks) { + size_t sendNumBytes, recvNumBytes; + + switch (taskInfo.funcType) { + case ncclCollBroadcast: case ncclCollReduce: case ncclCollAllReduce: + sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); + recvNumBytes = sendNumBytes; + break; + case ncclCollAllGather: case ncclCollGather: + sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); + recvNumBytes = numGlobalRanks * sendNumBytes; + break; + case ncclCollReduceScatter: case ncclCollScatter: + recvNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); + sendNumBytes = numGlobalRanks * recvNumBytes; + break; + case ncclCollAllToAll: + sendNumBytes = numGlobalRanks * taskInfo.count * DataTypeToBytes(taskInfo.datatype); + recvNumBytes = sendNumBytes; + break; + default: + sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); + recvNumBytes = sendNumBytes; + } + return std::make_pair(sendNumBytes, recvNumBytes); +} + +void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff) +{ + switch (task.funcType) { + case ncclCollAllGather: + NCCL_CALL(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream)); + break; + case ncclCollAllReduce: + NCCL_CALL(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream)); + break; + case ncclCollBroadcast: + NCCL_CALL(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); + break; + case ncclCollReduce: + NCCL_CALL(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream)); + break; + case ncclCollReduceScatter: + NCCL_CALL(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream)); + break; + case ncclCollGather: + NCCL_CALL(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); + break; + case ncclCollScatter: + NCCL_CALL(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); + break; + case ncclCollAllToAll: + NCCL_CALL(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream)); + break; + case ncclCollSend: + NCCL_CALL(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream)); + break; + case ncclCollRecv: + NCCL_CALL(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream)); + break; + default: + printf("Error: unsupported collective\n"); + exit(1); + } } diff --git a/tools/rccl_replayer/rcclReplayer.hpp b/tools/rccl_replayer/rcclReplayer.hpp index f031f84a8e..3df45659f4 100644 --- a/tools/rccl_replayer/rcclReplayer.hpp +++ b/tools/rccl_replayer/rcclReplayer.hpp @@ -6,39 +6,30 @@ // NOTE: Parsing is based on this line logging collective information in enqueue.cc // INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d \ - root %d comm %p [nranks=%d] stream %p task %d globalrank %d", + root %d comm %p [nranks=%d] stream %p task %d globalrank %d", // info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, // info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream, // info->comm->tasks.nTasksP2p + info->comm->tasks.nTasksColl, // info->comm->localRankToRank[info->comm->localRank]); -#define MPICHECK(cmd) do { \ - int e = cmd; \ - if( e != MPI_SUCCESS ) { \ - printf("Failed: MPI error %s:%d '%d'\n", \ - __FILE__,__LINE__, e); \ - exit(EXIT_FAILURE); \ - } \ -} while(0) +#define HIP_CALL(cmd) \ + do { \ + hipError_t error = (cmd); \ + if (error != hipSuccess) { \ + printf("Encountered HIP error (%s) at line %d in file %s\n", \ + hipGetErrorString(error), __LINE__, __FILE__); \ + exit(-1); \ + } \ + } while (0) -#define HIPCALL(cmd) \ - do { \ - hipError_t error = (cmd); \ - if (error != hipSuccess) \ - { \ - printf("Encountered HIP error (%s) at line %d in file %s\n", \ - hipGetErrorString(error), __LINE__, __FILE__); \ - exit(-1); \ - } \ - } while (0) - -#define NCCLCHECK(cmd) do { \ +#define NCCL_CALL(cmd) \ + do { \ ncclResult_t res = cmd; \ if (res != ncclSuccess) { \ - printf("NCCL failure %s:%d '%s'\n", \ - __FILE__,__LINE__,ncclGetErrorString(res)); \ + printf("NCCL failure %s:%d '%s'\n", \ + __FILE__,__LINE__,ncclGetErrorString(res)); \ } \ -} while(0) + } while(0) struct LineItem { @@ -106,7 +97,7 @@ struct TaskInfo struct RankData { int lineNum; - std::string comm; + int commIdx; std::vector tasks; }; @@ -114,21 +105,36 @@ struct GroupCall { bool isValid; int opCount; - std::map rankData; // Indexed by globalRank + std::map rankData; }; +struct CollectiveCalls +{ + int numGlobalRanks; + int numGpusPerMpiRank; + std::vector> globalRankComms; // Set of comms used by each global rank + std::vector groupCalls; // List of group calls for each global rank + + int localGpuOffset; // First local GPU device idx for this MPI process + int firstGlobalRank; // First global rank for this MPI process + int numCommsPerRank; // Number of communicators per rank + std::vector> localRankComms; // comms per local rank + std::vector> localRankStreams; // streams per local rank +}; + + size_t DataTypeToBytes(ncclDataType_t const dataType) { switch (dataType) { - case ncclInt8: return 1; - case ncclUint8: return 1; - case ncclInt32: return 4; - case ncclUint32: return 4; - case ncclInt64: return 8; - case ncclUint64: return 8; - case ncclFloat16: return 2; - case ncclFloat32: return 4; - case ncclFloat64: return 8; + case ncclInt8: return 1; + case ncclUint8: return 1; + case ncclInt32: return 4; + case ncclUint32: return 4; + case ncclInt64: return 8; + case ncclUint64: return 8; + case ncclFloat16: return 2; + case ncclFloat32: return 4; + case ncclFloat64: return 8; case ncclBfloat16: return 2; case ncclFp8E4M3: return 1; case ncclFp8E5M2: return 1; @@ -149,24 +155,20 @@ ncclFunc_t GetFuncType(char* func) // parse the logs and assign them into lineItem bool ParseLineItem(char const* line, LineItem& li); -// this covers grouping the logs based on opCount and task number, +// this covers grouping the logs based on opCount and task number, // validatation of the groupCalls for both non-send/recv collectives and send/recv -void ParseCollectives(char const* logFilename, - int const numGlobalRanks, - std::vector& groupCalls); +void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& collectiveCalls); -// size differ for each collective call and getSize gives a specific size in bytes depending on type of task, +// allocates send/recv buff, sets the device based on which rank the task belongs to, +// syncronize devices after executing all the tasks and free device memory. +void ReplayRccl(CollectiveCalls const& collCall, int groupIdx); + +// Print information about a group call +void PrintGroupCall(GroupCall const& gc); + +// size differ for each collective call and getSize gives a specific size in bytes depending on type of task, // global rank, element count and data type -std::pair GetSize(TaskInfo taskInfo, - int numGlobalRanks); +std::pair GetSize(TaskInfo taskInfo, int numGlobalRanks); -// executes the collective call (task) -void ExecuteCollective(TaskInfo task, ncclComm_t comm, hipStream_t stream, const void *sendbuff, void *recvbuff); - -// allocates send/recv buff, sets the device based on which rank the task belongs to, -// syncronize devices after executing all the tasks and free device memory. -void ReplayRccl(GroupCall& groupCall, std::vector comms, std::vector streams, - int const localGpuOffset, - int const numGpusPerMpiRank, - int const firstGlobalRank, - int const numGlobalRanks); \ No newline at end of file +// executes the collective call (task) +void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff);