Add proxyTrace (#1732)

This feature tracks the proxy events and status of each send/recv op. ProxyTrace keeps a fixed number of active ops in host mem and dumps the status of each op when the program crashes or hangs.
This commit is contained in:
Dingming Wu
2025-06-25 21:01:34 -07:00
کامیت شده توسط GitHub
والد 568777a9bf
کامیت 020dcf0a7c
11فایلهای تغییر یافته به همراه693 افزوده شده و 2 حذف شده
+4
مشاهده پرونده
@@ -562,6 +562,7 @@ set(SRC_FILES
src/include/nvtx3/nvtxDetail/nvtxInitDefs.h
src/include/nvtx3/nvtxDetail/nvtxLinkOnce.h
src/include/nvtx3/nvtxDetail/nvtxTypes.h
src/include/proxy_trace/proxy_trace.h
src/include/plugin/nccl_net.h
src/include/plugin/nccl_profiler.h
src/include/plugin/nccl_tuner.h
@@ -608,6 +609,7 @@ set(SRC_FILES
src/misc/msccl/msccl_parser.cc
src/misc/msccl/msccl_setup.cc
src/misc/msccl/msccl_status.cc
src/misc/proxy_trace/proxy_trace.cc
src/plugin/net.cc
src/plugin/plugin_open.cc
src/plugin/profiler.cc
@@ -754,6 +756,7 @@ add_custom_target(git_version_check
VERBATIM
)
# Set up RCCL library
#==================================================================================================
## Set RCCL source files
@@ -1078,6 +1081,7 @@ target_link_libraries(rccl INTERFACE hip::host)
target_link_libraries(rccl PRIVATE hip::device)
target_link_libraries(rccl PRIVATE dl)
target_link_libraries(rccl PRIVATE ${ROCM_SMI_LIBRARIES})
target_link_libraries(rccl PRIVATE fmt::fmt)
if(ENABLE_MSCCLPP)
target_link_libraries(rccl PRIVATE mscclpp_nccl)
endif()
+20
مشاهده پرونده
@@ -32,6 +32,8 @@
# For downloading, building, and installing required dependencies
include(cmake/DownloadProject.cmake)
include(FetchContent)
if(NOT INSTALL_DEPENDENCIES)
find_package(GTest 1.11)
endif()
@@ -105,6 +107,24 @@ if(NOT ROCM_FOUND)
find_package( ROCM 0.7.3 REQUIRED CONFIG PATHS ${PROJECT_EXTERN_DIR}/rocm-cmake )
endif()
set(CMAKE_INSTALL_LIBDIR lib CACHE STRING "Define install directory for libraries" FORCE)
# Find or download/install fmt
find_package(fmt QUIET)
if(NOT fmt_FOUND)
message(STATUS "fmt not found, fetching from source...")
FetchContent_Declare(
fmt
GIT_REPOSITORY https://github.com/fmtlib/fmt
GIT_TAG e69e5f977d458f2650bb346dadf2ad30c5320281 # 10.2.1
)
FetchContent_MakeAvailable(fmt)
else()
message(STATUS "Using system fmt")
get_target_property(FMT_INCLUDE_DIRS fmt::fmt INTERFACE_INCLUDE_DIRECTORIES)
message(STATUS "fmt include directories: ${FMT_INCLUDE_DIRS}")
endif()
# Find available local ROCM targets
# NOTE: This will eventually be part of ROCm-CMake and should be removed at that time
function(rocm_local_targets VARIABLE)
+11 -1
مشاهده پرونده
@@ -139,11 +139,16 @@ static inline int ncclFuncTrafficPerByte(ncclFunc_t func, int nRanks) {
}
}
RCCL_PARAM_DECLARE(EnableProxyTrace);
/*****************************************************************************/
/* Launch system : synchronization and CUDA kernel launch */
/*****************************************************************************/
static ncclResult_t addProxyOpIfNeeded(struct ncclComm* comm, struct ncclKernelPlan* plan, struct ncclProxyOp* op) {
bool needed = true;
if (rcclParamEnableProxyTrace()) {
op->traceKey.commHash = comm->commHash;
op->traceKey.opCount = comm->opCount;
}
NCCLCHECK(ncclProxySaveOp(comm, op, &needed));
if (needed) {
struct ncclProxyOp* q = ncclMemoryPoolAlloc<struct ncclProxyOp>(&comm->memPool_ncclProxyOp, &comm->memPermanent);
@@ -1030,6 +1035,9 @@ static ncclResult_t addP2pToPlan(
op->rank = comm->rank;
op->eActivationMask = p2pTasks[dir] ? p2pTasks[dir]->eActivationMask : 0;
op->connIndex = connIndex[dir];
if (rcclParamEnableProxyTrace()) {
op->coll = dir ? ncclFuncSend : ncclFuncRecv;
}
// The following are modified per channel part in addWorkToChannels():
// op->buffer, op->nbytes, op->nsteps = ...;
}
@@ -1052,7 +1060,9 @@ static ncclResult_t addP2pToPlan(
int nParts = dir ? work->nSendChannels : work->nRecvChannels;
void* addr = dir ? work->sendAddr : work->recvAddr;
size_t bytes = dir ? work->sendBytes : work->recvBytes;
if (rcclParamEnableProxyTrace()) {
proxyOps[dir].totalBytes = bytes;
}
proxyOps[dir].recvbuff = nullptr;
if (nParts <= part) {
proxyOps[dir].nsteps = 0;
+14
مشاهده پرونده
@@ -18,6 +18,7 @@
#include "shmutils.h"
#include "p2p.h"
#include "collectives.h"
#include "proxy_trace/proxy_trace.h"
typedef enum : uint8_t {
ncclPatternRing,
@@ -109,6 +110,12 @@ struct ncclProxyOp {
uint64_t workCounter;
struct ncclProxyOp *enqNext;
// Used to track total real bytes of this op
uint32_t totalBytes;
// Used to fetch/update the proxyOp in ProxyTrace map
facebook_rccl::ProxyTraceRecordKey traceKey;
facebook_rccl::ProxyTraceExtraInfo traceInfo;
};
struct ncclProxySubArgs {
@@ -160,6 +167,10 @@ struct ncclProxySubArgs {
int npKitSizesFifo[NCCL_STEPS];
uint64_t timestamp[NCCL_STEPS];
#endif
// Used to fetch/update the proxyOp in ProxyTrace map
facebook_rccl::ProxyTraceRecordKey traceKey;
facebook_rccl::ProxyTraceExtraInfo traceInfo;
};
struct ncclProxyArgs {
@@ -352,6 +363,9 @@ struct ncclProxyState {
// Queue of expected responses from the proxy
struct ncclExpectedProxyResponse* expectedResponses;
// A handle to the proxy traces
std::unique_ptr<facebook_rccl::ProxyTrace> proxyTrace;
};
enum proxyConnectState {
@@ -0,0 +1,181 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <chrono>
#include <cstdint>
#include <deque>
#include <fmt/format.h>
#include <memory>
#include <unordered_map>
namespace facebook_rccl {
enum class ProxyOpStepStatus {
INIT,
POSTING,
SENDING,
RECEIVING,
WAITING_GPU,
FLUSHING,
DONE,
UNINITIALIZED,
NUM_STATUS,
};
// All counters have default value of 0, except for FIFO_SZ_OR_HEAD_CACHE, which
// has default value of -1
enum class ProxyCounterTypes {
POSTED = 0,
KERNEL_COPY_READY,
// ready-to-receive signal received from remote
RTR_RECV,
// ready-to-send signal sent to remote
RTS_SEND,
RECEIVED,
TRANSMITTED,
FLUSHED,
DONE,
// tail pointer of data ready to be sent, updated by kernel
RECV_TAIL,
// tail for sender, head for receiver, updated by kernel
TAIL_OR_HEAD,
// for sender this is data size of D2D copy; for receiver this is head cache,
// i.e., sub->base + sub->done
FIFO_SZ_OR_HEAD_CACHE,
UNINITIALIZED = 100
};
enum class ProxyOpType { SEND, RECV };
// ProxyTraceRecordKey and ProxyTraceExtraInfo is used to pass arguments to
// proxy thread (see ncclProxyOp and ncclProxySubArgs in proxy.h)
struct ProxyTraceRecordKey {
uint64_t commHash{0};
int64_t opCount{-1}; // opCount is a unique id for a given collective/p2p
int64_t proxyOpId{-1}; // id of a proxyOp in an given comm and grouped
// collective/p2p (identified as commHash:opCount),
// assigned when creating ProxyTraceOp entry
inline std::string str() const {
return "<" + std::to_string(commHash) + ":" + std::to_string(opCount) +
":" + std::to_string(proxyOpId) + ">";
}
};
struct ProxyTraceExtraInfo {
int32_t funcIdx{-1};
int32_t protocol{-1};
int32_t pattern{-1};
uint32_t totalBytes{0};
uint32_t chunkSize{0};
inline std::string str() const {
return fmt::format("[fu,pr,pa,tb,ck]:{},{},{},{},{}", funcIdx, protocol,
pattern, totalBytes, chunkSize);
}
};
// record progress state per comm per collective per proxyOp
struct ProxyTraceOp {
ProxyTraceRecordKey traceKey;
ProxyTraceExtraInfo extraInfo;
int32_t channelId{-1};
int32_t nSteps{-1};
uint32_t nbytes{0};
int32_t myRank{-1};
int32_t peerRank{-1};
std::unordered_map<ProxyCounterTypes, int64_t> counters{
{ProxyCounterTypes::POSTED, 0},
{ProxyCounterTypes::KERNEL_COPY_READY, 0},
{ProxyCounterTypes::RTR_RECV, 0},
{ProxyCounterTypes::RTS_SEND, 0},
{ProxyCounterTypes::RECEIVED, 0},
{ProxyCounterTypes::TRANSMITTED, 0},
{ProxyCounterTypes::FLUSHED, 0},
{ProxyCounterTypes::DONE, 0},
{ProxyCounterTypes::RECV_TAIL, 0},
{ProxyCounterTypes::TAIL_OR_HEAD, 0},
{ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE, -1},
};
ProxyCounterTypes lastUpdatingCounter{ProxyCounterTypes::UNINITIALIZED};
ProxyOpType opType{ProxyOpType::SEND};
ProxyOpStepStatus status{ProxyOpStepStatus::UNINITIALIZED};
std::chrono::time_point<std::chrono::high_resolution_clock> startTs{};
std::chrono::time_point<std::chrono::high_resolution_clock> lastUpdateTs{};
void computeStatus();
// str the entry to a string
std::string str();
};
using ProxyActiveOpMap = std::unordered_map<
uint64_t /* commHash*/,
std::unordered_map<int64_t /* opCount*/,
/* proxyOpId : op */
std::unordered_map<int64_t, ProxyTraceOp>>>;
using ProxyActiveOpIdTracker =
std::unordered_map<uint64_t /* commHash*/,
std::unordered_map<int64_t /* opCount*/, int64_t>>;
class ProxyTrace {
public:
ProxyTrace(int32_t rank) : myRank(rank) {}
ProxyTrace(const ProxyTrace &) = delete;
ProxyTrace &operator=(const ProxyTrace &) = delete;
bool initialized{false};
void checkOpCompleted(const ProxyTraceRecordKey &key);
void addNewProxyTraceOpImpl(const ProxyTraceRecordKey &key,
const ProxyTraceExtraInfo &extraInfo,
ProxyOpType opType, int channelId, int nSteps,
uint32_t nbytes, int peerRank);
// Get a unique proxyOpId for a given commHash:opCount
// If the opCount is not found, create a new entry for it and return 0
int64_t getOrCreateProxyOpId(uint64_t commHash, uint64_t opCount);
// Dump all trace for a given communicator
std::string dump(uint64_t commHash);
// Dump all active ops
std::string dump();
// check if an active send/recv operation exists for a given commHash:opCount
bool checkActiveOpExist(uint64_t commHash, uint64_t opCount,
uint32_t proxyOpId) const;
ProxyTraceOp *getProxyTraceOpPtr(const ProxyTraceRecordKey &traceKey);
float getMapSizeMB() const;
void resetAll();
private:
int myRank{-1};
// Current active send/recv operations.
// Use map to quickly find the record with commHash:opCount:proxyOpId during
// active progress. Note that each op may not complete in order, e.g.,
// proxyOpId 1 may finish before proxyOpId 0 if they are to different peers.
// Thus, the inner-most layer has to still be a map for searching by
// proxyOpId, no matter other ops are completed or not.
ProxyActiveOpMap activeOps;
ProxyActiveOpIdTracker activeOpIdTracker;
// keep track of the recent completed ops;
// A record is a pair of traceKey.str() and ProxyTraceOp.str()
std::deque<std::pair<std::string, std::string>> finishedOps;
};
struct ncclProxySubArgs;
void proxyTraceInit(std::unique_ptr<ProxyTrace> &proxyTrace,
int32_t rank, uint64_t commHash);
void updateProxyOpCounter(std::unique_ptr<ProxyTrace> &proxyTraceObj,
const ProxyTraceRecordKey &traceKey,
ProxyCounterTypes counter, int64_t val);
void addNewProxyOp(
std::unique_ptr<ProxyTrace> &proxyTraceObj, ProxyTraceRecordKey &key,
const ProxyTraceExtraInfo &extraInfo, ProxyOpType opType, int channelId,
int nSteps, uint32_t nbytes, int peerRank);
} // namespace facebook_rccl
+11
مشاهده پرونده
@@ -243,6 +243,9 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) {
comm->startMagic = comm->endMagic = 0;
}
RCCL_PARAM_DECLARE(EnableProxyTrace);
RCCL_PARAM(EnableProxyTrace, "ENABLE_PROXY_TRACE", 0);
RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0);
RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0);
@@ -415,6 +418,14 @@ static ncclResult_t commFree(ncclComm_t comm) {
free(comm->connectSend);
free(comm->connectRecv);
if (rcclParamEnableProxyTrace()) {
WARN("ProxyTrace:");
if (comm->proxyState && comm->proxyState->proxyTrace){
WARN("%s", comm->proxyState->proxyTrace->dump().c_str());
}
}
#ifdef ENABLE_PROFILING
struct ncclProf *prof, *prof_seq;
prof = (struct ncclProf*)malloc(sizeof(struct ncclProf)*MAXCHANNELS*PROFILE_NUM_LAUNCHES);
@@ -0,0 +1,264 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "proxy_trace/proxy_trace.h"
#include "debug.h"
#include "device.h"
#include "proxy.h"
#include <fmt/format.h>
#include <map>
constexpr int32_t kFinishedProxyOpItems = 32;
static std::unordered_map<facebook_rccl::ProxyOpStepStatus, std::string>
proxyStepStatusStrMap = {
{facebook_rccl::ProxyOpStepStatus::INIT, "INIT"},
{facebook_rccl::ProxyOpStepStatus::POSTING, "POSTING"},
{facebook_rccl::ProxyOpStepStatus::SENDING, "SENDING"},
{facebook_rccl::ProxyOpStepStatus::RECEIVING, "RECEIVING"},
{facebook_rccl::ProxyOpStepStatus::WAITING_GPU, "WAITING_GPU"},
{facebook_rccl::ProxyOpStepStatus::FLUSHING, "FLUSHING"},
{facebook_rccl::ProxyOpStepStatus::DONE, "DONE"},
{facebook_rccl::ProxyOpStepStatus::UNINITIALIZED, "ILLEGAL"},
};
void facebook_rccl::ProxyTrace::resetAll() {
activeOps.clear();
activeOpIdTracker.clear();
myRank = -1;
initialized = false;
}
bool facebook_rccl::ProxyTrace::checkActiveOpExist(uint64_t commHash,
uint64_t opCount,
uint32_t proxyOpId) const {
return (activeOps.find(commHash) != activeOps.end() &&
activeOps.at(commHash).find(opCount) !=
activeOps.at(commHash).end() &&
activeOps.at(commHash).at(opCount).find(proxyOpId) !=
activeOps.at(commHash).at(opCount).end());
}
// Get a unique proxyOpId for a given commHash:opCount
// If the opCount is not found, create a new entry for it and return 0
int64_t facebook_rccl::ProxyTrace::getOrCreateProxyOpId(uint64_t commHash,
uint64_t opCount) {
return activeOpIdTracker[commHash][opCount];
}
facebook_rccl::ProxyTraceOp *
facebook_rccl::ProxyTrace::getProxyTraceOpPtr(const ProxyTraceRecordKey &key) {
if (checkActiveOpExist(key.commHash, key.opCount, key.proxyOpId)) {
return &(activeOps.at(key.commHash).at(key.opCount).at(key.proxyOpId));
}
return nullptr;
}
void facebook_rccl::ProxyTrace::checkOpCompleted(
const ProxyTraceRecordKey &key) {
if (checkActiveOpExist(key.commHash, key.opCount, key.proxyOpId)) {
auto &traceOp = activeOps[key.commHash][key.opCount][key.proxyOpId];
// Remove finished proxyOp or colls to avoid memory leak
if (traceOp.counters[facebook_rccl::ProxyCounterTypes::DONE] ==
traceOp.nSteps) {
traceOp.status = ProxyOpStepStatus::DONE;
if (finishedOps.size() >= kFinishedProxyOpItems) {
finishedOps.pop_front();
}
finishedOps.push_back({key.str(), traceOp.str()});
activeOps[key.commHash][key.opCount].erase(key.proxyOpId);
if (activeOps[key.commHash][key.opCount].empty()) {
activeOps[key.commHash].erase(key.opCount);
activeOpIdTracker[key.commHash].erase(key.opCount);
}
}
} else {
WARN("[proxyTrace] ProxyTraceOp %s not found", key.str().c_str());
}
}
void facebook_rccl::ProxyTrace::addNewProxyTraceOpImpl(
const ProxyTraceRecordKey &key, const ProxyTraceExtraInfo &extraInfo,
ProxyOpType opType, int channelId, int nSteps, uint32_t nbytes,
int peerRank) {
if (nSteps > 0 &&
!checkActiveOpExist(key.commHash, key.opCount, key.proxyOpId)) {
auto traceOp = facebook_rccl::ProxyTraceOp();
traceOp.opType = opType;
traceOp.traceKey = key;
traceOp.extraInfo = extraInfo;
traceOp.channelId = channelId;
traceOp.nSteps = nSteps;
traceOp.nbytes = nbytes;
traceOp.myRank = this->myRank;
traceOp.peerRank = peerRank;
traceOp.startTs = std::chrono::high_resolution_clock::now();
traceOp.status = ProxyOpStepStatus::INIT;
activeOpIdTracker[key.commHash][key.opCount]++;
activeOps[key.commHash][key.opCount].emplace(key.proxyOpId,
std::move(traceOp));
} else if (nSteps == 0) {
INFO(NCCL_PROXY, "nSteps is 0, ignored %s", key.str().c_str());
}
}
void facebook_rccl::ProxyTraceOp::computeStatus() {
ProxyOpStepStatus newStatus;
int posted = counters[facebook_rccl::ProxyCounterTypes::POSTED];
int received = counters[facebook_rccl::ProxyCounterTypes::RECEIVED];
int transmitted = counters[facebook_rccl::ProxyCounterTypes::TRANSMITTED];
int done = counters[facebook_rccl::ProxyCounterTypes::DONE];
if (opType == ProxyOpType::RECV) {
if (posted < nSteps && posted < done + NCCL_STEPS)
newStatus = ProxyOpStepStatus::POSTING; // Init
else if (received < posted)
newStatus = ProxyOpStepStatus::RECEIVING; // Receiving
else if (received < transmitted)
newStatus = ProxyOpStepStatus::RECEIVING; // Receiving
else if (transmitted < received)
newStatus = ProxyOpStepStatus::FLUSHING; // Flushing
else if (done < transmitted)
newStatus = ProxyOpStepStatus::WAITING_GPU; // Waiting on GPU
else
newStatus = ProxyOpStepStatus::DONE; // Done
} else {
if (posted < nSteps && posted < done + NCCL_STEPS)
newStatus = ProxyOpStepStatus::POSTING; // Init
else if (transmitted < posted)
newStatus = ProxyOpStepStatus::WAITING_GPU; // Waiting on GPU
else if (done < transmitted)
newStatus = ProxyOpStepStatus::SENDING; // Sending
else
newStatus = ProxyOpStepStatus::DONE; // Done
}
this->status = newStatus;
}
std::string facebook_rccl::ProxyTrace::dump(uint64_t commHash) {
std::string result = fmt::format("commDump for commHash:{}\n", commHash);
std::map<std::string, std::string> sortedDumpStrMap;
for (auto &opCountMap : activeOps.at(commHash)) {
for (auto &proxyOpMap : opCountMap.second) {
ProxyTraceRecordKey traceKey = {commHash, opCountMap.first,
proxyOpMap.first};
proxyOpMap.second.computeStatus();
sortedDumpStrMap[traceKey.str()] = proxyOpMap.second.str();
}
}
for (const auto &[keyStr, proxyOpStr] : sortedDumpStrMap) {
result += proxyOpStr;
}
return result;
}
std::string facebook_rccl::ProxyTrace::dump() {
std::string result = "commDump for all active ops ";
result += fmt::format("mapSizeMB:{:.2f}\n", getMapSizeMB());
// maps serialized key to serliazed proxyOp; sorted by key
std::map<std::string, std::string> sortedDumpStrMap;
for (auto &[commHash, opCountMap] : activeOps) {
for (auto &[opCount, proxyOpMap] : opCountMap) {
for (auto &[opId, opEntry] : proxyOpMap) {
ProxyTraceRecordKey traceKey = {commHash, opCount, opId};
opEntry.computeStatus();
sortedDumpStrMap[traceKey.str()] = opEntry.str();
}
}
}
// add the recent finished ops as well
for (const auto &[keyStr, proxyOpStr] : finishedOps) {
sortedDumpStrMap[keyStr] = proxyOpStr;
}
for (const auto &[keyStr, proxyOpStr] : sortedDumpStrMap) {
result += proxyOpStr;
}
return result;
}
std::string facebook_rccl::ProxyTraceOp::str() {
computeStatus();
std::string ret = fmt::format(
"createT:{}, lastT:{}, cntNm:{}, {}, {}, {}->{}({}), "
"chan:{}, status:{}, ns:{}, nb:{}, po:{}, ke:{}, tail/h:{}, recvT:{}, "
"connSz/h:{}, trans:{}, flushed:{}, recvd:{}, done:{}\n",
std::chrono::duration_cast<std::chrono::milliseconds>(
startTs.time_since_epoch())
.count(),
std::chrono::duration_cast<std::chrono::milliseconds>(
lastUpdateTs.time_since_epoch())
.count(),
static_cast<int>(lastUpdatingCounter), traceKey.str(), extraInfo.str(),
myRank, peerRank, opType == ProxyOpType::SEND ? "S" : "R", channelId,
proxyStepStatusStrMap[status], nSteps, nbytes,
counters[ProxyCounterTypes::POSTED],
counters[ProxyCounterTypes::KERNEL_COPY_READY],
counters[ProxyCounterTypes::TAIL_OR_HEAD],
counters[ProxyCounterTypes::RECV_TAIL],
counters[ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE],
counters[ProxyCounterTypes::TRANSMITTED],
counters[ProxyCounterTypes::FLUSHED],
counters[ProxyCounterTypes::RECEIVED], counters[ProxyCounterTypes::DONE]);
return ret;
}
float facebook_rccl::ProxyTrace::getMapSizeMB() const {
float size = 0;
for (const auto &[commHash, opCountMap] : activeOps) {
for (const auto &[opCount, proxyOpMap] : opCountMap) {
size += proxyOpMap.size() *
(sizeof(ProxyTraceOp) +
sizeof(std::unique_ptr<facebook_rccl::ProxyTraceOp>));
}
}
for (const auto &[keyStr, proxyOpStr] : finishedOps) {
size += keyStr.size() + proxyOpStr.size();
}
return size / 1024.0 / 1024.0;
}
void facebook_rccl::proxyTraceInit(std::unique_ptr<ProxyTrace> &proxyTrace,
int32_t rank, uint64_t commHash) {
if (proxyTrace) {
WARN("[proxyTrace] Initializing non-empty proxyTrace! rank: %d, commHash: "
"%lu",
rank, commHash);
return;
}
INFO(NCCL_PROXY, "Initializing ProxyTrace, rank: %d, commHash: %lu", rank,
commHash);
proxyTrace = std::make_unique<facebook_rccl::ProxyTrace>(rank);
proxyTrace->initialized = true;
}
void facebook_rccl::updateProxyOpCounter(
std::unique_ptr<ProxyTrace> &proxyTraceObj,
const ProxyTraceRecordKey &traceKey, ProxyCounterTypes counter,
int64_t val) {
if (proxyTraceObj) {
auto traceOpPtr = proxyTraceObj->getProxyTraceOpPtr(traceKey);
if (traceOpPtr) {
traceOpPtr->counters[counter] = val;
traceOpPtr->lastUpdateTs = std::chrono::high_resolution_clock::now();
traceOpPtr->lastUpdatingCounter = counter;
proxyTraceObj->checkOpCompleted(traceKey);
}
}
}
void facebook_rccl::addNewProxyOp(std::unique_ptr<ProxyTrace> &proxyTraceObj,
ProxyTraceRecordKey &key,
const ProxyTraceExtraInfo &extraInfo,
ProxyOpType opType, int channelId, int nSteps,
uint32_t nbytes, int peerRank) {
if (proxyTraceObj) {
auto opId = proxyTraceObj->getOrCreateProxyOpId(key.commHash, key.opCount);
key.proxyOpId = opId;
proxyTraceObj->addNewProxyTraceOpImpl(key, extraInfo, opType, channelId,
nSteps, nbytes, peerRank);
}
}
+13
مشاهده پرونده
@@ -368,6 +368,8 @@ ncclResult_t dumpProxyState(struct ncclProxyProgressState* state) {
return ncclSuccess;
}
RCCL_PARAM_DECLARE(EnableProxyTrace);
static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyArgs* args, int subIndex) {
struct ncclProxySubArgs* sub = args->subs+subIndex;
if (subIndex >= NCCL_PROXY_MAX_SUBS) {
@@ -398,6 +400,14 @@ static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyAr
sub->ringAlgo = op->ringAlgo;
sub->workCounter = op->workCounter;
args->nsubs = subIndex+1;
if (rcclParamEnableProxyTrace()) {
sub->traceKey = op->traceKey;
sub->traceInfo.funcIdx = op->coll;
sub->traceInfo.protocol = op->protocol;
sub->traceInfo.pattern = op->pattern;
sub->traceInfo.totalBytes = op->totalBytes;
sub->traceInfo.chunkSize = op->chunkSize;
}
if (subIndex) {
if ((args->sliceSteps != op->sliceSteps) ||
(args->chunkSteps != op->chunkSteps) ||
@@ -1836,6 +1846,9 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union
comm->proxyState->listenSock = sock;
comm->proxyState->peerAddresses = peerAddresses;
comm->proxyState->peerAddressesUDS = peerAddressesUDS;
if (rcclParamEnableProxyTrace()) {
facebook_rccl::proxyTraceInit(comm->proxyState->proxyTrace, comm->rank, comm->commHash);
}
// UDS support
NCCLCHECK(ncclIpcSocketInit(&comm->proxyState->ipcSock, comm->rank, peerAddressesUDS[comm->rank], comm->abortFlag));
+37 -1
مشاهده پرونده
@@ -1244,6 +1244,9 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
resources->step = sub->base + sub->nsteps;
sub->posted = sub->transmitted = sub->done = 0;
ncclProfilerStartSendProxyOpEvent(s, args);
facebook_rccl::addNewProxyOp(proxyState->proxyTrace, sub->traceKey,
sub->traceInfo, facebook_rccl::ProxyOpType::SEND,
sub->channelId, sub->nsteps, sub->nbytes, sub->peer);
if (!sub->reg)
sub->sendMhandle = resources->mhandles[args->protocol];
}
@@ -1285,6 +1288,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
}
ncclProfilerRecordProxyOpEventState(s, args, sub->posted, sub->transSize, ncclProfilerProxyOpSendPosted);
ncclProfilerRecordProxyStepEventState(s, args, postedStepId, ncclProfilerProxyStepSendGPUWait);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::POSTED, sub->posted);
args->idle = 0;
continue;
}
@@ -1293,6 +1297,16 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
int buffSlot = (sub->base+sub->transmitted)%NCCL_STEPS;
volatile uint64_t* recvTail = &resources->recvMem->tail;
uint64_t tail = sub->base + sub->transmitted;
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey,
facebook_rccl::ProxyCounterTypes::RECV_TAIL, *recvTail);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey,
facebook_rccl::ProxyCounterTypes::TAIL_OR_HEAD, tail);
facebook_rccl::updateProxyOpCounter(
proxyState->proxyTrace, sub->traceKey,
facebook_rccl::ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE, connFifo[buffSlot].size);
if (connFifo[buffSlot].size != -1 && (*recvTail > tail || p == NCCL_PROTO_LL)) {
// We have something to receive, let's check if it's completely ready.
int size = connFifo[buffSlot].size;
@@ -1340,6 +1354,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
*resources->curr_hdp_reg = 1;
}
ncclProfilerRecordProxyOpEventState(s, args, sub->transmitted+args->sliceSteps, sub->transSize, ncclProfilerProxyOpSendRemFifoWait);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey,
facebook_rccl::ProxyCounterTypes::KERNEL_COPY_READY, sub->reg ? 1: sub->transmitted + args->sliceSteps);
// Data is ready, try to send.
// Coverity complains about the size here as pointing to an out-of-scope temporary. Which is nonsense,
// since size is a plain integer.
@@ -1369,6 +1385,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
sub->profilerSteps++;
ncclProfilerRecordProxyOpEventState(s, args, sub->transmitted, sub->transSize, ncclProfilerProxyOpSendTransmitted);
ncclProfilerRecordProxyStepEventState(s, args, transmittedStepId, ncclProfilerProxyStepSendWait);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey,
facebook_rccl::ProxyCounterTypes::TRANSMITTED, sub->transmitted);
args->idle = 0;
continue;
}
@@ -1435,7 +1453,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
sub->done += args->sliceSteps;
ncclProfilerStopProxyStepEvent(s, args, doneStepId);
ncclProfilerRecordProxyOpEventState(s, args, sub->done, sub->transSize, ncclProfilerProxyOpSendDone);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey,
facebook_rccl::ProxyCounterTypes::DONE, sub->done);
if (resources->shared == 0) {
volatile uint64_t* sendHead = resources->gdcSync ? resources->gdcSync : &resources->sendMem->head;
*sendHead = sub->base + sub->done;
@@ -1502,6 +1521,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
sub->regBufferReady = 0;
for (int i=0; i<groupSize; i++) sub[-i].groupSize = groupSize;
ncclProfilerStartRecvProxyOpEvent(s, args);
facebook_rccl::addNewProxyOp(proxyState->proxyTrace, sub->traceKey, sub->traceInfo,
facebook_rccl::ProxyOpType::RECV, sub->channelId, sub->nsteps, sub->nbytes, sub->peer);
if (!sub->reg)
sub->recvMhandle = resources->mhandles[args->protocol];
}
@@ -1600,6 +1621,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
sub->profilerSteps++;
ncclProfilerRecordProxyOpEventState(s+i, args, sub->posted, sub->transSize, ncclProfilerProxyOpRecvPosted);
ncclProfilerRecordProxyStepEventState(s+i, args, postedStepId, ncclProfilerProxyStepRecvWait);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace,
sub->traceKey, facebook_rccl::ProxyCounterTypes::POSTED, sub->posted);
}
args->idle = 0;
}
@@ -1648,6 +1671,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
sub->received += args->sliceSteps;
ncclProfilerRecordProxyOpEventState(s+i, args, sub->received, sub->transSize, ncclProfilerProxyOpRecvReceived);
ncclProfilerRecordProxyStepEventState(s+i, args, receivedStepId, ncclProfilerProxyStepRecvFlushWait);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::RECEIVED, sub->received);
if (step < sub->nsteps) {
struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources);
if (resources->useGdr) needFlush |= resources->needFlush;
@@ -1704,6 +1728,13 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
}
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS)));
// [Added-comment] iflush has been posted for this subGroup at `step`
if (subGroup->requests[step%NCCL_STEPS]){
for(int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup + i;
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::FLUSHED, sub->received);
}
}
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDR flush", __func__);
@@ -1731,11 +1762,13 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
sub->transmitted += args->sliceSteps;
ncclProfilerRecordProxyOpEventState(s+i, args, sub->transmitted, sub->transSize, ncclProfilerProxyOpRecvTransmitted);
ncclProfilerRecordProxyStepEventState(s+i, args, transmittedStepId, ncclProfilerProxyStepRecvGPUWait);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::TRANSMITTED, sub->transmitted);
if (step < sub->nsteps) {
__sync_synchronize();
struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources);
volatile uint64_t* recvTail = resources->gdcSync ? resources->gdcSync : &resources->recvMem->tail;
*recvTail = sub->base + sub->transmitted;
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::RECV_TAIL, *recvTail);
if (resources->gdcSync) wc_store_fence(); // Flush out WC write
}
}
@@ -1755,6 +1788,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources);
volatile uint64_t* sendHead = &resources->sendMem->head;
uint64_t done = *sendHead;
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::TAIL_OR_HEAD, done);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::FIFO_SZ_OR_HEAD_CACHE, sub->base + sub->done);
while (done > sub->base + sub->done &&
// LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted.
sub->transmitted > sub->done) {
@@ -1767,6 +1802,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
sub->done += args->sliceSteps;
ncclProfilerStopProxyStepEvent(s+i, args, doneStepId);
ncclProfilerRecordProxyOpEventState(s+i, args, sub->done, sub->transSize, ncclProfilerProxyOpRecvDone);
facebook_rccl::updateProxyOpCounter(proxyState->proxyTrace, sub->traceKey, facebook_rccl::ProxyCounterTypes::DONE, sub->done);
args->idle = 0;
if (sub->done == sub->nsteps) {
args->done++;
+4
مشاهده پرونده
@@ -56,6 +56,9 @@ if(BUILD_TESTS)
common/TestBed.cpp
common/TestBedChild.cpp
common/StandaloneUtils.cpp
../src/misc/recorder.cc
proxy_trace/ProxyTraceUnitTests.cpp
../src/misc/proxy_trace/proxy_trace.cc
)
# Append a file if BUILD_TESTS is ON and build type is not Debug
@@ -94,6 +97,7 @@ if(BUILD_TESTS)
target_link_libraries(rccl-UnitTests PRIVATE hip::host hip::device hsa-runtime64::hsa-runtime64)
target_link_libraries(rccl-UnitTests PRIVATE Threads::Threads)
target_link_libraries(rccl-UnitTests PRIVATE dl)
target_link_libraries(rccl-UnitTests PRIVATE fmt::fmt)
if(OPENMP_TESTS_ENABLED)
target_link_libraries(rccl-UnitTests PRIVATE "${OpenMP_CXX_FLAGS}")
endif()
@@ -0,0 +1,134 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "proxy.h"
#include "proxy_trace/proxy_trace.h"
#include <cstdint>
#include <gtest/gtest.h>
#include <unistd.h>
namespace RcclUnitTesting {
class ProxyTraceTestFixture : public ::testing::Test {
public:
ncclProxyState *proxyState;
ncclProxySubArgs *sub1, *sub2;
uint64_t commHash = 123456789;
int64_t opCount = 31;
int nSteps = 10;
void SetUp() override {
proxyState = new ncclProxyState();
facebook_rccl::proxyTraceInit(proxyState->proxyTrace, 0, commHash);
EXPECT_NE(proxyState->proxyTrace, nullptr);
sub1 = new ncclProxySubArgs();
sub2 = new ncclProxySubArgs();
sub1->traceKey = {commHash, opCount, -1};
sub2->traceKey = {commHash, opCount, -1};
sub1->nsteps = nSteps;
sub2->nsteps = nSteps;
}
void TearDown() override {
delete sub1;
delete sub2;
delete proxyState;
}
void AddTraceOp(ncclProxySubArgs *sub, facebook_rccl::ProxyOpType opType) {
facebook_rccl::addNewProxyOp(proxyState->proxyTrace, sub->traceKey,
sub->traceInfo, opType, sub->channelId,
sub->nsteps, sub->nbytes, sub->peer);
}
};
TEST_F(ProxyTraceTestFixture, nonEmptySingleton) {
const auto &tracer = proxyState->proxyTrace;
EXPECT_NE(tracer, nullptr);
}
TEST_F(ProxyTraceTestFixture, addTraceOp) {
auto &tracer = proxyState->proxyTrace;
EXPECT_EQ(tracer->getOrCreateProxyOpId(sub1->traceKey.commHash,
sub1->traceKey.opCount),
0);
AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND);
EXPECT_EQ(sub1->traceKey.proxyOpId, 0);
AddTraceOp(sub2, facebook_rccl::ProxyOpType::RECV);
EXPECT_EQ(sub2->traceKey.proxyOpId, 1);
EXPECT_EQ(tracer->getOrCreateProxyOpId(sub1->traceKey.commHash,
sub1->traceKey.opCount),
2);
auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey);
EXPECT_EQ(traceRecordPtr->opType, facebook_rccl::ProxyOpType::SEND);
}
TEST_F(ProxyTraceTestFixture, getMapSizeMB) {
auto &tracer = proxyState->proxyTrace;
AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND);
auto size1 = tracer->getMapSizeMB();
EXPECT_GT(size1, 0);
AddTraceOp(sub2, facebook_rccl::ProxyOpType::RECV);
auto size2 = tracer->getMapSizeMB();
EXPECT_GT(size2, size1);
// finish sub1
sub1->done = nSteps;
facebook_rccl::updateProxyOpCounter(tracer, sub1->traceKey,
facebook_rccl::ProxyCounterTypes::DONE,
sub1->done);
// sub1 is now serialized and should be moved from activeOps to finishedOps
auto size3 = tracer->getMapSizeMB();
EXPECT_GT(size3, size1);
}
TEST_F(ProxyTraceTestFixture, updateTraceOp) {
auto &tracer = proxyState->proxyTrace;
AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND);
facebook_rccl::updateProxyOpCounter(
tracer, sub1->traceKey,
facebook_rccl::ProxyCounterTypes::KERNEL_COPY_READY, 1);
facebook_rccl::updateProxyOpCounter(
tracer, sub1->traceKey, facebook_rccl::ProxyCounterTypes::POSTED, 3);
facebook_rccl::updateProxyOpCounter(
tracer, sub1->traceKey, facebook_rccl::ProxyCounterTypes::TRANSMITTED, 2);
auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey);
EXPECT_NE(traceRecordPtr, nullptr);
EXPECT_EQ(traceRecordPtr->counters[facebook_rccl::ProxyCounterTypes::POSTED],
3);
EXPECT_EQ(
traceRecordPtr->counters[facebook_rccl::ProxyCounterTypes::TRANSMITTED],
2);
EXPECT_EQ(traceRecordPtr
->counters[facebook_rccl::ProxyCounterTypes::KERNEL_COPY_READY],
1);
EXPECT_GE(traceRecordPtr->lastUpdateTs, traceRecordPtr->startTs);
}
TEST_F(ProxyTraceTestFixture, updateTraceOp2) {
auto &tracer = proxyState->proxyTrace;
AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND);
int64_t rand = 123456789;
sub1->posted = rand;
facebook_rccl::updateProxyOpCounter(tracer, sub1->traceKey,
facebook_rccl::ProxyCounterTypes::POSTED,
sub1->posted);
auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey);
EXPECT_EQ(traceRecordPtr->counters[facebook_rccl::ProxyCounterTypes::POSTED],
rand);
}
TEST_F(ProxyTraceTestFixture, memoryReclaim) {
auto &tracer = proxyState->proxyTrace;
tracer->resetAll();
AddTraceOp(sub1, facebook_rccl::ProxyOpType::SEND);
sub1->done = nSteps;
facebook_rccl::updateProxyOpCounter(tracer, sub1->traceKey,
facebook_rccl::ProxyCounterTypes::DONE,
sub1->done);
auto traceRecordPtr = tracer->getProxyTraceOpPtr(sub1->traceKey);
EXPECT_EQ(traceRecordPtr, nullptr);
EXPECT_GT(tracer->getMapSizeMB(), 0);
}
} // namespace RcclUnitTesting