Add collective latency profiler (#1785)

* [LatencyProfiler] Initial commit

* [LatencyProfiler] Add unit tests

* [LatencyProfiler] add more

* [LatencyProfiler] Pass unit tests

* [LatencyProfiler] Add hooks to integrate with meta internal tools

* [LatencyProfiler] Restore install.sh

* [LatencyProfiler] Resolved comments 1. add proper license 2. use proper namespace

* [LatencyProfiler] Add header

[ROCm/rccl commit: 874cd657ef]
Этот коммит содержится в:
ycui1984
2025-07-30 14:59:28 -07:00
коммит произвёл GitHub
родитель cafd7a5126
Коммит 39c508b80d
15 изменённых файлов: 891 добавлений и 0 удалений
+9
Просмотреть файл
@@ -652,6 +652,15 @@ set(SRC_FILES
src/transport/p2p.cc
src/transport/profiler.cc
src/transport/shm.cc
src/include/latency_profiler/CollTrace.h
src/include/latency_profiler/CollTraceEvent.h
src/include/latency_profiler/CollTraceFunc.h
src/include/latency_profiler/CollTraceUtils.h
src/include/latency_profiler/EventQueue.h
src/misc/latency_profiler/CollTrace.cc
src/misc/latency_profiler/CollTraceEvent.cc
src/misc/latency_profiler/CollTraceFunc.cc
src/misc/latency_profiler/CollTraceUtils.cc
)
if (ENABLE_MSCCL_KERNEL)
+10
Просмотреть файл
@@ -25,6 +25,7 @@
#include <cstring> // std::memcpy
#include <cinttypes> // PRIx64
#include <cassert>
#include "latency_profiler/CollTraceFunc.h"
using namespace rccl;
@@ -1656,9 +1657,12 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
cudaStream_t launchStream = planner->streams->stream;
void* extra[] = {plan->kernelArgs, &plan->kernelArgsSize};
auto event = latency_profiler::collTraceAquireEventBaseline(plan, launchStream);
if (planner->numStreams == 1 && !plan->persistent) {
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
comm->lastStream = planner->streams->stream;
CUDACHECKGOTO(hipExtLaunchKernel(plan->kernelFn, grid, block, extra, 0, launchStream, NULL, comm->doneEvent, 0), ret, do_return);
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
return ncclSuccess;
}
@@ -1727,16 +1731,22 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
launchConfig.numAttrs = attrs;
launchConfig.hStream = launchStream;
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
CUCHECKGOTO(cuLaunchKernelEx(&launchConfig, fn, nullptr, extra), ret, do_return);
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
#endif
} else {
// Standard kernel launch
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
CUCHECKGOTO(cuLaunchKernel(fn, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra), ret, do_return);
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
}
#endif
// Standard kernel launch
//cuLaunchKernel(sym, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra);
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
CUDACHECKGOTO(cudaLaunchKernel(sym, grid, block, extra, smem, launchStream), ret, do_return);
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
do_return:
return ret;
+2
Просмотреть файл
@@ -19,6 +19,7 @@
#include "graph.h"
#include "nvmlwrap.h"
#include "profiler.h"
#include "latency_profiler/CollTrace.h"
#include "rccl_common.h"
#include "recorder.h"
@@ -634,6 +635,7 @@ struct ncclComm {
hipEvent_t doneEvent;
hipStream_t lastStream;
std::unique_ptr<latency_profiler::CollTrace> ctrace;
#ifdef ENABLE_COLLTRACE
struct ncclCollTrace* collTrace;
+61
Просмотреть файл
@@ -0,0 +1,61 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <thread>
#include <atomic>
#include "CollTraceEvent.h"
#include "CollTraceUtils.h"
#include "EventQueue.h"
struct ncclComm;
namespace latency_profiler {
struct CollStats;
// CollTrace Workflow
// 1) measure collective latency by adding cuda/hip event before and after
// a kernel launch in RCCL.
// 2) Started a separate worker thread to collect latency data into
// its local ring buffer.
// 3) Perform CPU level all gather to exchange latency data
// between threads (200 to 300us per 100 latency data exchange)
// 4) Report to scuba when results buffer is full or every few minutes
class CollTrace {
public:
CollTrace(ncclComm* comm);
__attribute__((visibility("default"))) ~CollTrace();
void enqueueEvent(std::unique_ptr<CollTraceEvent> event);
std::unique_ptr<CollTraceEvent> createEvent(
CollTraceEvent::EventType type = CollTraceEvent::EventType::COMM);
void recordCurCollResult(int rank, float latencyMs);
void reportIfNeeded(bool checkInterval);
private:
EventQueue<CollTraceEvent> eventQueue_;
std::thread profilingWorkerThread_;
void* collTraceThreadFn(int cudaDev);
std::atomic<uint64_t> curCollId_{0};
std::unique_ptr<CollTraceEvent> curEvent_;
std::deque<std::unique_ptr<CollTraceInfo>> pastColls_;
ncclComm* comm_{nullptr};
std::string commHash_;
int rank_{-1};
std::deque<std::vector<CollStats>> stats_;
std::chrono::time_point<std::chrono::steady_clock> lastReportTime_;
};
} // namespace latency_profiler
+68
Просмотреть файл
@@ -0,0 +1,68 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <cuda_runtime.h>
#include <functional>
#include <memory>
#include <queue>
#include "checks.h"
#include "CollTraceUtils.h"
namespace latency_profiler {
// CUDA event pointer w/ deleter
struct CudaEventDeleter {
void operator()(cudaEvent_t e) {
// Ignore error at destroy
cudaEventDestroy(e);
}
};
using CudaEventPtr = std::unique_ptr<
std::pointer_traits<cudaEvent_t>::element_type,
CudaEventDeleter>;
// a wrapper class for cuda event
class CudaWaitEvent {
public:
CudaWaitEvent(CudaEventPtr e) : event_(std::move(e)) {}
~CudaWaitEvent() {}
cudaEvent_t getCudaEvent() {
return event_.get();
}
std::shared_ptr<float> getElapsedTimeSinceEvent(CudaWaitEvent* start);
ncclResult_t waitEventFinish();
private:
CudaEventPtr event_;
};
// Event data structure
struct CollTraceEvent {
enum class EventType { COMM, TERMINATE };
CollTraceInfo coll;
std::unique_ptr<CudaWaitEvent> start{nullptr};
std::unique_ptr<CudaWaitEvent> stop{nullptr};
EventType eventType = EventType::COMM;
CollTraceEvent(EventType type) : eventType(type) {}
CollTraceEvent() = default;
~CollTraceEvent() {}
// CollTraceEvent is not copyable
CollTraceEvent(const CollTraceEvent&) = delete;
CollTraceEvent& operator=(const CollTraceEvent&) = delete;
// CollTraceEvent is movable
CollTraceEvent(CollTraceEvent&&) = default;
CollTraceEvent& operator=(CollTraceEvent&&) = default;
};
} // namespace latency_profiler
+39
Просмотреть файл
@@ -0,0 +1,39 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include "CollTraceEvent.h"
#include "comm.h"
#include "CollTraceUtils.h"
namespace latency_profiler {
class CollTraceError : public std::runtime_error {
public:
explicit CollTraceError(const std::string& what) : std::runtime_error(what) {}
};
ncclResult_t collTraceInit(ncclComm* comm);
ncclResult_t collTraceDestroy(ncclComm* comm);
std::unique_ptr<CollTraceEvent> collTraceAquireEventBaseline(
ncclKernelPlan* plan,
cudaStream_t stream);
ncclResult_t collTraceRecordStartEvent(
ncclComm* comm,
cudaStream_t launchStream,
CollTraceEvent* event);
ncclResult_t collTraceRecordEndEvent(
ncclComm* comm,
ncclKernelPlan* plan,
cudaStream_t launchStream,
std::unique_ptr<CollTraceEvent> event);
CollTraceInfo parseCollInfoFromCollTask(const ncclTaskColl& collTask);
} // namespace latency_profiler
+49
Просмотреть файл
@@ -0,0 +1,49 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <deque>
#include <memory>
#include <string>
#include <vector>
namespace latency_profiler {
struct CollStats {
const int collId;
const int percent;
const float minLatencyUs;
const float maxLatencyUs;
const std::string opName;
const std::string dataType;
int64_t count;
CollStats(const int collId, const int percent, const float minLatencyUs, const float maxLatencyUs, const std::string& opName, const std::string& dataType, const int64_t count) : collId(collId), percent(percent), minLatencyUs(minLatencyUs), maxLatencyUs(maxLatencyUs), opName(opName), dataType(dataType), count(count) {
}
};
struct CollTraceInfo {
int64_t collId;
std::string opName;
std::string dataType;
int64_t count;
float latencyMs{-1};
};
void reportToFile(
const std::deque<std::vector<CollStats>>& stats,
const std::string& commHash);
std::vector<CollStats> aggregateResults(
const std::deque<std::unique_ptr<CollTraceInfo>>& info,
const std::vector<float>& latencyAllGather,
int RANKS_PER_HOST,
int NCCL_COLLTRACE_RECORD_MAX);
float getSizeMb(const std::string& dataType, int count);
} // namespace latency_profiler
+46
Просмотреть файл
@@ -0,0 +1,46 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <condition_variable>
#include <deque>
namespace latency_profiler {
// A multi-producer, single consumer queue.
// This queue is designed to be used in scenarios where multiple producers
// are pushing items into the queue, and a single consumer is waiting for
// items to be available.
template <class Element>
class EventQueue {
private:
std::deque<std::unique_ptr<Element>> queue_;
std::condition_variable cv_;
mutable std::mutex mutex_;
public:
void push(std::unique_ptr<Element> item) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push_back(std::move(item));
}
cv_.notify_one();
}
std::unique_ptr<Element> waitPop() {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
cv_.wait(lock, [this] { return !queue_.empty(); });
}
std::unique_ptr<Element> item = std::move(queue_.front());
queue_.pop_front();
return item;
}
};
} // namespace latency_profiler
+4
Просмотреть файл
@@ -56,6 +56,8 @@
#include "msccl/msccl_lifecycle.h"
#include "msccl/msccl_status.h"
#include "latency_profiler/CollTrace.h"
#include "latency_profiler/CollTraceFunc.h"
#ifndef STR2
#define STR2(v) #v
@@ -2026,6 +2028,7 @@ static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
NCCLCHECK(comm->tuner->init(comm->nRanks, comm->nNodes, ncclDebugLog, &comm->tunerContext));
}
NCCLCHECKGOTO(latency_profiler::collTraceInit(comm), res, fail);
// update communicator state
comm->initState = ncclSuccess;
timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL];
@@ -2509,6 +2512,7 @@ static ncclResult_t commDestroySync(struct ncclAsyncJob* job_) {
CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), ret, fail);
NCCLCHECKGOTO(latency_profiler::collTraceDestroy(comm), ret, fail);
TRACE(NCCL_INIT, "Destroying comm %p rank %d abortFlag %d asyncResult %d", comm, comm->rank, *comm->abortFlag, comm->asyncResult);
if (comm->initState == ncclSuccess) {
+228
Просмотреть файл
@@ -0,0 +1,228 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "latency_profiler/CollTrace.h"
#include "bootstrap.h"
#include "checks.h"
#include "comm.h"
#include "param.h"
NCCL_PARAM(ColltraceRecordMax, "COLLTRACE_RECORD_MAX", 100);
NCCL_PARAM(ColltraceMaxDumpSize, "COLLTRACE_MAX_DUMP_SIZE", 20);
NCCL_PARAM(ColltraceDumpIntervalSec, "COLLTRACE_DUMP_INTERVAL_SEC", 300);
constexpr int RANKS_PER_HOST = 8;
namespace latency_profiler {
namespace {
CudaEventPtr getCudaEventPtr() {
cudaEvent_t newEvent = nullptr;
CUDACHECKIGNORE(cudaEventCreate(&newEvent));
CudaEventPtr item(newEvent);
return item;
}
} // namespace
CollTrace::CollTrace(ncclComm* comm)
: comm_(comm),
commHash_(std::to_string(comm->commHash)),
rank_(comm->rank) {
profilingWorkerThread_ =
std::thread{[this]() { return collTraceThreadFn(comm_->cudaDev); }};
}
CollTrace::~CollTrace() {
try {
INFO(
NCCL_INIT,
"COLLTRACE: commHash %s rank %d - Destroy START",
commHash_.c_str(),
rank_);
eventQueue_.push(std::unique_ptr<CollTraceEvent>(
new CollTraceEvent(CollTraceEvent::EventType::TERMINATE)));
if (profilingWorkerThread_.joinable()) {
profilingWorkerThread_.join();
}
if (rank_ == 0) {
reportIfNeeded(false);
}
INFO(
NCCL_INIT,
"COLLTRACE: commHash %s rank %d - Destroy COMPLETE",
commHash_.c_str(),
rank_);
} catch (const std::exception& e) {
WARN(
"COLLTRACE: commHash %s rank %d - Destroy FAILED: %s",
commHash_.c_str(),
rank_,
e.what());
}
}
void* CollTrace::collTraceThreadFn(int cudaDev) {
INFO(NCCL_INIT, "CollTrace thread started for cudaDev %d", cudaDev);
auto err = cudaSetDevice(cudaDev);
if (err != cudaSuccess) {
WARN("Cuda failure '%s'", cudaGetErrorString(err));
return nullptr;
}
lastReportTime_ = std::chrono::steady_clock::now();
INFO(
NCCL_INIT,
"COLLTRACE: commHash %s rank %d - worker thread STARTED",
commHash_.c_str(),
rank_);
while (true) {
curEvent_ = eventQueue_.waitPop();
if (curEvent_->eventType == CollTraceEvent::EventType::TERMINATE) {
break;
}
curEvent_->start->waitEventFinish();
auto ncclRes = curEvent_->stop->waitEventFinish();
float latency = -1;
if (ncclRes == ncclSuccess) {
auto latencyMaybe =
curEvent_->stop->getElapsedTimeSinceEvent(curEvent_->start.get());
// latencyMaybe could be nullopt when cudaEventElapsedTime failed
// this could happen when events are not recorded or stream is not valid
if (latencyMaybe == nullptr) {
WARN(
"CollTrace: getElapsedTimeSinceEvent failed, aborting worker thread");
return nullptr;
}
latency = *latencyMaybe;
}
recordCurCollResult(cudaDev, latency);
curEvent_.reset();
}
INFO(
NCCL_INIT,
"COLLTRACE: commHash %s rank %d - worker thread TERMINATE",
commHash_.c_str(),
rank_);
return nullptr;
}
void CollTrace::enqueueEvent(std::unique_ptr<CollTraceEvent> event) {
event->coll.collId = curCollId_.fetch_add(1);
eventQueue_.push(std::move(event));
}
std::unique_ptr<CollTraceEvent> CollTrace::createEvent(
CollTraceEvent::EventType type) {
auto eventInfo = std::make_unique<CollTraceEvent>(type);
eventInfo->start = std::make_unique<CudaWaitEvent>(getCudaEventPtr());
eventInfo->stop = std::make_unique<CudaWaitEvent>(getCudaEventPtr());
if (!eventInfo->start || !eventInfo->stop) {
std::unique_ptr<CollTraceEvent> nullCollTraceEvent(nullptr);
return nullCollTraceEvent;
}
return eventInfo;
}
bool shouldAggregateRingBuffer(int collId) {
const int NCCL_COLLTRACE_RECORD_MAX = ncclParamColltraceRecordMax();
return ((collId + 1) % NCCL_COLLTRACE_RECORD_MAX == 0);
}
void CollTrace::reportIfNeeded(bool checkInterval = true) {
auto now = std::chrono::steady_clock::now();
auto secs_passed =
std::chrono::duration_cast<std::chrono::seconds>(now - lastReportTime_)
.count();
if (checkInterval) {
if (secs_passed < ncclParamColltraceDumpIntervalSec() &&
stats_.size() < ncclParamColltraceMaxDumpSize()) {
return;
}
}
INFO(
NCCL_COLL,
"CollTrace: %ld seconds passed since last report, stats size = %zu, checkInterval = %d",
secs_passed,
stats_.size(),
checkInterval);
// reportToScuba is a placeholder for oss environment.
// meta production reports to scuba instead of file, which enables
// filering, aggregation and visualization.
#ifdef ENABLE_SCUBA_LOGGING
reportToScuba(stats_, commHash_);
#else
reportToFile(stats_, commHash_);
#endif
lastReportTime_ = std::chrono::steady_clock::now();
stats_.clear();
}
void CollTrace::recordCurCollResult(int rank, float latency) {
const int NCCL_COLLTRACE_RECORD_MAX = ncclParamColltraceRecordMax();
auto result = std::make_unique<CollTraceInfo>(curEvent_->coll);
auto collId = result->collId;
result->latencyMs = latency;
pastColls_.push_back(std::move(result));
if (pastColls_.size() > NCCL_COLLTRACE_RECORD_MAX) {
pastColls_.pop_front();
}
if (shouldAggregateRingBuffer(collId) && pastColls_.size() > 0) {
std::vector<float> latencyAllGather;
latencyAllGather.resize(RANKS_PER_HOST * NCCL_COLLTRACE_RECORD_MAX, 0);
int start = (comm_->localRank) * NCCL_COLLTRACE_RECORD_MAX;
for (int i = start; i < start + NCCL_COLLTRACE_RECORD_MAX; i++) {
latencyAllGather[i] = pastColls_[i - start]->latencyMs;
}
auto before = std::chrono::high_resolution_clock::now();
auto ncclResult = bootstrapIntraNodeAllGather(
comm_->bootstrap,
comm_->localRankToRank,
comm_->localRank,
comm_->localRanks,
latencyAllGather.data(),
NCCL_COLLTRACE_RECORD_MAX * sizeof(float));
auto after = std::chrono::high_resolution_clock::now();
auto interval_us =
std::chrono::duration_cast<std::chrono::microseconds>(after - before)
.count();
if (ncclResult != ncclSuccess) {
WARN("CollTrace: All gather exchange latency data failed");
return;
}
if (rank == 0) {
INFO(NCCL_COLL, "latency metrics all gather takes %ld us", interval_us);
try {
auto stats = aggregateResults(
pastColls_,
latencyAllGather,
RANKS_PER_HOST,
NCCL_COLLTRACE_RECORD_MAX);
stats_.push_back(stats);
if (stats_.size() > ncclParamColltraceMaxDumpSize()) {
stats_.pop_front();
}
reportIfNeeded();
} catch (const std::exception& e) {
WARN("Aggregating error: %s", e.what());
}
}
}
}
} // namespace latency_profiler
+42
Просмотреть файл
@@ -0,0 +1,42 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <thread>
#include "latency_profiler/CollTraceEvent.h"
#include "param.h"
NCCL_PARAM(ColltraceCheckIntervalMs, "COLLTRACE_CHECK_INTERVAL_MS", 10);
namespace latency_profiler {
ncclResult_t CudaWaitEvent::waitEventFinish() {
// async polling case, query cuda whether event is ready every
// NCCL_COLLTRACE_CHECK_INTERVAL_MS ms
auto res = cudaEventQuery(event_.get());
while (res != cudaSuccess) {
if (res != cudaErrorNotReady) {
CUDACHECK(res);
}
std::this_thread::sleep_for(
std::chrono::milliseconds(ncclParamColltraceCheckIntervalMs()));
res = cudaEventQuery(event_.get());
}
return ncclSuccess;
}
std::shared_ptr<float> CudaWaitEvent::getElapsedTimeSinceEvent(
CudaWaitEvent* start) {
float elapsedTime;
auto res =
cudaEventElapsedTime(&elapsedTime, start->event_.get(), event_.get());
if (res != cudaSuccess) {
WARN("get elapsed time failed error: %s", cudaGetErrorString(res));
return nullptr;
}
return std::make_shared<float>(elapsedTime);
}
} // namespace latency_profiler
+140
Просмотреть файл
@@ -0,0 +1,140 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "latency_profiler/CollTraceFunc.h"
namespace latency_profiler {
namespace {
bool enableCollTrace() {
const char* colltraceEnable = ncclGetEnv("RCCL_LATENCY_PROFILER");
if (colltraceEnable != NULL) {
INFO(
NCCL_INIT,
"RCCL_LATENCY_PROFILER set by environment to %s.",
colltraceEnable);
if (strcmp(colltraceEnable, "1") == 0) {
return true;
}
}
return false;
}
} // namespace
ncclResult_t collTraceInit(ncclComm* comm) {
if (!enableCollTrace()) {
return ncclSuccess;
}
comm->ctrace = std::make_unique<CollTrace>(comm);
return ncclSuccess;
}
ncclResult_t collTraceDestroy(ncclComm* comm) {
if (comm->ctrace == nullptr) {
return ncclSuccess;
}
comm->ctrace.reset();
return ncclSuccess;
}
ncclResult_t collTraceRecordStartEvent(
ncclComm* comm,
cudaStream_t launchStream,
CollTraceEvent* event) {
if (comm->ctrace && event) {
CUDACHECK(
cudaEventRecord(event->start.get()->getCudaEvent(), launchStream));
}
return ncclSuccess;
}
ncclResult_t collTraceRecordEndEvent(
ncclComm* comm,
ncclKernelPlan* plan,
cudaStream_t launchStream,
std::unique_ptr<CollTraceEvent> event) {
if (comm->ctrace && event) {
CUDACHECK(cudaEventRecord(event->stop.get()->getCudaEvent(), launchStream));
comm->ctrace->enqueueEvent(std::move(event));
}
return ncclSuccess;
}
CollTraceInfo parseCollInfoFromCollTask(const ncclTaskColl& collTask) {
return CollTraceInfo{
.opName = std::string{ncclFuncToString(collTask.func)},
.dataType = std::string{ncclDatatypeToString(collTask.datatype)},
.count = (int64_t)collTask.count,
};
}
std::shared_ptr<CollTraceInfo> parseCollInfoFromNcclKernelPlan(
ncclKernelPlan& plan,
cudaStream_t stream) {
if (plan.comm == nullptr || plan.comm->ctrace == nullptr) {
return nullptr;
}
auto collTaskHead = ncclIntruQueueHead(&plan.collTaskQueue);
if (collTaskHead == nullptr) {
WARN("CollTrace: no coll task in this plan, this plan is empty");
return nullptr;
}
CollTraceInfo collInfo = parseCollInfoFromCollTask(*collTaskHead);
return std::make_shared<CollTraceInfo>(collInfo);
}
std::unique_ptr<CollTraceEvent> collTraceAquireEventCommon(
ncclComm* comm,
CollTraceEvent::EventType type,
cudaStream_t stream) {
if (!comm->ctrace) {
return nullptr;
}
struct ncclCudaGraph graph;
auto res = ncclCudaGetCapturingGraph(&graph, stream);
if (res != ncclSuccess) {
WARN("Internal error: ncclCudaGetCapturingGraph failed by %d", res);
return nullptr;
}
if (graph.graph != nullptr) {
// We are in a cuda graph, this is currently unsupported
WARN(
"COLLTRACE: does not support cuda graph. Collectives from comm %lx will be skipped",
comm->commHash);
return nullptr;
}
auto event = comm->ctrace->createEvent(type);
if (!event) {
throw CollTraceError("Event init failed");
return nullptr; /*Event init failed*/
}
return event;
}
std::unique_ptr<CollTraceEvent> collTraceAquireEventBaseline(
ncclKernelPlan* plan,
cudaStream_t stream) {
auto collPtr = parseCollInfoFromNcclKernelPlan(*plan, stream);
if (collPtr == nullptr) {
return nullptr;
}
auto comm = plan->comm;
if (!comm->ctrace) {
return nullptr;
}
auto event =
collTraceAquireEventCommon(comm, CollTraceEvent::EventType::COMM, stream);
if (event == nullptr) {
WARN("COLLTRACE: failed to aquire event");
return nullptr;
}
event->coll = *collPtr;
return event;
}
} // namespace latency_profiler
+87
Просмотреть файл
@@ -0,0 +1,87 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "latency_profiler/CollTraceUtils.h"
#include "nccl_common.h"
#include "debug.h"
namespace latency_profiler {
float getSizeMb(const std::string& dataType, int count) {
if (dataType == "ncclInt8" || dataType == "ncclFp8E4M3" ||
dataType == "ncclFp8E5M2") {
return count / 1024.0 / 1024.0;
}
if (dataType == "ncclFloat16" || dataType == "ncclBfloat16") {
return 2 * count / 1024.0 / 1024.0;
}
if (dataType == "ncclInt32" || dataType == "ncclUint32" ||
dataType == "ncclFloat32") {
return 4 * count / 1024.0 / 1024.0;
}
if (dataType == "ncclInt64" || dataType == "ncclUint64" ||
dataType == "ncclFloat64") {
return 8 * count / 1024.0 / 1024.0;
}
throw std::runtime_error("CollTrace: unsupported data type " + dataType);
}
void reportToFile(
const std::deque<std::vector<CollStats>>& stats,
const std::string& commHash) {
for (const auto& oneDumpStats : stats) {
for (const auto& elem : oneDumpStats) {
auto size_mb = getSizeMb(elem.dataType, elem.count);
INFO(NCCL_COLL, "coll_id %ld, percent %d, min_latency_us %f, max_latency_us %f, op_name %s, data_type %s, count %ld, message_size_MB %f, comm_hash %s", elem.collId, elem.percent, elem.minLatencyUs, elem.maxLatencyUs, elem.opName.c_str(), elem.dataType.c_str(), elem.count, size_mb, commHash.c_str());
}
}
}
std::vector<CollStats> aggregateResults(
const std::deque<std::unique_ptr<CollTraceInfo>>& info,
const std::vector<float>& latencyAllGather,
int RANKS_PER_HOST,
int NCCL_COLLTRACE_RECORD_MAX) {
std::vector<std::pair<float, float>> latencyMetrics;
for (auto rank = 0; rank < RANKS_PER_HOST; rank++) {
for (auto i = 0; i < NCCL_COLLTRACE_RECORD_MAX; i++) {
auto val = latencyAllGather.at(rank * NCCL_COLLTRACE_RECORD_MAX + i);
if (val == 0) {
throw std::runtime_error(
"CollTrace: latency value cannot be zero, CPU all gather failed");
}
if (rank == 0) {
latencyMetrics.emplace_back(val, val);
} else {
latencyMetrics.at(i).first =
std::min<float>(latencyMetrics.at(i).first, val);
latencyMetrics.at(i).second =
std::max<float>(latencyMetrics.at(i).second, val);
}
}
}
std::vector<CollStats> results;
for (int i = 0; i < info.size(); i++) {
int percent = 100 *
(latencyMetrics.at(i).second - latencyMetrics.at(i).first) /
latencyMetrics.at(i).first;
results.emplace_back(CollStats(
(int)info[i]->collId,
percent,
latencyMetrics.at(i).first * 1000,
latencyMetrics.at(i).second * 1000,
info[i]->opName,
info[i]->dataType,
info[i]->count));
}
return results;
}
} // namespace latency_profiler
+3
Просмотреть файл
@@ -110,6 +110,9 @@ if(BUILD_TESTS)
common/TestBedChild.cpp
common/StandaloneUtils.cpp
proxy_trace/ProxyTraceUnitTests.cpp
../src/misc/proxy_trace/proxy_trace.cc
latency_profiler/LatencyProfilerUnitTest.cpp
../src/misc/latency_profiler/CollTraceUtils.cc
)
# Due to default hidden symbol visibility, append source file if build type is not Debug.
+103
Просмотреть файл
@@ -0,0 +1,103 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "latency_profiler/CollTraceUtils.h"
#include "latency_profiler/EventQueue.h"
#include <gtest/gtest.h>
#include <thread>
#include <unordered_map>
using latency_profiler::CollStats;
using latency_profiler::CollTraceInfo;
using latency_profiler::EventQueue;
namespace RcclUnitTesting {
TEST(CollTraceUtilsTest, aggregateResultsTest) {
std::deque<std::unique_ptr<CollTraceInfo>> results;
// Host has 2 ranks, Ring buffer has 3 records
auto info1 = CollTraceInfo{.collId = 123, .opName = "allReduce", .dataType = "float32", .count = 10};
auto info2 = CollTraceInfo{.collId = 127, .opName = "allReduce", .dataType = "int64", .count = 20};
auto info3 = CollTraceInfo{.collId = 200, .opName = "allGather", .dataType = "float32", .count = 50};
results.emplace_back(
std::make_unique<CollTraceInfo>(info1));
results.emplace_back(
std::make_unique<CollTraceInfo>(info2));
results.emplace_back(
std::make_unique<CollTraceInfo>(info3));
std::vector<float> latencyAllGather = {10, 20, 50, 15, 21, 45};
auto stats = aggregateResults(
results,
latencyAllGather,
2 /* ranks per host */,
3 /* element per rank */);
EXPECT_EQ(3, stats.size());
std::vector<CollStats> expected = {
CollStats(123, 50, 10000, 15000, "allReduce", "int64", 0),
CollStats(127, 5, 20000, 21000, "allReduce", "int64", 0),
CollStats(200, 11, 45000, 50000, "allReduce", "int64", 0)
};
for (int i = 0; i < 3; i++) {
EXPECT_EQ(stats[i].collId, expected[i].collId);
EXPECT_EQ(stats[i].percent, expected[i].percent);
EXPECT_EQ(stats[i].minLatencyUs, expected[i].minLatencyUs);
EXPECT_EQ(stats[i].maxLatencyUs, expected[i].maxLatencyUs);
}
}
TEST(CollTraceUtilsTest, EventQueueOperationTest) {
EventQueue<int> q;
q.push(std::make_unique<int>(5));
q.push(std::make_unique<int>(100));
auto res1 = q.waitPop();
EXPECT_EQ(*res1, 5);
auto res2 = q.waitPop();
EXPECT_EQ(*res2, 100);
}
void producer(EventQueue<std::string>& q, const std::string& str) {
q.push(std::make_unique<std::string>(str));
}
void consumer(EventQueue<std::string>& q, std::vector<std::string>& results) {
results.clear();
auto res1 = q.waitPop();
results.push_back(*res1);
auto res2 = q.waitPop();
results.push_back(*res2);
}
TEST(CollTraceUtilsTest, EventQueueMultiThreadTest) {
EventQueue<std::string> q;
std::vector<std::string> results;
std::thread t0(consumer, std::ref(q), std::ref(results));
std::thread t1(producer, std::ref(q), "hello");
std::thread t2(producer, std::ref(q), "world");
t0.join();
t1.join();
t2.join();
EXPECT_TRUE(results[0] == "hello" || results[0] == "world");
EXPECT_TRUE(results[1] == "hello" || results[1] == "world");
}
TEST(CollTraceUtilsTest, getSizeMbTest) {
std::unordered_map<int, std::vector<std::string>> bytesToTypes = {
{1, {"ncclInt8", "ncclFp8E4M3", "ncclFp8E5M2"}},
{2, {"ncclFloat16", "ncclBfloat16"}},
{4, {"ncclInt32", "ncclUint32", "ncclFloat32"}},
{8, {"ncclInt64", "ncclUint64", "ncclFloat64"}}};
for (const auto& it : bytesToTypes) {
auto types = it.second;
for (const auto& type : types) {
auto mb = latency_profiler::getSizeMb(type, 1024 * 1024);
EXPECT_NEAR(mb, it.first, 0.01);
}
}
}
}