diff --git a/CMakeLists.txt b/CMakeLists.txt index cff5c5b14b..2d216189bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -506,6 +506,7 @@ set(SRC_FILES src/include/rocm_smi_wrap.h src/include/rocmwrap.h src/include/roctx.h + src/include/recorder.h src/include/shm.h src/include/shmutils.h src/include/signals.h @@ -577,6 +578,7 @@ set(SRC_FILES src/misc/rocm_smi_wrap.cc src/misc/rocmwrap.cc src/misc/roctx.cc + src/misc/recorder.cc src/misc/shmutils.cc src/misc/signals.cc src/misc/socket.cc diff --git a/src/collectives.cc b/src/collectives.cc index 09fe07ed54..5207589476 100644 --- a/src/collectives.cc +++ b/src/collectives.cc @@ -13,6 +13,8 @@ #include "msccl/msccl_lifecycle.h" +using namespace rccl; + const char* ncclFuncToString(ncclFunc_t fn) { switch (fn) { case ncclFuncAllGather: return "AllGather"; @@ -99,15 +101,21 @@ ncclResult_t ncclAllGather_impl(const void* sendbuff, void* recvbuff, size_t sen NvtxParamsAllGather payload{sendcount * ncclTypeSize(datatype), datatype}; NVTX3_FUNC_WITH_PARAMS(AllGather, AllGatherSchema, payload) + struct ncclInfo info = { ncclFuncAllGather, "AllGather", + sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */ + ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrAllGather, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, sendcount, datatype, 0, 0, ncclSum, mscclFuncAllGather, comm, stream); } - struct ncclInfo info = { ncclFuncAllGather, "AllGather", - sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */ - ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS }; NCCLCHECK(ncclEnqueueCheck(&info)); return ncclSuccess; } @@ -133,15 +141,21 @@ ncclResult_t ncclAllReduce_impl(const void* sendbuff, void* recvbuff, size_t cou NvtxParamsAllReduce payload{count * ncclTypeSize(datatype), op, datatype}; NVTX3_FUNC_WITH_PARAMS(AllReduce, AllReduceSchema, payload) + struct ncclInfo info = { ncclFuncAllReduce, "AllReduce", + sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */ + ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrAllReduce, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, count, datatype, 0, 0, op, mscclFuncAllReduce, comm, stream); } - struct ncclInfo info = { ncclFuncAllReduce, "AllReduce", - sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */ - ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS }; NCCLCHECK(ncclEnqueueCheck(&info)); return ncclSuccess; } @@ -154,6 +168,12 @@ NCCL_API(ncclResult_t, ncclAllToAll, const void* sendbuff, void* recvbuff, size_ ncclResult_t ncclAllToAll_impl(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream) { + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrAllToAll, sendbuff, recvbuff, count, datatype, comm, stream)); + } + struct NvtxParamsAllToAll { size_t bytes; ncclDataType_t datatype; @@ -204,6 +224,12 @@ NCCL_API(ncclResult_t, ncclAllToAllv, const void *sendbuff, const size_t sendcou ncclResult_t ncclAllToAllv_impl(const void *sendbuff, const size_t sendcounts[], const size_t sdispls[], void *recvbuff, const size_t recvcounts[], const size_t rdispls[], ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream) { + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrAllToAllv, sendbuff, recvbuff, 0, datatype, comm, stream, -1, sendcounts, sdispls, recvcounts, rdispls)); + } + struct NvtxParamsAllToAllv { size_t sendbytes; size_t recvbytes; @@ -268,15 +294,21 @@ ncclResult_t ncclBroadcast_impl(const void* sendbuff, void* recvbuff, size_t cou NvtxParamsBroadcast payload{count * ncclTypeSize(datatype), root, datatype}; NVTX3_FUNC_WITH_PARAMS(Broadcast, BroadcastSchema, payload) + struct ncclInfo info = { ncclFuncBroadcast, "Broadcast", + sendbuff, recvbuff, count, datatype, ncclSum, root, comm, stream, /* Args */ + BROADCAST_CHUNKSTEPS, BROADCAST_SLICESTEPS }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrBroadcast, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, count, datatype, root, 0, ncclSum, mscclFuncBroadcast, comm, stream); } - struct ncclInfo info = { ncclFuncBroadcast, "Broadcast", - sendbuff, recvbuff, count, datatype, ncclSum, root, comm, stream, /* Args */ - BROADCAST_CHUNKSTEPS, BROADCAST_SLICESTEPS }; NCCLCHECK(ncclEnqueueCheck(&info)); return ncclSuccess; } @@ -285,6 +317,7 @@ NCCL_API(ncclResult_t, ncclBcast, void* buff, size_t count, ncclDataType_t datat ncclComm_t comm, cudaStream_t stream); ncclResult_t ncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(Recorder::instance().record(rrBcast, buff, buff, count, datatype, comm, stream, root)); NCCLCHECK(ncclBroadcast(buff, buff, count, datatype, root, comm, stream)); return ncclSuccess; } @@ -308,6 +341,11 @@ ncclResult_t ncclGather_impl(const void* sendbuff, void* recvbuff, size_t sendco NvtxParamsGather payload{sendcount * ncclTypeSize(datatype), root, datatype}; NVTX3_FUNC_WITH_PARAMS(Gather, GatherSchema, payload) + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrGather, sendbuff, recvbuff, sendcount, datatype, comm, stream, root)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, @@ -352,15 +390,21 @@ ncclResult_t ncclReduce_impl(const void* sendbuff, void* recvbuff, size_t count, NvtxParamsReduce payload{count * ncclTypeSize(datatype), root, op, datatype}; NVTX3_FUNC_WITH_PARAMS(Reduce, ReduceSchema, payload) + struct ncclInfo info = { ncclFuncReduce, "Reduce", + sendbuff, recvbuff, count, datatype, op, root, comm, stream, /* Args */ + REDUCE_CHUNKSTEPS, REDUCE_SLICESTEPS }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrReduce, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, count, datatype, root, 0, op, mscclFuncReduce, comm, stream); } - struct ncclInfo info = { ncclFuncReduce, "Reduce", - sendbuff, recvbuff, count, datatype, op, root, comm, stream, /* Args */ - REDUCE_CHUNKSTEPS, REDUCE_SLICESTEPS }; NCCLCHECK(ncclEnqueueCheck(&info)); return ncclSuccess; } @@ -386,15 +430,21 @@ ncclResult_t ncclReduceScatter_impl(const void* sendbuff, void* recvbuff, size_t NvtxParamsReduceScatter payload{recvcount * ncclTypeSize(datatype), op, datatype}; NVTX3_FUNC_WITH_PARAMS(ReduceScatter, ReduceScatterSchema, payload) + struct ncclInfo info = { ncclFuncReduceScatter, "ReduceScatter", + sendbuff, recvbuff, recvcount, datatype, op, 0, comm, stream, /* Args */ + REDUCESCATTER_CHUNKSTEPS, REDUCESCATTER_SLICESTEPS }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrReduceScatter, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, recvcount, datatype, 0, 0, op, mscclFuncReduceScatter, comm, stream); } - struct ncclInfo info = { ncclFuncReduceScatter, "ReduceScatter", - sendbuff, recvbuff, recvcount, datatype, op, 0, comm, stream, /* Args */ - REDUCESCATTER_CHUNKSTEPS, REDUCESCATTER_SLICESTEPS }; NCCLCHECK(ncclEnqueueCheck(&info)); return ncclSuccess; } @@ -418,7 +468,12 @@ ncclResult_t ncclScatter_impl(const void* sendbuff, void* recvbuff, size_t recvc }; NvtxParamsScatter payload{recvcount * ncclTypeSize(datatype), root, datatype}; NVTX3_FUNC_WITH_PARAMS(Scatter, ScatterSchema, payload) - + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrScatter, sendbuff, recvbuff, recvcount, datatype, comm, stream, root)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, @@ -462,15 +517,21 @@ ncclResult_t ncclSend_impl(const void* sendbuff, size_t count, ncclDataType_t da NvtxParamsSendRecv payload{count * ncclTypeSize(datatype), peer, datatype}; NVTX3_FUNC_WITH_PARAMS(Send, SendRecvSchema, payload) + struct ncclInfo info = { ncclFuncSend, "Send", + NULL, (void*)sendbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */ + 1, 1 }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrSend, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, nullptr, nullptr, nullptr, count, datatype, 0, peer, ncclSum, mscclFuncSend, comm, stream); } - struct ncclInfo info = { ncclFuncSend, "Send", - NULL, (void*)sendbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */ - 1, 1 }; ncclResult_t ret; NCCLCHECK(ncclGroupStart()); NCCLCHECKGOTO(ncclEnqueueCheck(&info), ret, exit); @@ -487,15 +548,21 @@ ncclResult_t ncclRecv_impl(void* recvbuff, size_t count, ncclDataType_t datatype NvtxParamsSendRecv payload{count * ncclTypeSize(datatype), peer, datatype}; NVTX3_FUNC_WITH_PARAMS(Recv, SendRecvSchema, payload) + struct ncclInfo info = { ncclFuncRecv, "Recv", + NULL, recvbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */ + 1, 1 }; + + if (!mscclIsCaller()) // when msccl falls back to + { + NCCLCHECK(Recorder::instance().record(rrRecv, info)); + } + if (mscclAvailable(comm->rank) && !mscclIsCaller()) { return mscclEnqueueCheck( nullptr, nullptr, nullptr, recvbuff, nullptr, nullptr, count, datatype, 0, peer, ncclSum, mscclFuncRecv, comm, stream); } - struct ncclInfo info = { ncclFuncRecv, "Recv", - NULL, recvbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */ - 1, 1 }; ncclResult_t ret; NCCLCHECK(ncclGroupStart()); NCCLCHECKGOTO(ncclEnqueueCheck(&info), ret, exit); diff --git a/src/enqueue.cc b/src/enqueue.cc index 40bf736e8a..42af17bc26 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -25,6 +25,8 @@ #include // std::memcpy #include // PRIx64 +using namespace rccl; + struct ncclKernelMatch { void* kernelFn; bool specialized; @@ -2560,12 +2562,16 @@ ncclResult_t ncclRedOpCreatePreMulSum_impl(ncclRedOp_t *op, void *scalar, ncclDa } *op = ncclRedOp_t(int(ncclNumOps) + ix); *op = ncclUserRedOpMangle(comm, *op); + + // ! recording at sink + NCCLCHECK(Recorder::instance().record(rrRedOpCreatePreMulSum, *op, comm, datatype, residence, scalar)); TRACE_CALL("ncclRedOpCreatePreMulSum(%d,%p,%d,%d,%p)", *op, scalar, datatype, residence, comm); return ncclSuccess; } NCCL_API(ncclResult_t, ncclRedOpDestroy, ncclRedOp_t op, ncclComm_t comm); ncclResult_t ncclRedOpDestroy_impl(ncclRedOp_t op, ncclComm_t comm) { + NCCLCHECK(Recorder::instance().record(rrRedOpDestroy, op, comm)); if (0 <= int(op) && int(op) < int(ncclNumOps)) { WARN("ncclRedOpDestroy : operator is a NCCL builtin."); return ncclInvalidArgument; diff --git a/src/group.cc b/src/group.cc index 0e66044938..f5d2d12e7e 100644 --- a/src/group.cc +++ b/src/group.cc @@ -17,6 +17,8 @@ #include "msccl/msccl_lifecycle.h" +using namespace rccl; + __thread int ncclGroupDepth = 0; // depth of ncclGroupStart nesting __thread ncclResult_t ncclGroupError = ncclSuccess; __thread struct ncclComm* ncclGroupCommHead = nullptr; @@ -96,6 +98,7 @@ ncclResult_t ncclAsyncJobComplete(struct ncclAsyncJob* job) { NCCL_API(ncclResult_t, ncclGroupStart); ncclResult_t ncclGroupStart_impl() { + NCCLCHECK(Recorder::instance().record(rrGroupStart, ncclGroupDepth)); ncclResult_t ret = ncclSuccess; NVTX3_FUNC_RANGE_IN(nccl_domain); @@ -114,6 +117,7 @@ ncclResult_t ncclGroupStartInternal() { NCCL_API(ncclResult_t, ncclGroupEnd); ncclResult_t ncclGroupEnd_impl() { + NCCLCHECK(Recorder::instance().record(rrGroupEnd, ncclGroupDepth)); ncclResult_t ret = ncclSuccess; NVTX3_FUNC_RANGE_IN(nccl_domain); NCCLCHECKGOTO(ncclGroupEndInternal(), ret, exit); @@ -124,6 +128,7 @@ exit: NCCL_API(ncclResult_t, ncclGroupSimulateEnd, ncclSimInfo_t* simInfo); ncclResult_t ncclGroupSimulateEnd(ncclSimInfo_t* simInfo) { + Recorder::instance().record(ncclGroupDepth, simInfo); ncclResult_t ret = ncclSuccess; NVTX3_FUNC_RANGE_IN(nccl_domain); NCCLCHECKGOTO(ncclGroupEndInternal(simInfo), ret, exit); diff --git a/src/include/comm.h b/src/include/comm.h index 115e3da607..d22ca71e69 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -20,6 +20,7 @@ #include "nvmlwrap.h" #include "profiler.h" #include "rccl_common.h" +#include "recorder.h" #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) #define HIPRT_CB diff --git a/src/include/recorder.h b/src/include/recorder.h new file mode 100644 index 0000000000..4d2a5afa8f --- /dev/null +++ b/src/include/recorder.h @@ -0,0 +1,162 @@ +#include +#include +#include +#include +#include +#include "debug.h" + +namespace rccl +{ +// API opcode covered by rccl replayer +typedef enum { + rrBroadcast, + rrReduce, + rrAllGather, + rrReduceScatter, + rrAllReduce, + rrSend, + rrRecv, + rrAllToAll, + rrAllToAllv, + rrGather, + rrScatter, + rrBcast, + rrGroupStart, + rrGroupEnd, + rrGroupSimulatedEnd, + rrGetUniqueId, + rrCommInitDev, + rrCommInitRank, + rrCommInitAll, + rrCommInitRankConfig, + rrCommSplit, + rrCommFinalize, + rrCommDestroy, + rrCommAbort, + rrCommRegister, + rrCommDeregister, + rrMemAlloc, + rrMemFree, + rrRedOpCreatePreMulSum, + rrRedOpDestroy, + rrOtherCall +} rcclCall_t; + +constexpr const char* rcclCallStr[] +{ + "Broadcast", + "Reduce", + "AllGather", + "ReduceScatter", + "AllReduce", + "Send", + "Recv", + "AllToAll", + "AllToAllv", + "Gather", + "Scatter", + "Bcast", + "GroupStart", + "GroupEnd", + "GroupSimulatedEnd", + "GetUniqueId", + "CommInitDev", + "CommInitRank", + "CommInitAll", + "CommInitRankConfig", + "CommSplit", + "CommFinalize", + "CommDestroy", + "CommAbort", + "CommRegister", + "CommDeregister", + "MemAlloc", + "MemFree", + "RedOpCreatePreMulSum", + "RedOpDestroy", + "OtherCall" +}; + +struct rcclApiCall { +// implicit data + int pid = -1; + int tid = -1; + int hipDev = -1; + int groupDepth = -1; + double timestamp = -1; + unsigned long long graphID = 0; + int graphCaptured = -1; + +// explicit data from header + rcclCall_t type; // in adjacent to op Name ^ + uint64_t opCount = 0; + const void* sendbuff = NULL; + void* recvbuff = NULL; + size_t count = 0; + ncclDataType_t datatype; + ncclRedOp_t op; + int root = -1; + int nRanks = -1; + ncclComm_t comm; + hipStream_t stream; + int nTasks = -1; + int globalRank = -1; + uint64_t commId = 0; + + rcclApiCall(){} + rcclApiCall(rcclCall_t type, const ncclInfo& info); + rcclApiCall(rcclCall_t type); +}; + +class Recorder { + private: + std::ofstream outputFile; //1 per process + int output_json = 0; // 0 is to binary, 1 to json + std::string filename; + int logLevel = -1; + + //std::string hostname; + int pid = -1; + int numCall = 0; //for debugging only + static __thread int rcclReplayThreadIdx; + static int depth; // for indentation purpose, will need thread safty later + + std::mutex writemtx; + std::vector calls; + + void captureGpuContext(rcclApiCall& call) const; + void write(const rcclApiCall &call); + static void recordLater(void* idx); + Recorder(); + Recorder(const Recorder&) = delete; + Recorder& operator=(const Recorder&) = delete; + ~Recorder(); + + public: + static Recorder& instance(); + void record(const char* name); // non-replayable calls + ncclResult_t record(rcclApiCall& call); + ncclResult_t record(rcclCall_t type, const ncclInfo& info); // collective + ncclResult_t record(rcclCall_t type, const void* sendbuff, void* recvbuff, size_t count, // sendrecv based + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream, int root = -1, + const size_t sendcounts[] = NULL, const size_t sdispls[] = NULL, const size_t recvcounts[] = NULL, + const size_t rdispls[] = NULL); // for alltoallv + ncclResult_t record(rcclCall_t type, ncclRedOp_t op, ncclComm_t comm, + ncclDataType_t datatype = ncclInt8, ncclScalarResidence_t residence = ncclScalarDevice, + void* scalar = NULL); // redop + ncclResult_t record(rcclCall_t type, int groupDepth); // group op + ncclResult_t record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, + ncclComm_t comm = NULL, int device = 0); // init + void record(ncclComm_t* comms, int ndev, const int* devlist); // CommInitAll + ncclResult_t record(rcclCall_t type, ncclComm_t comm); // comm destroy + void record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, ncclConfig_t* config, + ncclComm_t comm = NULL); // CommInitConfig OR split + ncclResult_t record(rcclCall_t type, void* ptr, size_t size = 0); // mem alloc + ncclResult_t record(rcclCall_t type, ncclComm_t comm, void* handle, + void* userBuffer = NULL, size_t size = 0); // UBR + void record(int groupDepth, ncclSimInfo_t* siminfo); // SimulatedGroupEnd +}; + +void parseJsonEntry(const char* entry, std::vector& calls); +void parseBinLog(); +} // namespace rccl diff --git a/src/init.cc b/src/init.cc index 839fbb517d..7a1aa312b6 100644 --- a/src/init.cc +++ b/src/init.cc @@ -67,6 +67,8 @@ #define NCCL_GROUP_CUDA_STREAM 1 // CGMD: CUDA 9.0,9.1 Need to use an internal CUDA stream #endif +using namespace rccl; + const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+2] = { "AllGather", "AllReduce", "AllToAllPivot", "Broadcast", "Reduce", "ReduceScatter", "SendRecv"}; const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNetDirect", "CollNetChain", "NVLS", "NVLSTree", "PAT" }; const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" }; @@ -188,6 +190,7 @@ static ncclResult_t ncclInit() { NCCL_API(ncclResult_t, ncclGetVersion, int* version); ncclResult_t ncclGetVersion_impl(int* version) { + Recorder::instance().record("GetVersion"); if (version == NULL) return ncclInvalidArgument; *version = NCCL_VERSION_CODE; return ncclSuccess; @@ -204,6 +207,7 @@ ncclResult_t ncclGetUniqueId_impl(ncclUniqueId* out) { memset(out, 0, sizeof(*out)); // copy to avoid alignment mismatch memcpy(out, &handle, sizeof(handle)); + Recorder::instance().record(rrGetUniqueId, -1, -1, out); TRACE_CALL("ncclGetUniqueId(0x%llx)", (unsigned long long)hashUniqueId(*out)); return ncclSuccess; } @@ -2328,6 +2332,9 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, int nId NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, NULL, ncclCommInitJobFree, comm), res, fail); exit: + // for loggin only, not ready for replaying + // !recording at sink + NCCLCHECK(Recorder::instance().record(rrCommInitDev, nranks, myrank, commId, comm, cudaDev)); return ncclGroupErrCheck(res); fail: if (comm) { @@ -2354,6 +2361,7 @@ constexpr nvtxPayloadSchemaEntry_t CommInitRankSchema[] = { NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank); ncclResult_t ncclCommInitRank_impl(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) { + NCCLCHECK(Recorder::instance().record(rrCommInitRank, nranks, myrank, &commId)); // Load the CUDA driver and dlsym hooks (can fail on old drivers) rocmLibraryInit(); @@ -2370,6 +2378,7 @@ ncclResult_t ncclCommInitRank_impl(ncclComm_t* newcomm, int nranks, ncclUniqueId NCCL_API(ncclResult_t, ncclCommInitAll, ncclComm_t* comms, int ndev, const int* devlist); ncclResult_t ncclCommInitAll_impl(ncclComm_t* comms, int ndev, const int* devlist) { + Recorder::instance().record(comms, ndev, devlist); ncclResult_t ret = ncclSuccess; int totalnDev; int *gpuFlags = NULL; @@ -2446,6 +2455,7 @@ ncclResult_t ncclCommSetAsyncError(ncclComm_t comm, ncclResult_t nextState) { NCCL_API(ncclResult_t, ncclCommInitRankConfig, ncclComm_t* comm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config); ncclResult_t ncclCommInitRankConfig_impl(ncclComm_t *newcomm, int nranks, ncclUniqueId commId, int myrank, ncclConfig_t *config) { + Recorder::instance().record(rrCommInitRankConfig, nranks, myrank, &commId, config); int cudaDev; ncclResult_t ret = ncclSuccess; ncclConfig_t internalConfig = NCCL_CONFIG_INITIALIZER; @@ -2595,6 +2605,7 @@ static ncclResult_t commCleanup(ncclComm_t comm) { NCCL_API(ncclResult_t, ncclCommFinalize, ncclComm_t comm); ncclResult_t ncclCommFinalize_impl(ncclComm_t comm) { + NCCLCHECK(Recorder::instance().record(rrCommFinalize, comm)); NVTX3_FUNC_RANGE_IN(nccl_domain); ncclResult_t ret = ncclSuccess; struct ncclCommFinalizeAsyncJob *job = NULL; @@ -2683,6 +2694,7 @@ static ncclResult_t commReclaim(struct ncclAsyncJob* job_) { NCCL_API(ncclResult_t, ncclCommDestroy, ncclComm_t comm); ncclResult_t ncclCommDestroy_impl(ncclComm_t comm) { + NCCLCHECK(Recorder::instance().record(rrCommDestroy, comm)); if (comm == NULL) { NVTX3_FUNC_RANGE_IN(nccl_domain); return ncclSuccess; @@ -2740,6 +2752,7 @@ fail: NCCL_API(ncclResult_t, ncclCommAbort, ncclComm_t comm); ncclResult_t ncclCommAbort_impl(ncclComm_t comm) { + NCCLCHECK(Recorder::instance().record(rrCommAbort, comm)); if (comm == NULL) { NVTX3_FUNC_RANGE_IN(nccl_domain); return ncclSuccess; @@ -2853,6 +2866,10 @@ ncclResult_t ncclCommSplit_impl(ncclComm_t comm, int color, int key, ncclComm_t NCCLCHECKGOTO(ncclAsyncLaunch((struct ncclAsyncJob*)job, ncclCommInitRankFunc, NULL, free, comm), res, fail); exit: + // for loggin only, not ready for replaying + // TODO: further integrate overloaded record header + // !recording at sink + Recorder::instance().record(rrCommSplit, color, key, (ncclUniqueId*)comm, config, *newcomm); cudaSetDevice(oldDev); (void)ncclGroupErrCheck(res); NCCLCHECK(ncclGroupEndInternal()); @@ -2872,6 +2889,7 @@ fail: NCCL_API(const char*, ncclGetErrorString, ncclResult_t code); const char* ncclGetErrorString_impl(ncclResult_t code) { + Recorder::instance().record("GetErrorString"); switch (code) { case ncclSuccess : return "no error"; case ncclUnhandledCudaError : return "unhandled cuda error (run with NCCL_DEBUG=INFO for details)"; @@ -2890,11 +2908,13 @@ const char* ncclGetErrorString_impl(ncclResult_t code) { */ NCCL_API(const char*, ncclGetLastError, const ncclComm_t comm); const char* ncclGetLastError_impl(ncclComm_t comm) { + Recorder::instance().record("GetLastEror"); return ncclLastError; } NCCL_API(ncclResult_t, ncclCommGetAsyncError, ncclComm_t comm, ncclResult_t *asyncError); ncclResult_t ncclCommGetAsyncError_impl(ncclComm_t comm, ncclResult_t *asyncError) { + Recorder::instance().record("GetAsyncError"); NCCLCHECK(CommCheck(comm, "ncclGetAsyncError", "comm")); NCCLCHECK(PtrCheck(asyncError, "ncclGetAsyncError", "asyncError")); @@ -2905,6 +2925,7 @@ ncclResult_t ncclCommGetAsyncError_impl(ncclComm_t comm, ncclResult_t *asyncErro NCCL_API(ncclResult_t, ncclCommCount, const ncclComm_t comm, int* count); ncclResult_t ncclCommCount_impl(const ncclComm_t comm, int* count) { + Recorder::instance().record("CommCount"); NVTX3_FUNC_RANGE_IN(nccl_domain); NCCLCHECK(CommCheck(comm, "CommCount", "comm")); @@ -2919,6 +2940,7 @@ ncclResult_t ncclCommCount_impl(const ncclComm_t comm, int* count) { NCCL_API(ncclResult_t, ncclCommCuDevice, const ncclComm_t comm, int* devid); ncclResult_t ncclCommCuDevice_impl(const ncclComm_t comm, int* devid) { + Recorder::instance().record("CuDevice"); NVTX3_FUNC_RANGE_IN(nccl_domain); NCCLCHECK(CommCheck(comm, "CommCuDevice", "comm")); @@ -2932,6 +2954,7 @@ ncclResult_t ncclCommCuDevice_impl(const ncclComm_t comm, int* devid) { NCCL_API(ncclResult_t, ncclCommUserRank, const ncclComm_t comm, int* rank); ncclResult_t ncclCommUserRank_impl(const ncclComm_t comm, int* rank) { + Recorder::instance().record("CommUserRank"); NVTX3_FUNC_RANGE_IN(nccl_domain); NCCLCHECK(CommCheck(comm, "CommUserRank", "comm")); @@ -3035,6 +3058,7 @@ fallback: CUDACHECKGOTO(cudaMalloc(ptr, size), ret, fail); exit: + NCCLCHECK(Recorder::instance().record(rrMemAlloc, *ptr, size)); return ret; fail: goto exit; @@ -3042,6 +3066,7 @@ fail: NCCL_API(ncclResult_t, ncclMemFree, void *ptr); ncclResult_t ncclMemFree_impl(void *ptr) { + NCCLCHECK(Recorder::instance().record(rrMemFree, ptr)); NVTX3_FUNC_RANGE_IN(nccl_domain); ncclResult_t ret = ncclSuccess; int saveDevice; diff --git a/src/misc/recorder.cc b/src/misc/recorder.cc new file mode 100644 index 0000000000..9cb457c3af --- /dev/null +++ b/src/misc/recorder.cc @@ -0,0 +1,669 @@ +#include "bootstrap.h" +#include "group.h" +#include "utils.h" +#include +#include +#include +#include + +using namespace std::chrono; + +namespace rccl +{ +__thread int Recorder::rcclReplayThreadIdx = -1; +int Recorder::depth = 0; + +static char buffer[4096]; +static rcclCall_t lastcall = rrGroupStart; // for trailing comma, need modify for multithread case for Bcast +void indent(int depth, std::ofstream& o) {for (int i = 0; i < depth; i++) o << " ";} +void newLine(std::ofstream& o) {if (lastcall != rrGroupStart) o << ","; o << std::endl;} +static uint64_t hashUniqueId(ncclUniqueId const &id) { + char const *bytes = (char const*)&id; + uint64_t h = 0xdeadbeef; + for(int i=0; i < (int)sizeof(ncclUniqueId); i++) { + h ^= h >> 32; + h *= 0x8db3db47fa2994ad; + h += bytes[i]; + } + return h; +} + +rcclApiCall::rcclApiCall(rcclCall_t type, const ncclInfo& info)://name(rcclCallStr[call.type]), // opName + type(type), + opCount(info.comm->opCount), + sendbuff(info.sendbuff), + recvbuff(info.recvbuff), + count(info.count), + datatype(info.datatype), + op(info.op), + root(info.root), + comm(info.comm), + nRanks(info.comm->nRanks), + stream(info.stream), + nTasks(info.comm->planner.nTasksP2p + info.comm->planner.nTasksColl), + globalRank(info.comm->localRankToRank[info.comm->localRank]){} + +rcclApiCall::rcclApiCall(rcclCall_t type) : type(type){} + +std::string siminfo_fmt = "[size : %zu, magic : %u, version : %u, estimated time : %f, timestamp : %f]"; +std::string config_fmt = ", ncclConfig : [size : %zu, magic : %u, version : %u, blocking : %d, cgaClusterSize : %d, minCTA : %d, maxCTA : %d, netname : %s, splitshare : %d]"; +std::string ctxt_fmt = "time : %lf, thread : %d, device : %d, captured : %d, graphID : %llu ]]"; // implicit context info +std::string ubr_fmt = "%s : [comm : %p, buff : %p, returned handle : %p, size : %zu, context : ["; +std::string getId_fmt = "%s : [uniqueID : %llu, context : ["; +std::string ubDereg_fmt = "%s : [comm : %p, handle : %p, context : ["; +std::string rank_fmt = "%s : [size : %d, uniqueID : %llu, rank : %d, context : ["; +std::string init_fmt = "%s : [comm : %p, size : %d, uniqueID : %llu, rank : %d, dev : %d, context : ["; +std::string all_fmt = "%s : [# of device : %d, context : ["; +std::string destroy_fmt = "%s : [comm : %p, context : ["; +std::string split_fmt = "%s : [comm : %p, color : %d, key : %d, newcomm : %p, context : ["; +std::string alloc_fmt = "%s : [returned ptr : %p, size : %zu, context : ["; +std::string free_fmt = "%s : [ptr : %p, context : ["; +std::string redop_fmt = "%s : [scalar : %p, datatype : %d, op : %d, residence : %d, comm : %p, context : ["; +std::string redopdestroy_fmt = "%s : [op : %d, comm : %p, context : ["; +std::string coll_fmt = "%s : [opCount : %lx, sendbuff : %p, recvbuff : %p, count : %zu, datatype : %d, op : %d, root : %d, comm : %p, nranks : %d, stream : %p, task : %d, globalrank : %d, context : ["; + +Recorder::Recorder() +{ + filename = getenv("RCCL_REPLAY_FILE") ? getenv("RCCL_REPLAY_FILE") : ""; + + if (!filename.size()) + { + return; + } + + logLevel = getenv("RCCL_LOG_LEVEL") ? std::stoi(getenv("RCCL_LOG_LEVEL")) : 1; + char hostname[1024]; + getHostName(hostname, 1024, '.'); + pid = getpid(); + output_json = 0; + + size_t dot; + std::string output_name, output_extension; + + if ((dot = std::string(filename).find(".")) != std::string::npos) + { + output_name = std::string(filename).substr(0, dot); + output_extension = std::string(filename).substr(dot); + if (output_extension.compare(".json") == 0) + { + output_json = 1; + } + } else { + output_name = std::string(filename); + } + + outputFile.open(output_name + std::to_string(pid) + output_extension, + output_json ? std::ofstream::out : std::ofstream::binary); + if (output_json) + { + outputFile << "{" << std::endl; + indent(2, outputFile); + outputFile << "hostname : " << hostname << ", version : 0,"; + } +} + +Recorder& Recorder::instance() +{ + static Recorder _instance; + return _instance; +} + +void Recorder::captureGpuContext(rcclApiCall& call) const +{ + call.timestamp = duration_cast>(high_resolution_clock::now().time_since_epoch()).count() * 1000; + + if (rcclReplayThreadIdx == -1) + { + rcclReplayThreadIdx = syscall(SYS_gettid); + } + + int hipDev; + hipGetDevice(&hipDev); // need later change to copy from comm + + call.pid = pid; + call.tid = rcclReplayThreadIdx; + call.hipDev = hipDev; + return; +} + +// for single process use only, for now +// TODO: potentially need async logging for performance +void Recorder::write(const rcclApiCall &call) +{ + if (!filename.size()) + { + return ; + } + + std::unique_lock lock(writemtx); + + if (lastcall == rrBcast && call.type == rrBroadcast) + { + return; + } + + int len = -1; + + if (output_json) + { + if (call.type == rrGroupEnd || call.type == rrGroupSimulatedEnd) + { + depth--; + outputFile << std::endl; + indent(2 + 2 * depth, outputFile); + outputFile << "}"; + return ; + } + + newLine(outputFile); + indent(2 + 2 * depth, outputFile); + lastcall = call.type; + switch (call.type) { + case rrGroupStart: + { + outputFile << "{"; + depth++; + return ; + } + case rrCommRegister: + { + len = snprintf(buffer, 4096, ubr_fmt.c_str(), + rcclCallStr[call.type], call.comm, call.sendbuff, call.recvbuff, call.count); + break; + } + case rrCommDeregister: + { + len = snprintf(buffer, 4096, ubDereg_fmt.c_str(), + rcclCallStr[call.type], call.comm, call.recvbuff); + break; + } + case rrGetUniqueId: + { + len = snprintf(buffer, 4096, getId_fmt.c_str(), rcclCallStr[call.type], call.commId); + break; + } + case rrCommInitDev: + { + len = snprintf(buffer, 4096, init_fmt.c_str(), rcclCallStr[call.type], call.comm, call.nRanks, call.commId, call.globalRank, call.root); + break; + } + case rrCommInitRank: + case rrCommInitRankConfig: + { + len = snprintf(buffer, 4096, rank_fmt.c_str(), rcclCallStr[call.type], call.nRanks, call.commId, call.globalRank); + break; // detail to be provided by init dev or config info + } + case rrCommInitAll: + { + len = snprintf(buffer, 4096, all_fmt.c_str(), rcclCallStr[call.type], call.root); + break; + } + case rrCommFinalize: + case rrCommDestroy: + case rrCommAbort: + { + len = snprintf(buffer, 4096, destroy_fmt.c_str(), rcclCallStr[call.type], call.comm); + break; + } + case rrCommSplit: + { + len = snprintf(buffer, 4096, split_fmt.c_str(), + rcclCallStr[call.type], (void*)(call.commId), call.comm, call.nRanks, call.globalRank, call.comm); + break; + } + case rrMemAlloc: + { + len = snprintf(buffer, 4096, alloc_fmt.c_str(), + rcclCallStr[call.type], call.recvbuff, call.count); + break; + } + case rrMemFree: + { + len = snprintf(buffer, 4096, free_fmt.c_str(), + rcclCallStr[call.type], call.recvbuff); + break; + } + case rrRedOpCreatePreMulSum: + { + len = snprintf(buffer, 4096, redop_fmt.c_str(), + rcclCallStr[call.type], call.sendbuff, call.datatype, call.op, call.root, call.comm); + break; + } + case rrRedOpDestroy: + { + len = snprintf(buffer, 4096, redopdestroy_fmt.c_str(), + rcclCallStr[call.type], call.op, call.comm); + break; + } + default: // collectives + len = snprintf(buffer, 4096, coll_fmt.c_str(), + rcclCallStr[call.type], call.opCount, call.sendbuff, call.recvbuff, call.count, call.datatype, + call.op, call.root, call.comm, call.nRanks, call.stream, call.nTasks, call.globalRank); + + } + outputFile.write(buffer, len); + len = snprintf(buffer, 4096, ctxt_fmt.c_str(), call.timestamp, call.tid, call.hipDev, call.graphCaptured, call.graphID); + outputFile.write(buffer, len); + } else { + outputFile.write((char*)&call, sizeof(rcclApiCall)); + outputFile << std::endl; + } + outputFile.flush(); + return ; +} + +// wrapper function for graph launch callback +void Recorder::recordLater(void* idx) +{ + Recorder& recorder = Recorder::instance(); + size_t callidx = (size_t) idx; + rcclApiCall call = recorder.calls[callidx]; + recorder.record(call); +} + +void Recorder::record(const char* name) +{ + if (!filename.size() || logLevel <= 1) + { + return; + } + double ts = duration_cast>(high_resolution_clock::now().time_since_epoch()).count() * 1000; + if (output_json) // will not record for binary replay export + { + std::unique_lock lock(writemtx); + newLine(outputFile); lastcall = rrOtherCall; + indent(2 + 2 * depth, outputFile); + outputFile << name << " : [time : " << std::fixed << std::setprecision(6) << ts << "]"; + } + //numCall++; + outputFile.flush(); +} + +ncclResult_t Recorder::record(rcclApiCall& call) +{ + ncclResult_t ret = ncclSuccess; + captureGpuContext(call); + + switch (call.type) { + case rrGroupStart: + case rrGroupEnd: + case rrGroupSimulatedEnd: + case rrGetUniqueId: + case rrCommInitDev: + case rrCommInitRank: + case rrCommInitAll: + case rrCommInitRankConfig: + case rrCommSplit: + case rrCommFinalize: + case rrCommDestroy: + case rrCommAbort: // communicator ops just exit and write + case rrCommRegister: // same with UBR + case rrCommDeregister: + case rrMemAlloc: + case rrMemFree: + case rrRedOpCreatePreMulSum: + case rrRedOpDestroy: + break; + + // collectives, which may be registered with graph + case rrAllToAll: // should work with rest of the coll with nested sendrecv + default: // collective other than a2a/a2av + #if ROCM_VERSION >= 60100 + hipStreamCaptureStatus status; + hipGraph_t graphCaptured; + unsigned long long graphID = 0; + CUDACHECK(hipStreamGetCaptureInfo_v2(call.stream, &status, &(call.graphID), &graphCaptured)); // shouldnt we need dependency? + + if (status == hipStreamCaptureStatusActive) // when graph launched this should be disabled + { + call.graphCaptured = 1; + calls.push_back(call); + hipGraphNode_t logNode; + hipHostNodeParams p; + p.fn = &(Recorder::recordLater); + p.userData = (void*) (calls.size() - 1); + CUDACHECK(hipGraphAddHostNode(&logNode, graphCaptured, nullptr, 0, &p)); + } else { + call.graphCaptured = 0; + } + #endif + } + + write(call); // write immediatelly + return ret; +} + +ncclResult_t Recorder::record(rcclCall_t type, const ncclInfo& info) +{ + if (!filename.size()) + { + return ncclSuccess; + } + + rcclApiCall call(type, info); + return record(call); +} + +ncclResult_t Recorder::record(rcclCall_t type, const void* sendbuff, void* recvbuff, + size_t count, ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream, int root, + const size_t sendcounts[], const size_t sdispls[], const size_t recvcounts[], const size_t rdispls[]) +{ + if (!filename.size()) + { + return ncclSuccess; + } + + rcclApiCall call(type, {.sendbuff = sendbuff, .recvbuff = recvbuff, .count = count, + .datatype = datatype, .comm = comm, .stream = stream}); + if (root != -1) + { + call.root = root; + } + + ncclResult_t ret = record(call); + if (type == rrAllToAllv) + { + if (output_json) + { + int size = call.nRanks - 1; + outputFile << ", sendcounts : ["; + for (int i = 0; i < size; i++) outputFile << sendcounts[i] << ", "; + outputFile << sendcounts[size] << "], sdispls : ["; + for (int i = 0; i < size; i++) outputFile << sdispls[i] << ", "; + outputFile << sdispls[size] << "], recvcounts : ["; + for (int i = 0; i < size; i++) outputFile << recvcounts[i] << ", "; + outputFile << recvcounts[size] << "], rdispls : ["; + for (int i = 0; i < size; i++) outputFile << rdispls[i] << ", "; + outputFile << rdispls[size] << "]"; + outputFile.flush(); + } + // else export to binary + } + return ret; +} + +ncclResult_t Recorder::record(rcclCall_t type, ncclRedOp_t op, ncclComm_t comm, ncclDataType_t datatype, ncclScalarResidence_t residence, void* scalar) +{ + if (!filename.size()) + { + return ncclSuccess; + } + + rcclApiCall call(type, {.op = op, .comm = comm}); + if (type == rrRedOpCreatePreMulSum) + { + call.sendbuff = scalar; + call.datatype = datatype; + call.root = residence; + call.sendbuff = scalar; + } + return record(call); + //TODO: printout scalar +} + +ncclResult_t Recorder::record(rcclCall_t type, int groupDepth) +{ + if (!filename.size()) + { + return ncclSuccess; + } + rcclApiCall gc(type); + gc.groupDepth = groupDepth; + return record(gc); +} + +ncclResult_t Recorder::record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, ncclComm_t comm, int device) +{ + if (!filename.size()) + { + return ncclSuccess; + } + + rcclApiCall initCall(type); + + if (type == rrCommSplit) + { + initCall.comm = comm; + initCall.commId = (uint64_t)commId; + } else { + initCall.commId = hashUniqueId(*commId); + } + + initCall.nRanks = size; + initCall.globalRank = rank; + if (type == rrCommInitDev) + { + initCall.root = device; + initCall.comm = comm; + } + + return record(initCall); +} + +// comm destroy +ncclResult_t Recorder::record(rcclCall_t type, ncclComm_t comm) +{ + if (!filename.size()) + { + return ncclSuccess; + } + + rcclApiCall call(type); + call.comm = comm; + return record(call); +} + +ncclResult_t Recorder::record(rcclCall_t type, ncclComm_t comm, void* handle, void* userBuffer, size_t size) +{ + if (!filename.size()) + { + return ncclSuccess; + } + rcclApiCall call(type); + call.comm = comm; + call.recvbuff = handle; + if (type == rrCommRegister) + { + call.sendbuff = userBuffer; + call.count = size; + } + return record(call); +} + +ncclResult_t Recorder::record(rcclCall_t type, void* ptr, size_t size) +{ + if (!filename.size()) + { + return ncclSuccess; + } + rcclApiCall call(type); + call.recvbuff = ptr; + if (type == rrMemAlloc) + { + call.count = size; + } + return record(call); +} + +void Recorder::record(int groupDepth, ncclSimInfo_t *siminfo) +{ + if (!filename.size()) + { + return; + } + rcclApiCall call(rrGroupSimulatedEnd); + record(call); + + if (output_json && siminfo) + { + int len = snprintf(buffer, 4096, siminfo_fmt.c_str(), + siminfo->size, siminfo->magic, siminfo->version, siminfo->estimatedTime, call.timestamp); + outputFile.write(buffer, len); + } // no tid for groupCall + outputFile.flush(); +} + +void Recorder::record(rcclCall_t type, int size, int rank, ncclUniqueId* commId, ncclConfig_t* config, ncclComm_t comm) +{ + if (!filename.size()) + { + return ; + } + + if (type == rrCommInitRankConfig) + { + record(type, size, rank, commId); + } + else //rrCommSplit + { + record(type, size /*color*/, rank/*key*/, commId, comm); + } + + if (output_json && config) + { + int len = snprintf(buffer, 4096, config_fmt.c_str(), config->size, config->magic, config->version, config->blocking, + config->cgaClusterSize, config->minCTAs, config->maxCTAs, config->netName, config->splitShare); + outputFile.write(buffer, len); + outputFile.flush(); + } +} + +void Recorder::record(ncclComm_t* comms, int ndev, const int* devlist) +{ + if (!filename.size()) + { + return ; + } + + rcclApiCall call(rrCommInitAll); + call.root = ndev; + // call.sendbuff = comms; TODO: might log this too + record(call); + + if (output_json && devlist) + { + outputFile << ", devlist : ["; + for (int i = 0; i < call.root - 1; i++) + outputFile << devlist[i] << ", "; + outputFile << devlist[call.root - 1] << "]"; + outputFile.flush(); + } +} + +Recorder::~Recorder() +{ + if (outputFile.is_open()) + { + if (output_json) outputFile << std::endl << "}" << std::endl; + outputFile.close(); + calls.clear(); + } +} + +static rcclCall_t getFuncType(std::string func) +{ + for (int i = 0; i < sizeof(rcclCallStr) / sizeof(char*); i++) + { + if (func == std::string(rcclCallStr[i])) + { + return (rcclCall_t)i; + } + } + printf("[ERROR] Unrecognized func %s\n", func.c_str()); + exit(1); +} + +void parseJsonEntry(const char* entry, std::vector& calls) +{ + rcclApiCall call; + std::string str(entry); + size_t begin = str.find_first_not_of(' '); + size_t end = str.find(" : "); + rcclCall_t type = getFuncType(str.substr(begin, end-begin)); + call.type = type; + switch(type) { + case rrCommRegister: + { + assert(sscanf(str.c_str() + end + 3, (ubr_fmt.substr(5) + ctxt_fmt).c_str(), + &call.comm, &call.sendbuff, &call.recvbuff, &call.count) == 4); + break; + } + case rrCommDeregister: + { + assert(sscanf(str.c_str() + end + 3, (ubDereg_fmt.substr(5) + ctxt_fmt).c_str(), + &call.comm, &call.recvbuff) == 2); + break; + } + case rrGetUniqueId: + { + assert(sscanf(str.c_str() + end + 3, (getId_fmt.substr(5) + ctxt_fmt).c_str(), &call.commId) == 1); + break; + } + case rrCommInitDev: + { + assert(sscanf(str.c_str() + end + 3, (init_fmt.substr(5) + ctxt_fmt).c_str(), + &call.comm, &call.nRanks, &call.commId, &call.globalRank, &call.root) == 5); + break; + } + case rrCommInitRank: + case rrCommInitRankConfig: + { + assert(sscanf(str.c_str() + end + 3, (rank_fmt.substr(5) + ctxt_fmt).c_str(), + &call.nRanks, &call.commId, &call.globalRank) == 3); + break; + } + case rrCommInitAll: + { + assert(sscanf(str.c_str() + end + 3, (all_fmt.substr(5) + ctxt_fmt).c_str(), &call.root) == 1); + break; + } + case rrCommFinalize: + case rrCommDestroy: + case rrCommAbort: + { + assert(sscanf(str.c_str() + end + 3, (destroy_fmt.substr(5) + ctxt_fmt).c_str(), &call.comm) == 1); + break; + } + case rrCommSplit: + { + assert(sscanf(str.c_str() + end + 3, (split_fmt.substr(5) + ctxt_fmt).c_str(), + &call.commId, &call.comm, &call.nRanks, &call.globalRank, &call.comm) == 5); + break; + } + case rrMemAlloc: + { + assert(sscanf(str.c_str() + end + 3, (alloc_fmt.substr(5) + ctxt_fmt).c_str(), + &call.recvbuff, &call.count) == 2); + break; + } + case rrMemFree: + { + assert(sscanf(str.c_str() + end + 3, (free_fmt.substr(5) + ctxt_fmt).c_str(), &call.recvbuff) == 1); + break; + } + case rrRedOpCreatePreMulSum: + { + assert(sscanf(str.c_str() + end + 3, (redop_fmt.substr(5) + ctxt_fmt).c_str(), + &call.sendbuff, &call.datatype, &call.op, &call.root, &call.comm) == 5); + break; + } + case rrRedOpDestroy: + { + assert(sscanf(str.c_str() + end + 3, (redopdestroy_fmt.substr(5) + ctxt_fmt).c_str(), + &call.op, &call.comm) == 2); + break; + } + default: + assert(sscanf(str.c_str() + end + 3, (coll_fmt.substr(5) + ctxt_fmt).c_str(), + &call.opCount, &call.sendbuff, &call.recvbuff, &call.count, &call.datatype, &call.op, &call.root, + &call.comm, &call.nRanks, &call.stream, &call.nTasks, &call.globalRank, &call.timestamp, &call.tid, + &call.hipDev, &call.graphCaptured, &call.graphID) == 17); + } + calls.push_back(call); +} + +void parseBinLog() +{ + // TODO: need to handle trailing data such as simInfo, devList, a2av data, etc. +} +}; diff --git a/src/msccl.cc b/src/msccl.cc index 2fca78f23c..312b8b2ce4 100644 --- a/src/msccl.cc +++ b/src/msccl.cc @@ -11,8 +11,11 @@ #include #include +using namespace rccl; + NCCL_API(ncclResult_t, mscclLoadAlgo, const char *mscclAlgoFilePath, mscclAlgoHandle_t *mscclAlgoHandle, int rank); ncclResult_t mscclLoadAlgo_impl(const char *mscclAlgoFilePath, mscclAlgoHandle_t *mscclAlgoHandle, int rank) { + Recorder::instance().record("mscclLoadAlgo"); mscclStatus& status = mscclGetStatus(rank); if (status.freeAlgoHandles.size() == 0) { @@ -45,6 +48,7 @@ ncclResult_t mscclRunAlgo_impl( void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, mscclAlgoHandle_t mscclAlgoHandle, ncclComm_t comm, hipStream_t stream) { + Recorder::instance().record("mscclRunAlgo"); struct NvtxParamsMsccl { size_t bytes; ncclRedOp_t op; @@ -91,5 +95,6 @@ ncclResult_t mscclRunAlgo_impl( NCCL_API(ncclResult_t, mscclUnloadAlgo, mscclAlgoHandle_t mscclAlgoHandle); ncclResult_t mscclUnloadAlgo_impl(mscclAlgoHandle_t mscclAlgoHandle) { // deprecated + Recorder::instance().record("mscclUnloadAlgo"); return ncclSuccess; } diff --git a/src/register.cc b/src/register.cc index 348150c786..5661c47511 100644 --- a/src/register.cc +++ b/src/register.cc @@ -15,6 +15,8 @@ #include "mscclpp/mscclpp_nccl.h" #endif +using namespace rccl; + ncclResult_t ncclNetDeregister(struct ncclComm* comm, struct ncclReg* reg) { struct ncclRegCache* cache = &comm->regCache; ncclDebugNoWarn = NCCL_NET; @@ -162,6 +164,8 @@ ncclResult_t ncclRegCleanup(struct ncclComm* comm) { NCCL_API(ncclResult_t, ncclCommRegister, const ncclComm_t comm, void* buff, size_t size, void** handle); ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t size, void** handle) { + ncclResult_t ret = ncclSuccess; + NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm")); if (comm->checkPointers) NCCLCHECK(CudaPtrCheck(buff, comm, "buff", "ncclCommRegister")); #ifdef ENABLE_MSCCLPP @@ -171,8 +175,7 @@ ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t siz CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast(buff))); if(!isManagedBuffer){ INFO(NCCL_INIT, "MSCCL++: ncclCommRegister"); - NCCLCHECK(mscclpp_ncclCommRegister(comm->mscclpp_comm, buff, size, handle)); - return ncclSuccess; + NCCLCHECKGOTO(mscclpp_ncclCommRegister(comm->mscclpp_comm, buff, size, handle), ret, end); } else{ WARN("MSCCL++: Cannot register user-buffers on managed memory. RCCL user-buffer registration will occur."); @@ -181,12 +184,17 @@ ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t siz } #endif INFO(NCCL_INIT, "RCCL: ncclCommRegister"); - NCCLCHECK(ncclRegister(comm, buff, size, handle)); - return ncclSuccess; + NCCLCHECKGOTO(ncclRegister(comm, buff, size, handle), ret, end); + +end: + // !recording at sink + NCCLCHECK(Recorder::instance().record(rrCommRegister, comm, *handle, buff, size)); + return ret; } NCCL_API(ncclResult_t, ncclCommDeregister, const ncclComm_t comm, void* handle); ncclResult_t ncclCommDeregister_impl(const ncclComm_t comm, void* handle) { + NCCLCHECK(Recorder::instance().record(rrCommDeregister, comm, handle)); #ifdef ENABLE_MSCCLPP if (comm->mscclppCompatible) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7ffa52a852..0d3bdaffb4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,7 @@ if(BUILD_TESTS) ScatterTests.cpp SendRecvTests.cpp StandaloneTests.cpp + _RecorderTests.cpp common/main.cpp common/CallCollectiveForked.cpp common/CollectiveArgs.cpp @@ -55,6 +56,7 @@ if(BUILD_TESTS) common/TestBed.cpp common/TestBedChild.cpp common/StandaloneUtils.cpp + ../src/misc/recorder.cc ) add_executable(rccl-UnitTests ${TEST_SOURCE_FILES}) diff --git a/test/_RecorderTests.cpp b/test/_RecorderTests.cpp new file mode 100644 index 0000000000..ff354870d5 --- /dev/null +++ b/test/_RecorderTests.cpp @@ -0,0 +1,72 @@ +/************************************************************************* + * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include +#include + +#include "RcclMockFuncs.hpp" +//#include "TestBed.hpp" + +namespace RcclUnitTesting +{ + /** + * \brief Verify correctness of Recorder record() correctness in binary mode + * ******************************************************************************************/ + TEST(Recorder, ParseBinary) + { + // to add after binary export of logging is supported + } + + /** + * \brief Verify correctness of Recorder record() correctness in json mode + * ******************************************************************************************/ + TEST(Recorder, ParseJson) + { + setenv("RCCL_REPLAY_FILE", "test.json", 1); + + int pid = getpid(); + hipStream_t stream; + hipStreamCreate(&stream); + + int array[] = {2, 3, 5}; + ncclComm comm{.nRanks = 1, .localRank = 1, .localRankToRank = array, .opCount = 8, .planner = {.nTasksColl = 13, .nTasksP2p = 21}}; + rccl::rcclApiCall call(rccl::rrAllToAllv, {.sendbuff = (void*)0x7f22f9600000, .recvbuff = (void*)0x7f22f9601000, .count = 0, .datatype = ncclFloat32, .comm = &comm, .stream = stream}); + rccl::Recorder::instance().record(call); + + std::vector calls; + char entry[4096]; + //parse the outfile + std::string filename = "test" + std::to_string(pid) + ".json"; + std::ifstream fp("test" + std::to_string(pid) + ".json"); + fp.getline(entry, 4096); + fp.getline(entry, 4096); + fp.getline(entry, 4096); + parseJsonEntry(entry, calls); + int result = memcmp((char*)&calls[0]+4, (char*)&call+4, sizeof(rccl::rcclApiCall)-4); + fp.close(); // care that recorder is not designed to anticipate fp closing before destructor + remove(filename.c_str()); + unsetenv("RCCL_REPLAY_FILE"); + assert(!result); + } + + /** + * \brief Verify RCCL Recorder's integrity in multithread context by comparing Recorder + * instance across different threads. + * ******************************************************************************************/ + static void recorderCmp(void** recorder) + { + *recorder = &(rccl::Recorder::instance()); + } + TEST(Recorder, VerifyMultithread) + { + void *p1, *p2; + std::thread t1(recorderCmp, &p1); + std::thread t2(recorderCmp, &p2); + t1.join(); + t2.join(); + assert(p1 == p2); + } +} diff --git a/test/common/RcclMockFuncs.hpp b/test/common/RcclMockFuncs.hpp new file mode 100644 index 0000000000..302f5a44ab --- /dev/null +++ b/test/common/RcclMockFuncs.hpp @@ -0,0 +1,7 @@ +#include "info.h" +#include "comm.h" + +void ncclDebugLog(ncclDebugLogLevel, unsigned long, char const*, int, char const*, ...) {}; +ncclResult_t getHostName(char* hostname, int maxlen, const char delim) { + return ncclSuccess; +} diff --git a/tools/RcclReplayer/Makefile b/tools/RcclReplayer/Makefile new file mode 100644 index 0000000000..d2f12d3783 --- /dev/null +++ b/tools/RcclReplayer/Makefile @@ -0,0 +1,12 @@ +ROCM_DIR ?= /opt/rocm +RCCL_DIR ?= ../../build/release +MPI_DIR ?= /opt/ompi + +INCLUDES = -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include +LDFLAGS = -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl + +main: rcclReplayer.cpp + $(ROCM_DIR)/bin/hipcc rcclReplayer.cpp -O1 -g -o rcclReplayer $(INCLUDES) $(LDFLAGS) + +clean: + rm -f ./rcclReplayer diff --git a/tools/rccl_replayer/README.md b/tools/RcclReplayer/README.md similarity index 100% rename from tools/rccl_replayer/README.md rename to tools/RcclReplayer/README.md diff --git a/tools/rccl_replayer/rcclReplayer.cpp b/tools/RcclReplayer/rcclReplayer.cpp similarity index 96% rename from tools/rccl_replayer/rcclReplayer.cpp rename to tools/RcclReplayer/rcclReplayer.cpp index 37d250a997..140f733780 100644 --- a/tools/rccl_replayer/rcclReplayer.cpp +++ b/tools/RcclReplayer/rcclReplayer.cpp @@ -180,31 +180,36 @@ void dataToCsv(GroupCall const& gc, std::ofstream &datafile, double runTime) void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& cc) { - bool verbose = isFirstRank && (getenv("VERBOSE") != NULL); cc.globalRankComms.clear(); cc.globalRankComms.resize(cc.numGlobalRanks); cc.groupCalls.clear(); - FILE* fp = fopen(logFilename, "r"); + std::ifstream fp(logFilename, std::ios::binary); if (!fp) { printf("[ERROR] Unable to open file %s\n", logFilename); exit(-1); } - char line[2048]; - LineItem li; + constexpr size_t size = sizeof(LineItem) + 1; + char line[size]; //size of collectivecall struct int lineNum = 0; - while (fgets(line, 2048, fp)) { - ++lineNum; + while (fp.getline(line, size)) + { + LineItem li = *((LineItem*)line); - //Ignore invalid lines and collectives - if (!ParseLineItem(line, li) || li.nRanks != cc.numGlobalRanks) continue; + if (li.coll == 10 || li.coll == 11) + { + continue; + } + // if need hostname will parse together with num of line in output + // Ignore invalid lines and collectives + if (li.nRanks != cc.numGlobalRanks) continue; // Figure out commIdx for this globalrank int commIdx = -1; for (auto i = 0; i < cc.globalRankComms[li.globalRank].size(); i++) { - if (!strcmp(cc.globalRankComms[li.globalRank][i].c_str(), li.comm)) { + if (cc.globalRankComms[li.globalRank][i] != li.comm) { commIdx = i; break; } @@ -215,8 +220,8 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls } TaskInfo taskInfo; - taskInfo.funcType = GetFuncType(li.opName); - taskInfo.inPlace = !strcmp(li.sendbuff, li.recvbuff); + taskInfo.funcType = (ncclFunc_t) li.coll; + taskInfo.inPlace = li.sendbuff == li.recvbuff; taskInfo.count = li.count; taskInfo.datatype = (ncclDataType_t) li.datatype; taskInfo.op = (ncclRedOp_t) li.op; @@ -229,7 +234,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls if (gc.opCount != li.opCount) continue; if (gc.rankData.count(li.globalRank)) { RankData& rd = gc.rankData[li.globalRank]; - if (rd.commIdx != commIdx || rd.tasks.size() != li.task) + if (rd.commIdx != commIdx || rd.tasks.size() != li.nTasks) continue; rd.tasks.push_back(taskInfo); @@ -237,7 +242,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls break; } // Rank has no tasks - make sure this is task 0 - else if (li.task == 0) { + else if (li.nTasks == 0) { gc.rankData[li.globalRank].lineNum = lineNum; gc.rankData[li.globalRank].commIdx = commIdx; gc.rankData[li.globalRank].tasks.push_back(taskInfo); @@ -248,7 +253,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls // If no collectives were found, create new one if (!found) { - if (li.task != 0) { + if (li.nTasks != 0) { if (isFirstRank) printf("[WARN] Was unable to find corresponding collective for line %d\n", lineNum); } @@ -260,7 +265,7 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls cc.groupCalls.push_back(gc); } } - fclose(fp); + fp.close(); // Validate group calls // - For non Send/Recv, check that all ranks participate with same parameters count @@ -375,18 +380,6 @@ void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls } } -bool ParseLineItem(char const* line, LineItem& li) -{ - return sscanf(line, - "%[^:]:%d:%d [%d] NCCL INFO %[^:]: opCount %x sendbuff %s " - "recvbuff %s count %lu datatype %d op %d root %d comm %s " - "[nranks=%d] stream %p task %d globalrank %d", - li.hostname, &li.pid, &li.tid, &li.cudaDev, li.opName, - &li.opCount, li.sendbuff, li.recvbuff, - &li.count, &li.datatype, &li.op, &li.root, li.comm, - &li.nRanks, &li.stream, &li.task, &li.globalRank) == 17; -} - double ReplayRccl(CollectiveCalls& cc, int groupIdx, int& numInvalid) { int numLocalRanks = cc.localRankComms.size(); @@ -566,6 +559,12 @@ void ExecuteCollective(TaskInfo& task, ncclComm_t const& comm, hipStream_t strea case ncclCollRecv: NCCL_CALL(ncclRecv(task.outputGpu.ptr, task.count, task.datatype, task.root, comm, stream)); break; + case ncclStartGroup: + NCCL_CALL(ncclGroupStart()); + break; + case ncclEndGroup: + NCCL_CALL(ncclGroupEnd()); + break; default: printf("Error: unsupported collective\n"); exit(1); @@ -755,4 +754,4 @@ void PrepData_Send(TaskInfo& taskInfo, int globalRank) { void PrepData_Recv(TaskInfo& taskInfo, int globalRank) { FillPattern(taskInfo.expected, taskInfo.datatype, taskInfo.count, globalRank); -} \ No newline at end of file +} diff --git a/tools/rccl_replayer/rcclReplayer.hpp b/tools/RcclReplayer/rcclReplayer.hpp similarity index 96% rename from tools/rccl_replayer/rcclReplayer.hpp rename to tools/RcclReplayer/rcclReplayer.hpp index 486826d94e..8979389793 100644 --- a/tools/rccl_replayer/rcclReplayer.hpp +++ b/tools/RcclReplayer/rcclReplayer.hpp @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -36,23 +37,25 @@ struct LineItem { - char hostname[MPI_MAX_PROCESSOR_NAME]; - int pid; - int tid; - int cudaDev; - char opName[32]; - int opCount; - char sendbuff[32]; - char recvbuff[32]; - size_t count; - int datatype; - int op; - int root; - char comm[32]; - int nRanks; - void* stream; - int task; - int globalRank; + int pid; + int tid; + int cudaDev; + //int graph; + int groupDepth; + + int coll; + uint64_t opCount; + void* sendbuff; + void* recvbuff; + size_t count; + int datatype; + int op; + int root; + void* comm; + int nRanks; + void* stream; + int nTasks; + int globalRank; }; // Enumeration of all collective functions currently supported @@ -69,7 +72,9 @@ typedef enum ncclCollAllToAllv, ncclCollSend, ncclCollRecv, - ncclNumFuncs + ncclNumFuncs, + ncclStartGroup = 10; + ncclEndGroup = 11; } ncclFunc_t; char const ncclFuncNames[ncclNumFuncs][32] = @@ -144,8 +149,8 @@ struct RankData struct GroupCall { - bool isValid; - int opCount; + bool isValid; + uint64_t opCount; std::map rankData; }; @@ -153,7 +158,7 @@ struct CollectiveCalls { int numGlobalRanks; int numGpusPerMpiRank; - std::vector> globalRankComms; // Set of comms used by each global rank + std::vector> globalRankComms; // Set of comms used by each global rank std::vector groupCalls; // List of group calls for each global rank int localGpuOffset; // First local GPU device idx for this MPI process @@ -384,9 +389,6 @@ bool IsRootUsed(ncclFunc_t funcType) { funcType == ncclCollGather || funcType == ncclCollScatter); } -// parse the logs and assign them into lineItem -bool ParseLineItem(char const* line, LineItem& li); - // this covers grouping the logs based on opCount and task number, // validatation of the groupCalls for both non-send/recv collectives and send/recv void ParseCollectives(char const* logFilename, bool isFirstRank, CollectiveCalls& collectiveCalls); @@ -426,4 +428,4 @@ void PrepData_Gather(TaskInfo& taskInfo, int globalRank, int totalRanks, bool is void PrepData_Scatter(TaskInfo& taskInfo, int globalRank, int totalRanks); void PrepData_AlltoAll(TaskInfo& taskInfo, int globalRank, int totalRanks); void PrepData_Send(TaskInfo& taskInfo, int globalRank); -void PrepData_Recv(TaskInfo& taskInfo, int globalRank); \ No newline at end of file +void PrepData_Recv(TaskInfo& taskInfo, int globalRank); diff --git a/tools/rccl_replayer/Makefile b/tools/rccl_replayer/Makefile deleted file mode 100644 index 5dac92b692..0000000000 --- a/tools/rccl_replayer/Makefile +++ /dev/null @@ -1,14 +0,0 @@ -ROCM_DIR ?= /opt/rocm -RCCL_DIR ?= ../../build/release -MPI_DIR ?= /opt/ompi -MPI_INC_DIR ?= /usr/include/x86_64-linux-gnu/mpi -MPI_LIB_DIR ?= /usr/lib/x86_64-linux-gnu - -INCLUDES = -I$(MPI_INC_DIR) -I$(MPI_DIR)/include -I$(RCCL_DIR)/include -I$(RCCL_DIR)/hipify/src/include -LDFLAGS = -L$(MPI_LIB_DIR) -L$(MPI_DIR)/lib -L$(RCCL_DIR) -lmpi -lrccl - -main: rcclReplayer.cpp - $(ROCM_DIR)/bin/hipcc rcclReplayer.cpp -O1 -g -o rcclReplayer $(INCLUDES) $(LDFLAGS) - -clean: - rm -f ./rcclReplayer