* Add validation to rccl_replayer

[ROCm/rccl commit: cfecce790f]
Αυτή η υποβολή περιλαμβάνεται σε:
Bertan Dogancay
2024-10-22 10:41:08 -04:00
υποβλήθηκε από GitHub
γονέας 64aead445c
υποβολή fcb0b2da3f
3 αρχεία άλλαξαν με 525 προσθήκες και 63 διαγραφές
@@ -4,7 +4,7 @@ MPI_DIR ?= /opt/ompi
MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi
MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu
INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include
INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include
LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl
main: rcclReplayer.cpp
@@ -81,6 +81,7 @@ int main(int argc, char **argv)
printf("Rank %d Done setting up communicators\n", mpiRank);
int numSkippedCalls = 0;
int numInvalid = 0;
double runTime;
std::ofstream datafile;
datafile.open("replayer_data.csv");
@@ -98,7 +99,7 @@ int main(int argc, char **argv)
printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size());
PrintGroupCall(collCalls.groupCalls[i]);
}
double runTime = ReplayRccl(collCalls, i);
double runTime = ReplayRccl(collCalls, i, numInvalid);
if (mpiRank == 0) {
dataToCsv(collCalls.groupCalls[i], datafile, runTime);
}
@@ -132,6 +133,9 @@ int main(int argc, char **argv)
if (mpiRank == 0) printf("Executed group calls: %zu\n", collCalls.groupCalls.size() - numSkippedCalls);
if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls);
// Data validation failures during group calls
if (mpiRank == 0) printf("Failed group calls: %d\n", numInvalid);
// Time it takes to execute all the group calls
if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count());
printf("MPI Rank %d Success\n", mpiRank);
@@ -170,7 +174,7 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime)
else if (funcName == "ReduceScatter" || funcName == "AllGather") busBw *= ((n-1)/n);
busBw /= (1e9); //in gb/s
std::string dataTypeName = DataTypeToName(ti.datatype);
std::string redOp = getRedOp(ti.op);
std::string redOp = RedOpToName(ti.op);
datafile << gc.opCount << ", " << funcName.c_str() << ", " << ti.inPlace << ", " << ti.count << ", " << dataTypeName << ", " << redOp << ", " << ti.root << ", " << runTime << ", " << busBw << "\n";
}
@@ -323,6 +327,52 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
exit(1);
}
}
// Detect and replace scatter patterns
for (auto& gc : cc.groupCalls) {
if (!gc.isValid) continue;
int scatterRoot = -1;
bool isScatter = true;
for (auto& [rank, rankData] : gc.rankData) {
int sendCount = 0, recvCount = 0;
for (const auto& task : rankData.tasks) {
if (task.funcType == ncclCollSend)
sendCount++;
else if (task.funcType == ncclCollRecv)
recvCount++;
}
if (sendCount == cc.numGlobalRanks && recvCount == 1) {
if (scatterRoot == -1) {
// Root is the first rank that matches the condition
scatterRoot = rank;
} else {
isScatter = false;
break;
}
} else if (recvCount != 1 || sendCount != 0) {
// Non-root ranks must only recv and not send
isScatter = false;
break;
}
}
// Replace send/recv calls with scatter call for the group call
if (isScatter) {
TaskInfo scatterTask;
scatterTask.funcType = ncclCollScatter;
scatterTask.count = gc.rankData[scatterRoot].tasks[0].count;
scatterTask.datatype = gc.rankData[scatterRoot].tasks[0].datatype;
scatterTask.root = scatterRoot;
for (auto& [rank, rankData] : gc.rankData) {
rankData.tasks.clear();
rankData.tasks.push_back(scatterTask);
}
if (isFirstRank)
printf("[INFO] Scatter pattern detected and replaced with scatter collective\n");
}
}
}
bool ParseLineItem(char const* line, LineItem& li)
@@ -337,26 +387,20 @@ bool ParseLineItem(char const* line, LineItem& li)
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
}
double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid)
{
int numLocalRanks = cc.localRankComms.size();
// Allocate memory for collective
std::vector<std::vector<void*>> sendbuff(numLocalRanks);
std::vector<std::vector<void*>> recvbuff(numLocalRanks);
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
int globalRank = cc.firstGlobalRank + localIdx;
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
HIP_CALL(hipSetDevice(cc.localGpuOffset + localIdx));
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
int numTasks = rankData.tasks.size();
sendbuff[localIdx].resize(numTasks);
recvbuff[localIdx].resize(numTasks);
for (int taskId = 0; taskId < numTasks; taskId++) {
TaskInfo const& task = rankData.tasks[taskId];
TaskInfo& task = rankData.tasks[taskId];
// Each task has a size based on the type of collective (funcType)
std::pair<size_t, size_t> numBytes = GetSize(task, cc.numGlobalRanks);
@@ -366,17 +410,20 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
numBytes.second = numBytes.first;
}
// Set the device and allocate send/recv buffers
HIP_CALL(hipMalloc(&sendbuff[localIdx][taskId], numBytes.first));
HIP_CALL(hipMemset(sendbuff[localIdx][taskId], 0, numBytes.first));
// Allocate memory
AllocateMem(task.inputGpu, numBytes.first, true);
AllocateMem(task.outputCpu, numBytes.second);
AllocateMem(task.expected, numBytes.second);
if (!task.inPlace) {
HIP_CALL(hipMalloc(&recvbuff[localIdx][taskId], numBytes.second));
HIP_CALL(hipMemset(recvbuff[localIdx][taskId], 0, numBytes.second));
AllocateMem(task.outputGpu, numBytes.second, true);
} else {
recvbuff[localIdx][taskId] = sendbuff[localIdx][taskId];
task.outputGpu = task.inputGpu;
}
// Prepare input/output for each task based on collective type
PrepareDataFunc(task, globalRank, cc.numGlobalRanks);
HIP_CALL(hipDeviceSynchronize());
}
}
@@ -388,14 +435,12 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
int globalRank = cc.firstGlobalRank + localIdx;
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
int numTasks = rankData.tasks.size();
int commIdx = rankData.commIdx;
for (int taskId = 0; taskId < numTasks; taskId++) {
TaskInfo const& task = rankData.tasks[taskId];
ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx],
sendbuff[localIdx][taskId],
recvbuff[localIdx][taskId]);
TaskInfo& task = rankData.tasks[taskId];
ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx]);
}
}
NCCL_CALL(ncclGroupEnd());
@@ -415,14 +460,46 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
double runTime = duration.count();
runTime *= 1000; //convering into milliseconds
// Data validation
bool isValid = true;
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
int globalRank = cc.firstGlobalRank + localIdx;
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
int numTasks = rankData.tasks.size();
for (int taskId = 0; taskId < numTasks; taskId++) {
TaskInfo const& task = rankData.tasks[taskId];
HIP_CALL(hipFree(sendbuff[localIdx][taskId]));
if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId]));
TaskInfo& task = rankData.tasks[taskId];
// Only need Recv to validate
if (task.funcType == ncclCollSend) break;
// Ignore non-root ranks
if (IsRootUsed(task.funcType) && task.root != globalRank) break;
std::pair<size_t, size_t> numBytes = GetSize(task, cc.numGlobalRanks);
if (task.inPlace) {
numBytes.first = std::max(numBytes.first, numBytes.second);
numBytes.second = numBytes.first;
}
HIP_CALL(hipMemcpy(task.outputCpu.ptr, task.outputGpu.ptr, numBytes.second, hipMemcpyDeviceToHost));
if (!IsEqual(task.outputCpu, task.expected, task.datatype, task.count, globalRank)) {
isValid = false;
break; // Check other ranks
}
}
}
if (!isValid) numInvalid++;
// Free memory
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
int globalRank = cc.firstGlobalRank + localIdx;
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
int numTasks = rankData.tasks.size();
for (int taskId = 0; taskId < numTasks; taskId++) {
TaskInfo& task = rankData.tasks[taskId];
FreeMem(task.inputGpu, true);
if (!task.inPlace) FreeMem(task.outputGpu, true);
FreeMem(task.outputCpu);
FreeMem(task.expected);
}
}
return runTime;
@@ -456,41 +533,226 @@ std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks) {
return std::make_pair(sendNumBytes, recvNumBytes);
}
void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff)
void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t stream)
{
switch (task.funcType) {
case ncclCollAllGather:
NCCL_CALL(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
NCCL_CALL(ncclAllGather(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, comm, stream));
break;
case ncclCollAllReduce:
NCCL_CALL(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
NCCL_CALL(ncclAllReduce(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, comm, stream));
break;
case ncclCollBroadcast:
NCCL_CALL(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
NCCL_CALL(ncclBroadcast(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
case ncclCollReduce:
NCCL_CALL(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream));
NCCL_CALL(ncclReduce(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, task.root, comm, stream));
break;
case ncclCollReduceScatter:
NCCL_CALL(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
NCCL_CALL(ncclReduceScatter(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, comm, stream));
break;
case ncclCollGather:
NCCL_CALL(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
NCCL_CALL(ncclGather(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
case ncclCollScatter:
NCCL_CALL(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
NCCL_CALL(ncclScatter(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
case ncclCollAllToAll:
NCCL_CALL(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
NCCL_CALL(ncclAllToAll(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, comm, stream));
break;
case ncclCollSend:
NCCL_CALL(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream));
NCCL_CALL(ncclSend(task.inputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
case ncclCollRecv:
NCCL_CALL(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream));
NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
default:
printf("Error: unsupported collective\n");
exit(1);
}
}
void AllocateMem(PtrUnion& ptrUnion, size_t const numBytes, bool isGpu) {
if (numBytes) {
if (isGpu) {
HIP_CALL(hipMalloc(&ptrUnion.ptr, numBytes));
HIP_CALL(hipMemset(ptrUnion.ptr, 0, numBytes));
HIP_CALL(hipStreamSynchronize(NULL));
} else {
ptrUnion.ptr = calloc(numBytes, 1);
memset(ptrUnion.ptr, 0, numBytes);
if (!ptrUnion.ptr) {
printf("Unable to allocate memory (%lu bytes)\n", numBytes);
}
}
}
}
void FreeMem(PtrUnion& ptrUnion, bool isGpu) {
if (ptrUnion.ptr != nullptr) {
if (isGpu)
HIP_CALL(hipFree(ptrUnion.ptr));
else
free(ptrUnion.ptr);
ptrUnion.ptr = nullptr;
}
}
void FillPattern(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int globalRank, bool isGpu) {
PtrUnion temp;
size_t const numBytes = numElements * DataTypeToBytes(dataType);
if (isGpu)
AllocateMem(temp, numBytes);
else
temp.ptr = ptrUnion.ptr;
for (int i = 0; i < numElements; i++) {
int valueI = (globalRank + i) % 256;
double valueF = 1.0L/((double)valueI+1.0L);
SetPtr(temp, dataType, i, valueI, valueF);
}
if (isGpu) {
HIP_CALL(hipMemcpy(ptrUnion.ptr, temp.ptr, numBytes, hipMemcpyHostToDevice));
FreeMem(temp);
}
}
void PrepareDataFunc(TaskInfo& taskInfo, int globalRank, int totalRanks)
{
switch (taskInfo.funcType)
{
case ncclCollBroadcast: PrepData_Broadcast(taskInfo, globalRank); break;
case ncclCollReduce: PrepData_Reduce(taskInfo, globalRank, totalRanks, false); break;
case ncclCollAllGather: PrepData_Gather(taskInfo, globalRank, totalRanks, true); break;
case ncclCollReduceScatter: PrepData_ReduceScatter(taskInfo, globalRank, totalRanks); break;
case ncclCollAllReduce: PrepData_Reduce(taskInfo, globalRank, totalRanks, true); break;
case ncclCollGather: PrepData_Gather(taskInfo, globalRank, totalRanks, false); break;
case ncclCollScatter: PrepData_Scatter(taskInfo, globalRank, totalRanks); break;
case ncclCollAllToAll: PrepData_AlltoAll(taskInfo, globalRank, totalRanks); break;
case ncclCollSend: PrepData_Send(taskInfo, globalRank); break;
case ncclCollRecv: PrepData_Recv(taskInfo, globalRank); break;
default:
printf("Error: unsupported collective\n");
exit(1);
}
}
void PrepData_Broadcast(TaskInfo& taskInfo, int globalRank) {
// Only root needs input pattern
if (globalRank == taskInfo.root)
FillPattern(taskInfo.inputGpu, taskInfo.datatype, taskInfo.count, taskInfo.root, true);
// Otherwise all other ranks expected output is the same as input of root
FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, taskInfo.root);
}
void PrepData_Reduce(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllReduce) {
size_t const numBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
// If average or custom reduction operator is used, perform a summation instead
ncclRedOp_t const tempOp = (taskInfo.op >= ncclAvg ? ncclSum : taskInfo.op);
for (int rank = 0; rank < totalRanks; ++rank) {
FillPattern(taskInfo.outputCpu, taskInfo.datatype, taskInfo.count, rank);
if (rank == globalRank)
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes, hipMemcpyHostToDevice));
if (isAllReduce || taskInfo.root == globalRank) {
if (rank == 0)
memcpy(taskInfo.expected.ptr, taskInfo.outputCpu.ptr, numBytes);
else
Reduce(taskInfo.expected, taskInfo.outputCpu, taskInfo.count, taskInfo.datatype, tempOp);
}
}
if (taskInfo.op == ncclAvg && (isAllReduce || taskInfo.root == globalRank))
DivideByInt(taskInfo.expected, taskInfo.datatype, taskInfo.count, totalRanks);
}
void PrepData_ReduceScatter(TaskInfo& taskInfo, int globalRank, int totalRanks) {
int const numInputElements = taskInfo.count * totalRanks;
int const numOutputElements = taskInfo.count;
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
PtrUnion tempInputCpu;
PtrUnion tempResultCpu;
AllocateMem(tempInputCpu, numBytes.first);
AllocateMem(tempResultCpu, numBytes.first);
// If average or custom reduction operator is used, perform a summation instead
ncclRedOp_t const tempOp = (taskInfo.op >= ncclAvg ? ncclSum : taskInfo.op);
for (int rank = 0; rank < totalRanks; ++rank) {
FillPattern(tempInputCpu, taskInfo.datatype, numInputElements, rank);
if (rank == globalRank)
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, tempInputCpu.ptr, numBytes.first, hipMemcpyHostToDevice));
if (rank == 0)
memcpy(tempResultCpu.ptr, tempInputCpu.ptr, numBytes.first);
else
Reduce(tempResultCpu, tempInputCpu, numInputElements, taskInfo.datatype, tempOp);
}
if (taskInfo.op == ncclAvg)
DivideByInt(tempResultCpu, taskInfo.datatype, numInputElements, totalRanks);
memcpy(taskInfo.expected.I1, tempResultCpu.I1 + globalRank * numBytes.second, numBytes.second);
FreeMem(tempInputCpu);
FreeMem(tempResultCpu);
}
void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllGather) {
int numInputElements = taskInfo.count;
int numOutputElements = totalRanks * taskInfo.count;
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
for (int rank = 0; rank < totalRanks; ++rank) {
FillPattern(taskInfo.outputCpu, taskInfo.datatype, numInputElements, rank);
if (rank == globalRank)
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes.first, hipMemcpyHostToDevice));
if (isAllGather || taskInfo.root == globalRank)
memcpy(taskInfo.expected.I1 + (rank * numBytes.first), taskInfo.outputCpu.ptr, numBytes.first);
}
}
void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks) {
int const numInputElements = taskInfo.count * totalRanks;
int const numOutputElements = taskInfo.count;
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
PtrUnion tempInput;
AllocateMem(tempInput, numBytes.first);
FillPattern(tempInput, taskInfo.datatype, numInputElements, taskInfo.root);
if (globalRank == taskInfo.root)
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, tempInput.ptr, numBytes.first, hipMemcpyHostToDevice));
memcpy(taskInfo.expected.U1, tempInput.U1 + globalRank * numBytes.second, numBytes.second);
FreeMem(tempInput);
}
void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks) {
int const numInputElements = taskInfo.count * totalRanks;
int const numOutputElements = numInputElements;
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
size_t const numBytesPerRank = numBytes.first / totalRanks;
for (int rank = 0; rank < totalRanks; ++rank) {
FillPattern(taskInfo.outputCpu, taskInfo.datatype, numInputElements, rank);
if (rank == globalRank)
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes.first, hipMemcpyHostToDevice));
memcpy(taskInfo.expected.U1 + numBytesPerRank * rank, taskInfo.outputCpu.U1 + numBytesPerRank * globalRank, numBytesPerRank);
}
}
void PrepData_Send(TaskInfo& taskInfo, int globalRank) {
FillPattern(taskInfo.inputGpu, taskInfo.datatype, taskInfo.count, globalRank, true);
}
void PrepData_Recv(TaskInfo& taskInfo, int globalRank) {
FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank);
}
@@ -3,6 +3,9 @@
#include <cstring>
#include <rccl/rccl.h>
#include <hip/hip_bfloat16.h>
#include "hip/hip_fp16.h"
#include "rccl_float8.h"
// NOTE: Parsing is based on this line logging collective information in enqueue.cc
// INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d \
@@ -99,6 +102,25 @@ char const mscclFuncNames[ncclNumFuncs][32] =
"mscclFuncRecv"
};
union PtrUnion
{
void* ptr;
int8_t* I1; // ncclInt8
uint8_t* U1; // ncclUint8
int32_t* I4; // ncclInt32
uint32_t* U4; // ncclUint32
int64_t* I8; // ncclInt64
uint64_t* U8; // ncclUint64
__half* F2; // ncclFloat16
rccl_float8* F1; // ncclFp8E4M3
float* F4; // ncclFloat32
double* F8; // ncclFloat64
rccl_bfloat8* B1; // ncclFp8E5M2
hip_bfloat16* B2; // ncclBfloat16
constexpr PtrUnion() : ptr(nullptr) {}
};
struct TaskInfo
{
ncclFunc_t funcType;
@@ -107,6 +129,10 @@ struct TaskInfo
ncclDataType_t datatype;
ncclRedOp_t op;
int root;
PtrUnion inputGpu;
PtrUnion outputCpu;
PtrUnion outputGpu;
PtrUnion expected;
};
struct RankData
@@ -137,28 +163,6 @@ struct CollectiveCalls
std::vector<std::vector<hipStream_t>> localRankStreams; // streams per local rank
};
size_t DataTypeToBytes(ncclDataType_t const dataType)
{
switch (dataType) {
case ncclInt8: return 1;
case ncclUint8: return 1;
case ncclInt32: return 4;
case ncclUint32: return 4;
case ncclInt64: return 8;
case ncclUint64: return 8;
case ncclFloat16: return 2;
case ncclFloat32: return 4;
case ncclFloat64: return 8;
case ncclBfloat16: return 2;
case ncclFp8E4M3: return 1;
case ncclFp8E5M2: return 1;
default:
printf("Unsupported datatype (%d)\n", dataType);
exit(0);
}
}
std::string DataTypeToName(ncclDataType_t const dataType)
{
switch (dataType) {
@@ -180,7 +184,28 @@ std::string DataTypeToName(ncclDataType_t const dataType)
}
}
std::string getRedOp(ncclRedOp_t const op)
size_t DataTypeToBytes(ncclDataType_t const dataType)
{
switch (dataType) {
case ncclInt8: return 1;
case ncclUint8: return 1;
case ncclInt32: return 4;
case ncclUint32: return 4;
case ncclInt64: return 8;
case ncclUint64: return 8;
case ncclFloat16: return 2;
case ncclFloat32: return 4;
case ncclFloat64: return 8;
case ncclBfloat16: return 2;
case ncclFp8E4M3: return 1;
case ncclFp8E5M2: return 1;
default:
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
exit(0);
}
}
std::string RedOpToName(ncclRedOp_t const op)
{
switch (op) {
case ncclSum: return "Sum";
@@ -204,6 +229,161 @@ ncclFunc_t GetFuncType(char* func)
exit(1);
}
// Set data for ptrUnion (Used during fillPattern)
void SetPtr(PtrUnion& ptrUnion, ncclDataType_t const dataType, int const idx, int valueI, double valueF) {
switch (dataType)
{
case ncclInt8: ptrUnion.I1[idx] = valueI; break;
case ncclUint8: ptrUnion.U1[idx] = valueI; break;
case ncclInt32: ptrUnion.I4[idx] = valueI; break;
case ncclUint32: ptrUnion.U4[idx] = valueI; break;
case ncclInt64: ptrUnion.I8[idx] = valueI; break;
case ncclUint64: ptrUnion.U8[idx] = valueI; break;
case ncclFp8E4M3: ptrUnion.F1[idx] = rccl_float8(valueF); break;
case ncclFloat16: ptrUnion.F2[idx] = __float2half(static_cast<float>(valueF)); break;
case ncclFloat32: ptrUnion.F4[idx] = valueF; break;
case ncclFloat64: ptrUnion.F8[idx] = valueF; break;
case ncclFp8E5M2: ptrUnion.B1[idx] = rccl_bfloat8(valueF); break;
case ncclBfloat16: ptrUnion.B2[idx] = hip_bfloat16(static_cast<float>(valueF)); break;
default:
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
exit(0);
}
}
// Check if each element in actual equals to expected
bool IsEqual(PtrUnion const& actual, PtrUnion const& expected, ncclDataType_t const dataType, size_t const numElements, int const globalRank) {
bool isMatch = true;
size_t idx = 0;
for (idx = 0; idx < numElements; ++idx)
{
switch (dataType)
{
case ncclInt8: isMatch = (actual.I1[idx] == expected.I1[idx]); break;
case ncclUint8: isMatch = (actual.U1[idx] == expected.U1[idx]); break;
case ncclInt32: isMatch = (actual.I4[idx] == expected.I4[idx]); break;
case ncclUint32: isMatch = (actual.U4[idx] == expected.U4[idx]); break;
case ncclInt64: isMatch = (actual.I8[idx] == expected.I8[idx]); break;
case ncclUint64: isMatch = (actual.U8[idx] == expected.U8[idx]); break;
case ncclFp8E4M3: isMatch = (fabs(float(actual.F1[idx]) - float(expected.F1[idx])) < 9e-2); break;
case ncclFloat16: isMatch = (fabs(__half2float(actual.F2[idx]) - __half2float(expected.F2[idx])) < 9e-2); break;
case ncclFloat32: isMatch = (fabs(actual.F4[idx] - expected.F4[idx]) < 1e-5); break;
case ncclFloat64: isMatch = (fabs(actual.F8[idx] - expected.F8[idx]) < 1e-12); break;
case ncclFp8E5M2: isMatch = (fabs(float(actual.B1[idx]) - float(expected.B1[idx])) < 9e-2); break;
case ncclBfloat16: isMatch = (fabs((float)actual.B2[idx] - (float)expected.B2[idx]) < 9e-2); break;
default:
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
isMatch = false;
}
if (!isMatch) {
switch (dataType)
{
case ncclInt8:
printf("[Error Rank = %d] Expected output: %d. Actual output: %d at index %lu\n", globalRank, expected.I1[idx], actual.I1[idx], idx); break;
case ncclUint8:
printf("[Error Rank = %d] Expected output: %u. Actual output: %u at index %lu\n", globalRank, expected.U1[idx], actual.U1[idx], idx); break;
case ncclInt32:
printf("[Error Rank = %d] Expected output: %d. Actual output: %d at index %lu\n", globalRank, expected.I4[idx], actual.I4[idx], idx); break;
case ncclUint32:
printf("[Error Rank = %d] Expected output: %u. Actual output: %u at index %lu\n", globalRank, expected.U4[idx], actual.U4[idx], idx); break;
case ncclInt64:
printf("[Error Rank = %d] Expected output: %ld. Actual output: %ld at index %lu\n", globalRank, expected.I8[idx], actual.I8[idx], idx); break;
case ncclUint64:
printf("[Error Rank = %d] Expected output: %lu. Actual output: %lu at index %lu\n", globalRank, expected.U8[idx], actual.U8[idx], idx); break;
case ncclFp8E4M3:
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.F1[idx], (float)actual.F1[idx], idx); break;
case ncclFloat16:
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, __half2float(expected.F2[idx]), __half2float(actual.F2[idx]), idx); break;
case ncclFloat32:
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, expected.F4[idx], actual.F4[idx], idx); break;
case ncclFloat64:
printf("[Error Rank = %d] Expected output: %lf. Actual output: %lf at index %lu\n", globalRank, expected.F8[idx], actual.F8[idx], idx); break;
case ncclFp8E5M2:
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.B1[idx], (float)actual.B1[idx], idx); break;
case ncclBfloat16:
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.B2[idx], (float)actual.B2[idx], idx); break;
default:
break;
}
return isMatch;
}
}
return isMatch;
}
// Performs the various basic reduction operations
template <typename T>
T ReduceOp(ncclRedOp_t const op, T const A, T const B)
{
switch (op)
{
case ncclSum: return A + B;
case ncclProd: return A * B;
case ncclMax: return std::max(A, B);
case ncclMin: return std::min(A, B);
default:
printf("Unsupported reduction operator (%s)\n", RedOpToName(op).c_str());
exit(0);
}
}
// Perform various reduction ops to ptrUnion
void Reduce(PtrUnion& ptrUnion, PtrUnion const& otherPtrUnion, size_t const numElements, ncclDataType_t const dataType, ncclRedOp_t const op) {
for (size_t idx = 0; idx < numElements; ++idx)
{
switch (dataType)
{
case ncclInt8: ptrUnion.I1[idx] = ReduceOp(op, ptrUnion.I1[idx], otherPtrUnion.I1[idx]); break;
case ncclUint8: ptrUnion.U1[idx] = ReduceOp(op, ptrUnion.U1[idx], otherPtrUnion.U1[idx]); break;
case ncclInt32: ptrUnion.I4[idx] = ReduceOp(op, ptrUnion.I4[idx], otherPtrUnion.I4[idx]); break;
case ncclUint32: ptrUnion.U4[idx] = ReduceOp(op, ptrUnion.U4[idx], otherPtrUnion.U4[idx]); break;
case ncclInt64: ptrUnion.I8[idx] = ReduceOp(op, ptrUnion.I8[idx], otherPtrUnion.I8[idx]); break;
case ncclUint64: ptrUnion.U8[idx] = ReduceOp(op, ptrUnion.U8[idx], otherPtrUnion.U8[idx]); break;
case ncclFp8E4M3: ptrUnion.F1[idx] = rccl_float8(ReduceOp(op, float(ptrUnion.F1[idx]), float(otherPtrUnion.F1[idx]))); break;
case ncclFloat16: ptrUnion.F2[idx] = __float2half(ReduceOp(op, __half2float(ptrUnion.F2[idx]), __half2float(otherPtrUnion.F2[idx]))); break;
case ncclFloat32: ptrUnion.F4[idx] = ReduceOp(op, ptrUnion.F4[idx], otherPtrUnion.F4[idx]); break;
case ncclFloat64: ptrUnion.F8[idx] = ReduceOp(op, ptrUnion.F8[idx], otherPtrUnion.F8[idx]); break;
case ncclFp8E5M2: ptrUnion.B1[idx] = rccl_bfloat8(ReduceOp(op, float(ptrUnion.B1[idx]), float(otherPtrUnion.B1[idx]))); break;
case ncclBfloat16: ptrUnion.B2[idx] = ReduceOp(op, ptrUnion.B2[idx], otherPtrUnion.B2[idx]); break;
default:
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
exit(0);
}
}
}
// Divide each element in ptrUnion by divisor
void DivideByInt(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int const divisor) {
for (size_t idx = 0; idx < numElements; ++idx)
{
switch (dataType)
{
case ncclInt8: ptrUnion.I1[idx] /= divisor; break;
case ncclUint8: ptrUnion.U1[idx] /= divisor; break;
case ncclInt32: ptrUnion.I4[idx] /= divisor; break;
case ncclUint32: ptrUnion.U4[idx] /= divisor; break;
case ncclInt64: ptrUnion.I8[idx] /= divisor; break;
case ncclUint64: ptrUnion.U8[idx] /= divisor; break;
case ncclFp8E4M3: ptrUnion.F1[idx] = (rccl_float8((float)(ptrUnion.F1[idx]) / divisor)); break;
case ncclFloat16: ptrUnion.F2[idx] = __float2half(__half2float(ptrUnion.F2[idx])/divisor); break;
case ncclFloat32: ptrUnion.F4[idx] /= divisor; break;
case ncclFloat64: ptrUnion.F8[idx] /= divisor; break;
case ncclFp8E5M2: ptrUnion.B1[idx] = (rccl_bfloat8((float)(ptrUnion.B1[idx]) / divisor)); break;
case ncclBfloat16: ptrUnion.B2[idx] = (hip_bfloat16((float)(ptrUnion.B2[idx]) / divisor)); break;
default:
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
exit(0);
}
}
}
// Check if a collective uses a root
bool IsRootUsed(ncclFunc_t funcType) {
return (funcType == ncclCollBroadcast || funcType == ncclCollReduce ||
funcType == ncclCollGather || funcType == ncclCollScatter);
}
// parse the logs and assign them into lineItem
bool ParseLineItem(char const* line, LineItem& li);
@@ -213,7 +393,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
// allocates send/recv buff, sets the device based on which rank the task belongs to,
// syncronize devices after executing all the tasks and free device memory.
double ReplayRccl(CollectiveCalls const& collCall, int groupIdx);
double ReplayRccl(CollectiveCalls& collCall, int groupIdx, int& numInvalid);
// Print information about a group call
void PrintGroupCall(GroupCall const& gc);
@@ -226,4 +406,24 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime);
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks);
// executes the collective call (task)
void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff);
void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t stream);
// Allocate CPU/GPU memory for ptrUnion
void AllocateMem(PtrUnion& ptrUnion, size_t const numBytes, bool isGpu = false);
// Free CPU/GPU memory for ptrUnion
void FreeMem(PtrUnion& ptrUnion, bool isGpu = false);
// Fill buffers based on pattern using globalRank
void FillPattern(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int globalRank, bool isGpu = false);
// PrepareData functions are responsible for setting up input / expected for the given taskInfo
void PrepareDataFunc(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_Broadcast(TaskInfo& taskInfo, int globalRank);
void PrepData_Reduce(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllReduce);
void PrepData_ReduceScatter(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllGather);
void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_Send(TaskInfo& taskInfo, int globalRank);
void PrepData_Recv(TaskInfo& taskInfo, int globalRank);