From 020dcf0a7ca34f25364dfbbf21a9342b223e7464 Mon Sep 17 00:00:00 2001 From: Dingming Wu Date: Wed, 25 Jun 2025 21:01:34 -0700 Subject: [PATCH] Add proxyTrace (#1732) This feature tracks the proxy events and status of each send/recv op. ProxyTrace keeps a fixed number of active ops in host mem and dumps the status of each op when the program crashes or hangs. --- CMakeLists.txt | 4 + cmake/Dependencies.cmake | 20 ++ src/enqueue.cc | 12 +- src/include/proxy.h | 14 ++ src/include/proxy_trace/proxy_trace.h | 181 ++++++++++++++++ src/init.cc | 11 + src/misc/proxy_trace/proxy_trace.cc | 264 +++++++++++++++++++++++ src/proxy.cc | 13 ++ src/transport/net.cc | 38 +++- test/CMakeLists.txt | 4 + test/proxy_trace/ProxyTraceUnitTests.cpp | 134 ++++++++++++ 11 files changed, 693 insertions(+), 2 deletions(-) create mode 100644 src/include/proxy_trace/proxy_trace.h create mode 100644 src/misc/proxy_trace/proxy_trace.cc create mode 100644 test/proxy_trace/ProxyTraceUnitTests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6ccaf57f8a..4b8aea2245 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -562,6 +562,7 @@ set(SRC_FILES src/include/nvtx3/nvtxDetail/nvtxInitDefs.h src/include/nvtx3/nvtxDetail/nvtxLinkOnce.h src/include/nvtx3/nvtxDetail/nvtxTypes.h + src/include/proxy_trace/proxy_trace.h src/include/plugin/nccl_net.h src/include/plugin/nccl_profiler.h src/include/plugin/nccl_tuner.h @@ -608,6 +609,7 @@ set(SRC_FILES src/misc/msccl/msccl_parser.cc src/misc/msccl/msccl_setup.cc src/misc/msccl/msccl_status.cc + src/misc/proxy_trace/proxy_trace.cc src/plugin/net.cc src/plugin/plugin_open.cc src/plugin/profiler.cc @@ -754,6 +756,7 @@ add_custom_target(git_version_check VERBATIM ) + # Set up RCCL library #================================================================================================== ## Set RCCL source files @@ -1078,6 +1081,7 @@ target_link_libraries(rccl INTERFACE hip::host) target_link_libraries(rccl PRIVATE hip::device) target_link_libraries(rccl PRIVATE dl) target_link_libraries(rccl PRIVATE ${ROCM_SMI_LIBRARIES}) +target_link_libraries(rccl PRIVATE fmt::fmt) if(ENABLE_MSCCLPP) target_link_libraries(rccl PRIVATE mscclpp_nccl) endif() diff --git a/cmake/Dependencies.cmake b/cmake/Dependencies.cmake index d7095b38c0..725c0c2e37 100644 --- a/cmake/Dependencies.cmake +++ b/cmake/Dependencies.cmake @@ -32,6 +32,8 @@ # For downloading, building, and installing required dependencies include(cmake/DownloadProject.cmake) +include(FetchContent) + if(NOT INSTALL_DEPENDENCIES) find_package(GTest 1.11) endif() @@ -105,6 +107,24 @@ if(NOT ROCM_FOUND) find_package( ROCM 0.7.3 REQUIRED CONFIG PATHS ${PROJECT_EXTERN_DIR}/rocm-cmake ) endif() +set(CMAKE_INSTALL_LIBDIR lib CACHE STRING "Define install directory for libraries" FORCE) + +# Find or download/install fmt +find_package(fmt QUIET) +if(NOT fmt_FOUND) + message(STATUS "fmt not found, fetching from source...") + FetchContent_Declare( + fmt + GIT_REPOSITORY https://github.com/fmtlib/fmt + GIT_TAG e69e5f977d458f2650bb346dadf2ad30c5320281 # 10.2.1 + ) + FetchContent_MakeAvailable(fmt) +else() + message(STATUS "Using system fmt") + get_target_property(FMT_INCLUDE_DIRS fmt::fmt INTERFACE_INCLUDE_DIRECTORIES) + message(STATUS "fmt include directories: ${FMT_INCLUDE_DIRS}") +endif() + # Find available local ROCM targets # NOTE: This will eventually be part of ROCm-CMake and should be removed at that time function(rocm_local_targets VARIABLE) diff --git a/src/enqueue.cc b/src/enqueue.cc index f680128615..22ec0abd6c 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -139,11 +139,16 @@ static inline int ncclFuncTrafficPerByte(ncclFunc_t func, int nRanks) { } } +RCCL_PARAM_DECLARE(EnableProxyTrace); /*****************************************************************************/ /* Launch system : synchronization and CUDA kernel launch */ /*****************************************************************************/ static ncclResult_t addProxyOpIfNeeded(struct ncclComm* comm, struct ncclKernelPlan* plan, struct ncclProxyOp* op) { bool needed = true; + if (rcclParamEnableProxyTrace()) { + op->traceKey.commHash = comm->commHash; + op->traceKey.opCount = comm->opCount; + } NCCLCHECK(ncclProxySaveOp(comm, op, &needed)); if (needed) { struct ncclProxyOp* q = ncclMemoryPoolAlloc(&comm->memPool_ncclProxyOp, &comm->memPermanent); @@ -1030,6 +1035,9 @@ static ncclResult_t addP2pToPlan( op->rank = comm->rank; op->eActivationMask = p2pTasks[dir] ? p2pTasks[dir]->eActivationMask : 0; op->connIndex = connIndex[dir]; + if (rcclParamEnableProxyTrace()) { + op->coll = dir ? ncclFuncSend : ncclFuncRecv; + } // The following are modified per channel part in addWorkToChannels(): // op->buffer, op->nbytes, op->nsteps = ...; } @@ -1052,7 +1060,9 @@ static ncclResult_t addP2pToPlan( int nParts = dir ? work->nSendChannels : work->nRecvChannels; void* addr = dir ? work->sendAddr : work->recvAddr; size_t bytes = dir ? work->sendBytes : work->recvBytes; - + if (rcclParamEnableProxyTrace()) { + proxyOps[dir].totalBytes = bytes; + } proxyOps[dir].recvbuff = nullptr; if (nParts <= part) { proxyOps[dir].nsteps = 0; diff --git a/src/include/proxy.h b/src/include/proxy.h index e62e01f2cc..3cf2999808 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -18,6 +18,7 @@ #include "shmutils.h" #include "p2p.h" #include "collectives.h" +#include "proxy_trace/proxy_trace.h" typedef enum : uint8_t { ncclPatternRing, @@ -109,6 +110,12 @@ struct ncclProxyOp { uint64_t workCounter; struct ncclProxyOp *enqNext; + + // Used to track total real bytes of this op + uint32_t totalBytes; + // Used to fetch/update the proxyOp in ProxyTrace map + facebook_rccl::ProxyTraceRecordKey traceKey; + facebook_rccl::ProxyTraceExtraInfo traceInfo; }; struct ncclProxySubArgs { @@ -160,6 +167,10 @@ struct ncclProxySubArgs { int npKitSizesFifo[NCCL_STEPS]; uint64_t timestamp[NCCL_STEPS]; #endif + + // Used to fetch/update the proxyOp in ProxyTrace map + facebook_rccl::ProxyTraceRecordKey traceKey; + facebook_rccl::ProxyTraceExtraInfo traceInfo; }; struct ncclProxyArgs { @@ -352,6 +363,9 @@ struct ncclProxyState { // Queue of expected responses from the proxy struct ncclExpectedProxyResponse* expectedResponses; + + // A handle to the proxy traces + std::unique_ptr proxyTrace; }; enum proxyConnectState { diff --git a/src/include/proxy_trace/proxy_trace.h b/src/include/proxy_trace/proxy_trace.h new file mode 100644 index 0000000000..bb8920eb60 --- /dev/null +++ b/src/include/proxy_trace/proxy_trace.h @@ -0,0 +1,181 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +namespace facebook_rccl { + +enum class ProxyOpStepStatus { + INIT, + POSTING, + SENDING, + RECEIVING, + WAITING_GPU, + FLUSHING, + DONE, + UNINITIALIZED, + NUM_STATUS, +}; + +// All counters have default value of 0, except for FIFO_SZ_OR_HEAD_CACHE, which +// has default value of -1 +enum class ProxyCounterTypes { + POSTED = 0, + KERNEL_COPY_READY, + // ready-to-receive signal received from remote + RTR_RECV, + // ready-to-send signal sent to remote + RTS_SEND, + RECEIVED, + TRANSMITTED, + FLUSHED, + DONE, + // tail pointer of data ready to be sent, updated by kernel + RECV_TAIL, + // tail for sender, head for receiver, updated by kernel + TAIL_OR_HEAD, + // for sender this is data size of D2D copy; for receiver this is head cache, + // i.e., sub->base + sub->done + FIFO_SZ_OR_HEAD_CACHE, + UNINITIALIZED = 100 +}; + +enum class ProxyOpType { SEND, RECV }; +// ProxyTraceRecordKey and ProxyTraceExtraInfo is used to pass arguments to +// proxy thread (see ncclProxyOp and ncclProxySubArgs in proxy.h) +struct ProxyTraceRecordKey { + uint64_t commHash{0}; + int64_t opCount{-1}; // opCount is a unique id for a given collective/p2p + int64_t proxyOpId{-1}; // id of a proxyOp in an given comm and grouped + // collective/p2p (identified as commHash:opCount), + // assigned when creating ProxyTraceOp entry + inline std::string str() const { + return "<" + std::to_string(commHash) + ":" + std::to_string(opCount) + + ":" + std::to_string(proxyOpId) + ">"; + } +}; + +struct ProxyTraceExtraInfo { + int32_t funcIdx{-1}; + int32_t protocol{-1}; + int32_t pattern{-1}; + uint32_t totalBytes{0}; + uint32_t chunkSize{0}; + inline std::string str() const { + return fmt::format("[fu,pr,pa,tb,ck]:{},{},{},{},{}", funcIdx, protocol, + pattern, totalBytes, chunkSize); + } +}; + +// record progress state per comm per collective per proxyOp +struct ProxyTraceOp { + ProxyTraceRecordKey traceKey; + ProxyTraceExtraInfo extraInfo; + int32_t channelId{-1}; + int32_t nSteps{-1}; + uint32_t nbytes{0}; + int32_t myRank{-1}; + int32_t peerRank{-1}; + std::unordered_map counters{ + {ProxyCounterTypes::POSTED, 0}, + {ProxyCounterTypes::KERNEL_COPY_READY, 0}, + {ProxyCounterTypes::RTR_RECV, 0}, + {ProxyCounterTypes::RTS_SEND, 0}, + {ProxyCounterTypes::RECEIVED, 0}, + {ProxyCounterTypes::TRANSMITTED, 0}, + {ProxyCounterTypes::FLUSHED, 0}, + {ProxyCounterTypes::DONE, 0}, + {ProxyCounterTypes::RECV_TAIL, 0}, + {ProxyCounterTypes::TAIL_OR_HEAD, 0}, + {ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE, -1}, + }; + ProxyCounterTypes lastUpdatingCounter{ProxyCounterTypes::UNINITIALIZED}; + ProxyOpType opType{ProxyOpType::SEND}; + ProxyOpStepStatus status{ProxyOpStepStatus::UNINITIALIZED}; + std::chrono::time_point startTs{}; + std::chrono::time_point lastUpdateTs{}; + void computeStatus(); + // str the entry to a string + std::string str(); +}; + +using ProxyActiveOpMap = std::unordered_map< + uint64_t /* commHash*/, + std::unordered_map>>; + +using ProxyActiveOpIdTracker = + std::unordered_map>; + +class ProxyTrace { +public: + ProxyTrace(int32_t rank) : myRank(rank) {} + ProxyTrace(const ProxyTrace &) = delete; + ProxyTrace &operator=(const ProxyTrace &) = delete; + bool initialized{false}; + void checkOpCompleted(const ProxyTraceRecordKey &key); + + void addNewProxyTraceOpImpl(const ProxyTraceRecordKey &key, + const ProxyTraceExtraInfo &extraInfo, + ProxyOpType opType, int channelId, int nSteps, + uint32_t nbytes, int peerRank); + + // Get a unique proxyOpId for a given commHash:opCount + // If the opCount is not found, create a new entry for it and return 0 + int64_t getOrCreateProxyOpId(uint64_t commHash, uint64_t opCount); + + // Dump all trace for a given communicator + std::string dump(uint64_t commHash); + + // Dump all active ops + std::string dump(); + + // check if an active send/recv operation exists for a given commHash:opCount + bool checkActiveOpExist(uint64_t commHash, uint64_t opCount, + uint32_t proxyOpId) const; + + ProxyTraceOp *getProxyTraceOpPtr(const ProxyTraceRecordKey &traceKey); + float getMapSizeMB() const; + void resetAll(); + +private: + int myRank{-1}; + + // Current active send/recv operations. + // Use map to quickly find the record with commHash:opCount:proxyOpId during + // active progress. Note that each op may not complete in order, e.g., + // proxyOpId 1 may finish before proxyOpId 0 if they are to different peers. + // Thus, the inner-most layer has to still be a map for searching by + // proxyOpId, no matter other ops are completed or not. + ProxyActiveOpMap activeOps; + ProxyActiveOpIdTracker activeOpIdTracker; + + // keep track of the recent completed ops; + // A record is a pair of traceKey.str() and ProxyTraceOp.str() + std::deque> finishedOps; +}; +struct ncclProxySubArgs; +void proxyTraceInit(std::unique_ptr &proxyTrace, + int32_t rank, uint64_t commHash); + +void updateProxyOpCounter(std::unique_ptr &proxyTraceObj, + const ProxyTraceRecordKey &traceKey, + ProxyCounterTypes counter, int64_t val); + +void addNewProxyOp( + std::unique_ptr &proxyTraceObj, ProxyTraceRecordKey &key, + const ProxyTraceExtraInfo &extraInfo, ProxyOpType opType, int channelId, + int nSteps, uint32_t nbytes, int peerRank); +} // namespace facebook_rccl diff --git a/src/init.cc b/src/init.cc index 23169987f7..bce482c16a 100644 --- a/src/init.cc +++ b/src/init.cc @@ -243,6 +243,9 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) { comm->startMagic = comm->endMagic = 0; } +RCCL_PARAM_DECLARE(EnableProxyTrace); +RCCL_PARAM(EnableProxyTrace, "ENABLE_PROXY_TRACE", 0); + RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0); RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0); @@ -415,6 +418,14 @@ static ncclResult_t commFree(ncclComm_t comm) { free(comm->connectSend); free(comm->connectRecv); + if (rcclParamEnableProxyTrace()) { + WARN("ProxyTrace:"); + if (comm->proxyState && comm->proxyState->proxyTrace){ + WARN("%s", comm->proxyState->proxyTrace->dump().c_str()); + } + } + + #ifdef ENABLE_PROFILING struct ncclProf *prof, *prof_seq; prof = (struct ncclProf*)malloc(sizeof(struct ncclProf)*MAXCHANNELS*PROFILE_NUM_LAUNCHES); diff --git a/src/misc/proxy_trace/proxy_trace.cc b/src/misc/proxy_trace/proxy_trace.cc new file mode 100644 index 0000000000..b007d20ce3 --- /dev/null +++ b/src/misc/proxy_trace/proxy_trace.cc @@ -0,0 +1,264 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "proxy_trace/proxy_trace.h" +#include "debug.h" +#include "device.h" +#include "proxy.h" +#include +#include + +constexpr int32_t kFinishedProxyOpItems = 32; +static std::unordered_map + proxyStepStatusStrMap = { + {facebook_rccl::ProxyOpStepStatus::INIT, "INIT"}, + {facebook_rccl::ProxyOpStepStatus::POSTING, "POSTING"}, + {facebook_rccl::ProxyOpStepStatus::SENDING, "SENDING"}, + {facebook_rccl::ProxyOpStepStatus::RECEIVING, "RECEIVING"}, + {facebook_rccl::ProxyOpStepStatus::WAITING_GPU, "WAITING_GPU"}, + {facebook_rccl::ProxyOpStepStatus::FLUSHING, "FLUSHING"}, + {facebook_rccl::ProxyOpStepStatus::DONE, "DONE"}, + {facebook_rccl::ProxyOpStepStatus::UNINITIALIZED, "ILLEGAL"}, +}; + +void facebook_rccl::ProxyTrace::resetAll() { + activeOps.clear(); + activeOpIdTracker.clear(); + myRank = -1; + initialized = false; +} + +bool facebook_rccl::ProxyTrace::checkActiveOpExist(uint64_t commHash, + uint64_t opCount, + uint32_t proxyOpId) const { + return (activeOps.find(commHash) != activeOps.end() && + activeOps.at(commHash).find(opCount) != + activeOps.at(commHash).end() && + activeOps.at(commHash).at(opCount).find(proxyOpId) != + activeOps.at(commHash).at(opCount).end()); +} + +// Get a unique proxyOpId for a given commHash:opCount +// If the opCount is not found, create a new entry for it and return 0 +int64_t facebook_rccl::ProxyTrace::getOrCreateProxyOpId(uint64_t commHash, + uint64_t opCount) { + return activeOpIdTracker[commHash][opCount]; +} + +facebook_rccl::ProxyTraceOp * +facebook_rccl::ProxyTrace::getProxyTraceOpPtr(const ProxyTraceRecordKey &key) { + if (checkActiveOpExist(key.commHash, key.opCount, key.proxyOpId)) { + return &(activeOps.at(key.commHash).at(key.opCount).at(key.proxyOpId)); + } + return nullptr; +} + +void facebook_rccl::ProxyTrace::checkOpCompleted( + const ProxyTraceRecordKey &key) { + if (checkActiveOpExist(key.commHash, key.opCount, key.proxyOpId)) { + auto &traceOp = activeOps[key.commHash][key.opCount][key.proxyOpId]; + // Remove finished proxyOp or colls to avoid memory leak + if (traceOp.counters[facebook_rccl::ProxyCounterTypes::DONE] == + traceOp.nSteps) { + traceOp.status = ProxyOpStepStatus::DONE; + if (finishedOps.size() >= kFinishedProxyOpItems) { + finishedOps.pop_front(); + } + finishedOps.push_back({key.str(), traceOp.str()}); + activeOps[key.commHash][key.opCount].erase(key.proxyOpId); + if (activeOps[key.commHash][key.opCount].empty()) { + activeOps[key.commHash].erase(key.opCount); + activeOpIdTracker[key.commHash].erase(key.opCount); + } + } + } else { + WARN("[proxyTrace] ProxyTraceOp %s not found", key.str().c_str()); + } +} + +void facebook_rccl::ProxyTrace::addNewProxyTraceOpImpl( + const ProxyTraceRecordKey &key, const ProxyTraceExtraInfo &extraInfo, + ProxyOpType opType, int channelId, int nSteps, uint32_t nbytes, + int peerRank) { + if (nSteps > 0 && + !checkActiveOpExist(key.commHash, key.opCount, key.proxyOpId)) { + auto traceOp = facebook_rccl::ProxyTraceOp(); + traceOp.opType = opType; + traceOp.traceKey = key; + traceOp.extraInfo = extraInfo; + traceOp.channelId = channelId; + traceOp.nSteps = nSteps; + traceOp.nbytes = nbytes; + traceOp.myRank = this->myRank; + traceOp.peerRank = peerRank; + traceOp.startTs = std::chrono::high_resolution_clock::now(); + traceOp.status = ProxyOpStepStatus::INIT; + activeOpIdTracker[key.commHash][key.opCount]++; + activeOps[key.commHash][key.opCount].emplace(key.proxyOpId, + std::move(traceOp)); + } else if (nSteps == 0) { + INFO(NCCL_PROXY, "nSteps is 0, ignored %s", key.str().c_str()); + } +} + +void facebook_rccl::ProxyTraceOp::computeStatus() { + ProxyOpStepStatus newStatus; + int posted = counters[facebook_rccl::ProxyCounterTypes::POSTED]; + int received = counters[facebook_rccl::ProxyCounterTypes::RECEIVED]; + int transmitted = counters[facebook_rccl::ProxyCounterTypes::TRANSMITTED]; + int done = counters[facebook_rccl::ProxyCounterTypes::DONE]; + if (opType == ProxyOpType::RECV) { + if (posted < nSteps && posted < done + NCCL_STEPS) + newStatus = ProxyOpStepStatus::POSTING; // Init + else if (received < posted) + newStatus = ProxyOpStepStatus::RECEIVING; // Receiving + else if (received < transmitted) + newStatus = ProxyOpStepStatus::RECEIVING; // Receiving + else if (transmitted < received) + newStatus = ProxyOpStepStatus::FLUSHING; // Flushing + else if (done < transmitted) + newStatus = ProxyOpStepStatus::WAITING_GPU; // Waiting on GPU + else + newStatus = ProxyOpStepStatus::DONE; // Done + } else { + if (posted < nSteps && posted < done + NCCL_STEPS) + newStatus = ProxyOpStepStatus::POSTING; // Init + else if (transmitted < posted) + newStatus = ProxyOpStepStatus::WAITING_GPU; // Waiting on GPU + else if (done < transmitted) + newStatus = ProxyOpStepStatus::SENDING; // Sending + else + newStatus = ProxyOpStepStatus::DONE; // Done + } + this->status = newStatus; +} + +std::string facebook_rccl::ProxyTrace::dump(uint64_t commHash) { + std::string result = fmt::format("commDump for commHash:{}\n", commHash); + std::map sortedDumpStrMap; + for (auto &opCountMap : activeOps.at(commHash)) { + for (auto &proxyOpMap : opCountMap.second) { + ProxyTraceRecordKey traceKey = {commHash, opCountMap.first, + proxyOpMap.first}; + proxyOpMap.second.computeStatus(); + sortedDumpStrMap[traceKey.str()] = proxyOpMap.second.str(); + } + } + for (const auto &[keyStr, proxyOpStr] : sortedDumpStrMap) { + result += proxyOpStr; + } + return result; +} + +std::string facebook_rccl::ProxyTrace::dump() { + std::string result = "commDump for all active ops "; + result += fmt::format("mapSizeMB:{:.2f}\n", getMapSizeMB()); + + // maps serialized key to serliazed proxyOp; sorted by key + std::map sortedDumpStrMap; + for (auto &[commHash, opCountMap] : activeOps) { + for (auto &[opCount, proxyOpMap] : opCountMap) { + for (auto &[opId, opEntry] : proxyOpMap) { + ProxyTraceRecordKey traceKey = {commHash, opCount, opId}; + opEntry.computeStatus(); + sortedDumpStrMap[traceKey.str()] = opEntry.str(); + } + } + } + + // add the recent finished ops as well + for (const auto &[keyStr, proxyOpStr] : finishedOps) { + sortedDumpStrMap[keyStr] = proxyOpStr; + } + for (const auto &[keyStr, proxyOpStr] : sortedDumpStrMap) { + result += proxyOpStr; + } + return result; +} + +std::string facebook_rccl::ProxyTraceOp::str() { + computeStatus(); + std::string ret = fmt::format( + "createT:{}, lastT:{}, cntNm:{}, {}, {}, {}->{}({}), " + "chan:{}, status:{}, ns:{}, nb:{}, po:{}, ke:{}, tail/h:{}, recvT:{}, " + "connSz/h:{}, trans:{}, flushed:{}, recvd:{}, done:{}\n", + std::chrono::duration_cast( + startTs.time_since_epoch()) + .count(), + std::chrono::duration_cast( + lastUpdateTs.time_since_epoch()) + .count(), + static_cast(lastUpdatingCounter), traceKey.str(), extraInfo.str(), + myRank, peerRank, opType == ProxyOpType::SEND ? "S" : "R", channelId, + proxyStepStatusStrMap[status], nSteps, nbytes, + counters[ProxyCounterTypes::POSTED], + counters[ProxyCounterTypes::KERNEL_COPY_READY], + counters[ProxyCounterTypes::TAIL_OR_HEAD], + counters[ProxyCounterTypes::RECV_TAIL], + counters[ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE], + counters[ProxyCounterTypes::TRANSMITTED], + counters[ProxyCounterTypes::FLUSHED], + counters[ProxyCounterTypes::RECEIVED], counters[ProxyCounterTypes::DONE]); + return ret; +} + +float facebook_rccl::ProxyTrace::getMapSizeMB() const { + float size = 0; + for (const auto &[commHash, opCountMap] : activeOps) { + for (const auto &[opCount, proxyOpMap] : opCountMap) { + size += proxyOpMap.size() * + (sizeof(ProxyTraceOp) + + sizeof(std::unique_ptr)); + } + } + for (const auto &[keyStr, proxyOpStr] : finishedOps) { + size += keyStr.size() + proxyOpStr.size(); + } + return size / 1024.0 / 1024.0; +} + +void facebook_rccl::proxyTraceInit(std::unique_ptr &proxyTrace, + int32_t rank, uint64_t commHash) { + if (proxyTrace) { + WARN("[proxyTrace] Initializing non-empty proxyTrace! rank: %d, commHash: " + "%lu", + rank, commHash); + return; + } + INFO(NCCL_PROXY, "Initializing ProxyTrace, rank: %d, commHash: %lu", rank, + commHash); + proxyTrace = std::make_unique(rank); + proxyTrace->initialized = true; +} + +void facebook_rccl::updateProxyOpCounter( + std::unique_ptr &proxyTraceObj, + const ProxyTraceRecordKey &traceKey, ProxyCounterTypes counter, + int64_t val) { + if (proxyTraceObj) { + auto traceOpPtr = proxyTraceObj->getProxyTraceOpPtr(traceKey); + if (traceOpPtr) { + traceOpPtr->counters[counter] = val; + traceOpPtr->lastUpdateTs = std::chrono::high_resolution_clock::now(); + traceOpPtr->lastUpdatingCounter = counter; + proxyTraceObj->checkOpCompleted(traceKey); + } + } +} + +void facebook_rccl::addNewProxyOp(std::unique_ptr &proxyTraceObj, + ProxyTraceRecordKey &key, + const ProxyTraceExtraInfo &extraInfo, + ProxyOpType opType, int channelId, int nSteps, + uint32_t nbytes, int peerRank) { + if (proxyTraceObj) { + auto opId = proxyTraceObj->getOrCreateProxyOpId(key.commHash, key.opCount); + key.proxyOpId = opId; + proxyTraceObj->addNewProxyTraceOpImpl(key, extraInfo, opType, channelId, + nSteps, nbytes, peerRank); + } +} \ No newline at end of file diff --git a/src/proxy.cc b/src/proxy.cc index b392da416d..dbd4a80ace 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -368,6 +368,8 @@ ncclResult_t dumpProxyState(struct ncclProxyProgressState* state) { return ncclSuccess; } +RCCL_PARAM_DECLARE(EnableProxyTrace); + static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyArgs* args, int subIndex) { struct ncclProxySubArgs* sub = args->subs+subIndex; if (subIndex >= NCCL_PROXY_MAX_SUBS) { @@ -398,6 +400,14 @@ static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyAr sub->ringAlgo = op->ringAlgo; sub->workCounter = op->workCounter; args->nsubs = subIndex+1; + if (rcclParamEnableProxyTrace()) { + sub->traceKey = op->traceKey; + sub->traceInfo.funcIdx = op->coll; + sub->traceInfo.protocol = op->protocol; + sub->traceInfo.pattern = op->pattern; + sub->traceInfo.totalBytes = op->totalBytes; + sub->traceInfo.chunkSize = op->chunkSize; + } if (subIndex) { if ((args->sliceSteps != op->sliceSteps) || (args->chunkSteps != op->chunkSteps) || @@ -1836,6 +1846,9 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union comm->proxyState->listenSock = sock; comm->proxyState->peerAddresses = peerAddresses; comm->proxyState->peerAddressesUDS = peerAddressesUDS; + if (rcclParamEnableProxyTrace()) { + facebook_rccl::proxyTraceInit(comm->proxyState->proxyTrace, comm->rank, comm->commHash); + } // UDS support NCCLCHECK(ncclIpcSocketInit(&comm->proxyState->ipcSock, comm->rank, peerAddressesUDS[comm->rank], comm->abortFlag)); diff --git a/src/transport/net.cc b/src/transport/net.cc index 956187730c..4c858b299b 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1244,6 +1244,9 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct resources->step = sub->base + sub->nsteps; sub->posted = sub->transmitted = sub->done = 0; ncclProfilerStartSendProxyOpEvent(s, args); + facebook_rccl::addNewProxyOp(proxyState->proxyTrace, sub->traceKey, + sub->traceInfo, facebook_rccl::ProxyOpType::SEND, + sub->channelId, sub->nsteps, sub->nbytes, sub->peer); if (!sub->reg) sub->sendMhandle = resources->mhandles[args->protocol]; } @@ -1285,6 +1288,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct } ncclProfilerRecordProxyOpEventState(s, args, sub->posted, sub->transSize, ncclProfilerProxyOpSendPosted); ncclProfilerRecordProxyStepEventState(s, args, postedStepId, ncclProfilerProxyStepSendGPUWait); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::POSTED, sub->posted); args->idle = 0; continue; } @@ -1293,6 +1297,16 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct int buffSlot = (sub->base+sub->transmitted)%NCCL_STEPS; volatile uint64_t* recvTail = &resources->recvMem->tail; uint64_t tail = sub->base + sub->transmitted; + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, + facebook_rccl::ProxyCounterTypes::RECV_TAIL, *recvTail); + + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, + facebook_rccl::ProxyCounterTypes::TAIL_OR_HEAD, tail); + + facebook_rccl::updateProxyOpCounter( + proxyState->proxyTrace, sub->traceKey, + facebook_rccl::ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE, connFifo[buffSlot].size); + if (connFifo[buffSlot].size != -1 && (*recvTail > tail || p == NCCL_PROTO_LL)) { // We have something to receive, let's check if it's completely ready. int size = connFifo[buffSlot].size; @@ -1340,6 +1354,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct *resources->curr_hdp_reg = 1; } ncclProfilerRecordProxyOpEventState(s, args, sub->transmitted+args->sliceSteps, sub->transSize, ncclProfilerProxyOpSendRemFifoWait); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, + facebook_rccl::ProxyCounterTypes::KERNEL_COPY_READY, sub->reg ? 1: sub->transmitted + args->sliceSteps); // Data is ready, try to send. // Coverity complains about the size here as pointing to an out-of-scope temporary. Which is nonsense, // since size is a plain integer. @@ -1369,6 +1385,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct sub->profilerSteps++; ncclProfilerRecordProxyOpEventState(s, args, sub->transmitted, sub->transSize, ncclProfilerProxyOpSendTransmitted); ncclProfilerRecordProxyStepEventState(s, args, transmittedStepId, ncclProfilerProxyStepSendWait); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, + facebook_rccl::ProxyCounterTypes::TRANSMITTED, sub->transmitted); args->idle = 0; continue; } @@ -1435,7 +1453,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct sub->done += args->sliceSteps; ncclProfilerStopProxyStepEvent(s, args, doneStepId); ncclProfilerRecordProxyOpEventState(s, args, sub->done, sub->transSize, ncclProfilerProxyOpSendDone); - + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, + facebook_rccl::ProxyCounterTypes::DONE, sub->done); if (resources->shared == 0) { volatile uint64_t* sendHead = resources->gdcSync ? resources->gdcSync : &resources->sendMem->head; *sendHead = sub->base + sub->done; @@ -1502,6 +1521,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct sub->regBufferReady = 0; for (int i=0; iproxyTrace, sub->traceKey, sub->traceInfo, + facebook_rccl::ProxyOpType::RECV, sub->channelId, sub->nsteps, sub->nbytes, sub->peer); if (!sub->reg) sub->recvMhandle = resources->mhandles[args->protocol]; } @@ -1600,6 +1621,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct sub->profilerSteps++; ncclProfilerRecordProxyOpEventState(s+i, args, sub->posted, sub->transSize, ncclProfilerProxyOpRecvPosted); ncclProfilerRecordProxyStepEventState(s+i, args, postedStepId, ncclProfilerProxyStepRecvWait); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, + sub->traceKey, facebook_rccl::ProxyCounterTypes::POSTED, sub->posted); } args->idle = 0; } @@ -1648,6 +1671,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct sub->received += args->sliceSteps; ncclProfilerRecordProxyOpEventState(s+i, args, sub->received, sub->transSize, ncclProfilerProxyOpRecvReceived); ncclProfilerRecordProxyStepEventState(s+i, args, receivedStepId, ncclProfilerProxyStepRecvFlushWait); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::RECEIVED, sub->received); if (step < sub->nsteps) { struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources); if (resources->useGdr) needFlush |= resources->needFlush; @@ -1704,6 +1728,13 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct } struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS))); + // [Added-comment] iflush has been posted for this subGroup at `step` + if (subGroup->requests[step%NCCL_STEPS]){ + for(int i=0; igroupSize; i++) { + struct ncclProxySubArgs* sub = subGroup + i; + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::FLUSHED, sub->received); + } + } if (once) { once = false; INFO(NCCL_INIT, "%s: issued GDR flush", __func__); @@ -1731,11 +1762,13 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct sub->transmitted += args->sliceSteps; ncclProfilerRecordProxyOpEventState(s+i, args, sub->transmitted, sub->transSize, ncclProfilerProxyOpRecvTransmitted); ncclProfilerRecordProxyStepEventState(s+i, args, transmittedStepId, ncclProfilerProxyStepRecvGPUWait); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::TRANSMITTED, sub->transmitted); if (step < sub->nsteps) { __sync_synchronize(); struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources); volatile uint64_t* recvTail = resources->gdcSync ? resources->gdcSync : &resources->recvMem->tail; *recvTail = sub->base + sub->transmitted; + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::RECV_TAIL, *recvTail); if (resources->gdcSync) wc_store_fence(); // Flush out WC write } } @@ -1755,6 +1788,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources); volatile uint64_t* sendHead = &resources->sendMem->head; uint64_t done = *sendHead; + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::TAIL_OR_HEAD, done); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE, sub->base + sub->done); while (done > sub->base + sub->done && // LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted. sub->transmitted > sub->done) { @@ -1767,6 +1802,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct sub->done += args->sliceSteps; ncclProfilerStopProxyStepEvent(s+i, args, doneStepId); ncclProfilerRecordProxyOpEventState(s+i, args, sub->done, sub->transSize, ncclProfilerProxyOpRecvDone); + facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::DONE, sub->done); args->idle = 0; if (sub->done == sub->nsteps) { args->done++; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 2fae078a66..277d07d4fa 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -56,6 +56,9 @@ if(BUILD_TESTS) common/TestBed.cpp common/TestBedChild.cpp common/StandaloneUtils.cpp + ../src/misc/recorder.cc + proxy_trace/ProxyTraceUnitTests.cpp + ../src/misc/proxy_trace/proxy_trace.cc ) # Append a file if BUILD_TESTS is ON and build type is not Debug @@ -94,6 +97,7 @@ if(BUILD_TESTS) target_link_libraries(rccl-UnitTests PRIVATE hip::host hip::device hsa-runtime64::hsa-runtime64) target_link_libraries(rccl-UnitTests PRIVATE Threads::Threads) target_link_libraries(rccl-UnitTests PRIVATE dl) + target_link_libraries(rccl-UnitTests PRIVATE fmt::fmt) if(OPENMP_TESTS_ENABLED) target_link_libraries(rccl-UnitTests PRIVATE "${OpenMP_CXX_FLAGS}") endif() diff --git a/test/proxy_trace/ProxyTraceUnitTests.cpp b/test/proxy_trace/ProxyTraceUnitTests.cpp new file mode 100644 index 0000000000..a62d394442 --- /dev/null +++ b/test/proxy_trace/ProxyTraceUnitTests.cpp @@ -0,0 +1,134 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "proxy.h" +#include "proxy_trace/proxy_trace.h" +#include +#include +#include +namespace RcclUnitTesting { + +class ProxyTraceTestFixture : public ::testing::Test { +public: + ncclProxyState *proxyState; + ncclProxySubArgs *sub1, *sub2; + uint64_t commHash = 123456789; + int64_t opCount = 31; + int nSteps = 10; + void SetUp() override { + proxyState = new ncclProxyState(); + facebook_rccl::proxyTraceInit(proxyState->proxyTrace, 0, commHash); + EXPECT_NE(proxyState->proxyTrace, nullptr); + sub1 = new ncclProxySubArgs(); + sub2 = new ncclProxySubArgs(); + sub1->traceKey = {commHash, opCount, -1}; + sub2->traceKey = {commHash, opCount, -1}; + sub1->nsteps = nSteps; + sub2->nsteps = nSteps; + } + void TearDown() override { + delete sub1; + delete sub2; + delete proxyState; + } + void AddTraceOp(ncclProxySubArgs *sub, facebook_rccl::ProxyOpType opType) { + facebook_rccl::addNewProxyOp(proxyState->proxyTrace, sub->traceKey, + sub->traceInfo, opType, sub->channelId, + sub->nsteps, sub->nbytes, sub->peer); + } +}; + +TEST_F(ProxyTraceTestFixture, nonEmptySingleton) { + const auto &tracer = proxyState->proxyTrace; + EXPECT_NE(tracer, nullptr); +} + +TEST_F(ProxyTraceTestFixture, addTraceOp) { + auto &tracer = proxyState->proxyTrace; + EXPECT_EQ(tracer->getOrCreateProxyOpId(sub1->traceKey.commHash, + sub1->traceKey.opCount), + 0); + AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND); + EXPECT_EQ(sub1->traceKey.proxyOpId, 0); + AddTraceOp(sub2, facebook_rccl::ProxyOpType::RECV); + EXPECT_EQ(sub2->traceKey.proxyOpId, 1); + EXPECT_EQ(tracer->getOrCreateProxyOpId(sub1->traceKey.commHash, + sub1->traceKey.opCount), + 2); + auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey); + EXPECT_EQ(traceRecordPtr->opType, facebook_rccl::ProxyOpType::SEND); +} + +TEST_F(ProxyTraceTestFixture, getMapSizeMB) { + auto &tracer = proxyState->proxyTrace; + AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND); + auto size1 = tracer->getMapSizeMB(); + EXPECT_GT(size1, 0); + AddTraceOp(sub2, facebook_rccl::ProxyOpType::RECV); + auto size2 = tracer->getMapSizeMB(); + EXPECT_GT(size2, size1); + // finish sub1 + sub1->done = nSteps; + facebook_rccl::updateProxyOpCounter(tracer, sub1->traceKey, + facebook_rccl::ProxyCounterTypes::DONE, + sub1->done); + // sub1 is now serialized and should be moved from activeOps to finishedOps + auto size3 = tracer->getMapSizeMB(); + EXPECT_GT(size3, size1); +} + +TEST_F(ProxyTraceTestFixture, updateTraceOp) { + auto &tracer = proxyState->proxyTrace; + AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND); + facebook_rccl::updateProxyOpCounter( + tracer, sub1->traceKey, + facebook_rccl::ProxyCounterTypes::KERNEL_COPY_READY, 1); + facebook_rccl::updateProxyOpCounter( + tracer, sub1->traceKey, facebook_rccl::ProxyCounterTypes::POSTED, 3); + facebook_rccl::updateProxyOpCounter( + tracer, sub1->traceKey, facebook_rccl::ProxyCounterTypes::TRANSMITTED, 2); + + auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey); + EXPECT_NE(traceRecordPtr, nullptr); + EXPECT_EQ(traceRecordPtr->counters[facebook_rccl::ProxyCounterTypes::POSTED], + 3); + EXPECT_EQ( + traceRecordPtr->counters[facebook_rccl::ProxyCounterTypes::TRANSMITTED], + 2); + EXPECT_EQ(traceRecordPtr + ->counters[facebook_rccl::ProxyCounterTypes::KERNEL_COPY_READY], + 1); + EXPECT_GE(traceRecordPtr->lastUpdateTs, traceRecordPtr->startTs); +} + +TEST_F(ProxyTraceTestFixture, updateTraceOp2) { + auto &tracer = proxyState->proxyTrace; + AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND); + int64_t rand = 123456789; + sub1->posted = rand; + facebook_rccl::updateProxyOpCounter(tracer, sub1->traceKey, + facebook_rccl::ProxyCounterTypes::POSTED, + sub1->posted); + auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey); + EXPECT_EQ(traceRecordPtr->counters[facebook_rccl::ProxyCounterTypes::POSTED], + rand); +} + +TEST_F(ProxyTraceTestFixture, memoryReclaim) { + auto &tracer = proxyState->proxyTrace; + tracer->resetAll(); + AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND); + sub1->done = nSteps; + facebook_rccl::updateProxyOpCounter(tracer, sub1->traceKey, + facebook_rccl::ProxyCounterTypes::DONE, + sub1->done); + auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey); + EXPECT_EQ(traceRecordPtr, nullptr); + EXPECT_GT(tracer->getMapSizeMB(), 0); +} + +} // namespace RcclUnitTesting \ No newline at end of file