@@ -4,7 +4,7 @@ MPI_DIR ?= /opt/ompi
|
||||
MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi
|
||||
MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu
|
||||
|
||||
INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include
|
||||
INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include
|
||||
LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl
|
||||
|
||||
main: rcclReplayer.cpp
|
||||
|
||||
+299
-37
@@ -81,6 +81,7 @@ int main(int argc, char **argv)
|
||||
printf("Rank %d Done setting up communicators\n", mpiRank);
|
||||
|
||||
int numSkippedCalls = 0;
|
||||
int numInvalid = 0;
|
||||
double runTime;
|
||||
std::ofstream datafile;
|
||||
datafile.open("replayer_data.csv");
|
||||
@@ -98,7 +99,7 @@ int main(int argc, char **argv)
|
||||
printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size());
|
||||
PrintGroupCall(collCalls.groupCalls[i]);
|
||||
}
|
||||
double runTime = ReplayRccl(collCalls, i);
|
||||
double runTime = ReplayRccl(collCalls, i, numInvalid);
|
||||
if (mpiRank == 0) {
|
||||
dataToCsv(collCalls.groupCalls[i], datafile, runTime);
|
||||
}
|
||||
@@ -132,6 +133,9 @@ int main(int argc, char **argv)
|
||||
if (mpiRank == 0) printf("Executed group calls: %zu\n", collCalls.groupCalls.size() - numSkippedCalls);
|
||||
if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls);
|
||||
|
||||
// Data validation failures during group calls
|
||||
if (mpiRank == 0) printf("Failed group calls: %d\n", numInvalid);
|
||||
|
||||
// Time it takes to execute all the group calls
|
||||
if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count());
|
||||
printf("MPI Rank %d Success\n", mpiRank);
|
||||
@@ -170,7 +174,7 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime)
|
||||
else if (funcName == "ReduceScatter" || funcName == "AllGather") busBw *= ((n-1)/n);
|
||||
busBw /= (1e9); //in gb/s
|
||||
std::string dataTypeName = DataTypeToName(ti.datatype);
|
||||
std::string redOp = getRedOp(ti.op);
|
||||
std::string redOp = RedOpToName(ti.op);
|
||||
datafile << gc.opCount << ", " << funcName.c_str() << ", " << ti.inPlace << ", " << ti.count << ", " << dataTypeName << ", " << redOp << ", " << ti.root << ", " << runTime << ", " << busBw << "\n";
|
||||
}
|
||||
|
||||
@@ -323,6 +327,52 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// Detect and replace scatter patterns
|
||||
for (auto& gc : cc.groupCalls) {
|
||||
if (!gc.isValid) continue;
|
||||
int scatterRoot = -1;
|
||||
bool isScatter = true;
|
||||
for (auto& [rank, rankData] : gc.rankData) {
|
||||
int sendCount = 0, recvCount = 0;
|
||||
for (const auto& task : rankData.tasks) {
|
||||
if (task.funcType == ncclCollSend)
|
||||
sendCount++;
|
||||
else if (task.funcType == ncclCollRecv)
|
||||
recvCount++;
|
||||
}
|
||||
if (sendCount == cc.numGlobalRanks && recvCount == 1) {
|
||||
if (scatterRoot == -1) {
|
||||
// Root is the first rank that matches the condition
|
||||
scatterRoot = rank;
|
||||
} else {
|
||||
isScatter = false;
|
||||
break;
|
||||
}
|
||||
} else if (recvCount != 1 || sendCount != 0) {
|
||||
// Non-root ranks must only recv and not send
|
||||
isScatter = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Replace send/recv calls with scatter call for the group call
|
||||
if (isScatter) {
|
||||
TaskInfo scatterTask;
|
||||
scatterTask.funcType = ncclCollScatter;
|
||||
scatterTask.count = gc.rankData[scatterRoot].tasks[0].count;
|
||||
scatterTask.datatype = gc.rankData[scatterRoot].tasks[0].datatype;
|
||||
scatterTask.root = scatterRoot;
|
||||
|
||||
for (auto& [rank, rankData] : gc.rankData) {
|
||||
rankData.tasks.clear();
|
||||
rankData.tasks.push_back(scatterTask);
|
||||
}
|
||||
|
||||
if (isFirstRank)
|
||||
printf("[INFO] Scatter pattern detected and replaced with scatter collective\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ParseLineItem(char const* line, LineItem& li)
|
||||
@@ -337,26 +387,20 @@ bool ParseLineItem(char const* line, LineItem& li)
|
||||
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
|
||||
}
|
||||
|
||||
double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
|
||||
double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid)
|
||||
{
|
||||
int numLocalRanks = cc.localRankComms.size();
|
||||
|
||||
// Allocate memory for collective
|
||||
std::vector<std::vector<void*>> sendbuff(numLocalRanks);
|
||||
std::vector<std::vector<void*>> recvbuff(numLocalRanks);
|
||||
|
||||
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
|
||||
HIP_CALL(hipSetDevice(cc.localGpuOffset + localIdx));
|
||||
|
||||
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
sendbuff[localIdx].resize(numTasks);
|
||||
recvbuff[localIdx].resize(numTasks);
|
||||
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo const& task = rankData.tasks[taskId];
|
||||
TaskInfo& task = rankData.tasks[taskId];
|
||||
|
||||
// Each task has a size based on the type of collective (funcType)
|
||||
std::pair<size_t, size_t> numBytes = GetSize(task, cc.numGlobalRanks);
|
||||
@@ -366,17 +410,20 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
|
||||
numBytes.second = numBytes.first;
|
||||
}
|
||||
|
||||
// Set the device and allocate send/recv buffers
|
||||
HIP_CALL(hipMalloc(&sendbuff[localIdx][taskId], numBytes.first));
|
||||
HIP_CALL(hipMemset(sendbuff[localIdx][taskId], 0, numBytes.first));
|
||||
// Allocate memory
|
||||
AllocateMem(task.inputGpu, numBytes.first, true);
|
||||
AllocateMem(task.outputCpu, numBytes.second);
|
||||
AllocateMem(task.expected, numBytes.second);
|
||||
|
||||
if (!task.inPlace) {
|
||||
HIP_CALL(hipMalloc(&recvbuff[localIdx][taskId], numBytes.second));
|
||||
HIP_CALL(hipMemset(recvbuff[localIdx][taskId], 0, numBytes.second));
|
||||
AllocateMem(task.outputGpu, numBytes.second, true);
|
||||
} else {
|
||||
recvbuff[localIdx][taskId] = sendbuff[localIdx][taskId];
|
||||
task.outputGpu = task.inputGpu;
|
||||
}
|
||||
|
||||
// Prepare input/output for each task based on collective type
|
||||
PrepareDataFunc(task, globalRank, cc.numGlobalRanks);
|
||||
|
||||
HIP_CALL(hipDeviceSynchronize());
|
||||
}
|
||||
}
|
||||
@@ -388,14 +435,12 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
|
||||
|
||||
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
int commIdx = rankData.commIdx;
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo const& task = rankData.tasks[taskId];
|
||||
ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx],
|
||||
sendbuff[localIdx][taskId],
|
||||
recvbuff[localIdx][taskId]);
|
||||
TaskInfo& task = rankData.tasks[taskId];
|
||||
ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx]);
|
||||
}
|
||||
}
|
||||
NCCL_CALL(ncclGroupEnd());
|
||||
@@ -415,14 +460,46 @@ double ReplayRccl(CollectiveCalls const& cc, int groupIdx)
|
||||
double runTime = duration.count();
|
||||
runTime *= 1000; //convering into milliseconds
|
||||
|
||||
// Data validation
|
||||
bool isValid = true;
|
||||
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo const& task = rankData.tasks[taskId];
|
||||
HIP_CALL(hipFree(sendbuff[localIdx][taskId]));
|
||||
if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId]));
|
||||
TaskInfo& task = rankData.tasks[taskId];
|
||||
|
||||
// Only need Recv to validate
|
||||
if (task.funcType == ncclCollSend) break;
|
||||
// Ignore non-root ranks
|
||||
if (IsRootUsed(task.funcType) && task.root != globalRank) break;
|
||||
|
||||
std::pair<size_t, size_t> numBytes = GetSize(task, cc.numGlobalRanks);
|
||||
if (task.inPlace) {
|
||||
numBytes.first = std::max(numBytes.first, numBytes.second);
|
||||
numBytes.second = numBytes.first;
|
||||
}
|
||||
HIP_CALL(hipMemcpy(task.outputCpu.ptr, task.outputGpu.ptr, numBytes.second, hipMemcpyDeviceToHost));
|
||||
if (!IsEqual(task.outputCpu, task.expected, task.datatype, task.count, globalRank)) {
|
||||
isValid = false;
|
||||
break; // Check other ranks
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValid) numInvalid++;
|
||||
|
||||
// Free memory
|
||||
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
RankData& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo& task = rankData.tasks[taskId];
|
||||
FreeMem(task.inputGpu, true);
|
||||
if (!task.inPlace) FreeMem(task.outputGpu, true);
|
||||
FreeMem(task.outputCpu);
|
||||
FreeMem(task.expected);
|
||||
}
|
||||
}
|
||||
return runTime;
|
||||
@@ -456,41 +533,226 @@ std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks) {
|
||||
return std::make_pair(sendNumBytes, recvNumBytes);
|
||||
}
|
||||
|
||||
void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff)
|
||||
void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t stream)
|
||||
{
|
||||
switch (task.funcType) {
|
||||
case ncclCollAllGather:
|
||||
NCCL_CALL(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
|
||||
NCCL_CALL(ncclAllGather(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, comm, stream));
|
||||
break;
|
||||
case ncclCollAllReduce:
|
||||
NCCL_CALL(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
|
||||
NCCL_CALL(ncclAllReduce(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, comm, stream));
|
||||
break;
|
||||
case ncclCollBroadcast:
|
||||
NCCL_CALL(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
NCCL_CALL(ncclBroadcast(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollReduce:
|
||||
NCCL_CALL(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream));
|
||||
NCCL_CALL(ncclReduce(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollReduceScatter:
|
||||
NCCL_CALL(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
|
||||
NCCL_CALL(ncclReduceScatter(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.op, comm, stream));
|
||||
break;
|
||||
case ncclCollGather:
|
||||
NCCL_CALL(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
NCCL_CALL(ncclGather(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollScatter:
|
||||
NCCL_CALL(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
NCCL_CALL(ncclScatter(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollAllToAll:
|
||||
NCCL_CALL(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
|
||||
NCCL_CALL(ncclAllToAll(task.inputGpu.ptr, task.outputGpu.ptr, task.count, task.datatype, comm, stream));
|
||||
break;
|
||||
case ncclCollSend:
|
||||
NCCL_CALL(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
NCCL_CALL(ncclSend(task.inputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollRecv:
|
||||
NCCL_CALL(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
default:
|
||||
printf("Error: unsupported collective\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
void AllocateMem(PtrUnion& ptrUnion, size_t const numBytes, bool isGpu) {
|
||||
if (numBytes) {
|
||||
if (isGpu) {
|
||||
HIP_CALL(hipMalloc(&ptrUnion.ptr, numBytes));
|
||||
HIP_CALL(hipMemset(ptrUnion.ptr, 0, numBytes));
|
||||
HIP_CALL(hipStreamSynchronize(NULL));
|
||||
} else {
|
||||
ptrUnion.ptr = calloc(numBytes, 1);
|
||||
memset(ptrUnion.ptr, 0, numBytes);
|
||||
if (!ptrUnion.ptr) {
|
||||
printf("Unable to allocate memory (%lu bytes)\n", numBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FreeMem(PtrUnion& ptrUnion, bool isGpu) {
|
||||
if (ptrUnion.ptr != nullptr) {
|
||||
if (isGpu)
|
||||
HIP_CALL(hipFree(ptrUnion.ptr));
|
||||
else
|
||||
free(ptrUnion.ptr);
|
||||
ptrUnion.ptr = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void FillPattern(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int globalRank, bool isGpu) {
|
||||
PtrUnion temp;
|
||||
size_t const numBytes = numElements * DataTypeToBytes(dataType);
|
||||
|
||||
if (isGpu)
|
||||
AllocateMem(temp, numBytes);
|
||||
else
|
||||
temp.ptr = ptrUnion.ptr;
|
||||
|
||||
for (int i = 0; i < numElements; i++) {
|
||||
int valueI = (globalRank + i) % 256;
|
||||
double valueF = 1.0L/((double)valueI+1.0L);
|
||||
SetPtr(temp, dataType, i, valueI, valueF);
|
||||
}
|
||||
|
||||
if (isGpu) {
|
||||
HIP_CALL(hipMemcpy(ptrUnion.ptr, temp.ptr, numBytes, hipMemcpyHostToDevice));
|
||||
FreeMem(temp);
|
||||
}
|
||||
}
|
||||
|
||||
void PrepareDataFunc(TaskInfo& taskInfo, int globalRank, int totalRanks)
|
||||
{
|
||||
switch (taskInfo.funcType)
|
||||
{
|
||||
case ncclCollBroadcast: PrepData_Broadcast(taskInfo, globalRank); break;
|
||||
case ncclCollReduce: PrepData_Reduce(taskInfo, globalRank, totalRanks, false); break;
|
||||
case ncclCollAllGather: PrepData_Gather(taskInfo, globalRank, totalRanks, true); break;
|
||||
case ncclCollReduceScatter: PrepData_ReduceScatter(taskInfo, globalRank, totalRanks); break;
|
||||
case ncclCollAllReduce: PrepData_Reduce(taskInfo, globalRank, totalRanks, true); break;
|
||||
case ncclCollGather: PrepData_Gather(taskInfo, globalRank, totalRanks, false); break;
|
||||
case ncclCollScatter: PrepData_Scatter(taskInfo, globalRank, totalRanks); break;
|
||||
case ncclCollAllToAll: PrepData_AlltoAll(taskInfo, globalRank, totalRanks); break;
|
||||
case ncclCollSend: PrepData_Send(taskInfo, globalRank); break;
|
||||
case ncclCollRecv: PrepData_Recv(taskInfo, globalRank); break;
|
||||
default:
|
||||
printf("Error: unsupported collective\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
void PrepData_Broadcast(TaskInfo& taskInfo, int globalRank) {
|
||||
// Only root needs input pattern
|
||||
if (globalRank == taskInfo.root)
|
||||
FillPattern(taskInfo.inputGpu, taskInfo.datatype, taskInfo.count, taskInfo.root, true);
|
||||
|
||||
// Otherwise all other ranks expected output is the same as input of root
|
||||
FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, taskInfo.root);
|
||||
}
|
||||
|
||||
void PrepData_Reduce(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllReduce) {
|
||||
size_t const numBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
|
||||
// If average or custom reduction operator is used, perform a summation instead
|
||||
ncclRedOp_t const tempOp = (taskInfo.op >= ncclAvg ? ncclSum : taskInfo.op);
|
||||
|
||||
for (int rank = 0; rank < totalRanks; ++rank) {
|
||||
FillPattern(taskInfo.outputCpu, taskInfo.datatype, taskInfo.count, rank);
|
||||
if (rank == globalRank)
|
||||
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes, hipMemcpyHostToDevice));
|
||||
if (isAllReduce || taskInfo.root == globalRank) {
|
||||
if (rank == 0)
|
||||
memcpy(taskInfo.expected.ptr, taskInfo.outputCpu.ptr, numBytes);
|
||||
else
|
||||
Reduce(taskInfo.expected, taskInfo.outputCpu, taskInfo.count, taskInfo.datatype, tempOp);
|
||||
}
|
||||
}
|
||||
|
||||
if (taskInfo.op == ncclAvg && (isAllReduce || taskInfo.root == globalRank))
|
||||
DivideByInt(taskInfo.expected, taskInfo.datatype, taskInfo.count, totalRanks);
|
||||
}
|
||||
|
||||
void PrepData_ReduceScatter(TaskInfo& taskInfo, int globalRank, int totalRanks) {
|
||||
int const numInputElements = taskInfo.count * totalRanks;
|
||||
int const numOutputElements = taskInfo.count;
|
||||
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
|
||||
|
||||
PtrUnion tempInputCpu;
|
||||
PtrUnion tempResultCpu;
|
||||
AllocateMem(tempInputCpu, numBytes.first);
|
||||
AllocateMem(tempResultCpu, numBytes.first);
|
||||
|
||||
// If average or custom reduction operator is used, perform a summation instead
|
||||
ncclRedOp_t const tempOp = (taskInfo.op >= ncclAvg ? ncclSum : taskInfo.op);
|
||||
|
||||
for (int rank = 0; rank < totalRanks; ++rank) {
|
||||
FillPattern(tempInputCpu, taskInfo.datatype, numInputElements, rank);
|
||||
if (rank == globalRank)
|
||||
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, tempInputCpu.ptr, numBytes.first, hipMemcpyHostToDevice));
|
||||
if (rank == 0)
|
||||
memcpy(tempResultCpu.ptr, tempInputCpu.ptr, numBytes.first);
|
||||
else
|
||||
Reduce(tempResultCpu, tempInputCpu, numInputElements, taskInfo.datatype, tempOp);
|
||||
}
|
||||
|
||||
if (taskInfo.op == ncclAvg)
|
||||
DivideByInt(tempResultCpu, taskInfo.datatype, numInputElements, totalRanks);
|
||||
|
||||
memcpy(taskInfo.expected.I1, tempResultCpu.I1 + globalRank * numBytes.second, numBytes.second);
|
||||
FreeMem(tempInputCpu);
|
||||
FreeMem(tempResultCpu);
|
||||
}
|
||||
|
||||
void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllGather) {
|
||||
int numInputElements = taskInfo.count;
|
||||
int numOutputElements = totalRanks * taskInfo.count;
|
||||
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
|
||||
|
||||
for (int rank = 0; rank < totalRanks; ++rank) {
|
||||
FillPattern(taskInfo.outputCpu, taskInfo.datatype, numInputElements, rank);
|
||||
if (rank == globalRank)
|
||||
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes.first, hipMemcpyHostToDevice));
|
||||
if (isAllGather || taskInfo.root == globalRank)
|
||||
memcpy(taskInfo.expected.I1 + (rank * numBytes.first), taskInfo.outputCpu.ptr, numBytes.first);
|
||||
}
|
||||
}
|
||||
|
||||
void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks) {
|
||||
int const numInputElements = taskInfo.count * totalRanks;
|
||||
int const numOutputElements = taskInfo.count;
|
||||
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
|
||||
|
||||
PtrUnion tempInput;
|
||||
AllocateMem(tempInput, numBytes.first);
|
||||
|
||||
FillPattern(tempInput, taskInfo.datatype, numInputElements, taskInfo.root);
|
||||
|
||||
if (globalRank == taskInfo.root)
|
||||
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, tempInput.ptr, numBytes.first, hipMemcpyHostToDevice));
|
||||
|
||||
memcpy(taskInfo.expected.U1, tempInput.U1 + globalRank * numBytes.second, numBytes.second);
|
||||
|
||||
FreeMem(tempInput);
|
||||
}
|
||||
|
||||
void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks) {
|
||||
int const numInputElements = taskInfo.count * totalRanks;
|
||||
int const numOutputElements = numInputElements;
|
||||
std::pair<size_t, size_t> numBytes = GetSize(taskInfo, totalRanks);
|
||||
size_t const numBytesPerRank = numBytes.first / totalRanks;
|
||||
|
||||
for (int rank = 0; rank < totalRanks; ++rank) {
|
||||
FillPattern(taskInfo.outputCpu, taskInfo.datatype, numInputElements, rank);
|
||||
|
||||
if (rank == globalRank)
|
||||
HIP_CALL(hipMemcpy(taskInfo.inputGpu.ptr, taskInfo.outputCpu.ptr, numBytes.first, hipMemcpyHostToDevice));
|
||||
|
||||
memcpy(taskInfo.expected.U1 + numBytesPerRank * rank, taskInfo.outputCpu.U1 + numBytesPerRank * globalRank, numBytesPerRank);
|
||||
}
|
||||
}
|
||||
|
||||
void PrepData_Send(TaskInfo& taskInfo, int globalRank) {
|
||||
FillPattern(taskInfo.inputGpu, taskInfo.datatype, taskInfo.count, globalRank, true);
|
||||
}
|
||||
|
||||
void PrepData_Recv(TaskInfo& taskInfo, int globalRank) {
|
||||
FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank);
|
||||
}
|
||||
+225
-25
@@ -3,6 +3,9 @@
|
||||
#include <cstring>
|
||||
|
||||
#include <rccl/rccl.h>
|
||||
#include <hip/hip_bfloat16.h>
|
||||
#include "hip/hip_fp16.h"
|
||||
#include "rccl_float8.h"
|
||||
|
||||
// NOTE: Parsing is based on this line logging collective information in enqueue.cc
|
||||
// INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d \
|
||||
@@ -99,6 +102,25 @@ char const mscclFuncNames[ncclNumFuncs][32] =
|
||||
"mscclFuncRecv"
|
||||
};
|
||||
|
||||
union PtrUnion
|
||||
{
|
||||
void* ptr;
|
||||
int8_t* I1; // ncclInt8
|
||||
uint8_t* U1; // ncclUint8
|
||||
int32_t* I4; // ncclInt32
|
||||
uint32_t* U4; // ncclUint32
|
||||
int64_t* I8; // ncclInt64
|
||||
uint64_t* U8; // ncclUint64
|
||||
__half* F2; // ncclFloat16
|
||||
rccl_float8* F1; // ncclFp8E4M3
|
||||
float* F4; // ncclFloat32
|
||||
double* F8; // ncclFloat64
|
||||
rccl_bfloat8* B1; // ncclFp8E5M2
|
||||
hip_bfloat16* B2; // ncclBfloat16
|
||||
|
||||
constexpr PtrUnion() : ptr(nullptr) {}
|
||||
};
|
||||
|
||||
struct TaskInfo
|
||||
{
|
||||
ncclFunc_t funcType;
|
||||
@@ -107,6 +129,10 @@ struct TaskInfo
|
||||
ncclDataType_t datatype;
|
||||
ncclRedOp_t op;
|
||||
int root;
|
||||
PtrUnion inputGpu;
|
||||
PtrUnion outputCpu;
|
||||
PtrUnion outputGpu;
|
||||
PtrUnion expected;
|
||||
};
|
||||
|
||||
struct RankData
|
||||
@@ -137,28 +163,6 @@ struct CollectiveCalls
|
||||
std::vector<std::vector<hipStream_t>> localRankStreams; // streams per local rank
|
||||
};
|
||||
|
||||
|
||||
size_t DataTypeToBytes(ncclDataType_t const dataType)
|
||||
{
|
||||
switch (dataType) {
|
||||
case ncclInt8: return 1;
|
||||
case ncclUint8: return 1;
|
||||
case ncclInt32: return 4;
|
||||
case ncclUint32: return 4;
|
||||
case ncclInt64: return 8;
|
||||
case ncclUint64: return 8;
|
||||
case ncclFloat16: return 2;
|
||||
case ncclFloat32: return 4;
|
||||
case ncclFloat64: return 8;
|
||||
case ncclBfloat16: return 2;
|
||||
case ncclFp8E4M3: return 1;
|
||||
case ncclFp8E5M2: return 1;
|
||||
default:
|
||||
printf("Unsupported datatype (%d)\n", dataType);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
std::string DataTypeToName(ncclDataType_t const dataType)
|
||||
{
|
||||
switch (dataType) {
|
||||
@@ -180,7 +184,28 @@ std::string DataTypeToName(ncclDataType_t const dataType)
|
||||
}
|
||||
}
|
||||
|
||||
std::string getRedOp(ncclRedOp_t const op)
|
||||
size_t DataTypeToBytes(ncclDataType_t const dataType)
|
||||
{
|
||||
switch (dataType) {
|
||||
case ncclInt8: return 1;
|
||||
case ncclUint8: return 1;
|
||||
case ncclInt32: return 4;
|
||||
case ncclUint32: return 4;
|
||||
case ncclInt64: return 8;
|
||||
case ncclUint64: return 8;
|
||||
case ncclFloat16: return 2;
|
||||
case ncclFloat32: return 4;
|
||||
case ncclFloat64: return 8;
|
||||
case ncclBfloat16: return 2;
|
||||
case ncclFp8E4M3: return 1;
|
||||
case ncclFp8E5M2: return 1;
|
||||
default:
|
||||
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
std::string RedOpToName(ncclRedOp_t const op)
|
||||
{
|
||||
switch (op) {
|
||||
case ncclSum: return "Sum";
|
||||
@@ -204,6 +229,161 @@ ncclFunc_t GetFuncType(char* func)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Set data for ptrUnion (Used during fillPattern)
|
||||
void SetPtr(PtrUnion& ptrUnion, ncclDataType_t const dataType, int const idx, int valueI, double valueF) {
|
||||
switch (dataType)
|
||||
{
|
||||
case ncclInt8: ptrUnion.I1[idx] = valueI; break;
|
||||
case ncclUint8: ptrUnion.U1[idx] = valueI; break;
|
||||
case ncclInt32: ptrUnion.I4[idx] = valueI; break;
|
||||
case ncclUint32: ptrUnion.U4[idx] = valueI; break;
|
||||
case ncclInt64: ptrUnion.I8[idx] = valueI; break;
|
||||
case ncclUint64: ptrUnion.U8[idx] = valueI; break;
|
||||
case ncclFp8E4M3: ptrUnion.F1[idx] = rccl_float8(valueF); break;
|
||||
case ncclFloat16: ptrUnion.F2[idx] = __float2half(static_cast<float>(valueF)); break;
|
||||
case ncclFloat32: ptrUnion.F4[idx] = valueF; break;
|
||||
case ncclFloat64: ptrUnion.F8[idx] = valueF; break;
|
||||
case ncclFp8E5M2: ptrUnion.B1[idx] = rccl_bfloat8(valueF); break;
|
||||
case ncclBfloat16: ptrUnion.B2[idx] = hip_bfloat16(static_cast<float>(valueF)); break;
|
||||
default:
|
||||
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if each element in actual equals to expected
|
||||
bool IsEqual(PtrUnion const& actual, PtrUnion const& expected, ncclDataType_t const dataType, size_t const numElements, int const globalRank) {
|
||||
bool isMatch = true;
|
||||
size_t idx = 0;
|
||||
for (idx = 0; idx < numElements; ++idx)
|
||||
{
|
||||
switch (dataType)
|
||||
{
|
||||
case ncclInt8: isMatch = (actual.I1[idx] == expected.I1[idx]); break;
|
||||
case ncclUint8: isMatch = (actual.U1[idx] == expected.U1[idx]); break;
|
||||
case ncclInt32: isMatch = (actual.I4[idx] == expected.I4[idx]); break;
|
||||
case ncclUint32: isMatch = (actual.U4[idx] == expected.U4[idx]); break;
|
||||
case ncclInt64: isMatch = (actual.I8[idx] == expected.I8[idx]); break;
|
||||
case ncclUint64: isMatch = (actual.U8[idx] == expected.U8[idx]); break;
|
||||
case ncclFp8E4M3: isMatch = (fabs(float(actual.F1[idx]) - float(expected.F1[idx])) < 9e-2); break;
|
||||
case ncclFloat16: isMatch = (fabs(__half2float(actual.F2[idx]) - __half2float(expected.F2[idx])) < 9e-2); break;
|
||||
case ncclFloat32: isMatch = (fabs(actual.F4[idx] - expected.F4[idx]) < 1e-5); break;
|
||||
case ncclFloat64: isMatch = (fabs(actual.F8[idx] - expected.F8[idx]) < 1e-12); break;
|
||||
case ncclFp8E5M2: isMatch = (fabs(float(actual.B1[idx]) - float(expected.B1[idx])) < 9e-2); break;
|
||||
case ncclBfloat16: isMatch = (fabs((float)actual.B2[idx] - (float)expected.B2[idx]) < 9e-2); break;
|
||||
default:
|
||||
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
|
||||
isMatch = false;
|
||||
}
|
||||
if (!isMatch) {
|
||||
switch (dataType)
|
||||
{
|
||||
case ncclInt8:
|
||||
printf("[Error Rank = %d] Expected output: %d. Actual output: %d at index %lu\n", globalRank, expected.I1[idx], actual.I1[idx], idx); break;
|
||||
case ncclUint8:
|
||||
printf("[Error Rank = %d] Expected output: %u. Actual output: %u at index %lu\n", globalRank, expected.U1[idx], actual.U1[idx], idx); break;
|
||||
case ncclInt32:
|
||||
printf("[Error Rank = %d] Expected output: %d. Actual output: %d at index %lu\n", globalRank, expected.I4[idx], actual.I4[idx], idx); break;
|
||||
case ncclUint32:
|
||||
printf("[Error Rank = %d] Expected output: %u. Actual output: %u at index %lu\n", globalRank, expected.U4[idx], actual.U4[idx], idx); break;
|
||||
case ncclInt64:
|
||||
printf("[Error Rank = %d] Expected output: %ld. Actual output: %ld at index %lu\n", globalRank, expected.I8[idx], actual.I8[idx], idx); break;
|
||||
case ncclUint64:
|
||||
printf("[Error Rank = %d] Expected output: %lu. Actual output: %lu at index %lu\n", globalRank, expected.U8[idx], actual.U8[idx], idx); break;
|
||||
case ncclFp8E4M3:
|
||||
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.F1[idx], (float)actual.F1[idx], idx); break;
|
||||
case ncclFloat16:
|
||||
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, __half2float(expected.F2[idx]), __half2float(actual.F2[idx]), idx); break;
|
||||
case ncclFloat32:
|
||||
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, expected.F4[idx], actual.F4[idx], idx); break;
|
||||
case ncclFloat64:
|
||||
printf("[Error Rank = %d] Expected output: %lf. Actual output: %lf at index %lu\n", globalRank, expected.F8[idx], actual.F8[idx], idx); break;
|
||||
case ncclFp8E5M2:
|
||||
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.B1[idx], (float)actual.B1[idx], idx); break;
|
||||
case ncclBfloat16:
|
||||
printf("[Error Rank = %d] Expected output: %f. Actual output: %f at index %lu\n", globalRank, (float)expected.B2[idx], (float)actual.B2[idx], idx); break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return isMatch;
|
||||
}
|
||||
}
|
||||
|
||||
return isMatch;
|
||||
}
|
||||
|
||||
// Performs the various basic reduction operations
|
||||
template <typename T>
|
||||
T ReduceOp(ncclRedOp_t const op, T const A, T const B)
|
||||
{
|
||||
switch (op)
|
||||
{
|
||||
case ncclSum: return A + B;
|
||||
case ncclProd: return A * B;
|
||||
case ncclMax: return std::max(A, B);
|
||||
case ncclMin: return std::min(A, B);
|
||||
default:
|
||||
printf("Unsupported reduction operator (%s)\n", RedOpToName(op).c_str());
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
// Perform various reduction ops to ptrUnion
|
||||
void Reduce(PtrUnion& ptrUnion, PtrUnion const& otherPtrUnion, size_t const numElements, ncclDataType_t const dataType, ncclRedOp_t const op) {
|
||||
for (size_t idx = 0; idx < numElements; ++idx)
|
||||
{
|
||||
switch (dataType)
|
||||
{
|
||||
case ncclInt8: ptrUnion.I1[idx] = ReduceOp(op, ptrUnion.I1[idx], otherPtrUnion.I1[idx]); break;
|
||||
case ncclUint8: ptrUnion.U1[idx] = ReduceOp(op, ptrUnion.U1[idx], otherPtrUnion.U1[idx]); break;
|
||||
case ncclInt32: ptrUnion.I4[idx] = ReduceOp(op, ptrUnion.I4[idx], otherPtrUnion.I4[idx]); break;
|
||||
case ncclUint32: ptrUnion.U4[idx] = ReduceOp(op, ptrUnion.U4[idx], otherPtrUnion.U4[idx]); break;
|
||||
case ncclInt64: ptrUnion.I8[idx] = ReduceOp(op, ptrUnion.I8[idx], otherPtrUnion.I8[idx]); break;
|
||||
case ncclUint64: ptrUnion.U8[idx] = ReduceOp(op, ptrUnion.U8[idx], otherPtrUnion.U8[idx]); break;
|
||||
case ncclFp8E4M3: ptrUnion.F1[idx] = rccl_float8(ReduceOp(op, float(ptrUnion.F1[idx]), float(otherPtrUnion.F1[idx]))); break;
|
||||
case ncclFloat16: ptrUnion.F2[idx] = __float2half(ReduceOp(op, __half2float(ptrUnion.F2[idx]), __half2float(otherPtrUnion.F2[idx]))); break;
|
||||
case ncclFloat32: ptrUnion.F4[idx] = ReduceOp(op, ptrUnion.F4[idx], otherPtrUnion.F4[idx]); break;
|
||||
case ncclFloat64: ptrUnion.F8[idx] = ReduceOp(op, ptrUnion.F8[idx], otherPtrUnion.F8[idx]); break;
|
||||
case ncclFp8E5M2: ptrUnion.B1[idx] = rccl_bfloat8(ReduceOp(op, float(ptrUnion.B1[idx]), float(otherPtrUnion.B1[idx]))); break;
|
||||
case ncclBfloat16: ptrUnion.B2[idx] = ReduceOp(op, ptrUnion.B2[idx], otherPtrUnion.B2[idx]); break;
|
||||
default:
|
||||
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Divide each element in ptrUnion by divisor
|
||||
void DivideByInt(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int const divisor) {
|
||||
for (size_t idx = 0; idx < numElements; ++idx)
|
||||
{
|
||||
switch (dataType)
|
||||
{
|
||||
case ncclInt8: ptrUnion.I1[idx] /= divisor; break;
|
||||
case ncclUint8: ptrUnion.U1[idx] /= divisor; break;
|
||||
case ncclInt32: ptrUnion.I4[idx] /= divisor; break;
|
||||
case ncclUint32: ptrUnion.U4[idx] /= divisor; break;
|
||||
case ncclInt64: ptrUnion.I8[idx] /= divisor; break;
|
||||
case ncclUint64: ptrUnion.U8[idx] /= divisor; break;
|
||||
case ncclFp8E4M3: ptrUnion.F1[idx] = (rccl_float8((float)(ptrUnion.F1[idx]) / divisor)); break;
|
||||
case ncclFloat16: ptrUnion.F2[idx] = __float2half(__half2float(ptrUnion.F2[idx])/divisor); break;
|
||||
case ncclFloat32: ptrUnion.F4[idx] /= divisor; break;
|
||||
case ncclFloat64: ptrUnion.F8[idx] /= divisor; break;
|
||||
case ncclFp8E5M2: ptrUnion.B1[idx] = (rccl_bfloat8((float)(ptrUnion.B1[idx]) / divisor)); break;
|
||||
case ncclBfloat16: ptrUnion.B2[idx] = (hip_bfloat16((float)(ptrUnion.B2[idx]) / divisor)); break;
|
||||
default:
|
||||
printf("Unsupported datatype (%s)\n", DataTypeToName(dataType).c_str());
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if a collective uses a root
|
||||
bool IsRootUsed(ncclFunc_t funcType) {
|
||||
return (funcType == ncclCollBroadcast || funcType == ncclCollReduce ||
|
||||
funcType == ncclCollGather || funcType == ncclCollScatter);
|
||||
}
|
||||
|
||||
// parse the logs and assign them into lineItem
|
||||
bool ParseLineItem(char const* line, LineItem& li);
|
||||
|
||||
@@ -213,7 +393,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
|
||||
|
||||
// allocates send/recv buff, sets the device based on which rank the task belongs to,
|
||||
// syncronize devices after executing all the tasks and free device memory.
|
||||
double ReplayRccl(CollectiveCalls const& collCall, int groupIdx);
|
||||
double ReplayRccl(CollectiveCalls& collCall, int groupIdx, int& numInvalid);
|
||||
|
||||
// Print information about a group call
|
||||
void PrintGroupCall(GroupCall const& gc);
|
||||
@@ -226,4 +406,24 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime);
|
||||
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks);
|
||||
|
||||
// executes the collective call (task)
|
||||
void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff);
|
||||
void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t stream);
|
||||
|
||||
// Allocate CPU/GPU memory for ptrUnion
|
||||
void AllocateMem(PtrUnion& ptrUnion, size_t const numBytes, bool isGpu = false);
|
||||
|
||||
// Free CPU/GPU memory for ptrUnion
|
||||
void FreeMem(PtrUnion& ptrUnion, bool isGpu = false);
|
||||
|
||||
// Fill buffers based on pattern using globalRank
|
||||
void FillPattern(PtrUnion& ptrUnion, ncclDataType_t const dataType, size_t const numElements, int globalRank, bool isGpu = false);
|
||||
|
||||
// PrepareData functions are responsible for setting up input / expected for the given taskInfo
|
||||
void PrepareDataFunc(TaskInfo& taskInfo, int globalRank, int totalRanks);
|
||||
void PrepData_Broadcast(TaskInfo& taskInfo, int globalRank);
|
||||
void PrepData_Reduce(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllReduce);
|
||||
void PrepData_ReduceScatter(TaskInfo& taskInfo, int globalRank, int totalRanks);
|
||||
void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool isAllGather);
|
||||
void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks);
|
||||
void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks);
|
||||
void PrepData_Send(TaskInfo& taskInfo, int globalRank);
|
||||
void PrepData_Recv(TaskInfo& taskInfo, int globalRank);
|
||||
新增問題並參考
封鎖使用者