RCCL Replayer - multi communicator support (#1176)
This commit is contained in:
@@ -1,21 +1,14 @@
|
||||
ROCM_DIR ?= /opt/rocm
|
||||
RCCL_DIR ?= ../../build/release
|
||||
MPI_DIR ?= /opt/ompi
|
||||
MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi
|
||||
MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu
|
||||
|
||||
INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include
|
||||
LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl
|
||||
|
||||
ifdef MPI_DIR
|
||||
main: rcclReplayer.cpp
|
||||
/opt/rocm/bin/hipcc rcclReplayer.cpp \
|
||||
-g \
|
||||
-o rcclReplayer \
|
||||
-I$(MPI_DIR)/ \
|
||||
-I$(RCCL_DIR) \
|
||||
-I$(RCCL_DIR)/include/rccl \
|
||||
-I/opt/rocm/include/hip \
|
||||
-L$(MPI_DIR)/lib \
|
||||
-L$(RCCL_DIR) -lmpich -lrccl
|
||||
else
|
||||
main:
|
||||
@echo "Error: MPI_DIR was not specified."
|
||||
@exit 1
|
||||
endif
|
||||
$(ROCM_DIR)/bin/hipcc rcclReplayer.cpp -O1 -g -o rcclReplayer $(INCLUDES) $(LDFLAGS)
|
||||
|
||||
clean:
|
||||
rm -f ./rcclReplayer
|
||||
rm -f ./rcclReplayer
|
||||
|
||||
@@ -1,387 +1,459 @@
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <mpi.h>
|
||||
|
||||
#include "rcclReplayer.hpp"
|
||||
|
||||
bool ParseLineItem(char const* line, LineItem& li)
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
return sscanf(line,
|
||||
"%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %d sendbuff %s "
|
||||
"recvbuff %s count %lu datatype %d op %d root %d comm %s "
|
||||
"[nranks=%d] stream %p task %d globalrank %d",
|
||||
li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName,
|
||||
&li.opCount, li.sendbuff, li.recvbuff,
|
||||
&li.count, &li.datatype, &li.op, &li.root, li.comm,
|
||||
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
|
||||
}
|
||||
MPI_Init(&argc, &argv);
|
||||
if (argc <= 1) {
|
||||
printf("Usage: %s logfile [numGpusPerMpiRank = 1]\n", argv[0]);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
void ParseCollectives(char const* logFilename, int const numGlobalRanks, std::vector<GroupCall>& groupCalls) {
|
||||
int mpiRank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
|
||||
// Parse rank information
|
||||
int mpiRank, numMpiRanks;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &numMpiRanks);
|
||||
|
||||
groupCalls.clear();
|
||||
// Parse command line arguments
|
||||
char* logFilename = argv[1];
|
||||
int numGpusPerMpiRank = (argc > 2 ? atoi(argv[2]) : 1);
|
||||
int parseOnly = (argc > 3 ? atoi(argv[3]) : 0);
|
||||
|
||||
FILE *fp = fopen(logFilename, "r");
|
||||
if (!fp) {
|
||||
printf("[ERROR] Unable to open file %s\n", logFilename);
|
||||
exit(-1);
|
||||
}
|
||||
CollectiveCalls collCalls;
|
||||
collCalls.firstGlobalRank = mpiRank * numGpusPerMpiRank;
|
||||
collCalls.numGlobalRanks = numMpiRanks * numGpusPerMpiRank;
|
||||
|
||||
char line[1000];
|
||||
LineItem li;
|
||||
int lineNum = 0;
|
||||
while (fgets(line, 1000, fp)) {
|
||||
++lineNum;
|
||||
// Figure out starting GPU index to use based on hostname
|
||||
int nameLen;
|
||||
char name[MPI_MAX_PROCESSOR_NAME];
|
||||
std::vector<char> allnames(numMpiRanks * MPI_MAX_PROCESSOR_NAME, 0);
|
||||
MPI_Get_processor_name(name, &nameLen);
|
||||
MPI_Allgather(name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
|
||||
allnames.data(), MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD);
|
||||
|
||||
//Ignore invalid lines and collectives
|
||||
if (!ParseLineItem(line, li) || li.nRanks != numGlobalRanks) continue;
|
||||
// Offset local gpu device index based on number of previous ranks on the same host
|
||||
collCalls.localGpuOffset = 0;
|
||||
for (int rank = 0; rank < mpiRank; rank++) {
|
||||
if (!strcmp(name, allnames.data() + (rank * MPI_MAX_PROCESSOR_NAME)))
|
||||
collCalls.localGpuOffset += numGpusPerMpiRank;
|
||||
}
|
||||
if (mpiRank == 0)
|
||||
printf("RCCL Replayer: %d x %d = %d total ranks\n", numMpiRanks, numGpusPerMpiRank, collCalls.numGlobalRanks);
|
||||
printf("Rank %d [%s] LocalGpuOffset: %d GlobalRankFirst %d GlobalRankLast %d\n",
|
||||
mpiRank, name, collCalls.localGpuOffset, collCalls.firstGlobalRank, collCalls.firstGlobalRank + numGpusPerMpiRank - 1);
|
||||
|
||||
TaskInfo taskInfo;
|
||||
taskInfo.funcType = GetFuncType(li.opName);
|
||||
taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff);
|
||||
taskInfo.count = li.count;
|
||||
taskInfo.datatype = (ncclDataType_t) li.datatype;
|
||||
taskInfo.op = (ncclRedOp_t) li.op;
|
||||
taskInfo.root = li.root;
|
||||
// Parse collectives from logfile
|
||||
if (parseOnly) collCalls.numGlobalRanks = parseOnly;
|
||||
ParseCollectives(logFilename, mpiRank == 0, collCalls);
|
||||
if (collCalls.groupCalls.size() == 0) {
|
||||
MPI_Finalize();
|
||||
return 0;
|
||||
}
|
||||
if (parseOnly) return 0;
|
||||
|
||||
// Find the appropriate GroupCall that this task belongs to
|
||||
// If it doesn't exist yet, then create it
|
||||
bool found = false;
|
||||
for (auto& gc : groupCalls) {
|
||||
if (gc.rankData.count(li.globalRank)) {
|
||||
RankData& rd = gc.rankData[li.globalRank];
|
||||
if (rd.comm != li.comm || rd.tasks.size() != li.task)
|
||||
continue;
|
||||
|
||||
rd.tasks.push_back(taskInfo);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
// Rank has no tasks - make sure this is task 0
|
||||
else if (li.task == 0) {
|
||||
gc.rankData[li.globalRank].comm = li.comm;
|
||||
gc.rankData[li.globalRank].lineNum = lineNum;
|
||||
gc.rankData[li.globalRank].tasks.push_back(taskInfo);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Setup all communicators
|
||||
if (mpiRank == 0) printf("Preparing %d communicator(s) per rank\n", collCalls.numCommsPerRank);
|
||||
collCalls.localRankComms.resize(numGpusPerMpiRank, std::vector<ncclComm_t>(collCalls.numCommsPerRank));
|
||||
collCalls.localRankStreams.resize(numGpusPerMpiRank, std::vector<hipStream_t>(collCalls.numCommsPerRank));
|
||||
|
||||
// If no collectives were found, create new one
|
||||
if (!found) {
|
||||
if (li.task != 0) {
|
||||
if (mpiRank == 0) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum);
|
||||
}
|
||||
|
||||
groupCalls.resize(groupCalls.size() + 1);
|
||||
GroupCall& gc = groupCalls.back();
|
||||
gc.opCount = li.opCount;
|
||||
gc.rankData[li.globalRank].comm = li.comm;
|
||||
gc.rankData[li.globalRank].lineNum = lineNum;
|
||||
gc.rankData[li.globalRank].tasks.push_back(taskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
// - For non Send/Recv, check that all ranks participate with same parameters count
|
||||
// - For Send/Recv, check that pairs of Send/Recv calls exist
|
||||
if (mpiRank == 0) printf("Found %lu groupCalls\n", groupCalls.size());
|
||||
for (int i = 0; i < groupCalls.size(); i++) {
|
||||
GroupCall& gc = groupCalls[i];
|
||||
std::map<std::tuple<const char*, size_t, int, int>, std::vector<int>> arrivalCounter;
|
||||
|
||||
gc.isValid = true;
|
||||
|
||||
if (mpiRank == 0) {
|
||||
printf("GroupCall %d\n", i);
|
||||
printf(" - OpCount: %d\n", gc.opCount);
|
||||
}
|
||||
|
||||
for (auto rd : gc.rankData) {
|
||||
if (mpiRank == 0) {
|
||||
printf(" - Rank %02d: comm %s\n", rd.first, rd.second.comm.c_str());
|
||||
}
|
||||
|
||||
for (int task = 0; task < rd.second.tasks.size(); task++) {
|
||||
TaskInfo ti = rd.second.tasks[task];
|
||||
const char* funcName;
|
||||
|
||||
if (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv)
|
||||
funcName = "Send/Recv";
|
||||
else
|
||||
funcName = ncclFuncNames[ti.funcType];
|
||||
|
||||
std::tuple<const char*, size_t, int, int> key(funcName, ti.count, ti.datatype, ti.op);
|
||||
|
||||
if (mpiRank == 0) {
|
||||
printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n",
|
||||
task, funcName, ti.inPlace, ti.count, ti.datatype, ti.op, ti.root);
|
||||
}
|
||||
|
||||
auto& rankVector = arrivalCounter[key];
|
||||
if (rankVector.size() < numGlobalRanks) {
|
||||
rankVector.resize(numGlobalRanks);
|
||||
}
|
||||
|
||||
// rankVector<int> in arrivalCount represents the rank information
|
||||
// Count the number of tasks that are going to be executed by each rank. This is to validate the group call later on.
|
||||
// Nom-Send/Recv rank counts (rankVector<int> elements) should be equal at the end, and for Send/Recv, all the elements of rankVector<int> should be equal to 0
|
||||
if (ti.funcType == ncclCollRecv) {
|
||||
rankVector[ti.root]--;
|
||||
} else {
|
||||
rankVector[rd.first]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate through the map variable and report/validate the results
|
||||
for (const auto& e : arrivalCounter) {
|
||||
int maxVal;
|
||||
const char* funcName = std::get<0>(e.first);
|
||||
size_t count = std::get<1>(e.first);
|
||||
int datatype = std::get<2>(e.first);
|
||||
int op = std::get<3>(e.first);
|
||||
|
||||
bool isp2p = (strcmp(std::get<0>(e.first), "Send/Recv") == 0);
|
||||
if (!isp2p) maxVal = *std::max_element(e.second.begin(), e.second.end());
|
||||
|
||||
// Validate all the ranks have required amount of collective call (task)
|
||||
for (int i = 0; i < e.second.size(); i++) {
|
||||
if (e.second[i] != (isp2p ? 0 : maxVal)) {
|
||||
std::string warning = (isp2p ? (e.second[i] > 0 ? "[WARN] Missing Recv" : "[WARN] Missing Send") : "[WARN] Missing " + std::string(funcName))
|
||||
+ " count=" + std::to_string(count) + " datatype=" + std::to_string(datatype) + " op=" + std::to_string(op) + " at rank [" + std::to_string(i) + "]";
|
||||
if(mpiRank == 0) printf("%s\n", warning.c_str());
|
||||
|
||||
gc.isValid = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetSize will return a pair of bytes where first element in pair represents bytesSent and the second bytesRecv
|
||||
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks) {
|
||||
size_t sendNumBytes;
|
||||
size_t recvNumBytes;
|
||||
|
||||
if (taskInfo.funcType == ncclCollBroadcast || taskInfo.funcType == ncclCollReduce || taskInfo.funcType == ncclCollAllReduce) {
|
||||
sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = sendNumBytes;
|
||||
} else if (taskInfo.funcType == ncclCollAllGather || taskInfo.funcType == ncclCollGather) {
|
||||
sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = numGlobalRanks * sendNumBytes;
|
||||
} else if (taskInfo.funcType == ncclCollReduceScatter || taskInfo.funcType == ncclCollScatter) {
|
||||
recvNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
sendNumBytes = numGlobalRanks * recvNumBytes;
|
||||
} else if (taskInfo.funcType == ncclCollAllToAll) {
|
||||
sendNumBytes = numGlobalRanks * taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = sendNumBytes;
|
||||
} else {
|
||||
sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = sendNumBytes;
|
||||
}
|
||||
return std::make_pair(sendNumBytes, recvNumBytes);
|
||||
}
|
||||
|
||||
void ExecuteCollective(TaskInfo task, ncclComm_t comm, hipStream_t stream, const void *sendbuff, void *recvbuff) {
|
||||
|
||||
int funcTypeValue = (int)task.funcType;
|
||||
|
||||
switch (funcTypeValue) {
|
||||
case ncclCollAllGather:
|
||||
NCCLCHECK(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
|
||||
break;
|
||||
case ncclCollAllReduce:
|
||||
NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
|
||||
break;
|
||||
case ncclCollBroadcast:
|
||||
NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollReduce:
|
||||
NCCLCHECK(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollReduceScatter:
|
||||
NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
|
||||
break;
|
||||
case ncclCollGather:
|
||||
NCCLCHECK(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollScatter:
|
||||
NCCLCHECK(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollAllToAll:
|
||||
NCCLCHECK(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
|
||||
break;
|
||||
case ncclCollSend:
|
||||
NCCLCHECK(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollRecv:
|
||||
NCCLCHECK(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
default:
|
||||
printf("Error: unsupported collective\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
void ReplayRccl(GroupCall& groupCall, std::vector<ncclComm_t> comms, std::vector<hipStream_t> streams,
|
||||
int const localGpuOffset, int const numGpusPerMpiRank, int const firstGlobalRank, int const numGlobalRanks) {
|
||||
|
||||
std::vector<std::vector<void*>> sendbuff(numGpusPerMpiRank);
|
||||
std::vector<std::vector<void*>> recvbuff(numGpusPerMpiRank);
|
||||
|
||||
NCCLCHECK(ncclGroupStart());
|
||||
for (int localIdx = 0; localIdx < numGpusPerMpiRank; localIdx++) {
|
||||
int globalRank = firstGlobalRank + localIdx;
|
||||
RankData& rankData = groupCall.rankData[globalRank];
|
||||
|
||||
for (auto task : rankData.tasks) {
|
||||
void* sendBuffer;
|
||||
void* recvBuffer;
|
||||
|
||||
// Each task has a size based on the type of collective (funcType)
|
||||
std::pair<size_t, size_t> numBytes = GetSize(task, numGlobalRanks);
|
||||
|
||||
if (task.inPlace) {
|
||||
numBytes.first = std::max(numBytes.first, numBytes.second);
|
||||
numBytes.second = numBytes.first;
|
||||
}
|
||||
|
||||
// Set the device and allocate send/recv buffers
|
||||
HIPCALL(hipSetDevice(localGpuOffset + localIdx));
|
||||
HIPCALL(hipMalloc(&sendBuffer, numBytes.first));
|
||||
HIPCALL(hipMalloc(&recvBuffer, numBytes.second));
|
||||
HIPCALL(hipMemset(sendBuffer, 0, numBytes.first));
|
||||
HIPCALL(hipMemset(recvBuffer, 0, numBytes.second));
|
||||
HIPCALL(hipDeviceSynchronize());
|
||||
|
||||
// Add the send and receive buffers to their respective vectors
|
||||
sendbuff[localIdx].push_back(sendBuffer);
|
||||
recvbuff[localIdx].push_back(recvBuffer);
|
||||
|
||||
// Execute the collective call (task)
|
||||
ExecuteCollective(task, comms[localIdx], streams[localIdx], sendBuffer, recvBuffer);
|
||||
}
|
||||
}
|
||||
NCCLCHECK(ncclGroupEnd());
|
||||
|
||||
// Synchronize devices
|
||||
for (int i = 0; i < numGpusPerMpiRank; i++) {
|
||||
HIPCALL(hipStreamSynchronize(streams[i]));
|
||||
}
|
||||
|
||||
// Free device memory for each task on each GPU
|
||||
for (int i = 0; i < numGpusPerMpiRank; i++) {
|
||||
for (auto& sendBuffer : sendbuff[i]) HIPCALL(hipFree(sendBuffer));
|
||||
for (auto& recvBuffer : recvbuff[i]) HIPCALL(hipFree(recvBuffer));
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
MPI_Init(&argc, &argv);
|
||||
if (argc <= 1) {
|
||||
printf("Usage: %s logfile [numGpusPerMpiRank = 1]\n", argv[0]);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Parse rank information
|
||||
int mpiRank, numMpiRanks;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &numMpiRanks);
|
||||
|
||||
// Default value for numGpusPerMpiRank is 1
|
||||
char* logFilename = argv[1];
|
||||
int numGpusPerMpiRank = (argc > 2 ? atoi(argv[2]) : 1);
|
||||
int numGlobalRanks = numMpiRanks * numGpusPerMpiRank;
|
||||
|
||||
if (mpiRank == 0)
|
||||
printf("RCCL Replayer: %d x %d = %d total ranks\n", numMpiRanks, numGpusPerMpiRank, numGlobalRanks);
|
||||
|
||||
// Parse logfile for Collectives
|
||||
std::vector<GroupCall> groupCalls;
|
||||
ParseCollectives(logFilename, numGlobalRanks, groupCalls);
|
||||
|
||||
int localGpuOffset = 0;
|
||||
int firstGlobalRank = mpiRank * numGpusPerMpiRank;
|
||||
int lastGlobalRank = firstGlobalRank + numGpusPerMpiRank - 1;
|
||||
|
||||
// Figure out the host and get the localGpuOffset
|
||||
int nameLen;
|
||||
char name[MPI_MAX_PROCESSOR_NAME];
|
||||
std::vector<char> allnames(numMpiRanks * MPI_MAX_PROCESSOR_NAME, 0);
|
||||
|
||||
MPI_Get_processor_name(name, &nameLen);
|
||||
MPI_Allgather(name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
|
||||
allnames.data(), MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD);
|
||||
|
||||
for (int rank = 0; rank < mpiRank; rank++)
|
||||
{
|
||||
if (!strcmp(name, allnames.data() + (rank * MPI_MAX_PROCESSOR_NAME)))
|
||||
localGpuOffset += numGpusPerMpiRank;
|
||||
}
|
||||
|
||||
printf("Rank %d [%s] LocalGpuOffset: %d GlobalRankFirst %d GlobalRankLast %d\n",
|
||||
mpiRank, name, localGpuOffset, firstGlobalRank, lastGlobalRank);
|
||||
|
||||
for (int commIdx = 0; commIdx < collCalls.numCommsPerRank; commIdx++) {
|
||||
// Create a unique ID and broadcast it to all ranks
|
||||
ncclUniqueId uniqueId;
|
||||
if (mpiRank == 0) ncclGetUniqueId(&uniqueId);
|
||||
MPI_Bcast(&uniqueId, sizeof(ncclUniqueId), MPI_BYTE, 0, MPI_COMM_WORLD);
|
||||
|
||||
// Each rank has it's own comm and stream
|
||||
std::vector<ncclComm_t> comms(numGpusPerMpiRank);
|
||||
std::vector<hipStream_t> streams(numGpusPerMpiRank);
|
||||
|
||||
// Initialize comms and strams
|
||||
NCCLCHECK(ncclGroupStart());
|
||||
NCCL_CALL(ncclGroupStart());
|
||||
for (int i = 0; i < numGpusPerMpiRank; i++) {
|
||||
HIPCALL(hipSetDevice(localGpuOffset + i));
|
||||
NCCLCHECK(ncclCommInitRank(&(comms[i]), numGlobalRanks, uniqueId, firstGlobalRank + i));
|
||||
HIPCALL(hipStreamCreate(&(streams[i])));
|
||||
HIP_CALL(hipSetDevice(collCalls.localGpuOffset + i));
|
||||
NCCL_CALL(ncclCommInitRank(&collCalls.localRankComms[i][commIdx], collCalls.numGlobalRanks, uniqueId, collCalls.firstGlobalRank + i));
|
||||
HIP_CALL(hipStreamCreate(&collCalls.localRankStreams[i][commIdx]));
|
||||
}
|
||||
NCCLCHECK(ncclGroupEnd());
|
||||
|
||||
int numSkippedCalls = 0;
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
for (auto groupCall : groupCalls)
|
||||
if (groupCall.isValid)
|
||||
ReplayRccl(groupCall, comms, streams, localGpuOffset, numGpusPerMpiRank, firstGlobalRank, numGlobalRanks);
|
||||
else {
|
||||
if (mpiRank == 0) printf("[ERROR] in group call: (skipping...)\n");
|
||||
for (auto rd : groupCall.rankData) {
|
||||
if (mpiRank == 0) printf(" - Rank %02d: comm %s in line %d\n", rd.first, rd.second.comm.c_str(), rd.second.lineNum);
|
||||
for (int task = 0; task < rd.second.tasks.size(); task++) {
|
||||
TaskInfo ti = rd.second.tasks[task];
|
||||
if (mpiRank == 0)
|
||||
printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n",
|
||||
task, ncclFuncNames[ti.funcType], ti.inPlace, ti.count, ti.datatype, ti.op, ti.root);
|
||||
}
|
||||
}
|
||||
numSkippedCalls++;
|
||||
NCCL_CALL(ncclGroupEnd());
|
||||
}
|
||||
printf("Rank %d Done setting up communicators\n", mpiRank);
|
||||
|
||||
int numSkippedCalls = 0;
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
for (size_t i = 0; i < collCalls.groupCalls.size(); i++) {
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
if (collCalls.groupCalls[i].isValid) {
|
||||
if (mpiRank == 0)
|
||||
{
|
||||
printf("Running Collective Call %lu of %lu\n", i+1, collCalls.groupCalls.size());
|
||||
PrintGroupCall(collCalls.groupCalls[i]);
|
||||
}
|
||||
ReplayRccl(collCalls, i);
|
||||
} else {
|
||||
if (mpiRank == 0) {
|
||||
printf("[ERROR] in group call: (skipping...)\n");
|
||||
for (auto const& rd : collCalls.groupCalls[i].rankData) {
|
||||
printf(" - Rank %02d: comm %d in line %d\n", rd.first, rd.second.commIdx, rd.second.lineNum);
|
||||
for (int task = 0; task < rd.second.tasks.size(); task++) {
|
||||
TaskInfo ti = rd.second.tasks[task];
|
||||
printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n",
|
||||
task, ncclFuncNames[ti.funcType], ti.inPlace, ti.count, ti.datatype, ti.op, ti.root);
|
||||
}
|
||||
}
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
std::chrono::duration<double> duration = end - start;
|
||||
}
|
||||
numSkippedCalls++;
|
||||
}
|
||||
}
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
std::chrono::duration<double> duration = end - start;
|
||||
|
||||
// Need to destroy comms and streams after collective execution is done
|
||||
for (int i = 0; i < numGpusPerMpiRank; ++i) {
|
||||
ncclCommDestroy(comms[i]);
|
||||
HIPCALL(hipStreamDestroy(streams[i]));
|
||||
// Destroy all communicators
|
||||
for (int commIdx = 0; commIdx < collCalls.numCommsPerRank; commIdx++) {
|
||||
for (int i = 0; i < numGpusPerMpiRank; i++) {
|
||||
NCCL_CALL(ncclCommDestroy(collCalls.localRankComms[i][commIdx]));
|
||||
HIP_CALL(hipStreamDestroy(collCalls.localRankStreams[i][commIdx]));
|
||||
}
|
||||
}
|
||||
|
||||
if (mpiRank == 0) printf("Executed group calls: %zu\n", collCalls.groupCalls.size() - numSkippedCalls);
|
||||
if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls);
|
||||
|
||||
// Time it takes to execute all the group calls
|
||||
if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count());
|
||||
printf("MPI Rank %d Success\n", mpiRank);
|
||||
|
||||
MPI_Finalize();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void PrintGroupCall(GroupCall const& gc)
|
||||
{
|
||||
printf("OpCount: %d\n", gc.opCount);
|
||||
|
||||
for (auto rd : gc.rankData) {
|
||||
printf(" - Rank %02d: comm %d\n", rd.first, rd.second.commIdx);
|
||||
|
||||
for (int task = 0; task < rd.second.tasks.size(); task++) {
|
||||
TaskInfo ti = rd.second.tasks[task];
|
||||
std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType];
|
||||
std::tuple<std::string, size_t, int, int> key(funcName, ti.count, ti.datatype, ti.op);
|
||||
printf(" - Task %02d: %32s inPlace=%d count=%lu datatype=%d op=%d root=%d\n",
|
||||
task, funcName.c_str(), ti.inPlace, ti.count, ti.datatype, ti.op, ti.root);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc)
|
||||
{
|
||||
bool verbose = isFirstRank && (getenv("VERBOSE") != NULL);
|
||||
cc.globalRankComms.clear();
|
||||
cc.globalRankComms.resize(cc.numGlobalRanks);
|
||||
cc.groupCalls.clear();
|
||||
|
||||
FILE* fp = fopen(logFilename, "r");
|
||||
if (!fp) {
|
||||
printf("[ERROR] Unable to open file %s\n", logFilename);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
char line[2048];
|
||||
LineItem li;
|
||||
int lineNum = 0;
|
||||
|
||||
while (fgets(line, 2048, fp)) {
|
||||
++lineNum;
|
||||
|
||||
//Ignore invalid lines and collectives
|
||||
if (!ParseLineItem(line, li) || li.nRanks != cc.numGlobalRanks) continue;
|
||||
|
||||
// Figure out commIdx for this globalrank
|
||||
int commIdx = -1;
|
||||
for (auto i = 0; i < cc.globalRankComms[li.globalRank].size(); i++) {
|
||||
if (!strcmp(cc.globalRankComms[li.globalRank][i].c_str(), li.comm)) {
|
||||
commIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (commIdx == -1) {
|
||||
commIdx = cc.globalRankComms[li.globalRank].size();
|
||||
cc.globalRankComms[li.globalRank].push_back(li.comm);
|
||||
}
|
||||
|
||||
MPI_Finalize();
|
||||
TaskInfo taskInfo;
|
||||
taskInfo.funcType = GetFuncType(li.opName);
|
||||
taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff);
|
||||
taskInfo.count = li.count;
|
||||
taskInfo.datatype = (ncclDataType_t) li.datatype;
|
||||
taskInfo.op = (ncclRedOp_t) li.op;
|
||||
taskInfo.root = li.root;
|
||||
|
||||
if (mpiRank == 0) printf("Executed group calls: %zu\n", groupCalls.size() - numSkippedCalls);
|
||||
if (mpiRank == 0) printf("Skipped group calls: %d\n", numSkippedCalls);
|
||||
// Find the appropriate GroupCall that this task belongs to
|
||||
// If it doesn't exist yet, then create it
|
||||
bool found = false;
|
||||
for (auto& gc : cc.groupCalls) {
|
||||
if (gc.opCount != li.opCount) continue;
|
||||
if (gc.rankData.count(li.globalRank)) {
|
||||
RankData& rd = gc.rankData[li.globalRank];
|
||||
if (rd.commIdx != commIdx || rd.tasks.size() != li.task)
|
||||
continue;
|
||||
|
||||
// Time it takes to execute all the group calls
|
||||
if (mpiRank == 0) printf("Execution Time: %f seconds\n", duration.count());
|
||||
rd.tasks.push_back(taskInfo);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
// Rank has no tasks - make sure this is task 0
|
||||
else if (li.task == 0) {
|
||||
gc.rankData[li.globalRank].lineNum = lineNum;
|
||||
gc.rankData[li.globalRank].commIdx = commIdx;
|
||||
gc.rankData[li.globalRank].tasks.push_back(taskInfo);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Means no hang
|
||||
printf("MPI Rank %d Success\n", mpiRank);
|
||||
|
||||
return 0;
|
||||
// If no collectives were found, create new one
|
||||
if (!found) {
|
||||
if (li.task != 0) {
|
||||
if (isFirstRank) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum);
|
||||
}
|
||||
|
||||
GroupCall gc;
|
||||
gc.opCount = li.opCount;
|
||||
gc.rankData[li.globalRank].commIdx = commIdx;
|
||||
gc.rankData[li.globalRank].lineNum = lineNum;
|
||||
gc.rankData[li.globalRank].tasks.push_back(taskInfo);
|
||||
cc.groupCalls.push_back(gc);
|
||||
}
|
||||
}
|
||||
fclose(fp);
|
||||
|
||||
// Validate group calls
|
||||
// - For non Send/Recv, check that all ranks participate with same parameters count
|
||||
// - For Send/Recv, check that pairs of Send/Recv calls exist
|
||||
if (isFirstRank) printf("Found %lu groupCalls\n", cc.groupCalls.size());
|
||||
for (int i = 0; i < cc.groupCalls.size(); i++) {
|
||||
GroupCall& gc = cc.groupCalls[i];
|
||||
std::map<std::tuple<std::string, size_t, int, int>, std::vector<int>> arrivalCounter;
|
||||
|
||||
gc.isValid = true;
|
||||
|
||||
for (auto rd : gc.rankData) {
|
||||
for (int task = 0; task < rd.second.tasks.size(); task++) {
|
||||
TaskInfo ti = rd.second.tasks[task];
|
||||
|
||||
std::string funcName = (ti.funcType == ncclCollSend || ti.funcType == ncclCollRecv) ? "Send/Recv" : ncclFuncNames[ti.funcType];
|
||||
std::tuple<std::string, size_t, int, int> key(funcName, ti.count, ti.datatype, ti.op);
|
||||
|
||||
auto& rankVector = arrivalCounter[key];
|
||||
if (rankVector.size() < cc.numGlobalRanks)
|
||||
rankVector.resize(cc.numGlobalRanks);
|
||||
|
||||
// rankVector<int> in arrivalCount represents the rank information
|
||||
// Count the number of tasks that are going to be executed by each rank. This is to validate the group call later on.
|
||||
// Nom-Send/Recv rank counts (rankVector<int> elements) should be equal at the end, and for Send/Recv, all the elements of rankVector<int> should be equal to 0
|
||||
if (ti.funcType == ncclCollRecv) {
|
||||
rankVector[ti.root]--;
|
||||
} else {
|
||||
rankVector[rd.first]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate through the map variable and report/validate the results
|
||||
for (const auto& e : arrivalCounter) {
|
||||
int maxVal;
|
||||
std::string funcName = std::get<0>(e.first);
|
||||
size_t count = std::get<1>(e.first);
|
||||
int const datatype = std::get<2>(e.first);
|
||||
int const op = std::get<3>(e.first);
|
||||
|
||||
bool isp2p = (funcName == "Send/Recv");
|
||||
if (!isp2p) maxVal = *std::max_element(e.second.begin(), e.second.end());
|
||||
|
||||
// Validate all the ranks have required amount of collective call (task)
|
||||
for (int i = 0; i < e.second.size(); i++) {
|
||||
if (e.second[i] != (isp2p ? 0 : maxVal)) {
|
||||
std::string warning = (isp2p ? (e.second[i] > 0 ? "[WARN] Missing Recv" : "[WARN] Missing Send") : "[WARN] Missing " + std::string(funcName))
|
||||
+ " count=" + std::to_string(count) + " datatype=" + std::to_string(datatype) + " op=" + std::to_string(op) + " at rank [" + std::to_string(i) + "]";
|
||||
if(isFirstRank) printf("%s\n", warning.c_str());
|
||||
|
||||
gc.isValid = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check number of comms per rank
|
||||
cc.numCommsPerRank = cc.globalRankComms[0].size();
|
||||
for (int i = 1; i < cc.numGlobalRanks; i++) {
|
||||
if (cc.numCommsPerRank != cc.globalRankComms[i].size()) {
|
||||
printf("[ERROR] Replayer currently only supports identical number of communicators across all ranks\n");
|
||||
printf("[ERROR] Rank %d has %lu communicators (expecting %d)\n", i, cc.globalRankComms[i].size(), cc.numCommsPerRank);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ParseLineItem(char const* line, LineItem& li)
|
||||
{
|
||||
return sscanf(line,
|
||||
"%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %x sendbuff %s "
|
||||
"recvbuff %s count %lu datatype %d op %d root %d comm %s "
|
||||
"[nranks=%d] stream %p task %d globalrank %d",
|
||||
li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName,
|
||||
&li.opCount, li.sendbuff, li.recvbuff,
|
||||
&li.count, &li.datatype, &li.op, &li.root, li.comm,
|
||||
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
|
||||
}
|
||||
|
||||
void ReplayRccl(CollectiveCalls const& cc, int groupIdx)
|
||||
{
|
||||
int numLocalRanks = cc.localRankComms.size();
|
||||
|
||||
|
||||
|
||||
|
||||
// Allocate memory for collective
|
||||
std::vector<std::vector<void*>> sendbuff(numLocalRanks);
|
||||
std::vector<std::vector<void*>> recvbuff(numLocalRanks);
|
||||
|
||||
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
|
||||
HIP_CALL(hipSetDevice(cc.localGpuOffset + localIdx));
|
||||
|
||||
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
sendbuff[localIdx].resize(numTasks);
|
||||
recvbuff[localIdx].resize(numTasks);
|
||||
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo const& task = rankData.tasks[taskId];
|
||||
|
||||
// Each task has a size based on the type of collective (funcType)
|
||||
std::pair<size_t, size_t> numBytes = GetSize(task, cc.numGlobalRanks);
|
||||
|
||||
if (task.inPlace) {
|
||||
numBytes.first = std::max(numBytes.first, numBytes.second);
|
||||
numBytes.second = numBytes.first;
|
||||
}
|
||||
|
||||
// Set the device and allocate send/recv buffers
|
||||
HIP_CALL(hipMalloc(&sendbuff[localIdx][taskId], numBytes.first));
|
||||
HIP_CALL(hipMemset(sendbuff[localIdx][taskId], 0, numBytes.first));
|
||||
|
||||
if (!task.inPlace) {
|
||||
HIP_CALL(hipMalloc(&recvbuff[localIdx][taskId], numBytes.second));
|
||||
HIP_CALL(hipMemset(recvbuff[localIdx][taskId], 0, numBytes.second));
|
||||
} else {
|
||||
recvbuff[localIdx][taskId] = sendbuff[localIdx][taskId];
|
||||
}
|
||||
|
||||
HIP_CALL(hipDeviceSynchronize());
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the collective call (task)
|
||||
NCCL_CALL(ncclGroupStart());
|
||||
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
|
||||
|
||||
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
int commIdx = rankData.commIdx;
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo const& task = rankData.tasks[taskId];
|
||||
ExecuteCollective(task, cc.localRankComms[localIdx][commIdx], cc.localRankStreams[localIdx][commIdx],
|
||||
sendbuff[localIdx][taskId],
|
||||
recvbuff[localIdx][taskId]);
|
||||
}
|
||||
}
|
||||
NCCL_CALL(ncclGroupEnd());
|
||||
|
||||
// Synchronize devices and free memory
|
||||
for (int localIdx = 0; localIdx < numLocalRanks; localIdx++) {
|
||||
int globalRank = cc.firstGlobalRank + localIdx;
|
||||
if (cc.groupCalls[groupIdx].rankData.count(globalRank) == 0) continue;
|
||||
|
||||
RankData const& rankData = cc.groupCalls[groupIdx].rankData.at(globalRank);
|
||||
int numTasks = rankData.tasks.size();
|
||||
int commIdx = rankData.commIdx;
|
||||
HIP_CALL(hipStreamSynchronize(cc.localRankStreams[localIdx][commIdx]));
|
||||
|
||||
for (int taskId = 0; taskId < numTasks; taskId++) {
|
||||
TaskInfo const& task = rankData.tasks[taskId];
|
||||
HIP_CALL(hipFree(sendbuff[localIdx][taskId]));
|
||||
if (!task.inPlace) HIP_CALL(hipFree(recvbuff[localIdx][taskId]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetSize will return a pair of bytes where first element in pair represents bytesSent and the second bytesRecv
|
||||
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks) {
|
||||
size_t sendNumBytes, recvNumBytes;
|
||||
|
||||
switch (taskInfo.funcType) {
|
||||
case ncclCollBroadcast: case ncclCollReduce: case ncclCollAllReduce:
|
||||
sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = sendNumBytes;
|
||||
break;
|
||||
case ncclCollAllGather: case ncclCollGather:
|
||||
sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = numGlobalRanks * sendNumBytes;
|
||||
break;
|
||||
case ncclCollReduceScatter: case ncclCollScatter:
|
||||
recvNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
sendNumBytes = numGlobalRanks * recvNumBytes;
|
||||
break;
|
||||
case ncclCollAllToAll:
|
||||
sendNumBytes = numGlobalRanks * taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = sendNumBytes;
|
||||
break;
|
||||
default:
|
||||
sendNumBytes = taskInfo.count * DataTypeToBytes(taskInfo.datatype);
|
||||
recvNumBytes = sendNumBytes;
|
||||
}
|
||||
return std::make_pair(sendNumBytes, recvNumBytes);
|
||||
}
|
||||
|
||||
void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff)
|
||||
{
|
||||
switch (task.funcType) {
|
||||
case ncclCollAllGather:
|
||||
NCCL_CALL(ncclAllGather(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
|
||||
break;
|
||||
case ncclCollAllReduce:
|
||||
NCCL_CALL(ncclAllReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
|
||||
break;
|
||||
case ncclCollBroadcast:
|
||||
NCCL_CALL(ncclBroadcast(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollReduce:
|
||||
NCCL_CALL(ncclReduce(sendbuff, recvbuff, task.count, task.datatype, task.op, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollReduceScatter:
|
||||
NCCL_CALL(ncclReduceScatter(sendbuff, recvbuff, task.count, task.datatype, task.op, comm, stream));
|
||||
break;
|
||||
case ncclCollGather:
|
||||
NCCL_CALL(ncclGather(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollScatter:
|
||||
NCCL_CALL(ncclScatter(sendbuff, recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollAllToAll:
|
||||
NCCL_CALL(ncclAllToAll(sendbuff, recvbuff, task.count, task.datatype, comm, stream));
|
||||
break;
|
||||
case ncclCollSend:
|
||||
NCCL_CALL(ncclSend(sendbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
case ncclCollRecv:
|
||||
NCCL_CALL(ncclRecv(recvbuff, task.count, task.datatype, task.root, comm, stream));
|
||||
break;
|
||||
default:
|
||||
printf("Error: unsupported collective\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,39 +6,30 @@
|
||||
|
||||
// NOTE: Parsing is based on this line logging collective information in enqueue.cc
|
||||
// INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d \
|
||||
root %d comm %p [nranks=%d] stream %p task %d globalrank %d",
|
||||
root %d comm %p [nranks=%d] stream %p task %d globalrank %d",
|
||||
// info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count,
|
||||
// info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream,
|
||||
// info->comm->tasks.nTasksP2p + info->comm->tasks.nTasksColl,
|
||||
// info->comm->localRankToRank[info->comm->localRank]);
|
||||
|
||||
#define MPICHECK(cmd) do { \
|
||||
int e = cmd; \
|
||||
if( e != MPI_SUCCESS ) { \
|
||||
printf("Failed: MPI error %s:%d '%d'\n", \
|
||||
__FILE__,__LINE__, e); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while(0)
|
||||
#define HIP_CALL(cmd) \
|
||||
do { \
|
||||
hipError_t error = (cmd); \
|
||||
if (error != hipSuccess) { \
|
||||
printf("Encountered HIP error (%s) at line %d in file %s\n", \
|
||||
hipGetErrorString(error), __LINE__, __FILE__); \
|
||||
exit(-1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define HIPCALL(cmd) \
|
||||
do { \
|
||||
hipError_t error = (cmd); \
|
||||
if (error != hipSuccess) \
|
||||
{ \
|
||||
printf("Encountered HIP error (%s) at line %d in file %s\n", \
|
||||
hipGetErrorString(error), __LINE__, __FILE__); \
|
||||
exit(-1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define NCCLCHECK(cmd) do { \
|
||||
#define NCCL_CALL(cmd) \
|
||||
do { \
|
||||
ncclResult_t res = cmd; \
|
||||
if (res != ncclSuccess) { \
|
||||
printf("NCCL failure %s:%d '%s'\n", \
|
||||
__FILE__,__LINE__,ncclGetErrorString(res)); \
|
||||
printf("NCCL failure %s:%d '%s'\n", \
|
||||
__FILE__,__LINE__,ncclGetErrorString(res)); \
|
||||
} \
|
||||
} while(0)
|
||||
} while(0)
|
||||
|
||||
struct LineItem
|
||||
{
|
||||
@@ -106,7 +97,7 @@ struct TaskInfo
|
||||
struct RankData
|
||||
{
|
||||
int lineNum;
|
||||
std::string comm;
|
||||
int commIdx;
|
||||
std::vector<TaskInfo> tasks;
|
||||
};
|
||||
|
||||
@@ -114,21 +105,36 @@ struct GroupCall
|
||||
{
|
||||
bool isValid;
|
||||
int opCount;
|
||||
std::map<int, RankData> rankData; // Indexed by globalRank
|
||||
std::map<int, RankData> rankData;
|
||||
};
|
||||
|
||||
struct CollectiveCalls
|
||||
{
|
||||
int numGlobalRanks;
|
||||
int numGpusPerMpiRank;
|
||||
std::vector<std::vector<std::string>> globalRankComms; // Set of comms used by each global rank
|
||||
std::vector<GroupCall> groupCalls; // List of group calls for each global rank
|
||||
|
||||
int localGpuOffset; // First local GPU device idx for this MPI process
|
||||
int firstGlobalRank; // First global rank for this MPI process
|
||||
int numCommsPerRank; // Number of communicators per rank
|
||||
std::vector<std::vector<ncclComm_t>> localRankComms; // comms per local rank
|
||||
std::vector<std::vector<hipStream_t>> localRankStreams; // streams per local rank
|
||||
};
|
||||
|
||||
|
||||
size_t DataTypeToBytes(ncclDataType_t const dataType)
|
||||
{
|
||||
switch (dataType) {
|
||||
case ncclInt8: return 1;
|
||||
case ncclUint8: return 1;
|
||||
case ncclInt32: return 4;
|
||||
case ncclUint32: return 4;
|
||||
case ncclInt64: return 8;
|
||||
case ncclUint64: return 8;
|
||||
case ncclFloat16: return 2;
|
||||
case ncclFloat32: return 4;
|
||||
case ncclFloat64: return 8;
|
||||
case ncclInt8: return 1;
|
||||
case ncclUint8: return 1;
|
||||
case ncclInt32: return 4;
|
||||
case ncclUint32: return 4;
|
||||
case ncclInt64: return 8;
|
||||
case ncclUint64: return 8;
|
||||
case ncclFloat16: return 2;
|
||||
case ncclFloat32: return 4;
|
||||
case ncclFloat64: return 8;
|
||||
case ncclBfloat16: return 2;
|
||||
case ncclFp8E4M3: return 1;
|
||||
case ncclFp8E5M2: return 1;
|
||||
@@ -149,24 +155,20 @@ ncclFunc_t GetFuncType(char* func)
|
||||
// parse the logs and assign them into lineItem
|
||||
bool ParseLineItem(char const* line, LineItem& li);
|
||||
|
||||
// this covers grouping the logs based on opCount and task number,
|
||||
// this covers grouping the logs based on opCount and task number,
|
||||
// validatation of the groupCalls for both non-send/recv collectives and send/recv
|
||||
void ParseCollectives(char const* logFilename,
|
||||
int const numGlobalRanks,
|
||||
std::vector<GroupCall>& groupCalls);
|
||||
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& collectiveCalls);
|
||||
|
||||
// size differ for each collective call and getSize gives a specific size in bytes depending on type of task,
|
||||
// allocates send/recv buff, sets the device based on which rank the task belongs to,
|
||||
// syncronize devices after executing all the tasks and free device memory.
|
||||
void ReplayRccl(CollectiveCalls const& collCall, int groupIdx);
|
||||
|
||||
// Print information about a group call
|
||||
void PrintGroupCall(GroupCall const& gc);
|
||||
|
||||
// size differ for each collective call and getSize gives a specific size in bytes depending on type of task,
|
||||
// global rank, element count and data type
|
||||
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo,
|
||||
int numGlobalRanks);
|
||||
std::pair<size_t, size_t> GetSize(TaskInfo taskInfo, int numGlobalRanks);
|
||||
|
||||
// executes the collective call (task)
|
||||
void ExecuteCollective(TaskInfo task, ncclComm_t comm, hipStream_t stream, const void *sendbuff, void *recvbuff);
|
||||
|
||||
// allocates send/recv buff, sets the device based on which rank the task belongs to,
|
||||
// syncronize devices after executing all the tasks and free device memory.
|
||||
void ReplayRccl(GroupCall& groupCall, std::vector<ncclComm_t> comms, std::vector<hipStream_t> streams,
|
||||
int const localGpuOffset,
|
||||
int const numGpusPerMpiRank,
|
||||
int const firstGlobalRank,
|
||||
int const numGlobalRanks);
|
||||
// executes the collective call (task)
|
||||
void ExecuteCollective(TaskInfo const& task, ncclComm_t const& comm, hipStream_t stream, const void *sendbuff, void *recvbuff);
|
||||
|
||||
Viittaa uudesa ongelmassa
Block a user