RCCL recorder w/ suggested change and UT
Этот коммит содержится в:
Tim
2025-04-19 00:21:27 -04:00
коммит произвёл GitHub
родитель 52bfdf05dc
Коммит 9a55ff60a9
18 изменённых файлов: 1123 добавлений и 93 удалений
+2
Просмотреть файл
@@ -506,6 +506,7 @@ set(SRC_FILES
src/include/rocm_smi_wrap.h
src/include/rocmwrap.h
src/include/roctx.h
src/include/recorder.h
src/include/shm.h
src/include/shmutils.h
src/include/signals.h
@@ -577,6 +578,7 @@ set(SRC_FILES
src/misc/rocm_smi_wrap.cc
src/misc/rocmwrap.cc
src/misc/roctx.cc
src/misc/recorder.cc
src/misc/shmutils.cc
src/misc/signals.cc
src/misc/socket.cc
+89 -22
Просмотреть файл
@@ -13,6 +13,8 @@
#include "msccl/msccl_lifecycle.h"
using namespace rccl;
const char* ncclFuncToString(ncclFunc_t fn) {
switch (fn) {
case ncclFuncAllGather: return "AllGather";
@@ -99,15 +101,21 @@ ncclResult_t ncclAllGather_impl(const void* sendbuff, void* recvbuff, size_t sen
NvtxParamsAllGather payload{sendcount * ncclTypeSize(datatype), datatype};
NVTX3_FUNC_WITH_PARAMS(AllGather, AllGatherSchema, payload)
struct ncclInfo info = { ncclFuncAllGather, "AllGather",
sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */
ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrAllGather, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
sendcount, datatype, 0, 0, ncclSum, mscclFuncAllGather, comm, stream);
}
struct ncclInfo info = { ncclFuncAllGather, "AllGather",
sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */
ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS };
NCCLCHECK(ncclEnqueueCheck(&info));
return ncclSuccess;
}
@@ -133,15 +141,21 @@ ncclResult_t ncclAllReduce_impl(const void* sendbuff, void* recvbuff, size_t cou
NvtxParamsAllReduce payload{count * ncclTypeSize(datatype), op, datatype};
NVTX3_FUNC_WITH_PARAMS(AllReduce, AllReduceSchema, payload)
struct ncclInfo info = { ncclFuncAllReduce, "AllReduce",
sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */
ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrAllReduce, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
count, datatype, 0, 0, op, mscclFuncAllReduce, comm, stream);
}
struct ncclInfo info = { ncclFuncAllReduce, "AllReduce",
sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */
ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS };
NCCLCHECK(ncclEnqueueCheck(&info));
return ncclSuccess;
}
@@ -154,6 +168,12 @@ NCCL_API(ncclResult_t, ncclAllToAll, const void* sendbuff, void* recvbuff, size_
ncclResult_t ncclAllToAll_impl(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype,
ncclComm_t comm, hipStream_t stream) {
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrAllToAll, sendbuff, recvbuff, count, datatype, comm, stream));
}
struct NvtxParamsAllToAll {
size_t bytes;
ncclDataType_t datatype;
@@ -204,6 +224,12 @@ NCCL_API(ncclResult_t, ncclAllToAllv, const void *sendbuff, const size_t sendcou
ncclResult_t ncclAllToAllv_impl(const void *sendbuff, const size_t sendcounts[], const size_t sdispls[],
void *recvbuff, const size_t recvcounts[], const size_t rdispls[],
ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream) {
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrAllToAllv, sendbuff, recvbuff, 0, datatype, comm, stream, -1, sendcounts, sdispls, recvcounts, rdispls));
}
struct NvtxParamsAllToAllv {
size_t sendbytes;
size_t recvbytes;
@@ -268,15 +294,21 @@ ncclResult_t ncclBroadcast_impl(const void* sendbuff, void* recvbuff, size_t cou
NvtxParamsBroadcast payload{count * ncclTypeSize(datatype), root, datatype};
NVTX3_FUNC_WITH_PARAMS(Broadcast, BroadcastSchema, payload)
struct ncclInfo info = { ncclFuncBroadcast, "Broadcast",
sendbuff, recvbuff, count, datatype, ncclSum, root, comm, stream, /* Args */
BROADCAST_CHUNKSTEPS, BROADCAST_SLICESTEPS };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrBroadcast, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
count, datatype, root, 0, ncclSum, mscclFuncBroadcast, comm, stream);
}
struct ncclInfo info = { ncclFuncBroadcast, "Broadcast",
sendbuff, recvbuff, count, datatype, ncclSum, root, comm, stream, /* Args */
BROADCAST_CHUNKSTEPS, BROADCAST_SLICESTEPS };
NCCLCHECK(ncclEnqueueCheck(&info));
return ncclSuccess;
}
@@ -285,6 +317,7 @@ NCCL_API(ncclResult_t, ncclBcast, void* buff, size_t count, ncclDataType_t datat
ncclComm_t comm, cudaStream_t stream);
ncclResult_t ncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root,
ncclComm_t comm, cudaStream_t stream) {
NCCLCHECK(Recorder::instance().record(rrBcast, buff, buff, count, datatype, comm, stream, root));
NCCLCHECK(ncclBroadcast(buff, buff, count, datatype, root, comm, stream));
return ncclSuccess;
}
@@ -308,6 +341,11 @@ ncclResult_t ncclGather_impl(const void* sendbuff, void* recvbuff, size_t sendco
NvtxParamsGather payload{sendcount * ncclTypeSize(datatype), root, datatype};
NVTX3_FUNC_WITH_PARAMS(Gather, GatherSchema, payload)
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrGather, sendbuff, recvbuff, sendcount, datatype, comm, stream, root));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
@@ -352,15 +390,21 @@ ncclResult_t ncclReduce_impl(const void* sendbuff, void* recvbuff, size_t count,
NvtxParamsReduce payload{count * ncclTypeSize(datatype), root, op, datatype};
NVTX3_FUNC_WITH_PARAMS(Reduce, ReduceSchema, payload)
struct ncclInfo info = { ncclFuncReduce, "Reduce",
sendbuff, recvbuff, count, datatype, op, root, comm, stream, /* Args */
REDUCE_CHUNKSTEPS, REDUCE_SLICESTEPS };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrReduce, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
count, datatype, root, 0, op, mscclFuncReduce, comm, stream);
}
struct ncclInfo info = { ncclFuncReduce, "Reduce",
sendbuff, recvbuff, count, datatype, op, root, comm, stream, /* Args */
REDUCE_CHUNKSTEPS, REDUCE_SLICESTEPS };
NCCLCHECK(ncclEnqueueCheck(&info));
return ncclSuccess;
}
@@ -386,15 +430,21 @@ ncclResult_t ncclReduceScatter_impl(const void* sendbuff, void* recvbuff, size_t
NvtxParamsReduceScatter payload{recvcount * ncclTypeSize(datatype), op, datatype};
NVTX3_FUNC_WITH_PARAMS(ReduceScatter, ReduceScatterSchema, payload)
struct ncclInfo info = { ncclFuncReduceScatter, "ReduceScatter",
sendbuff, recvbuff, recvcount, datatype, op, 0, comm, stream, /* Args */
REDUCESCATTER_CHUNKSTEPS, REDUCESCATTER_SLICESTEPS };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrReduceScatter, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
recvcount, datatype, 0, 0, op, mscclFuncReduceScatter, comm, stream);
}
struct ncclInfo info = { ncclFuncReduceScatter, "ReduceScatter",
sendbuff, recvbuff, recvcount, datatype, op, 0, comm, stream, /* Args */
REDUCESCATTER_CHUNKSTEPS, REDUCESCATTER_SLICESTEPS };
NCCLCHECK(ncclEnqueueCheck(&info));
return ncclSuccess;
}
@@ -418,7 +468,12 @@ ncclResult_t ncclScatter_impl(const void* sendbuff, void* recvbuff, size_t recvc
};
NvtxParamsScatter payload{recvcount * ncclTypeSize(datatype), root, datatype};
NVTX3_FUNC_WITH_PARAMS(Scatter, ScatterSchema, payload)
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrScatter, sendbuff, recvbuff, recvcount, datatype, comm, stream, root));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr,
@@ -462,15 +517,21 @@ ncclResult_t ncclSend_impl(const void* sendbuff, size_t count, ncclDataType_t da
NvtxParamsSendRecv payload{count * ncclTypeSize(datatype), peer, datatype};
NVTX3_FUNC_WITH_PARAMS(Send, SendRecvSchema, payload)
struct ncclInfo info = { ncclFuncSend, "Send",
NULL, (void*)sendbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */
1, 1 };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrSend, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
sendbuff, nullptr, nullptr, nullptr, nullptr, nullptr,
count, datatype, 0, peer, ncclSum, mscclFuncSend, comm, stream);
}
struct ncclInfo info = { ncclFuncSend, "Send",
NULL, (void*)sendbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */
1, 1 };
ncclResult_t ret;
NCCLCHECK(ncclGroupStart());
NCCLCHECKGOTO(ncclEnqueueCheck(&info), ret, exit);
@@ -487,15 +548,21 @@ ncclResult_t ncclRecv_impl(void* recvbuff, size_t count, ncclDataType_t datatype
NvtxParamsSendRecv payload{count * ncclTypeSize(datatype), peer, datatype};
NVTX3_FUNC_WITH_PARAMS(Recv, SendRecvSchema, payload)
struct ncclInfo info = { ncclFuncRecv, "Recv",
NULL, recvbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */
1, 1 };
if (!mscclIsCaller()) // when msccl falls back to
{
NCCLCHECK(Recorder::instance().record(rrRecv, info));
}
if (mscclAvailable(comm->rank) && !mscclIsCaller()) {
return mscclEnqueueCheck(
nullptr, nullptr, nullptr, recvbuff, nullptr, nullptr,
count, datatype, 0, peer, ncclSum, mscclFuncRecv, comm, stream);
}
struct ncclInfo info = { ncclFuncRecv, "Recv",
NULL, recvbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */
1, 1 };
ncclResult_t ret;
NCCLCHECK(ncclGroupStart());
NCCLCHECKGOTO(ncclEnqueueCheck(&info), ret, exit);
+6
Просмотреть файл
@@ -25,6 +25,8 @@
#include <cstring> // std::memcpy
#include <cinttypes> // PRIx64
using namespace rccl;
struct ncclKernelMatch {
void* kernelFn;
bool specialized;
@@ -2560,12 +2562,16 @@ ncclResult_t ncclRedOpCreatePreMulSum_impl(ncclRedOp_t *op, void *scalar, ncclDa
}
*op = ncclRedOp_t(int(ncclNumOps) + ix);
*op = ncclUserRedOpMangle(comm, *op);
// ! recording at sink
NCCLCHECK(Recorder::instance().record(rrRedOpCreatePreMulSum, *op, comm, datatype, residence, scalar));
TRACE_CALL("ncclRedOpCreatePreMulSum(%d,%p,%d,%d,%p)", *op, scalar, datatype, residence, comm);
return ncclSuccess;
}
NCCL_API(ncclResult_t, ncclRedOpDestroy, ncclRedOp_t op, ncclComm_t comm);
ncclResult_t ncclRedOpDestroy_impl(ncclRedOp_t op, ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrRedOpDestroy, op, comm));
if (0 <= int(op) && int(op) < int(ncclNumOps)) {
WARN("ncclRedOpDestroy : operator is a NCCL builtin.");
return ncclInvalidArgument;
+5
Просмотреть файл
@@ -17,6 +17,8 @@
#include "msccl/msccl_lifecycle.h"
using namespace rccl;
__thread int ncclGroupDepth = 0; // depth of ncclGroupStart nesting
__thread ncclResult_t ncclGroupError = ncclSuccess;
__thread struct ncclComm* ncclGroupCommHead = nullptr;
@@ -96,6 +98,7 @@ ncclResult_t ncclAsyncJobComplete(struct ncclAsyncJob* job) {
NCCL_API(ncclResult_t, ncclGroupStart);
ncclResult_t ncclGroupStart_impl() {
NCCLCHECK(Recorder::instance().record(rrGroupStart, ncclGroupDepth));
ncclResult_t ret = ncclSuccess;
NVTX3_FUNC_RANGE_IN(nccl_domain);
@@ -114,6 +117,7 @@ ncclResult_t ncclGroupStartInternal() {
NCCL_API(ncclResult_t, ncclGroupEnd);
ncclResult_t ncclGroupEnd_impl() {
NCCLCHECK(Recorder::instance().record(rrGroupEnd, ncclGroupDepth));
ncclResult_t ret = ncclSuccess;
NVTX3_FUNC_RANGE_IN(nccl_domain);
NCCLCHECKGOTO(ncclGroupEndInternal(), ret, exit);
@@ -124,6 +128,7 @@ exit:
NCCL_API(ncclResult_t, ncclGroupSimulateEnd, ncclSimInfo_t* simInfo);
ncclResult_t ncclGroupSimulateEnd(ncclSimInfo_t* simInfo) {
Recorder::instance().record(ncclGroupDepth, simInfo);
ncclResult_t ret = ncclSuccess;
NVTX3_FUNC_RANGE_IN(nccl_domain);
NCCLCHECKGOTO(ncclGroupEndInternal(simInfo), ret, exit);
+1
Просмотреть файл
@@ -20,6 +20,7 @@
#include "nvmlwrap.h"
#include "profiler.h"
#include "rccl_common.h"
#include "recorder.h"
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#define HIPRT_CB
+162
Просмотреть файл
@@ -0,0 +1,162 @@
#include <fstream>
#include <sstream>
#include <vector>
#include <mutex>
#include <chrono>
#include "debug.h"
namespace rccl
{
// API opcode covered by rccl replayer
typedef enum {
rrBroadcast,
rrReduce,
rrAllGather,
rrReduceScatter,
rrAllReduce,
rrSend,
rrRecv,
rrAllToAll,
rrAllToAllv,
rrGather,
rrScatter,
rrBcast,
rrGroupStart,
rrGroupEnd,
rrGroupSimulatedEnd,
rrGetUniqueId,
rrCommInitDev,
rrCommInitRank,
rrCommInitAll,
rrCommInitRankConfig,
rrCommSplit,
rrCommFinalize,
rrCommDestroy,
rrCommAbort,
rrCommRegister,
rrCommDeregister,
rrMemAlloc,
rrMemFree,
rrRedOpCreatePreMulSum,
rrRedOpDestroy,
rrOtherCall
} rcclCall_t;
constexpr const char* rcclCallStr[]
{
"Broadcast",
"Reduce",
"AllGather",
"ReduceScatter",
"AllReduce",
"Send",
"Recv",
"AllToAll",
"AllToAllv",
"Gather",
"Scatter",
"Bcast",
"GroupStart",
"GroupEnd",
"GroupSimulatedEnd",
"GetUniqueId",
"CommInitDev",
"CommInitRank",
"CommInitAll",
"CommInitRankConfig",
"CommSplit",
"CommFinalize",
"CommDestroy",
"CommAbort",
"CommRegister",
"CommDeregister",
"MemAlloc",
"MemFree",
"RedOpCreatePreMulSum",
"RedOpDestroy",
"OtherCall"
};
struct rcclApiCall {
// implicit data
int pid = -1;
int tid = -1;
int hipDev = -1;
int groupDepth = -1;
double timestamp = -1;
unsigned long long graphID = 0;
int graphCaptured = -1;
// explicit data from header
rcclCall_t type; // in adjacent to op Name ^
uint64_t opCount = 0;
const void* sendbuff = NULL;
void* recvbuff = NULL;
size_t count = 0;
ncclDataType_t datatype;
ncclRedOp_t op;
int root = -1;
int nRanks = -1;
ncclComm_t comm;
hipStream_t stream;
int nTasks = -1;
int globalRank = -1;
uint64_t commId = 0;
rcclApiCall(){}
rcclApiCall(rcclCall_t type, const ncclInfo& info);
rcclApiCall(rcclCall_t type);
};
class Recorder {
private:
std::ofstream outputFile; //1 per process
int output_json = 0; // 0 is to binary, 1 to json
std::string filename;
int logLevel = -1;
//std::string hostname;
int pid = -1;
int numCall = 0; //for debugging only
static __thread int rcclReplayThreadIdx;
static int depth; // for indentation purpose, will need thread safty later
std::mutex writemtx;
std::vector<rcclApiCall> calls;
void captureGpuContext(rcclApiCall& call) const;
void write(const rcclApiCall &call);
static void recordLater(void* idx);
Recorder();
Recorder(const Recorder&) = delete;
Recorder& operator=(const Recorder&) = delete;
~Recorder();
public:
static Recorder& instance();
void record(const char* name); // non-replayable calls
ncclResult_t record(rcclApiCall& call);
ncclResult_t record(rcclCall_t type, const ncclInfo& info); // collective
ncclResult_t record(rcclCall_t type, const void* sendbuff, void* recvbuff, size_t count, // sendrecv based
ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream, int root = -1,
const size_t sendcounts[] = NULL, const size_t sdispls[] = NULL, const size_t recvcounts[] = NULL,
const size_t rdispls[] = NULL); // for alltoallv
ncclResult_t record(rcclCall_t type, ncclRedOp_t op, ncclComm_t comm,
ncclDataType_t datatype = ncclInt8, ncclScalarResidence_t residence = ncclScalarDevice,
void* scalar = NULL); // redop
ncclResult_t record(rcclCall_t type, int groupDepth); // group op
ncclResult_t record(rcclCall_t type, int size, int rank, ncclUniqueId* commId,
ncclComm_t comm = NULL, int device = 0); // init
void record(ncclComm_t* comms, int ndev, const int* devlist); // CommInitAll
ncclResult_t record(rcclCall_t type, ncclComm_t comm); // comm destroy
void record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, ncclConfig_t* config,
ncclComm_t comm = NULL); // CommInitConfig OR split
ncclResult_t record(rcclCall_t type, void* ptr, size_t size = 0); // mem alloc
ncclResult_t record(rcclCall_t type, ncclComm_t comm, void* handle,
void* userBuffer = NULL, size_t size = 0); // UBR
void record(int groupDepth, ncclSimInfo_t* siminfo); // SimulatedGroupEnd
};
void parseJsonEntry(const char* entry, std::vector<rcclApiCall>& calls);
void parseBinLog();
} // namespace rccl
+25
Просмотреть файл
@@ -67,6 +67,8 @@
#define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream
#endif
using namespace rccl;
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+2] = { "AllGather", "AllReduce", "AllToAllPivot", "Broadcast", "Reduce", "ReduceScatter", "SendRecv"};
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNetDirect", "CollNetChain", "NVLS", "NVLSTree", "PAT" };
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
@@ -188,6 +190,7 @@ static ncclResult_t ncclInit() {
NCCL_API(ncclResult_t, ncclGetVersion, int* version);
ncclResult_t ncclGetVersion_impl(int* version) {
Recorder::instance().record("GetVersion");
if (version == NULL) return ncclInvalidArgument;
*version = NCCL_VERSION_CODE;
return ncclSuccess;
@@ -204,6 +207,7 @@ ncclResult_t ncclGetUniqueId_impl(ncclUniqueId* out) {
memset(out, 0, sizeof(*out));
// copy to avoid alignment mismatch
memcpy(out, &handle, sizeof(handle));
Recorder::instance().record(rrGetUniqueId, -1, -1, out);
TRACE_CALL("ncclGetUniqueId(0x%llx)", (unsigned long long)hashUniqueId(*out));
return ncclSuccess;
}
@@ -2328,6 +2332,9 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, int nId
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, NULL, ncclCommInitJobFree, comm), res, fail);
exit:
// for loggin only, not ready for replaying
// !recording at sink
NCCLCHECK(Recorder::instance().record(rrCommInitDev, nranks, myrank, commId, comm, cudaDev));
return ncclGroupErrCheck(res);
fail:
if (comm) {
@@ -2354,6 +2361,7 @@ constexpr nvtxPayloadSchemaEntry_t CommInitRankSchema[] = {
NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank);
ncclResult_t ncclCommInitRank_impl(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
NCCLCHECK(Recorder::instance().record(rrCommInitRank, nranks, myrank, &commId));
// Load the CUDA driver and dlsym hooks (can fail on old drivers)
rocmLibraryInit();
@@ -2370,6 +2378,7 @@ ncclResult_t ncclCommInitRank_impl(ncclComm_t* newcomm, int nranks, ncclUniqueId
NCCL_API(ncclResult_t, ncclCommInitAll, ncclComm_t* comms, int ndev, const int* devlist);
ncclResult_t ncclCommInitAll_impl(ncclComm_t* comms, int ndev, const int* devlist) {
Recorder::instance().record(comms, ndev, devlist);
ncclResult_t ret = ncclSuccess;
int totalnDev;
int *gpuFlags = NULL;
@@ -2446,6 +2455,7 @@ ncclResult_t ncclCommSetAsyncError(ncclComm_t comm, ncclResult_t nextState) {
NCCL_API(ncclResult_t, ncclCommInitRankConfig, ncclComm_t* comm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config);
ncclResult_t ncclCommInitRankConfig_impl(ncclComm_t *newcomm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config) {
Recorder::instance().record(rrCommInitRankConfig, nranks, myrank, &commId, config);
int cudaDev;
ncclResult_t ret = ncclSuccess;
ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER;
@@ -2595,6 +2605,7 @@ static ncclResult_t commCleanup(ncclComm_t comm) {
NCCL_API(ncclResult_t, ncclCommFinalize, ncclComm_t comm);
ncclResult_t ncclCommFinalize_impl(ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrCommFinalize, comm));
NVTX3_FUNC_RANGE_IN(nccl_domain);
ncclResult_t ret = ncclSuccess;
struct ncclCommFinalizeAsyncJob *job = NULL;
@@ -2683,6 +2694,7 @@ static ncclResult_t commReclaim(struct ncclAsyncJob* job_) {
NCCL_API(ncclResult_t, ncclCommDestroy, ncclComm_t comm);
ncclResult_t ncclCommDestroy_impl(ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrCommDestroy, comm));
if (comm == NULL) {
NVTX3_FUNC_RANGE_IN(nccl_domain);
return ncclSuccess;
@@ -2740,6 +2752,7 @@ fail:
NCCL_API(ncclResult_t, ncclCommAbort, ncclComm_t comm);
ncclResult_t ncclCommAbort_impl(ncclComm_t comm) {
NCCLCHECK(Recorder::instance().record(rrCommAbort, comm));
if (comm == NULL) {
NVTX3_FUNC_RANGE_IN(nccl_domain);
return ncclSuccess;
@@ -2853,6 +2866,10 @@ ncclResult_t ncclCommSplit_impl(ncclComm_t comm, int color, int key, ncclComm_t
NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, NULL, free, comm), res, fail);
exit:
// for loggin only, not ready for replaying
// TODO: further integrate overloaded record header
// !recording at sink
Recorder::instance().record(rrCommSplit, color, key, (ncclUniqueId*)comm, config, *newcomm);
cudaSetDevice(oldDev);
(void)ncclGroupErrCheck(res);
NCCLCHECK(ncclGroupEndInternal());
@@ -2872,6 +2889,7 @@ fail:
NCCL_API(const char*, ncclGetErrorString, ncclResult_t code);
const char* ncclGetErrorString_impl(ncclResult_t code) {
Recorder::instance().record("GetErrorString");
switch (code) {
case ncclSuccess : return "no error";
case ncclUnhandledCudaError : return "unhandled cuda error (run with NCCL_DEBUG=INFO for details)";
@@ -2890,11 +2908,13 @@ const char* ncclGetErrorString_impl(ncclResult_t code) {
*/
NCCL_API(const char*, ncclGetLastError, const ncclComm_t comm);
const char* ncclGetLastError_impl(ncclComm_t comm) {
Recorder::instance().record("GetLastEror");
return ncclLastError;
}
NCCL_API(ncclResult_t, ncclCommGetAsyncError, ncclComm_t comm, ncclResult_t *asyncError);
ncclResult_t ncclCommGetAsyncError_impl(ncclComm_t comm, ncclResult_t *asyncError) {
Recorder::instance().record("GetAsyncError");
NCCLCHECK(CommCheck(comm, "ncclGetAsyncError", "comm"));
NCCLCHECK(PtrCheck(asyncError, "ncclGetAsyncError", "asyncError"));
@@ -2905,6 +2925,7 @@ ncclResult_t ncclCommGetAsyncError_impl(ncclComm_t comm, ncclResult_t *asyncErro
NCCL_API(ncclResult_t, ncclCommCount, const ncclComm_t comm, int* count);
ncclResult_t ncclCommCount_impl(const ncclComm_t comm, int* count) {
Recorder::instance().record("CommCount");
NVTX3_FUNC_RANGE_IN(nccl_domain);
NCCLCHECK(CommCheck(comm, "CommCount", "comm"));
@@ -2919,6 +2940,7 @@ ncclResult_t ncclCommCount_impl(const ncclComm_t comm, int* count) {
NCCL_API(ncclResult_t, ncclCommCuDevice, const ncclComm_t comm, int* devid);
ncclResult_t ncclCommCuDevice_impl(const ncclComm_t comm, int* devid) {
Recorder::instance().record("CuDevice");
NVTX3_FUNC_RANGE_IN(nccl_domain);
NCCLCHECK(CommCheck(comm, "CommCuDevice", "comm"));
@@ -2932,6 +2954,7 @@ ncclResult_t ncclCommCuDevice_impl(const ncclComm_t comm, int* devid) {
NCCL_API(ncclResult_t, ncclCommUserRank, const ncclComm_t comm, int* rank);
ncclResult_t ncclCommUserRank_impl(const ncclComm_t comm, int* rank) {
Recorder::instance().record("CommUserRank");
NVTX3_FUNC_RANGE_IN(nccl_domain);
NCCLCHECK(CommCheck(comm, "CommUserRank", "comm"));
@@ -3035,6 +3058,7 @@ fallback:
CUDACHECKGOTO(cudaMalloc(ptr, size), ret, fail);
exit:
NCCLCHECK(Recorder::instance().record(rrMemAlloc, *ptr, size));
return ret;
fail:
goto exit;
@@ -3042,6 +3066,7 @@ fail:
NCCL_API(ncclResult_t, ncclMemFree, void *ptr);
ncclResult_t ncclMemFree_impl(void *ptr) {
NCCLCHECK(Recorder::instance().record(rrMemFree, ptr));
NVTX3_FUNC_RANGE_IN(nccl_domain);
ncclResult_t ret = ncclSuccess;
int saveDevice;
+669
Просмотреть файл
@@ -0,0 +1,669 @@
#include "bootstrap.h"
#include "group.h"
#include "utils.h"
#include <cstring>
#include <string>
#include <iomanip>
#include <sys/syscall.h>
using namespace std::chrono;
namespace rccl
{
__thread int Recorder::rcclReplayThreadIdx = -1;
int Recorder::depth = 0;
static char buffer[4096];
static rcclCall_t lastcall = rrGroupStart; // for trailing comma, need modify for multithread case for Bcast
void indent(int depth, std::ofstream& o) {for (int i = 0; i < depth; i++) o << " ";}
void newLine(std::ofstream& o) {if (lastcall != rrGroupStart) o << ","; o << std::endl;}
static uint64_t hashUniqueId(ncclUniqueId const &id) {
char const *bytes = (char const*)&id;
uint64_t h = 0xdeadbeef;
for(int i=0; i < (int)sizeof(ncclUniqueId); i++) {
h ^= h >> 32;
h *= 0x8db3db47fa2994ad;
h += bytes[i];
}
return h;
}
rcclApiCall::rcclApiCall(rcclCall_t type, const ncclInfo& info)://name(rcclCallStr[call.type]), // opName
type(type),
opCount(info.comm->opCount),
sendbuff(info.sendbuff),
recvbuff(info.recvbuff),
count(info.count),
datatype(info.datatype),
op(info.op),
root(info.root),
comm(info.comm),
nRanks(info.comm->nRanks),
stream(info.stream),
nTasks(info.comm->planner.nTasksP2p + info.comm->planner.nTasksColl),
globalRank(info.comm->localRankToRank[info.comm->localRank]){}
rcclApiCall::rcclApiCall(rcclCall_t type) : type(type){}
std::string siminfo_fmt = "[size : %zu, magic : %u, version : %u, estimated time : %f, timestamp : %f]";
std::string config_fmt = ", ncclConfig : [size : %zu, magic : %u, version : %u, blocking : %d, cgaClusterSize : %d, minCTA : %d, maxCTA : %d, netname : %s, splitshare : %d]";
std::string ctxt_fmt = "time : %lf, thread : %d, device : %d, captured : %d, graphID : %llu ]]"; // implicit context info
std::string ubr_fmt = "%s : [comm : %p, buff : %p, returned handle : %p, size : %zu, context : [";
std::string getId_fmt = "%s : [uniqueID : %llu, context : [";
std::string ubDereg_fmt = "%s : [comm : %p, handle : %p, context : [";
std::string rank_fmt = "%s : [size : %d, uniqueID : %llu, rank : %d, context : [";
std::string init_fmt = "%s : [comm : %p, size : %d, uniqueID : %llu, rank : %d, dev : %d, context : [";
std::string all_fmt = "%s : [# of device : %d, context : [";
std::string destroy_fmt = "%s : [comm : %p, context : [";
std::string split_fmt = "%s : [comm : %p, color : %d, key : %d, newcomm : %p, context : [";
std::string alloc_fmt = "%s : [returned ptr : %p, size : %zu, context : [";
std::string free_fmt = "%s : [ptr : %p, context : [";
std::string redop_fmt = "%s : [scalar : %p, datatype : %d, op : %d, residence : %d, comm : %p, context : [";
std::string redopdestroy_fmt = "%s : [op : %d, comm : %p, context : [";
std::string coll_fmt = "%s : [opCount : %lx, sendbuff : %p, recvbuff : %p, count : %zu, datatype : %d, op : %d, root : %d, comm : %p, nranks : %d, stream : %p, task : %d, globalrank : %d, context : [";
Recorder::Recorder()
{
filename = getenv("RCCL_REPLAY_FILE") ? getenv("RCCL_REPLAY_FILE") : "";
if (!filename.size())
{
return;
}
logLevel = getenv("RCCL_LOG_LEVEL") ? std::stoi(getenv("RCCL_LOG_LEVEL")) : 1;
char hostname[1024];
getHostName(hostname, 1024, '.');
pid = getpid();
output_json = 0;
size_t dot;
std::string output_name, output_extension;
if ((dot = std::string(filename).find(".")) != std::string::npos)
{
output_name = std::string(filename).substr(0, dot);
output_extension = std::string(filename).substr(dot);
if (output_extension.compare(".json") == 0)
{
output_json = 1;
}
} else {
output_name = std::string(filename);
}
outputFile.open(output_name + std::to_string(pid) + output_extension,
output_json ? std::ofstream::out : std::ofstream::binary);
if (output_json)
{
outputFile << "{" << std::endl;
indent(2, outputFile);
outputFile << "hostname : " << hostname << ", version : 0,";
}
}
Recorder& Recorder::instance()
{
static Recorder _instance;
return _instance;
}
void Recorder::captureGpuContext(rcclApiCall& call) const
{
call.timestamp = duration_cast<duration<double>>(high_resolution_clock::now().time_since_epoch()).count() * 1000;
if (rcclReplayThreadIdx == -1)
{
rcclReplayThreadIdx = syscall(SYS_gettid);
}
int hipDev;
hipGetDevice(&hipDev); // need later change to copy from comm
call.pid = pid;
call.tid = rcclReplayThreadIdx;
call.hipDev = hipDev;
return;
}
// for single process use only, for now
// TODO: potentially need async logging for performance
void Recorder::write(const rcclApiCall &call)
{
if (!filename.size())
{
return ;
}
std::unique_lock<std::mutex> lock(writemtx);
if (lastcall == rrBcast && call.type == rrBroadcast)
{
return;
}
int len = -1;
if (output_json)
{
if (call.type == rrGroupEnd || call.type == rrGroupSimulatedEnd)
{
depth--;
outputFile << std::endl;
indent(2 + 2 * depth, outputFile);
outputFile << "}";
return ;
}
newLine(outputFile);
indent(2 + 2 * depth, outputFile);
lastcall = call.type;
switch (call.type) {
case rrGroupStart:
{
outputFile << "{";
depth++;
return ;
}
case rrCommRegister:
{
len = snprintf(buffer, 4096, ubr_fmt.c_str(),
rcclCallStr[call.type], call.comm, call.sendbuff, call.recvbuff, call.count);
break;
}
case rrCommDeregister:
{
len = snprintf(buffer, 4096, ubDereg_fmt.c_str(),
rcclCallStr[call.type], call.comm, call.recvbuff);
break;
}
case rrGetUniqueId:
{
len = snprintf(buffer, 4096, getId_fmt.c_str(), rcclCallStr[call.type], call.commId);
break;
}
case rrCommInitDev:
{
len = snprintf(buffer, 4096, init_fmt.c_str(), rcclCallStr[call.type], call.comm, call.nRanks, call.commId, call.globalRank, call.root);
break;
}
case rrCommInitRank:
case rrCommInitRankConfig:
{
len = snprintf(buffer, 4096, rank_fmt.c_str(), rcclCallStr[call.type], call.nRanks, call.commId, call.globalRank);
break; // detail to be provided by init dev or config info
}
case rrCommInitAll:
{
len = snprintf(buffer, 4096, all_fmt.c_str(), rcclCallStr[call.type], call.root);
break;
}
case rrCommFinalize:
case rrCommDestroy:
case rrCommAbort:
{
len = snprintf(buffer, 4096, destroy_fmt.c_str(), rcclCallStr[call.type], call.comm);
break;
}
case rrCommSplit:
{
len = snprintf(buffer, 4096, split_fmt.c_str(),
rcclCallStr[call.type], (void*)(call.commId), call.comm, call.nRanks, call.globalRank, call.comm);
break;
}
case rrMemAlloc:
{
len = snprintf(buffer, 4096, alloc_fmt.c_str(),
rcclCallStr[call.type], call.recvbuff, call.count);
break;
}
case rrMemFree:
{
len = snprintf(buffer, 4096, free_fmt.c_str(),
rcclCallStr[call.type], call.recvbuff);
break;
}
case rrRedOpCreatePreMulSum:
{
len = snprintf(buffer, 4096, redop_fmt.c_str(),
rcclCallStr[call.type], call.sendbuff, call.datatype, call.op, call.root, call.comm);
break;
}
case rrRedOpDestroy:
{
len = snprintf(buffer, 4096, redopdestroy_fmt.c_str(),
rcclCallStr[call.type], call.op, call.comm);
break;
}
default: // collectives
len = snprintf(buffer, 4096, coll_fmt.c_str(),
rcclCallStr[call.type], call.opCount, call.sendbuff, call.recvbuff, call.count, call.datatype,
call.op, call.root, call.comm, call.nRanks, call.stream, call.nTasks, call.globalRank);
}
outputFile.write(buffer, len);
len = snprintf(buffer, 4096, ctxt_fmt.c_str(), call.timestamp, call.tid, call.hipDev, call.graphCaptured, call.graphID);
outputFile.write(buffer, len);
} else {
outputFile.write((char*)&call, sizeof(rcclApiCall));
outputFile << std::endl;
}
outputFile.flush();
return ;
}
// wrapper function for graph launch callback
void Recorder::recordLater(void* idx)
{
Recorder& recorder = Recorder::instance();
size_t callidx = (size_t) idx;
rcclApiCall call = recorder.calls[callidx];
recorder.record(call);
}
void Recorder::record(const char* name)
{
if (!filename.size() || logLevel <= 1)
{
return;
}
double ts = duration_cast<duration<double>>(high_resolution_clock::now().time_since_epoch()).count() * 1000;
if (output_json) // will not record for binary replay export
{
std::unique_lock<std::mutex> lock(writemtx);
newLine(outputFile); lastcall = rrOtherCall;
indent(2 + 2 * depth, outputFile);
outputFile << name << " : [time : " << std::fixed << std::setprecision(6) << ts << "]";
}
//numCall++;
outputFile.flush();
}
ncclResult_t Recorder::record(rcclApiCall& call)
{
ncclResult_t ret = ncclSuccess;
captureGpuContext(call);
switch (call.type) {
case rrGroupStart:
case rrGroupEnd:
case rrGroupSimulatedEnd:
case rrGetUniqueId:
case rrCommInitDev:
case rrCommInitRank:
case rrCommInitAll:
case rrCommInitRankConfig:
case rrCommSplit:
case rrCommFinalize:
case rrCommDestroy:
case rrCommAbort: // communicator ops just exit and write
case rrCommRegister: // same with UBR
case rrCommDeregister:
case rrMemAlloc:
case rrMemFree:
case rrRedOpCreatePreMulSum:
case rrRedOpDestroy:
break;
// collectives, which may be registered with graph
case rrAllToAll: // should work with rest of the coll with nested sendrecv
default: // collective other than a2a/a2av
#if ROCM_VERSION >= 60100
hipStreamCaptureStatus status;
hipGraph_t graphCaptured;
unsigned long long graphID = 0;
CUDACHECK(hipStreamGetCaptureInfo_v2(call.stream, &status, &(call.graphID), &graphCaptured)); // shouldnt we need dependency?
if (status == hipStreamCaptureStatusActive) // when graph launched this should be disabled
{
call.graphCaptured = 1;
calls.push_back(call);
hipGraphNode_t logNode;
hipHostNodeParams p;
p.fn = &(Recorder::recordLater);
p.userData = (void*) (calls.size() - 1);
CUDACHECK(hipGraphAddHostNode(&logNode, graphCaptured, nullptr, 0, &p));
} else {
call.graphCaptured = 0;
}
#endif
}
write(call); // write immediatelly
return ret;
}
ncclResult_t Recorder::record(rcclCall_t type, const ncclInfo& info)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall call(type, info);
return record(call);
}
ncclResult_t Recorder::record(rcclCall_t type, const void* sendbuff, void* recvbuff,
size_t count, ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream, int root,
const size_t sendcounts[], const size_t sdispls[], const size_t recvcounts[], const size_t rdispls[])
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall call(type, {.sendbuff = sendbuff, .recvbuff = recvbuff, .count = count,
.datatype = datatype, .comm = comm, .stream = stream});
if (root != -1)
{
call.root = root;
}
ncclResult_t ret = record(call);
if (type == rrAllToAllv)
{
if (output_json)
{
int size = call.nRanks - 1;
outputFile << ", sendcounts : [";
for (int i = 0; i < size; i++) outputFile << sendcounts[i] << ", ";
outputFile << sendcounts[size] << "], sdispls : [";
for (int i = 0; i < size; i++) outputFile << sdispls[i] << ", ";
outputFile << sdispls[size] << "], recvcounts : [";
for (int i = 0; i < size; i++) outputFile << recvcounts[i] << ", ";
outputFile << recvcounts[size] << "], rdispls : [";
for (int i = 0; i < size; i++) outputFile << rdispls[i] << ", ";
outputFile << rdispls[size] << "]";
outputFile.flush();
}
// else export to binary
}
return ret;
}
ncclResult_t Recorder::record(rcclCall_t type, ncclRedOp_t op, ncclComm_t comm, ncclDataType_t datatype, ncclScalarResidence_t residence, void* scalar)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall call(type, {.op = op, .comm = comm});
if (type == rrRedOpCreatePreMulSum)
{
call.sendbuff = scalar;
call.datatype = datatype;
call.root = residence;
call.sendbuff = scalar;
}
return record(call);
//TODO: printout scalar
}
ncclResult_t Recorder::record(rcclCall_t type, int groupDepth)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall gc(type);
gc.groupDepth = groupDepth;
return record(gc);
}
ncclResult_t Recorder::record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, ncclComm_t comm, int device)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall initCall(type);
if (type == rrCommSplit)
{
initCall.comm = comm;
initCall.commId = (uint64_t)commId;
} else {
initCall.commId = hashUniqueId(*commId);
}
initCall.nRanks = size;
initCall.globalRank = rank;
if (type == rrCommInitDev)
{
initCall.root = device;
initCall.comm = comm;
}
return record(initCall);
}
// comm destroy
ncclResult_t Recorder::record(rcclCall_t type, ncclComm_t comm)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall call(type);
call.comm = comm;
return record(call);
}
ncclResult_t Recorder::record(rcclCall_t type, ncclComm_t comm, void* handle, void* userBuffer, size_t size)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall call(type);
call.comm = comm;
call.recvbuff = handle;
if (type == rrCommRegister)
{
call.sendbuff = userBuffer;
call.count = size;
}
return record(call);
}
ncclResult_t Recorder::record(rcclCall_t type, void* ptr, size_t size)
{
if (!filename.size())
{
return ncclSuccess;
}
rcclApiCall call(type);
call.recvbuff = ptr;
if (type == rrMemAlloc)
{
call.count = size;
}
return record(call);
}
void Recorder::record(int groupDepth, ncclSimInfo_t *siminfo)
{
if (!filename.size())
{
return;
}
rcclApiCall call(rrGroupSimulatedEnd);
record(call);
if (output_json && siminfo)
{
int len = snprintf(buffer, 4096, siminfo_fmt.c_str(),
siminfo->size, siminfo->magic, siminfo->version, siminfo->estimatedTime, call.timestamp);
outputFile.write(buffer, len);
} // no tid for groupCall
outputFile.flush();
}
void Recorder::record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, ncclConfig_t* config, ncclComm_t comm)
{
if (!filename.size())
{
return ;
}
if (type == rrCommInitRankConfig)
{
record(type, size, rank, commId);
}
else //rrCommSplit
{
record(type, size /*color*/, rank/*key*/, commId, comm);
}
if (output_json && config)
{
int len = snprintf(buffer, 4096, config_fmt.c_str(), config->size, config->magic, config->version, config->blocking,
config->cgaClusterSize, config->minCTAs, config->maxCTAs, config->netName, config->splitShare);
outputFile.write(buffer, len);
outputFile.flush();
}
}
void Recorder::record(ncclComm_t* comms, int ndev, const int* devlist)
{
if (!filename.size())
{
return ;
}
rcclApiCall call(rrCommInitAll);
call.root = ndev;
// call.sendbuff = comms; TODO: might log this too
record(call);
if (output_json && devlist)
{
outputFile << ", devlist : [";
for (int i = 0; i < call.root - 1; i++)
outputFile << devlist[i] << ", ";
outputFile << devlist[call.root - 1] << "]";
outputFile.flush();
}
}
Recorder::~Recorder()
{
if (outputFile.is_open())
{
if (output_json) outputFile << std::endl << "}" << std::endl;
outputFile.close();
calls.clear();
}
}
static rcclCall_t getFuncType(std::string func)
{
for (int i = 0; i < sizeof(rcclCallStr) / sizeof(char*); i++)
{
if (func == std::string(rcclCallStr[i]))
{
return (rcclCall_t)i;
}
}
printf("[ERROR] Unrecognized func %s\n", func.c_str());
exit(1);
}
void parseJsonEntry(const char* entry, std::vector<rcclApiCall>& calls)
{
rcclApiCall call;
std::string str(entry);
size_t begin = str.find_first_not_of(' ');
size_t end = str.find(" : ");
rcclCall_t type = getFuncType(str.substr(begin, end-begin));
call.type = type;
switch(type) {
case rrCommRegister:
{
assert(sscanf(str.c_str() + end + 3, (ubr_fmt.substr(5) + ctxt_fmt).c_str(),
&call.comm, &call.sendbuff, &call.recvbuff, &call.count) == 4);
break;
}
case rrCommDeregister:
{
assert(sscanf(str.c_str() + end + 3, (ubDereg_fmt.substr(5) + ctxt_fmt).c_str(),
&call.comm, &call.recvbuff) == 2);
break;
}
case rrGetUniqueId:
{
assert(sscanf(str.c_str() + end + 3, (getId_fmt.substr(5) + ctxt_fmt).c_str(), &call.commId) == 1);
break;
}
case rrCommInitDev:
{
assert(sscanf(str.c_str() + end + 3, (init_fmt.substr(5) + ctxt_fmt).c_str(),
&call.comm, &call.nRanks, &call.commId, &call.globalRank, &call.root) == 5);
break;
}
case rrCommInitRank:
case rrCommInitRankConfig:
{
assert(sscanf(str.c_str() + end + 3, (rank_fmt.substr(5) + ctxt_fmt).c_str(),
&call.nRanks, &call.commId, &call.globalRank) == 3);
break;
}
case rrCommInitAll:
{
assert(sscanf(str.c_str() + end + 3, (all_fmt.substr(5) + ctxt_fmt).c_str(), &call.root) == 1);
break;
}
case rrCommFinalize:
case rrCommDestroy:
case rrCommAbort:
{
assert(sscanf(str.c_str() + end + 3, (destroy_fmt.substr(5) + ctxt_fmt).c_str(), &call.comm) == 1);
break;
}
case rrCommSplit:
{
assert(sscanf(str.c_str() + end + 3, (split_fmt.substr(5) + ctxt_fmt).c_str(),
&call.commId, &call.comm, &call.nRanks, &call.globalRank, &call.comm) == 5);
break;
}
case rrMemAlloc:
{
assert(sscanf(str.c_str() + end + 3, (alloc_fmt.substr(5) + ctxt_fmt).c_str(),
&call.recvbuff, &call.count) == 2);
break;
}
case rrMemFree:
{
assert(sscanf(str.c_str() + end + 3, (free_fmt.substr(5) + ctxt_fmt).c_str(), &call.recvbuff) == 1);
break;
}
case rrRedOpCreatePreMulSum:
{
assert(sscanf(str.c_str() + end + 3, (redop_fmt.substr(5) + ctxt_fmt).c_str(),
&call.sendbuff, &call.datatype, &call.op, &call.root, &call.comm) == 5);
break;
}
case rrRedOpDestroy:
{
assert(sscanf(str.c_str() + end + 3, (redopdestroy_fmt.substr(5) + ctxt_fmt).c_str(),
&call.op, &call.comm) == 2);
break;
}
default:
assert(sscanf(str.c_str() + end + 3, (coll_fmt.substr(5) + ctxt_fmt).c_str(),
&call.opCount, &call.sendbuff, &call.recvbuff, &call.count, &call.datatype, &call.op, &call.root,
&call.comm, &call.nRanks, &call.stream, &call.nTasks, &call.globalRank, &call.timestamp, &call.tid,
&call.hipDev, &call.graphCaptured, &call.graphID) == 17);
}
calls.push_back(call);
}
void parseBinLog()
{
// TODO: need to handle trailing data such as simInfo, devList, a2av data, etc.
}
};
+5
Просмотреть файл
@@ -11,8 +11,11 @@
#include <cstdio>
#include <cstdlib>
using namespace rccl;
NCCL_API(ncclResult_t, mscclLoadAlgo, const char *mscclAlgoFilePath, mscclAlgoHandle_t *mscclAlgoHandle, int rank);
ncclResult_t mscclLoadAlgo_impl(const char *mscclAlgoFilePath, mscclAlgoHandle_t *mscclAlgoHandle, int rank) {
Recorder::instance().record("mscclLoadAlgo");
mscclStatus& status = mscclGetStatus(rank);
if (status.freeAlgoHandles.size() == 0) {
@@ -45,6 +48,7 @@ ncclResult_t mscclRunAlgo_impl(
void* recvBuff, const size_t recvCounts[], const size_t rDisPls[],
size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op,
mscclAlgoHandle_t mscclAlgoHandle, ncclComm_t comm, hipStream_t stream) {
Recorder::instance().record("mscclRunAlgo");
struct NvtxParamsMsccl {
size_t bytes;
ncclRedOp_t op;
@@ -91,5 +95,6 @@ ncclResult_t mscclRunAlgo_impl(
NCCL_API(ncclResult_t, mscclUnloadAlgo, mscclAlgoHandle_t mscclAlgoHandle);
ncclResult_t mscclUnloadAlgo_impl(mscclAlgoHandle_t mscclAlgoHandle) {
// deprecated
Recorder::instance().record("mscclUnloadAlgo");
return ncclSuccess;
}
+12 -4
Просмотреть файл
@@ -15,6 +15,8 @@
#include "mscclpp/mscclpp_nccl.h"
#endif
using namespace rccl;
ncclResult_t ncclNetDeregister(struct ncclComm* comm, struct ncclReg* reg) {
struct ncclRegCache* cache = &comm->regCache;
ncclDebugNoWarn = NCCL_NET;
@@ -162,6 +164,8 @@ ncclResult_t ncclRegCleanup(struct ncclComm* comm) {
NCCL_API(ncclResult_t, ncclCommRegister, const ncclComm_t comm, void* buff, size_t size, void** handle);
ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t size, void** handle) {
ncclResult_t ret = ncclSuccess;
NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm"));
if (comm->checkPointers) NCCLCHECK(CudaPtrCheck(buff, comm, "buff", "ncclCommRegister"));
#ifdef ENABLE_MSCCLPP
@@ -171,8 +175,7 @@ ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t siz
CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(buff)));
if(!isManagedBuffer){
INFO(NCCL_INIT, "MSCCL++: ncclCommRegister");
NCCLCHECK(mscclpp_ncclCommRegister(comm->mscclpp_comm, buff, size, handle));
return ncclSuccess;
NCCLCHECKGOTO(mscclpp_ncclCommRegister(comm->mscclpp_comm, buff, size, handle), ret, end);
}
else{
WARN("MSCCL++: Cannot register user-buffers on managed memory. RCCL user-buffer registration will occur.");
@@ -181,12 +184,17 @@ ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t siz
}
#endif
INFO(NCCL_INIT, "RCCL: ncclCommRegister");
NCCLCHECK(ncclRegister(comm, buff, size, handle));
return ncclSuccess;
NCCLCHECKGOTO(ncclRegister(comm, buff, size, handle), ret, end);
end:
// !recording at sink
NCCLCHECK(Recorder::instance().record(rrCommRegister, comm, *handle, buff, size));
return ret;
}
NCCL_API(ncclResult_t, ncclCommDeregister, const ncclComm_t comm, void* handle);
ncclResult_t ncclCommDeregister_impl(const ncclComm_t comm, void* handle) {
NCCLCHECK(Recorder::instance().record(rrCommDeregister, comm, handle));
#ifdef ENABLE_MSCCLPP
if (comm->mscclppCompatible) {
+2
Просмотреть файл
@@ -46,6 +46,7 @@ if(BUILD_TESTS)
ScatterTests.cpp
SendRecvTests.cpp
StandaloneTests.cpp
_RecorderTests.cpp
common/main.cpp
common/CallCollectiveForked.cpp
common/CollectiveArgs.cpp
@@ -55,6 +56,7 @@ if(BUILD_TESTS)
common/TestBed.cpp
common/TestBedChild.cpp
common/StandaloneUtils.cpp
../src/misc/recorder.cc
)
add_executable(rccl-UnitTests ${TEST_SOURCE_FILES})
+72
Просмотреть файл
@@ -0,0 +1,72 @@
/*************************************************************************
* Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <gtest/gtest.h>
#include <rccl/rccl.h>
#include "RcclMockFuncs.hpp"
//#include "TestBed.hpp"
namespace RcclUnitTesting
{
/**
* \brief Verify correctness of Recorder record() correctness in binary mode
* ******************************************************************************************/
TEST(Recorder, ParseBinary)
{
// to add after binary export of logging is supported
}
/**
* \brief Verify correctness of Recorder record() correctness in json mode
* ******************************************************************************************/
TEST(Recorder, ParseJson)
{
setenv("RCCL_REPLAY_FILE", "test.json", 1);
int pid = getpid();
hipStream_t stream;
hipStreamCreate(&stream);
int array[] = {2, 3, 5};
ncclComm comm{.nRanks = 1, .localRank = 1, .localRankToRank = array, .opCount = 8, .planner = {.nTasksColl = 13, .nTasksP2p = 21}};
rccl::rcclApiCall call(rccl::rrAllToAllv, {.sendbuff = (void*)0x7f22f9600000, .recvbuff = (void*)0x7f22f9601000, .count = 0, .datatype = ncclFloat32, .comm = &comm, .stream = stream});
rccl::Recorder::instance().record(call);
std::vector<rccl::rcclApiCall> calls;
char entry[4096];
//parse the outfile
std::string filename = "test" + std::to_string(pid) + ".json";
std::ifstream fp("test" + std::to_string(pid) + ".json");
fp.getline(entry, 4096);
fp.getline(entry, 4096);
fp.getline(entry, 4096);
parseJsonEntry(entry, calls);
int result = memcmp((char*)&calls[0]+4, (char*)&call+4, sizeof(rccl::rcclApiCall)-4);
fp.close(); // care that recorder is not designed to anticipate fp closing before destructor
remove(filename.c_str());
unsetenv("RCCL_REPLAY_FILE");
assert(!result);
}
/**
* \brief Verify RCCL Recorder's integrity in multithread context by comparing Recorder
* instance across different threads.
* ******************************************************************************************/
static void recorderCmp(void** recorder)
{
*recorder = &(rccl::Recorder::instance());
}
TEST(Recorder, VerifyMultithread)
{
void *p1, *p2;
std::thread t1(recorderCmp, &p1);
std::thread t2(recorderCmp, &p2);
t1.join();
t2.join();
assert(p1 == p2);
}
}
+7
Просмотреть файл
@@ -0,0 +1,7 @@
#include "info.h"
#include "comm.h"
void ncclDebugLog(ncclDebugLogLevel, unsigned long, char const*, int, char const*, ...) {};
ncclResult_t getHostName(char* hostname, int maxlen, const char delim) {
return ncclSuccess;
}
+12
Просмотреть файл
@@ -0,0 +1,12 @@
ROCM_DIR ?= /opt/rocm
RCCL_DIR ?= ../../build/release
MPI_DIR ?= /opt/ompi
INCLUDES = -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include
LDFLAGS = -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl
main: rcclReplayer.cpp
$(ROCM_DIR)/bin/hipcc rcclReplayer.cpp -O1 -g -o rcclReplayer $(INCLUDES) $(LDFLAGS)
clean:
rm -f ./rcclReplayer
Просмотреть файл
+27 -28
Просмотреть файл
@@ -180,31 +180,36 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime)
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc)
{
bool verbose = isFirstRank && (getenv("VERBOSE") != NULL);
cc.globalRankComms.clear();
cc.globalRankComms.resize(cc.numGlobalRanks);
cc.groupCalls.clear();
FILE* fp = fopen(logFilename, "r");
std::ifstream fp(logFilename, std::ios::binary);
if (!fp) {
printf("[ERROR] Unable to open file %s\n", logFilename);
exit(-1);
}
char line[2048];
LineItem li;
constexpr size_t size = sizeof(LineItem) + 1;
char line[size]; //size of collectivecall struct
int lineNum = 0;
while (fgets(line, 2048, fp)) {
++lineNum;
while (fp.getline(line, size))
{
LineItem li = *((LineItem*)line);
//Ignore invalid lines and collectives
if (!ParseLineItem(line, li) || li.nRanks != cc.numGlobalRanks) continue;
if (li.coll == 10 || li.coll == 11)
{
continue;
}
// if need hostname will parse together with num of line in output
// Ignore invalid lines and collectives
if (li.nRanks != cc.numGlobalRanks) continue;
// Figure out commIdx for this globalrank
int commIdx = -1;
for (auto i = 0; i < cc.globalRankComms[li.globalRank].size(); i++) {
if (!strcmp(cc.globalRankComms[li.globalRank][i].c_str(), li.comm)) {
if (cc.globalRankComms[li.globalRank][i] != li.comm) {
commIdx = i;
break;
}
@@ -215,8 +220,8 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
}
TaskInfo taskInfo;
taskInfo.funcType = GetFuncType(li.opName);
taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff);
taskInfo.funcType = (ncclFunc_t) li.coll;
taskInfo.inPlace = li.sendbuff == li.recvbuff;
taskInfo.count = li.count;
taskInfo.datatype = (ncclDataType_t) li.datatype;
taskInfo.op = (ncclRedOp_t) li.op;
@@ -229,7 +234,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
if (gc.opCount != li.opCount) continue;
if (gc.rankData.count(li.globalRank)) {
RankData& rd = gc.rankData[li.globalRank];
if (rd.commIdx != commIdx || rd.tasks.size() != li.task)
if (rd.commIdx != commIdx || rd.tasks.size() != li.nTasks)
continue;
rd.tasks.push_back(taskInfo);
@@ -237,7 +242,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
break;
}
// Rank has no tasks - make sure this is task 0
else if (li.task == 0) {
else if (li.nTasks == 0) {
gc.rankData[li.globalRank].lineNum = lineNum;
gc.rankData[li.globalRank].commIdx = commIdx;
gc.rankData[li.globalRank].tasks.push_back(taskInfo);
@@ -248,7 +253,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
// If no collectives were found, create new one
if (!found) {
if (li.task != 0) {
if (li.nTasks != 0) {
if (isFirstRank) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum);
}
@@ -260,7 +265,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
cc.groupCalls.push_back(gc);
}
}
fclose(fp);
fp.close();
// Validate group calls
// - For non Send/Recv, check that all ranks participate with same parameters count
@@ -375,18 +380,6 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls
}
}
bool ParseLineItem(char const* line, LineItem& li)
{
return sscanf(line,
"%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %x sendbuff %s "
"recvbuff %s count %lu datatype %d op %d root %d comm %s "
"[nranks=%d] stream %p task %d globalrank %d",
li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName,
&li.opCount, li.sendbuff, li.recvbuff,
&li.count, &li.datatype, &li.op, &li.root, li.comm,
&li.nRanks, &li.stream, &li.task, &li.globalRank) == 17;
}
double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid)
{
int numLocalRanks = cc.localRankComms.size();
@@ -566,6 +559,12 @@ void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t strea
case ncclCollRecv:
NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream));
break;
case ncclStartGroup:
NCCL_CALL(ncclGroupStart());
break;
case ncclEndGroup:
NCCL_CALL(ncclGroupEnd());
break;
default:
printf("Error: unsupported collective\n");
exit(1);
@@ -755,4 +754,4 @@ void PrepData_Send(TaskInfo& taskInfo, int globalRank) {
void PrepData_Recv(TaskInfo& taskInfo, int globalRank) {
FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank);
}
}
+27 -25
Просмотреть файл
@@ -1,5 +1,6 @@
#pragma once
#include <map>
#include <chrono>
#include <cstring>
#include <rccl/rccl.h>
@@ -36,23 +37,25 @@
struct LineItem
{
char hostname[MPI_MAX_PROCESSOR_NAME];
int pid;
int tid;
int cudaDev;
char opName[32];
int opCount;
char sendbuff[32];
char recvbuff[32];
size_t count;
int datatype;
int op;
int root;
char comm[32];
int nRanks;
void* stream;
int task;
int globalRank;
int pid;
int tid;
int cudaDev;
//int graph;
int groupDepth;
int coll;
uint64_t opCount;
void* sendbuff;
void* recvbuff;
size_t count;
int datatype;
int op;
int root;
void* comm;
int nRanks;
void* stream;
int nTasks;
int globalRank;
};
// Enumeration of all collective functions currently supported
@@ -69,7 +72,9 @@ typedef enum
ncclCollAllToAllv,
ncclCollSend,
ncclCollRecv,
ncclNumFuncs
ncclNumFuncs,
ncclStartGroup = 10;
ncclEndGroup = 11;
} ncclFunc_t;
char const ncclFuncNames[ncclNumFuncs][32] =
@@ -144,8 +149,8 @@ struct RankData
struct GroupCall
{
bool isValid;
int opCount;
bool isValid;
uint64_t opCount;
std::map<int, RankData> rankData;
};
@@ -153,7 +158,7 @@ struct CollectiveCalls
{
int numGlobalRanks;
int numGpusPerMpiRank;
std::vector<std::vector<std::string>> globalRankComms; // Set of comms used by each global rank
std::vector<std::vector<void*>> globalRankComms; // Set of comms used by each global rank
std::vector<GroupCall> groupCalls; // List of group calls for each global rank
int localGpuOffset; // First local GPU device idx for this MPI process
@@ -384,9 +389,6 @@ bool IsRootUsed(ncclFunc_t funcType) {
funcType == ncclCollGather || funcType == ncclCollScatter);
}
// parse the logs and assign them into lineItem
bool ParseLineItem(char const* line, LineItem& li);
// this covers grouping the logs based on opCount and task number,
// validatation of the groupCalls for both non-send/recv collectives and send/recv
void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& collectiveCalls);
@@ -426,4 +428,4 @@ void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool is
void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks);
void PrepData_Send(TaskInfo& taskInfo, int globalRank);
void PrepData_Recv(TaskInfo& taskInfo, int globalRank);
void PrepData_Recv(TaskInfo& taskInfo, int globalRank);
-14
Просмотреть файл
@@ -1,14 +0,0 @@
ROCM_DIR ?= /opt/rocm
RCCL_DIR ?= ../../build/release
MPI_DIR ?= /opt/ompi
MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi
MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu
INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include
LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl
main: rcclReplayer.cpp
$(ROCM_DIR)/bin/hipcc rcclReplayer.cpp -O1 -g -o rcclReplayer $(INCLUDES) $(LDFLAGS)
clean:
rm -f ./rcclReplayer