From cfecce790fe66892f21200ee2711c89cde7f5227 Mon Sep 17 00:00:00 2001 From: Bertan Dogancay <111835151+BertanDogancay@users.noreply.github.com> Date: Tue, 22 Oct 2024 10:41:08 -0400 Subject: [PATCH] [Replayer] Add validation (#1387) * Add validation to rccl_replayer --- tools/rccl_replayer/Makefile | 2 +- tools/rccl_replayer/rcclReplayer.cpp | 336 ++++++++++++++++++++++++--- tools/rccl_replayer/rcclReplayer.hpp | 250 ++++++++++++++++++-- 3 files changed, 525 insertions(+), 63 deletions(-) diff --git a/tools/rccl_replayer/Makefile b/tools/rccl_replayer/Makefile index fe4ebc68dc..5dac92b692 100644 --- a/tools/rccl_replayer/Makefile +++ b/tools/rccl_replayer/Makefile @@ -4,7 +4,7 @@ MPI_DIR ?= /opt/ompi MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu -INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include +INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl main: rcclReplayer.cpp diff --git a/tools/rccl_replayer/rcclReplayer.cpp b/tools/rccl_replayer/rcclReplayer.cpp index fef808b7ac..37d250a997 100644 --- a/tools/rccl_replayer/rcclReplayer.cpp +++ b/tools/rccl_replayer/rcclReplayer.cpp @@ -81,6 +81,7 @@ int main(int argc, char **argv) printf("Rank %d Done setting up communicators\n", mpiRank); int numSkippedCalls = 0; + int numInvalid = 0; double runTime; std::ofstream datafile; datafile.open("replayer_data.csv"); @@ -98,7 +99,7 @@ int main(int argc, char **argv) printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size()); PrintGroupCall(collCalls.groupCalls[i]); } - double runTime = ReplayRccl(collCalls, i); + double runTime = ReplayRccl(collCalls, i, numInvalid); if (mpiRank == 0) { dataToCsv(collCalls.groupCalls[i], datafile, runTime); } @@ -132,6 +133,9 @@ int main(int argc, char **argv) if (mpiRank == 0) printf("Executed group calls: %zu\n", collCalls.groupCalls.size() - numSkippedCalls); if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls); + // Data validation failures during group calls + if (mpiRank == 0) printf("Failed group calls: %d\n", numInvalid); + // Time it takes to execute all the group calls if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count()); printf("MPI Rank %d Success\n", mpiRank); @@ -170,7 +174,7 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime) else if (funcName == "ReduceScatter" || funcName == "AllGather") busBw *= ((n-1)/n); busBw /= (1e9); //in gb/s std::string dataTypeName = DataTypeToName(ti.datatype); - std::string redOp = getRedOp(ti.op); + std::string redOp = RedOpToName(ti.op); datafile << gc.opCount << ", " << funcName.c_str() << ", " << ti.inPlace << ", " << ti.count << ", " << dataTypeName << ", " << redOp << ", " << ti.root << ", " << runTime << ", " << busBw << "\n"; } @@ -323,6 +327,52 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls exit(1); } } + + // Detect and replace scatter patterns + for (auto& gc : cc.groupCalls) { + if (!gc.isValid) continue; + int scatterRoot = -1; + bool isScatter = true; + for (auto& [rank, rankData] : gc.rankData) { + int sendCount = 0, recvCount = 0; + for (const auto& task : rankData.tasks) { + if (task.funcType == ncclCollSend) + sendCount++; + else if (task.funcType == ncclCollRecv) + recvCount++; + } + if (sendCount == cc.numGlobalRanks && recvCount == 1) { + if (scatterRoot == -1) { + // Root is the first rank that matches the condition + scatterRoot = rank; + } else { + isScatter = false; + break; + } + } else if (recvCount != 1 || sendCount != 0) { + // Non-root ranks must only recv and not send + isScatter = false; + break; + } + } + + // Replace send/recv calls with scatter call for the group call + if (isScatter) { + TaskInfo scatterTask; + scatterTask.funcType = ncclCollScatter; + scatterTask.count = gc.rankData[scatterRoot].tasks[0].count; + scatterTask.datatype = gc.rankData[scatterRoot].tasks[0].datatype; + scatterTask.root = scatterRoot; + + for (auto& [rank, rankData] : gc.rankData) { + rankData.tasks.clear(); + rankData.tasks.push_back(scatterTask); + } + + if (isFirstRank) + printf("[INFO] Scatter pattern detected and replaced with scatter collective\n"); + } + } } bool ParseLineItem(char const* line, LineItem& li) @@ -337,26 +387,20 @@ bool ParseLineItem(char const* line, LineItem& li) &li.nRanks, &li.stream, &li.task, &li.globalRank) == 17; } -double ReplayRccl(CollectiveCalls const& cc, int groupIdx) +double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid) { int numLocalRanks = cc.localRankComms.size(); - // Allocate memory for collective - std::vector> sendbuff(numLocalRanks); - std::vector> recvbuff(numLocalRanks); - for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { int globalRank = cc.firstGlobalRank + localIdx; if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue; HIP_CALL(hipSetDevice(cc.localGpuOffset + localIdx)); - RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); int numTasks = rankData.tasks.size(); - sendbuff[localIdx].resize(numTasks); - recvbuff[localIdx].resize(numTasks); for (int taskId = 0; taskId < numTasks; taskId++) { - TaskInfo const& task = rankData.tasks[taskId]; + TaskInfo& task = rankData.tasks[taskId]; // Each task has a size based on the type of collective (funcType) std::pair numBytes = GetSize(task, cc.numGlobalRanks); @@ -366,17 +410,20 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx) numBytes.second = numBytes.first; } - // Set the device and allocate send/recv buffers - HIP_CALL(hipMalloc(&sendbuff[localIdx][taskId], numBytes.first)); - HIP_CALL(hipMemset(sendbuff[localIdx][taskId], 0, numBytes.first)); + // Allocate memory + AllocateMem(task.inputGpu, numBytes.first, true); + AllocateMem(task.outputCpu, numBytes.second); + AllocateMem(task.expected, numBytes.second); if (!task.inPlace) { - HIP_CALL(hipMalloc(&recvbuff[localIdx][taskId], numBytes.second)); - HIP_CALL(hipMemset(recvbuff[localIdx][taskId], 0, numBytes.second)); + AllocateMem(task.outputGpu, numBytes.second, true); } else { - recvbuff[localIdx][taskId] = sendbuff[localIdx][taskId]; + task.outputGpu = task.inputGpu; } + // Prepare input/output for each task based on collective type + PrepareDataFunc(task, globalRank, cc.numGlobalRanks); + HIP_CALL(hipDeviceSynchronize()); } } @@ -388,14 +435,12 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx) int globalRank = cc.firstGlobalRank + localIdx; if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue; - RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); int numTasks = rankData.tasks.size(); int commIdx = rankData.commIdx; for (int taskId = 0; taskId < numTasks; taskId++) { - TaskInfo const& task = rankData.tasks[taskId]; - ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx], - sendbuff[localIdx][taskId], - recvbuff[localIdx][taskId]); + TaskInfo& task = rankData.tasks[taskId]; + ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx]); } } NCCL_CALL(ncclGroupEnd()); @@ -415,14 +460,46 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx) double runTime = duration.count(); runTime *= 1000; //convering into milliseconds + // Data validation + bool isValid = true; for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { int globalRank = cc.firstGlobalRank + localIdx; - RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); int numTasks = rankData.tasks.size(); for (int taskId = 0; taskId < numTasks; taskId++) { - TaskInfo const& task = rankData.tasks[taskId]; - HIP_CALL(hipFree(sendbuff[localIdx][taskId])); - if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId])); + TaskInfo& task = rankData.tasks[taskId]; + + // Only need Recv to validate + if (task.funcType == ncclCollSend) break; + // Ignore non-root ranks + if (IsRootUsed(task.funcType) && task.root != globalRank) break; + + std::pair numBytes = GetSize(task, cc.numGlobalRanks); + if (task.inPlace) { + numBytes.first = std::max(numBytes.first, numBytes.second); + numBytes.second = numBytes.first; + } + HIP_CALL(hipMemcpy(task.outputCpu.ptr, task.outputGpu.ptr, numBytes.second, hipMemcpyDeviceToHost)); + if (!IsEqual(task.outputCpu, task.expected, task.datatype, task.count, globalRank)) { + isValid = false; + break; // Check other ranks + } + } + } + + if (!isValid) numInvalid++; + + // Free memory + for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) { + int globalRank = cc.firstGlobalRank + localIdx; + RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank); + int numTasks = rankData.tasks.size(); + for (int taskId = 0; taskId < numTasks; taskId++) { + TaskInfo& task = rankData.tasks[taskId]; + FreeMem(task.inputGpu, true); + if (!task.inPlace) FreeMem(task.outputGpu, true); + FreeMem(task.outputCpu); + FreeMem(task.expected); } } return runTime; @@ -456,41 +533,226 @@ std::pair GetSize(TaskInfo taskInfo, int numGlobalRanks) { return std::make_pair(sendNumBytes, recvNumBytes); } -void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff) +void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t stream) { switch (task.funcType) { case ncclCollAllGather: - NCCL_CALL(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream)); + NCCL_CALL(ncclAllGather(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, comm, stream)); break; case ncclCollAllReduce: - NCCL_CALL(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream)); + NCCL_CALL(ncclAllReduce(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, comm, stream)); break; case ncclCollBroadcast: - NCCL_CALL(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); + NCCL_CALL(ncclBroadcast(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; case ncclCollReduce: - NCCL_CALL(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream)); + NCCL_CALL(ncclReduce(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, task.root, comm, stream)); break; case ncclCollReduceScatter: - NCCL_CALL(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream)); + NCCL_CALL(ncclReduceScatter(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, comm, stream)); break; case ncclCollGather: - NCCL_CALL(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); + NCCL_CALL(ncclGather(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; case ncclCollScatter: - NCCL_CALL(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream)); + NCCL_CALL(ncclScatter(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; case ncclCollAllToAll: - NCCL_CALL(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream)); + NCCL_CALL(ncclAllToAll(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, comm, stream)); break; case ncclCollSend: - NCCL_CALL(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream)); + NCCL_CALL(ncclSend(task.inputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; case ncclCollRecv: - NCCL_CALL(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream)); + NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; default: printf("Error: unsupported collective\n"); exit(1); } } + +void AllocateMem(PtrUnion& ptrUnion, size_t const numBytes, bool isGpu) { + if (numBytes) { + if (isGpu) { + HIP_CALL(hipMalloc(&ptrUnion.ptr, numBytes)); + HIP_CALL(hipMemset(ptrUnion.ptr, 0, numBytes)); + HIP_CALL(hipStreamSynchronize(NULL)); + } else { + ptrUnion.ptr = calloc(numBytes, 1); + memset(ptrUnion.ptr, 0, numBytes); + if (!ptrUnion.ptr) { + printf("Unable to allocate memory (%lu bytes)\n", numBytes); + } + } + } +} + +void FreeMem(PtrUnion& ptrUnion, bool isGpu) { + if (ptrUnion.ptr != nullptr) { + if (isGpu) + HIP_CALL(hipFree(ptrUnion.ptr)); + else + free(ptrUnion.ptr); + ptrUnion.ptr = nullptr; + } +} + +void FillPattern(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int globalRank, bool isGpu) { + PtrUnion temp; + size_t const numBytes = numElements * DataTypeToBytes(dataType); + + if (isGpu) + AllocateMem(temp, numBytes); + else + temp.ptr = ptrUnion.ptr; + + for (int i = 0; i < numElements; i++) { + int valueI = (globalRank + i) % 256; + double valueF = 1.0L/((double)valueI+1.0L); + SetPtr(temp, dataType, i, valueI, valueF); + } + + if (isGpu) { + HIP_CALL(hipMemcpy(ptrUnion.ptr, temp.ptr, numBytes, hipMemcpyHostToDevice)); + FreeMem(temp); + } +} + +void PrepareDataFunc(TaskInfo& taskInfo, int globalRank, int totalRanks) +{ + switch (taskInfo.funcType) + { + case ncclCollBroadcast: PrepData_Broadcast(taskInfo, globalRank); break; + case ncclCollReduce: PrepData_Reduce(taskInfo, globalRank, totalRanks, false); break; + case ncclCollAllGather: PrepData_Gather(taskInfo, globalRank, totalRanks, true); break; + case ncclCollReduceScatter: PrepData_ReduceScatter(taskInfo, globalRank, totalRanks); break; + case ncclCollAllReduce: PrepData_Reduce(taskInfo, globalRank, totalRanks, true); break; + case ncclCollGather: PrepData_Gather(taskInfo, globalRank, totalRanks, false); break; + case ncclCollScatter: PrepData_Scatter(taskInfo, globalRank, totalRanks); break; + case ncclCollAllToAll: PrepData_AlltoAll(taskInfo, globalRank, totalRanks); break; + case ncclCollSend: PrepData_Send(taskInfo, globalRank); break; + case ncclCollRecv: PrepData_Recv(taskInfo, globalRank); break; + default: + printf("Error: unsupported collective\n"); + exit(1); + } +} + +void PrepData_Broadcast(TaskInfo& taskInfo, int globalRank) { + // Only root needs input pattern + if (globalRank == taskInfo.root) + FillPattern(taskInfo.inputGpu, taskInfo.datatype, taskInfo.count, taskInfo.root, true); + + // Otherwise all other ranks expected output is the same as input of root + FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, taskInfo.root); +} + +void PrepData_Reduce(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllReduce) { + size_t const numBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype); + + // If average or custom reduction operator is used, perform a summation instead + ncclRedOp_t const tempOp = (taskInfo.op >= ncclAvg ? ncclSum : taskInfo.op); + + for (int rank = 0; rank < totalRanks; ++rank) { + FillPattern(taskInfo.outputCpu, taskInfo.datatype, taskInfo.count, rank); + if (rank == globalRank) + HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes, hipMemcpyHostToDevice)); + if (isAllReduce || taskInfo.root == globalRank) { + if (rank == 0) + memcpy(taskInfo.expected.ptr, taskInfo.outputCpu.ptr, numBytes); + else + Reduce(taskInfo.expected, taskInfo.outputCpu, taskInfo.count, taskInfo.datatype, tempOp); + } + } + + if (taskInfo.op == ncclAvg && (isAllReduce || taskInfo.root == globalRank)) + DivideByInt(taskInfo.expected, taskInfo.datatype, taskInfo.count, totalRanks); +} + +void PrepData_ReduceScatter(TaskInfo& taskInfo, int globalRank, int totalRanks) { + int const numInputElements = taskInfo.count * totalRanks; + int const numOutputElements = taskInfo.count; + std::pair numBytes = GetSize(taskInfo, totalRanks); + + PtrUnion tempInputCpu; + PtrUnion tempResultCpu; + AllocateMem(tempInputCpu, numBytes.first); + AllocateMem(tempResultCpu, numBytes.first); + + // If average or custom reduction operator is used, perform a summation instead + ncclRedOp_t const tempOp = (taskInfo.op >= ncclAvg ? ncclSum : taskInfo.op); + + for (int rank = 0; rank < totalRanks; ++rank) { + FillPattern(tempInputCpu, taskInfo.datatype, numInputElements, rank); + if (rank == globalRank) + HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, tempInputCpu.ptr, numBytes.first, hipMemcpyHostToDevice)); + if (rank == 0) + memcpy(tempResultCpu.ptr, tempInputCpu.ptr, numBytes.first); + else + Reduce(tempResultCpu, tempInputCpu, numInputElements, taskInfo.datatype, tempOp); + } + + if (taskInfo.op == ncclAvg) + DivideByInt(tempResultCpu, taskInfo.datatype, numInputElements, totalRanks); + + memcpy(taskInfo.expected.I1, tempResultCpu.I1 + globalRank * numBytes.second, numBytes.second); + FreeMem(tempInputCpu); + FreeMem(tempResultCpu); +} + +void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllGather) { + int numInputElements = taskInfo.count; + int numOutputElements = totalRanks * taskInfo.count; + std::pair numBytes = GetSize(taskInfo, totalRanks); + + for (int rank = 0; rank < totalRanks; ++rank) { + FillPattern(taskInfo.outputCpu, taskInfo.datatype, numInputElements, rank); + if (rank == globalRank) + HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes.first, hipMemcpyHostToDevice)); + if (isAllGather || taskInfo.root == globalRank) + memcpy(taskInfo.expected.I1 + (rank * numBytes.first), taskInfo.outputCpu.ptr, numBytes.first); + } +} + +void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks) { + int const numInputElements = taskInfo.count * totalRanks; + int const numOutputElements = taskInfo.count; + std::pair numBytes = GetSize(taskInfo, totalRanks); + + PtrUnion tempInput; + AllocateMem(tempInput, numBytes.first); + + FillPattern(tempInput, taskInfo.datatype, numInputElements, taskInfo.root); + + if (globalRank == taskInfo.root) + HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, tempInput.ptr, numBytes.first, hipMemcpyHostToDevice)); + + memcpy(taskInfo.expected.U1, tempInput.U1 + globalRank * numBytes.second, numBytes.second); + + FreeMem(tempInput); +} + +void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks) { + int const numInputElements = taskInfo.count * totalRanks; + int const numOutputElements = numInputElements; + std::pair numBytes = GetSize(taskInfo, totalRanks); + size_t const numBytesPerRank = numBytes.first / totalRanks; + + for (int rank = 0; rank < totalRanks; ++rank) { + FillPattern(taskInfo.outputCpu, taskInfo.datatype, numInputElements, rank); + + if (rank == globalRank) + HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes.first, hipMemcpyHostToDevice)); + + memcpy(taskInfo.expected.U1 + numBytesPerRank * rank, taskInfo.outputCpu.U1 + numBytesPerRank * globalRank, numBytesPerRank); + } +} + +void PrepData_Send(TaskInfo& taskInfo, int globalRank) { + FillPattern(taskInfo.inputGpu, taskInfo.datatype, taskInfo.count, globalRank, true); +} + +void PrepData_Recv(TaskInfo& taskInfo, int globalRank) { + FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank); +} \ No newline at end of file diff --git a/tools/rccl_replayer/rcclReplayer.hpp b/tools/rccl_replayer/rcclReplayer.hpp index 3809a90590..486826d94e 100644 --- a/tools/rccl_replayer/rcclReplayer.hpp +++ b/tools/rccl_replayer/rcclReplayer.hpp @@ -3,6 +3,9 @@ #include #include +#include +#include "hip/hip_fp16.h" +#include "rccl_float8.h" // NOTE: Parsing is based on this line logging collective information in enqueue.cc // INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d \ @@ -99,6 +102,25 @@ char const mscclFuncNames[ncclNumFuncs][32] = "mscclFuncRecv" }; +union PtrUnion +{ + void* ptr; + int8_t* I1; // ncclInt8 + uint8_t* U1; // ncclUint8 + int32_t* I4; // ncclInt32 + uint32_t* U4; // ncclUint32 + int64_t* I8; // ncclInt64 + uint64_t* U8; // ncclUint64 + __half* F2; // ncclFloat16 + rccl_float8* F1; // ncclFp8E4M3 + float* F4; // ncclFloat32 + double* F8; // ncclFloat64 + rccl_bfloat8* B1; // ncclFp8E5M2 + hip_bfloat16* B2; // ncclBfloat16 + + constexpr PtrUnion() : ptr(nullptr) {} +}; + struct TaskInfo { ncclFunc_t funcType; @@ -107,6 +129,10 @@ struct TaskInfo ncclDataType_t datatype; ncclRedOp_t op; int root; + PtrUnion inputGpu; + PtrUnion outputCpu; + PtrUnion outputGpu; + PtrUnion expected; }; struct RankData @@ -137,28 +163,6 @@ struct CollectiveCalls std::vector> localRankStreams; // streams per local rank }; - -size_t DataTypeToBytes(ncclDataType_t const dataType) -{ - switch (dataType) { - case ncclInt8: return 1; - case ncclUint8: return 1; - case ncclInt32: return 4; - case ncclUint32: return 4; - case ncclInt64: return 8; - case ncclUint64: return 8; - case ncclFloat16: return 2; - case ncclFloat32: return 4; - case ncclFloat64: return 8; - case ncclBfloat16: return 2; - case ncclFp8E4M3: return 1; - case ncclFp8E5M2: return 1; - default: - printf("Unsupported datatype (%d)\n", dataType); - exit(0); - } -} - std::string DataTypeToName(ncclDataType_t const dataType) { switch (dataType) { @@ -180,7 +184,28 @@ std::string DataTypeToName(ncclDataType_t const dataType) } } -std::string getRedOp(ncclRedOp_t const op) +size_t DataTypeToBytes(ncclDataType_t const dataType) +{ + switch (dataType) { + case ncclInt8: return 1; + case ncclUint8: return 1; + case ncclInt32: return 4; + case ncclUint32: return 4; + case ncclInt64: return 8; + case ncclUint64: return 8; + case ncclFloat16: return 2; + case ncclFloat32: return 4; + case ncclFloat64: return 8; + case ncclBfloat16: return 2; + case ncclFp8E4M3: return 1; + case ncclFp8E5M2: return 1; + default: + printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str()); + exit(0); + } +} + +std::string RedOpToName(ncclRedOp_t const op) { switch (op) { case ncclSum: return "Sum"; @@ -204,6 +229,161 @@ ncclFunc_t GetFuncType(char* func) exit(1); } +// Set data for ptrUnion (Used during fillPattern) +void SetPtr(PtrUnion& ptrUnion, ncclDataType_t const dataType, int const idx, int valueI, double valueF) { + switch (dataType) + { + case ncclInt8: ptrUnion.I1[idx] = valueI; break; + case ncclUint8: ptrUnion.U1[idx] = valueI; break; + case ncclInt32: ptrUnion.I4[idx] = valueI; break; + case ncclUint32: ptrUnion.U4[idx] = valueI; break; + case ncclInt64: ptrUnion.I8[idx] = valueI; break; + case ncclUint64: ptrUnion.U8[idx] = valueI; break; + case ncclFp8E4M3: ptrUnion.F1[idx] = rccl_float8(valueF); break; + case ncclFloat16: ptrUnion.F2[idx] = __float2half(static_cast(valueF)); break; + case ncclFloat32: ptrUnion.F4[idx] = valueF; break; + case ncclFloat64: ptrUnion.F8[idx] = valueF; break; + case ncclFp8E5M2: ptrUnion.B1[idx] = rccl_bfloat8(valueF); break; + case ncclBfloat16: ptrUnion.B2[idx] = hip_bfloat16(static_cast(valueF)); break; + default: + printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str()); + exit(0); + } +} + +// Check if each element in actual equals to expected +bool IsEqual(PtrUnion const& actual, PtrUnion const& expected, ncclDataType_t const dataType, size_t const numElements, int const globalRank) { + bool isMatch = true; + size_t idx = 0; + for (idx = 0; idx < numElements; ++idx) + { + switch (dataType) + { + case ncclInt8: isMatch = (actual.I1[idx] == expected.I1[idx]); break; + case ncclUint8: isMatch = (actual.U1[idx] == expected.U1[idx]); break; + case ncclInt32: isMatch = (actual.I4[idx] == expected.I4[idx]); break; + case ncclUint32: isMatch = (actual.U4[idx] == expected.U4[idx]); break; + case ncclInt64: isMatch = (actual.I8[idx] == expected.I8[idx]); break; + case ncclUint64: isMatch = (actual.U8[idx] == expected.U8[idx]); break; + case ncclFp8E4M3: isMatch = (fabs(float(actual.F1[idx]) - float(expected.F1[idx])) < 9e-2); break; + case ncclFloat16: isMatch = (fabs(__half2float(actual.F2[idx]) - __half2float(expected.F2[idx])) < 9e-2); break; + case ncclFloat32: isMatch = (fabs(actual.F4[idx] - expected.F4[idx]) < 1e-5); break; + case ncclFloat64: isMatch = (fabs(actual.F8[idx] - expected.F8[idx]) < 1e-12); break; + case ncclFp8E5M2: isMatch = (fabs(float(actual.B1[idx]) - float(expected.B1[idx])) < 9e-2); break; + case ncclBfloat16: isMatch = (fabs((float)actual.B2[idx] - (float)expected.B2[idx]) < 9e-2); break; + default: + printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str()); + isMatch = false; + } + if (!isMatch) { + switch (dataType) + { + case ncclInt8: + printf("[Error Rank = %d] Expected output: %d. Actual output: %d at index %lu\n", globalRank, expected.I1[idx], actual.I1[idx], idx); break; + case ncclUint8: + printf("[Error Rank = %d] Expected output: %u. Actual output: %u at index %lu\n", globalRank, expected.U1[idx], actual.U1[idx], idx); break; + case ncclInt32: + printf("[Error Rank = %d] Expected output: %d. Actual output: %d at index %lu\n", globalRank, expected.I4[idx], actual.I4[idx], idx); break; + case ncclUint32: + printf("[Error Rank = %d] Expected output: %u. Actual output: %u at index %lu\n", globalRank, expected.U4[idx], actual.U4[idx], idx); break; + case ncclInt64: + printf("[Error Rank = %d] Expected output: %ld. Actual output: %ld at index %lu\n", globalRank, expected.I8[idx], actual.I8[idx], idx); break; + case ncclUint64: + printf("[Error Rank = %d] Expected output: %lu. Actual output: %lu at index %lu\n", globalRank, expected.U8[idx], actual.U8[idx], idx); break; + case ncclFp8E4M3: + printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.F1[idx], (float)actual.F1[idx], idx); break; + case ncclFloat16: + printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, __half2float(expected.F2[idx]), __half2float(actual.F2[idx]), idx); break; + case ncclFloat32: + printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, expected.F4[idx], actual.F4[idx], idx); break; + case ncclFloat64: + printf("[Error Rank = %d] Expected output: %lf. Actual output: %lf at index %lu\n", globalRank, expected.F8[idx], actual.F8[idx], idx); break; + case ncclFp8E5M2: + printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.B1[idx], (float)actual.B1[idx], idx); break; + case ncclBfloat16: + printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.B2[idx], (float)actual.B2[idx], idx); break; + default: + break; + } + return isMatch; + } + } + + return isMatch; +} + +// Performs the various basic reduction operations +template +T ReduceOp(ncclRedOp_t const op, T const A, T const B) +{ + switch (op) + { + case ncclSum: return A + B; + case ncclProd: return A * B; + case ncclMax: return std::max(A, B); + case ncclMin: return std::min(A, B); + default: + printf("Unsupported reduction operator (%s)\n", RedOpToName(op).c_str()); + exit(0); + } +} + +// Perform various reduction ops to ptrUnion +void Reduce(PtrUnion& ptrUnion, PtrUnion const& otherPtrUnion, size_t const numElements, ncclDataType_t const dataType, ncclRedOp_t const op) { + for (size_t idx = 0; idx < numElements; ++idx) + { + switch (dataType) + { + case ncclInt8: ptrUnion.I1[idx] = ReduceOp(op, ptrUnion.I1[idx], otherPtrUnion.I1[idx]); break; + case ncclUint8: ptrUnion.U1[idx] = ReduceOp(op, ptrUnion.U1[idx], otherPtrUnion.U1[idx]); break; + case ncclInt32: ptrUnion.I4[idx] = ReduceOp(op, ptrUnion.I4[idx], otherPtrUnion.I4[idx]); break; + case ncclUint32: ptrUnion.U4[idx] = ReduceOp(op, ptrUnion.U4[idx], otherPtrUnion.U4[idx]); break; + case ncclInt64: ptrUnion.I8[idx] = ReduceOp(op, ptrUnion.I8[idx], otherPtrUnion.I8[idx]); break; + case ncclUint64: ptrUnion.U8[idx] = ReduceOp(op, ptrUnion.U8[idx], otherPtrUnion.U8[idx]); break; + case ncclFp8E4M3: ptrUnion.F1[idx] = rccl_float8(ReduceOp(op, float(ptrUnion.F1[idx]), float(otherPtrUnion.F1[idx]))); break; + case ncclFloat16: ptrUnion.F2[idx] = __float2half(ReduceOp(op, __half2float(ptrUnion.F2[idx]), __half2float(otherPtrUnion.F2[idx]))); break; + case ncclFloat32: ptrUnion.F4[idx] = ReduceOp(op, ptrUnion.F4[idx], otherPtrUnion.F4[idx]); break; + case ncclFloat64: ptrUnion.F8[idx] = ReduceOp(op, ptrUnion.F8[idx], otherPtrUnion.F8[idx]); break; + case ncclFp8E5M2: ptrUnion.B1[idx] = rccl_bfloat8(ReduceOp(op, float(ptrUnion.B1[idx]), float(otherPtrUnion.B1[idx]))); break; + case ncclBfloat16: ptrUnion.B2[idx] = ReduceOp(op, ptrUnion.B2[idx], otherPtrUnion.B2[idx]); break; + default: + printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str()); + exit(0); + } + } +} + +// Divide each element in ptrUnion by divisor +void DivideByInt(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int const divisor) { + for (size_t idx = 0; idx < numElements; ++idx) + { + switch (dataType) + { + case ncclInt8: ptrUnion.I1[idx] /= divisor; break; + case ncclUint8: ptrUnion.U1[idx] /= divisor; break; + case ncclInt32: ptrUnion.I4[idx] /= divisor; break; + case ncclUint32: ptrUnion.U4[idx] /= divisor; break; + case ncclInt64: ptrUnion.I8[idx] /= divisor; break; + case ncclUint64: ptrUnion.U8[idx] /= divisor; break; + case ncclFp8E4M3: ptrUnion.F1[idx] = (rccl_float8((float)(ptrUnion.F1[idx]) / divisor)); break; + case ncclFloat16: ptrUnion.F2[idx] = __float2half(__half2float(ptrUnion.F2[idx])/divisor); break; + case ncclFloat32: ptrUnion.F4[idx] /= divisor; break; + case ncclFloat64: ptrUnion.F8[idx] /= divisor; break; + case ncclFp8E5M2: ptrUnion.B1[idx] = (rccl_bfloat8((float)(ptrUnion.B1[idx]) / divisor)); break; + case ncclBfloat16: ptrUnion.B2[idx] = (hip_bfloat16((float)(ptrUnion.B2[idx]) / divisor)); break; + default: + printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str()); + exit(0); + } + } +} + +// Check if a collective uses a root +bool IsRootUsed(ncclFunc_t funcType) { + return (funcType == ncclCollBroadcast || funcType == ncclCollReduce || + funcType == ncclCollGather || funcType == ncclCollScatter); +} + // parse the logs and assign them into lineItem bool ParseLineItem(char const* line, LineItem& li); @@ -213,7 +393,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls // allocates send/recv buff, sets the device based on which rank the task belongs to, // syncronize devices after executing all the tasks and free device memory. -double ReplayRccl(CollectiveCalls const& collCall, int groupIdx); +double ReplayRccl(CollectiveCalls& collCall, int groupIdx, int& numInvalid); // Print information about a group call void PrintGroupCall(GroupCall const& gc); @@ -226,4 +406,24 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime); std::pair GetSize(TaskInfo taskInfo, int numGlobalRanks); // executes the collective call (task) -void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff); +void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t stream); + +// Allocate CPU/GPU memory for ptrUnion +void AllocateMem(PtrUnion& ptrUnion, size_t const numBytes, bool isGpu = false); + +// Free CPU/GPU memory for ptrUnion +void FreeMem(PtrUnion& ptrUnion, bool isGpu = false); + +// Fill buffers based on pattern using globalRank +void FillPattern(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int globalRank, bool isGpu = false); + +// PrepareData functions are responsible for setting up input / expected for the given taskInfo +void PrepareDataFunc(TaskInfo& taskInfo, int globalRank, int totalRanks); +void PrepData_Broadcast(TaskInfo& taskInfo, int globalRank); +void PrepData_Reduce(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllReduce); +void PrepData_ReduceScatter(TaskInfo& taskInfo, int globalRank, int totalRanks); +void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllGather); +void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks); +void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks); +void PrepData_Send(TaskInfo& taskInfo, int globalRank); +void PrepData_Recv(TaskInfo& taskInfo, int globalRank); \ No newline at end of file