Add collective latency profiler (#1785)
* [LatencyProfiler] Initial commit * [LatencyProfiler] Add unit tests * [LatencyProfiler] add more * [LatencyProfiler] Pass unit tests * [LatencyProfiler] Add hooks to integrate with meta internal tools * [LatencyProfiler] Restore install.sh * [LatencyProfiler] Resolved comments 1. add proper license 2. use proper namespace * [LatencyProfiler] Add header
Este cometimento está contido em:
cometido por
GitHub
ascendente
4ce3df8d3a
cometimento
874cd657ef
@@ -652,6 +652,15 @@ set(SRC_FILES
|
||||
src/transport/p2p.cc
|
||||
src/transport/profiler.cc
|
||||
src/transport/shm.cc
|
||||
src/include/latency_profiler/CollTrace.h
|
||||
src/include/latency_profiler/CollTraceEvent.h
|
||||
src/include/latency_profiler/CollTraceFunc.h
|
||||
src/include/latency_profiler/CollTraceUtils.h
|
||||
src/include/latency_profiler/EventQueue.h
|
||||
src/misc/latency_profiler/CollTrace.cc
|
||||
src/misc/latency_profiler/CollTraceEvent.cc
|
||||
src/misc/latency_profiler/CollTraceFunc.cc
|
||||
src/misc/latency_profiler/CollTraceUtils.cc
|
||||
)
|
||||
|
||||
if (ENABLE_MSCCL_KERNEL)
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <cstring> // std::memcpy
|
||||
#include <cinttypes> // PRIx64
|
||||
#include <cassert>
|
||||
#include "latency_profiler/CollTraceFunc.h"
|
||||
|
||||
using namespace rccl;
|
||||
|
||||
@@ -1656,9 +1657,12 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
|
||||
cudaStream_t launchStream = planner->streams->stream;
|
||||
void* extra[] = {plan->kernelArgs, &plan->kernelArgsSize};
|
||||
|
||||
auto event = latency_profiler::collTraceAquireEventBaseline(plan, launchStream);
|
||||
if (planner->numStreams == 1 && !plan->persistent) {
|
||||
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
|
||||
comm->lastStream = planner->streams->stream;
|
||||
CUDACHECKGOTO(hipExtLaunchKernel(plan->kernelFn, grid, block, extra, 0, launchStream, NULL, comm->doneEvent, 0), ret, do_return);
|
||||
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
@@ -1727,16 +1731,22 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
|
||||
launchConfig.numAttrs = attrs;
|
||||
launchConfig.hStream = launchStream;
|
||||
|
||||
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
|
||||
CUCHECKGOTO(cuLaunchKernelEx(&launchConfig, fn, nullptr, extra), ret, do_return);
|
||||
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
|
||||
#endif
|
||||
} else {
|
||||
// Standard kernel launch
|
||||
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
|
||||
CUCHECKGOTO(cuLaunchKernel(fn, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra), ret, do_return);
|
||||
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
|
||||
}
|
||||
#endif
|
||||
// Standard kernel launch
|
||||
//cuLaunchKernel(sym, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra);
|
||||
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
|
||||
CUDACHECKGOTO(cudaLaunchKernel(sym, grid, block, extra, smem, launchStream), ret, do_return);
|
||||
latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
|
||||
|
||||
do_return:
|
||||
return ret;
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "graph.h"
|
||||
#include "nvmlwrap.h"
|
||||
#include "profiler.h"
|
||||
#include "latency_profiler/CollTrace.h"
|
||||
#include "rccl_common.h"
|
||||
#include "recorder.h"
|
||||
|
||||
@@ -634,6 +635,7 @@ struct ncclComm {
|
||||
|
||||
hipEvent_t doneEvent;
|
||||
hipStream_t lastStream;
|
||||
std::unique_ptr<latency_profiler::CollTrace> ctrace;
|
||||
|
||||
#ifdef ENABLE_COLLTRACE
|
||||
struct ncclCollTrace* collTrace;
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include "CollTraceEvent.h"
|
||||
#include "CollTraceUtils.h"
|
||||
#include "EventQueue.h"
|
||||
|
||||
struct ncclComm;
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
struct CollStats;
|
||||
|
||||
// CollTrace Workflow
|
||||
// 1) measure collective latency by adding cuda/hip event before and after
|
||||
// a kernel launch in RCCL.
|
||||
|
||||
// 2) Started a separate worker thread to collect latency data into
|
||||
// its local ring buffer.
|
||||
|
||||
// 3) Perform CPU level all gather to exchange latency data
|
||||
// between threads (200 to 300us per 100 latency data exchange)
|
||||
|
||||
// 4) Report to scuba when results buffer is full or every few minutes
|
||||
|
||||
class CollTrace {
|
||||
public:
|
||||
CollTrace(ncclComm* comm);
|
||||
__attribute__((visibility("default"))) ~CollTrace();
|
||||
|
||||
void enqueueEvent(std::unique_ptr<CollTraceEvent> event);
|
||||
|
||||
std::unique_ptr<CollTraceEvent> createEvent(
|
||||
CollTraceEvent::EventType type = CollTraceEvent::EventType::COMM);
|
||||
|
||||
void recordCurCollResult(int rank, float latencyMs);
|
||||
|
||||
void reportIfNeeded(bool checkInterval);
|
||||
|
||||
private:
|
||||
EventQueue<CollTraceEvent> eventQueue_;
|
||||
std::thread profilingWorkerThread_;
|
||||
void* collTraceThreadFn(int cudaDev);
|
||||
std::atomic<uint64_t> curCollId_{0};
|
||||
std::unique_ptr<CollTraceEvent> curEvent_;
|
||||
std::deque<std::unique_ptr<CollTraceInfo>> pastColls_;
|
||||
|
||||
ncclComm* comm_{nullptr};
|
||||
std::string commHash_;
|
||||
int rank_{-1};
|
||||
std::deque<std::vector<CollStats>> stats_;
|
||||
std::chrono::time_point<std::chrono::steady_clock> lastReportTime_;
|
||||
};
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <cuda_runtime.h>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include "checks.h"
|
||||
#include "CollTraceUtils.h"
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
// CUDA event pointer w/ deleter
|
||||
struct CudaEventDeleter {
|
||||
void operator()(cudaEvent_t e) {
|
||||
// Ignore error at destroy
|
||||
cudaEventDestroy(e);
|
||||
}
|
||||
};
|
||||
using CudaEventPtr = std::unique_ptr<
|
||||
std::pointer_traits<cudaEvent_t>::element_type,
|
||||
CudaEventDeleter>;
|
||||
|
||||
// a wrapper class for cuda event
|
||||
class CudaWaitEvent {
|
||||
public:
|
||||
CudaWaitEvent(CudaEventPtr e) : event_(std::move(e)) {}
|
||||
~CudaWaitEvent() {}
|
||||
|
||||
cudaEvent_t getCudaEvent() {
|
||||
return event_.get();
|
||||
}
|
||||
|
||||
std::shared_ptr<float> getElapsedTimeSinceEvent(CudaWaitEvent* start);
|
||||
ncclResult_t waitEventFinish();
|
||||
|
||||
private:
|
||||
CudaEventPtr event_;
|
||||
};
|
||||
|
||||
// Event data structure
|
||||
struct CollTraceEvent {
|
||||
enum class EventType { COMM, TERMINATE };
|
||||
|
||||
CollTraceInfo coll;
|
||||
std::unique_ptr<CudaWaitEvent> start{nullptr};
|
||||
std::unique_ptr<CudaWaitEvent> stop{nullptr};
|
||||
EventType eventType = EventType::COMM;
|
||||
|
||||
CollTraceEvent(EventType type) : eventType(type) {}
|
||||
CollTraceEvent() = default;
|
||||
|
||||
~CollTraceEvent() {}
|
||||
|
||||
// CollTraceEvent is not copyable
|
||||
CollTraceEvent(const CollTraceEvent&) = delete;
|
||||
CollTraceEvent& operator=(const CollTraceEvent&) = delete;
|
||||
|
||||
// CollTraceEvent is movable
|
||||
CollTraceEvent(CollTraceEvent&&) = default;
|
||||
CollTraceEvent& operator=(CollTraceEvent&&) = default;
|
||||
};
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "CollTraceEvent.h"
|
||||
#include "comm.h"
|
||||
#include "CollTraceUtils.h"
|
||||
|
||||
namespace latency_profiler {
|
||||
class CollTraceError : public std::runtime_error {
|
||||
public:
|
||||
explicit CollTraceError(const std::string& what) : std::runtime_error(what) {}
|
||||
};
|
||||
|
||||
ncclResult_t collTraceInit(ncclComm* comm);
|
||||
|
||||
ncclResult_t collTraceDestroy(ncclComm* comm);
|
||||
|
||||
std::unique_ptr<CollTraceEvent> collTraceAquireEventBaseline(
|
||||
ncclKernelPlan* plan,
|
||||
cudaStream_t stream);
|
||||
|
||||
ncclResult_t collTraceRecordStartEvent(
|
||||
ncclComm* comm,
|
||||
cudaStream_t launchStream,
|
||||
CollTraceEvent* event);
|
||||
|
||||
ncclResult_t collTraceRecordEndEvent(
|
||||
ncclComm* comm,
|
||||
ncclKernelPlan* plan,
|
||||
cudaStream_t launchStream,
|
||||
std::unique_ptr<CollTraceEvent> event);
|
||||
|
||||
CollTraceInfo parseCollInfoFromCollTask(const ncclTaskColl& collTask);
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
struct CollStats {
|
||||
const int collId;
|
||||
const int percent;
|
||||
const float minLatencyUs;
|
||||
const float maxLatencyUs;
|
||||
const std::string opName;
|
||||
const std::string dataType;
|
||||
int64_t count;
|
||||
CollStats(const int collId, const int percent, const float minLatencyUs, const float maxLatencyUs, const std::string& opName, const std::string& dataType, const int64_t count) : collId(collId), percent(percent), minLatencyUs(minLatencyUs), maxLatencyUs(maxLatencyUs), opName(opName), dataType(dataType), count(count) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
struct CollTraceInfo {
|
||||
int64_t collId;
|
||||
std::string opName;
|
||||
std::string dataType;
|
||||
int64_t count;
|
||||
float latencyMs{-1};
|
||||
};
|
||||
|
||||
void reportToFile(
|
||||
const std::deque<std::vector<CollStats>>& stats,
|
||||
const std::string& commHash);
|
||||
|
||||
std::vector<CollStats> aggregateResults(
|
||||
const std::deque<std::unique_ptr<CollTraceInfo>>& info,
|
||||
const std::vector<float>& latencyAllGather,
|
||||
int RANKS_PER_HOST,
|
||||
int NCCL_COLLTRACE_RECORD_MAX);
|
||||
|
||||
float getSizeMb(const std::string& dataType, int count);
|
||||
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
// A multi-producer, single consumer queue.
|
||||
// This queue is designed to be used in scenarios where multiple producers
|
||||
// are pushing items into the queue, and a single consumer is waiting for
|
||||
// items to be available.
|
||||
|
||||
template <class Element>
|
||||
class EventQueue {
|
||||
private:
|
||||
std::deque<std::unique_ptr<Element>> queue_;
|
||||
std::condition_variable cv_;
|
||||
mutable std::mutex mutex_;
|
||||
|
||||
public:
|
||||
void push(std::unique_ptr<Element> item) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
queue_.push_back(std::move(item));
|
||||
}
|
||||
cv_.notify_one();
|
||||
}
|
||||
|
||||
std::unique_ptr<Element> waitPop() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
if (queue_.empty()) {
|
||||
cv_.wait(lock, [this] { return !queue_.empty(); });
|
||||
}
|
||||
std::unique_ptr<Element> item = std::move(queue_.front());
|
||||
queue_.pop_front();
|
||||
|
||||
return item;
|
||||
}
|
||||
};
|
||||
} // namespace latency_profiler
|
||||
@@ -56,6 +56,8 @@
|
||||
|
||||
#include "msccl/msccl_lifecycle.h"
|
||||
#include "msccl/msccl_status.h"
|
||||
#include "latency_profiler/CollTrace.h"
|
||||
#include "latency_profiler/CollTraceFunc.h"
|
||||
|
||||
#ifndef STR2
|
||||
#define STR2(v) #v
|
||||
@@ -2026,6 +2028,7 @@ static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
|
||||
NCCLCHECK(comm->tuner->init(comm->nRanks, comm->nNodes, ncclDebugLog, &comm->tunerContext));
|
||||
}
|
||||
|
||||
NCCLCHECKGOTO(latency_profiler::collTraceInit(comm), res, fail);
|
||||
// update communicator state
|
||||
comm->initState = ncclSuccess;
|
||||
timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL];
|
||||
@@ -2509,6 +2512,7 @@ static ncclResult_t commDestroySync(struct ncclAsyncJob* job_) {
|
||||
|
||||
CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), ret, fail);
|
||||
|
||||
NCCLCHECKGOTO(latency_profiler::collTraceDestroy(comm), ret, fail);
|
||||
TRACE(NCCL_INIT, "Destroying comm %p rank %d abortFlag %d asyncResult %d", comm, comm->rank, *comm->abortFlag, comm->asyncResult);
|
||||
|
||||
if (comm->initState == ncclSuccess) {
|
||||
|
||||
@@ -0,0 +1,228 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#include "latency_profiler/CollTrace.h"
|
||||
#include "bootstrap.h"
|
||||
#include "checks.h"
|
||||
#include "comm.h"
|
||||
#include "param.h"
|
||||
|
||||
NCCL_PARAM(ColltraceRecordMax, "COLLTRACE_RECORD_MAX", 100);
|
||||
NCCL_PARAM(ColltraceMaxDumpSize, "COLLTRACE_MAX_DUMP_SIZE", 20);
|
||||
NCCL_PARAM(ColltraceDumpIntervalSec, "COLLTRACE_DUMP_INTERVAL_SEC", 300);
|
||||
|
||||
constexpr int RANKS_PER_HOST = 8;
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
namespace {
|
||||
CudaEventPtr getCudaEventPtr() {
|
||||
cudaEvent_t newEvent = nullptr;
|
||||
CUDACHECKIGNORE(cudaEventCreate(&newEvent));
|
||||
CudaEventPtr item(newEvent);
|
||||
return item;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
CollTrace::CollTrace(ncclComm* comm)
|
||||
: comm_(comm),
|
||||
commHash_(std::to_string(comm->commHash)),
|
||||
rank_(comm->rank) {
|
||||
profilingWorkerThread_ =
|
||||
std::thread{[this]() { return collTraceThreadFn(comm_->cudaDev); }};
|
||||
}
|
||||
|
||||
CollTrace::~CollTrace() {
|
||||
try {
|
||||
INFO(
|
||||
NCCL_INIT,
|
||||
"COLLTRACE: commHash %s rank %d - Destroy START",
|
||||
commHash_.c_str(),
|
||||
rank_);
|
||||
eventQueue_.push(std::unique_ptr<CollTraceEvent>(
|
||||
new CollTraceEvent(CollTraceEvent::EventType::TERMINATE)));
|
||||
if (profilingWorkerThread_.joinable()) {
|
||||
profilingWorkerThread_.join();
|
||||
}
|
||||
|
||||
if (rank_ == 0) {
|
||||
reportIfNeeded(false);
|
||||
}
|
||||
|
||||
INFO(
|
||||
NCCL_INIT,
|
||||
"COLLTRACE: commHash %s rank %d - Destroy COMPLETE",
|
||||
commHash_.c_str(),
|
||||
rank_);
|
||||
} catch (const std::exception& e) {
|
||||
WARN(
|
||||
"COLLTRACE: commHash %s rank %d - Destroy FAILED: %s",
|
||||
commHash_.c_str(),
|
||||
rank_,
|
||||
e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void* CollTrace::collTraceThreadFn(int cudaDev) {
|
||||
INFO(NCCL_INIT, "CollTrace thread started for cudaDev %d", cudaDev);
|
||||
auto err = cudaSetDevice(cudaDev);
|
||||
if (err != cudaSuccess) {
|
||||
WARN("Cuda failure '%s'", cudaGetErrorString(err));
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
lastReportTime_ = std::chrono::steady_clock::now();
|
||||
|
||||
INFO(
|
||||
NCCL_INIT,
|
||||
"COLLTRACE: commHash %s rank %d - worker thread STARTED",
|
||||
commHash_.c_str(),
|
||||
rank_);
|
||||
while (true) {
|
||||
curEvent_ = eventQueue_.waitPop();
|
||||
if (curEvent_->eventType == CollTraceEvent::EventType::TERMINATE) {
|
||||
break;
|
||||
}
|
||||
curEvent_->start->waitEventFinish();
|
||||
auto ncclRes = curEvent_->stop->waitEventFinish();
|
||||
float latency = -1;
|
||||
|
||||
if (ncclRes == ncclSuccess) {
|
||||
auto latencyMaybe =
|
||||
curEvent_->stop->getElapsedTimeSinceEvent(curEvent_->start.get());
|
||||
// latencyMaybe could be nullopt when cudaEventElapsedTime failed
|
||||
// this could happen when events are not recorded or stream is not valid
|
||||
if (latencyMaybe == nullptr) {
|
||||
WARN(
|
||||
"CollTrace: getElapsedTimeSinceEvent failed, aborting worker thread");
|
||||
return nullptr;
|
||||
}
|
||||
latency = *latencyMaybe;
|
||||
}
|
||||
|
||||
recordCurCollResult(cudaDev, latency);
|
||||
curEvent_.reset();
|
||||
}
|
||||
|
||||
INFO(
|
||||
NCCL_INIT,
|
||||
"COLLTRACE: commHash %s rank %d - worker thread TERMINATE",
|
||||
commHash_.c_str(),
|
||||
rank_);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void CollTrace::enqueueEvent(std::unique_ptr<CollTraceEvent> event) {
|
||||
event->coll.collId = curCollId_.fetch_add(1);
|
||||
eventQueue_.push(std::move(event));
|
||||
}
|
||||
|
||||
std::unique_ptr<CollTraceEvent> CollTrace::createEvent(
|
||||
CollTraceEvent::EventType type) {
|
||||
auto eventInfo = std::make_unique<CollTraceEvent>(type);
|
||||
eventInfo->start = std::make_unique<CudaWaitEvent>(getCudaEventPtr());
|
||||
eventInfo->stop = std::make_unique<CudaWaitEvent>(getCudaEventPtr());
|
||||
|
||||
if (!eventInfo->start || !eventInfo->stop) {
|
||||
std::unique_ptr<CollTraceEvent> nullCollTraceEvent(nullptr);
|
||||
return nullCollTraceEvent;
|
||||
}
|
||||
return eventInfo;
|
||||
}
|
||||
|
||||
bool shouldAggregateRingBuffer(int collId) {
|
||||
const int NCCL_COLLTRACE_RECORD_MAX = ncclParamColltraceRecordMax();
|
||||
return ((collId + 1) % NCCL_COLLTRACE_RECORD_MAX == 0);
|
||||
}
|
||||
|
||||
void CollTrace::reportIfNeeded(bool checkInterval = true) {
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
auto secs_passed =
|
||||
std::chrono::duration_cast<std::chrono::seconds>(now - lastReportTime_)
|
||||
.count();
|
||||
if (checkInterval) {
|
||||
if (secs_passed < ncclParamColltraceDumpIntervalSec() &&
|
||||
stats_.size() < ncclParamColltraceMaxDumpSize()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
INFO(
|
||||
NCCL_COLL,
|
||||
"CollTrace: %ld seconds passed since last report, stats size = %zu, checkInterval = %d",
|
||||
secs_passed,
|
||||
stats_.size(),
|
||||
checkInterval);
|
||||
|
||||
// reportToScuba is a placeholder for oss environment.
|
||||
// meta production reports to scuba instead of file, which enables
|
||||
// filering, aggregation and visualization.
|
||||
#ifdef ENABLE_SCUBA_LOGGING
|
||||
reportToScuba(stats_, commHash_);
|
||||
#else
|
||||
reportToFile(stats_, commHash_);
|
||||
#endif
|
||||
lastReportTime_ = std::chrono::steady_clock::now();
|
||||
stats_.clear();
|
||||
}
|
||||
|
||||
void CollTrace::recordCurCollResult(int rank, float latency) {
|
||||
const int NCCL_COLLTRACE_RECORD_MAX = ncclParamColltraceRecordMax();
|
||||
|
||||
auto result = std::make_unique<CollTraceInfo>(curEvent_->coll);
|
||||
auto collId = result->collId;
|
||||
result->latencyMs = latency;
|
||||
|
||||
pastColls_.push_back(std::move(result));
|
||||
if (pastColls_.size() > NCCL_COLLTRACE_RECORD_MAX) {
|
||||
pastColls_.pop_front();
|
||||
}
|
||||
|
||||
if (shouldAggregateRingBuffer(collId) && pastColls_.size() > 0) {
|
||||
std::vector<float> latencyAllGather;
|
||||
latencyAllGather.resize(RANKS_PER_HOST * NCCL_COLLTRACE_RECORD_MAX, 0);
|
||||
int start = (comm_->localRank) * NCCL_COLLTRACE_RECORD_MAX;
|
||||
for (int i = start; i < start + NCCL_COLLTRACE_RECORD_MAX; i++) {
|
||||
latencyAllGather[i] = pastColls_[i - start]->latencyMs;
|
||||
}
|
||||
auto before = std::chrono::high_resolution_clock::now();
|
||||
auto ncclResult = bootstrapIntraNodeAllGather(
|
||||
comm_->bootstrap,
|
||||
comm_->localRankToRank,
|
||||
comm_->localRank,
|
||||
comm_->localRanks,
|
||||
latencyAllGather.data(),
|
||||
NCCL_COLLTRACE_RECORD_MAX * sizeof(float));
|
||||
auto after = std::chrono::high_resolution_clock::now();
|
||||
auto interval_us =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(after - before)
|
||||
.count();
|
||||
if (ncclResult != ncclSuccess) {
|
||||
WARN("CollTrace: All gather exchange latency data failed");
|
||||
return;
|
||||
}
|
||||
|
||||
if (rank == 0) {
|
||||
INFO(NCCL_COLL, "latency metrics all gather takes %ld us", interval_us);
|
||||
try {
|
||||
auto stats = aggregateResults(
|
||||
pastColls_,
|
||||
latencyAllGather,
|
||||
RANKS_PER_HOST,
|
||||
NCCL_COLLTRACE_RECORD_MAX);
|
||||
stats_.push_back(stats);
|
||||
if (stats_.size() > ncclParamColltraceMaxDumpSize()) {
|
||||
stats_.pop_front();
|
||||
}
|
||||
reportIfNeeded();
|
||||
} catch (const std::exception& e) {
|
||||
WARN("Aggregating error: %s", e.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#include <thread>
|
||||
#include "latency_profiler/CollTraceEvent.h"
|
||||
#include "param.h"
|
||||
|
||||
NCCL_PARAM(ColltraceCheckIntervalMs, "COLLTRACE_CHECK_INTERVAL_MS", 10);
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
ncclResult_t CudaWaitEvent::waitEventFinish() {
|
||||
// async polling case, query cuda whether event is ready every
|
||||
// NCCL_COLLTRACE_CHECK_INTERVAL_MS ms
|
||||
auto res = cudaEventQuery(event_.get());
|
||||
while (res != cudaSuccess) {
|
||||
if (res != cudaErrorNotReady) {
|
||||
CUDACHECK(res);
|
||||
}
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(ncclParamColltraceCheckIntervalMs()));
|
||||
res = cudaEventQuery(event_.get());
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
std::shared_ptr<float> CudaWaitEvent::getElapsedTimeSinceEvent(
|
||||
CudaWaitEvent* start) {
|
||||
float elapsedTime;
|
||||
auto res =
|
||||
cudaEventElapsedTime(&elapsedTime, start->event_.get(), event_.get());
|
||||
if (res != cudaSuccess) {
|
||||
WARN("get elapsed time failed error: %s", cudaGetErrorString(res));
|
||||
return nullptr;
|
||||
}
|
||||
return std::make_shared<float>(elapsedTime);
|
||||
}
|
||||
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#include "latency_profiler/CollTraceFunc.h"
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
namespace {
|
||||
bool enableCollTrace() {
|
||||
const char* colltraceEnable = ncclGetEnv("RCCL_LATENCY_PROFILER");
|
||||
if (colltraceEnable != NULL) {
|
||||
INFO(
|
||||
NCCL_INIT,
|
||||
"RCCL_LATENCY_PROFILER set by environment to %s.",
|
||||
colltraceEnable);
|
||||
if (strcmp(colltraceEnable, "1") == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
ncclResult_t collTraceInit(ncclComm* comm) {
|
||||
if (!enableCollTrace()) {
|
||||
return ncclSuccess;
|
||||
}
|
||||
comm->ctrace = std::make_unique<CollTrace>(comm);
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t collTraceDestroy(ncclComm* comm) {
|
||||
if (comm->ctrace == nullptr) {
|
||||
return ncclSuccess;
|
||||
}
|
||||
comm->ctrace.reset();
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t collTraceRecordStartEvent(
|
||||
ncclComm* comm,
|
||||
cudaStream_t launchStream,
|
||||
CollTraceEvent* event) {
|
||||
if (comm->ctrace && event) {
|
||||
CUDACHECK(
|
||||
cudaEventRecord(event->start.get()->getCudaEvent(), launchStream));
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t collTraceRecordEndEvent(
|
||||
ncclComm* comm,
|
||||
ncclKernelPlan* plan,
|
||||
cudaStream_t launchStream,
|
||||
std::unique_ptr<CollTraceEvent> event) {
|
||||
if (comm->ctrace && event) {
|
||||
CUDACHECK(cudaEventRecord(event->stop.get()->getCudaEvent(), launchStream));
|
||||
comm->ctrace->enqueueEvent(std::move(event));
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
CollTraceInfo parseCollInfoFromCollTask(const ncclTaskColl& collTask) {
|
||||
return CollTraceInfo{
|
||||
.opName = std::string{ncclFuncToString(collTask.func)},
|
||||
.dataType = std::string{ncclDatatypeToString(collTask.datatype)},
|
||||
.count = (int64_t)collTask.count,
|
||||
};
|
||||
}
|
||||
|
||||
std::shared_ptr<CollTraceInfo> parseCollInfoFromNcclKernelPlan(
|
||||
ncclKernelPlan& plan,
|
||||
cudaStream_t stream) {
|
||||
if (plan.comm == nullptr || plan.comm->ctrace == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
auto collTaskHead = ncclIntruQueueHead(&plan.collTaskQueue);
|
||||
if (collTaskHead == nullptr) {
|
||||
WARN("CollTrace: no coll task in this plan, this plan is empty");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
CollTraceInfo collInfo = parseCollInfoFromCollTask(*collTaskHead);
|
||||
return std::make_shared<CollTraceInfo>(collInfo);
|
||||
}
|
||||
|
||||
std::unique_ptr<CollTraceEvent> collTraceAquireEventCommon(
|
||||
ncclComm* comm,
|
||||
CollTraceEvent::EventType type,
|
||||
cudaStream_t stream) {
|
||||
if (!comm->ctrace) {
|
||||
return nullptr;
|
||||
}
|
||||
struct ncclCudaGraph graph;
|
||||
auto res = ncclCudaGetCapturingGraph(&graph, stream);
|
||||
if (res != ncclSuccess) {
|
||||
WARN("Internal error: ncclCudaGetCapturingGraph failed by %d", res);
|
||||
return nullptr;
|
||||
}
|
||||
if (graph.graph != nullptr) {
|
||||
// We are in a cuda graph, this is currently unsupported
|
||||
WARN(
|
||||
"COLLTRACE: does not support cuda graph. Collectives from comm %lx will be skipped",
|
||||
comm->commHash);
|
||||
return nullptr;
|
||||
}
|
||||
auto event = comm->ctrace->createEvent(type);
|
||||
if (!event) {
|
||||
throw CollTraceError("Event init failed");
|
||||
return nullptr; /*Event init failed*/
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
std::unique_ptr<CollTraceEvent> collTraceAquireEventBaseline(
|
||||
ncclKernelPlan* plan,
|
||||
cudaStream_t stream) {
|
||||
auto collPtr = parseCollInfoFromNcclKernelPlan(*plan, stream);
|
||||
if (collPtr == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
auto comm = plan->comm;
|
||||
if (!comm->ctrace) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto event =
|
||||
collTraceAquireEventCommon(comm, CollTraceEvent::EventType::COMM, stream);
|
||||
if (event == nullptr) {
|
||||
WARN("COLLTRACE: failed to aquire event");
|
||||
return nullptr;
|
||||
}
|
||||
event->coll = *collPtr;
|
||||
return event;
|
||||
}
|
||||
|
||||
} // namespace latency_profiler
|
||||
@@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
#include "latency_profiler/CollTraceUtils.h"
|
||||
#include "nccl_common.h"
|
||||
#include "debug.h"
|
||||
|
||||
namespace latency_profiler {
|
||||
|
||||
float getSizeMb(const std::string& dataType, int count) {
|
||||
if (dataType == "ncclInt8" || dataType == "ncclFp8E4M3" ||
|
||||
dataType == "ncclFp8E5M2") {
|
||||
return count / 1024.0 / 1024.0;
|
||||
}
|
||||
|
||||
if (dataType == "ncclFloat16" || dataType == "ncclBfloat16") {
|
||||
return 2 * count / 1024.0 / 1024.0;
|
||||
}
|
||||
|
||||
if (dataType == "ncclInt32" || dataType == "ncclUint32" ||
|
||||
dataType == "ncclFloat32") {
|
||||
return 4 * count / 1024.0 / 1024.0;
|
||||
}
|
||||
|
||||
if (dataType == "ncclInt64" || dataType == "ncclUint64" ||
|
||||
dataType == "ncclFloat64") {
|
||||
return 8 * count / 1024.0 / 1024.0;
|
||||
}
|
||||
|
||||
throw std::runtime_error("CollTrace: unsupported data type " + dataType);
|
||||
}
|
||||
|
||||
void reportToFile(
|
||||
const std::deque<std::vector<CollStats>>& stats,
|
||||
const std::string& commHash) {
|
||||
for (const auto& oneDumpStats : stats) {
|
||||
for (const auto& elem : oneDumpStats) {
|
||||
auto size_mb = getSizeMb(elem.dataType, elem.count);
|
||||
INFO(NCCL_COLL, "coll_id %ld, percent %d, min_latency_us %f, max_latency_us %f, op_name %s, data_type %s, count %ld, message_size_MB %f, comm_hash %s", elem.collId, elem.percent, elem.minLatencyUs, elem.maxLatencyUs, elem.opName.c_str(), elem.dataType.c_str(), elem.count, size_mb, commHash.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<CollStats> aggregateResults(
|
||||
const std::deque<std::unique_ptr<CollTraceInfo>>& info,
|
||||
const std::vector<float>& latencyAllGather,
|
||||
int RANKS_PER_HOST,
|
||||
int NCCL_COLLTRACE_RECORD_MAX) {
|
||||
std::vector<std::pair<float, float>> latencyMetrics;
|
||||
for (auto rank = 0; rank < RANKS_PER_HOST; rank++) {
|
||||
for (auto i = 0; i < NCCL_COLLTRACE_RECORD_MAX; i++) {
|
||||
auto val = latencyAllGather.at(rank * NCCL_COLLTRACE_RECORD_MAX + i);
|
||||
if (val == 0) {
|
||||
throw std::runtime_error(
|
||||
"CollTrace: latency value cannot be zero, CPU all gather failed");
|
||||
}
|
||||
if (rank == 0) {
|
||||
latencyMetrics.emplace_back(val, val);
|
||||
} else {
|
||||
latencyMetrics.at(i).first =
|
||||
std::min<float>(latencyMetrics.at(i).first, val);
|
||||
latencyMetrics.at(i).second =
|
||||
std::max<float>(latencyMetrics.at(i).second, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
std::vector<CollStats> results;
|
||||
for (int i = 0; i < info.size(); i++) {
|
||||
int percent = 100 *
|
||||
(latencyMetrics.at(i).second - latencyMetrics.at(i).first) /
|
||||
latencyMetrics.at(i).first;
|
||||
results.emplace_back(CollStats(
|
||||
(int)info[i]->collId,
|
||||
percent,
|
||||
latencyMetrics.at(i).first * 1000,
|
||||
latencyMetrics.at(i).second * 1000,
|
||||
info[i]->opName,
|
||||
info[i]->dataType,
|
||||
info[i]->count));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
} // namespace latency_profiler
|
||||
@@ -110,6 +110,9 @@ if(BUILD_TESTS)
|
||||
common/TestBedChild.cpp
|
||||
common/StandaloneUtils.cpp
|
||||
proxy_trace/ProxyTraceUnitTests.cpp
|
||||
../src/misc/proxy_trace/proxy_trace.cc
|
||||
latency_profiler/LatencyProfilerUnitTest.cpp
|
||||
../src/misc/latency_profiler/CollTraceUtils.cc
|
||||
)
|
||||
|
||||
# Due to default hidden symbol visibility, append source file if build type is not Debug.
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
|
||||
#include "latency_profiler/CollTraceUtils.h"
|
||||
#include "latency_profiler/EventQueue.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
|
||||
using latency_profiler::CollStats;
|
||||
using latency_profiler::CollTraceInfo;
|
||||
using latency_profiler::EventQueue;
|
||||
|
||||
namespace RcclUnitTesting {
|
||||
TEST(CollTraceUtilsTest, aggregateResultsTest) {
|
||||
std::deque<std::unique_ptr<CollTraceInfo>> results;
|
||||
// Host has 2 ranks, Ring buffer has 3 records
|
||||
auto info1 = CollTraceInfo{.collId = 123, .opName = "allReduce", .dataType = "float32", .count = 10};
|
||||
auto info2 = CollTraceInfo{.collId = 127, .opName = "allReduce", .dataType = "int64", .count = 20};
|
||||
auto info3 = CollTraceInfo{.collId = 200, .opName = "allGather", .dataType = "float32", .count = 50};
|
||||
results.emplace_back(
|
||||
std::make_unique<CollTraceInfo>(info1));
|
||||
results.emplace_back(
|
||||
std::make_unique<CollTraceInfo>(info2));
|
||||
results.emplace_back(
|
||||
std::make_unique<CollTraceInfo>(info3));
|
||||
|
||||
std::vector<float> latencyAllGather = {10, 20, 50, 15, 21, 45};
|
||||
auto stats = aggregateResults(
|
||||
results,
|
||||
latencyAllGather,
|
||||
2 /* ranks per host */,
|
||||
3 /* element per rank */);
|
||||
EXPECT_EQ(3, stats.size());
|
||||
std::vector<CollStats> expected = {
|
||||
CollStats(123, 50, 10000, 15000, "allReduce", "int64", 0),
|
||||
CollStats(127, 5, 20000, 21000, "allReduce", "int64", 0),
|
||||
CollStats(200, 11, 45000, 50000, "allReduce", "int64", 0)
|
||||
};
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
EXPECT_EQ(stats[i].collId, expected[i].collId);
|
||||
EXPECT_EQ(stats[i].percent, expected[i].percent);
|
||||
EXPECT_EQ(stats[i].minLatencyUs, expected[i].minLatencyUs);
|
||||
EXPECT_EQ(stats[i].maxLatencyUs, expected[i].maxLatencyUs);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(CollTraceUtilsTest, EventQueueOperationTest) {
|
||||
EventQueue<int> q;
|
||||
q.push(std::make_unique<int>(5));
|
||||
q.push(std::make_unique<int>(100));
|
||||
auto res1 = q.waitPop();
|
||||
EXPECT_EQ(*res1, 5);
|
||||
auto res2 = q.waitPop();
|
||||
EXPECT_EQ(*res2, 100);
|
||||
}
|
||||
|
||||
void producer(EventQueue<std::string>& q, const std::string& str) {
|
||||
q.push(std::make_unique<std::string>(str));
|
||||
}
|
||||
|
||||
void consumer(EventQueue<std::string>& q, std::vector<std::string>& results) {
|
||||
results.clear();
|
||||
auto res1 = q.waitPop();
|
||||
results.push_back(*res1);
|
||||
auto res2 = q.waitPop();
|
||||
results.push_back(*res2);
|
||||
}
|
||||
|
||||
TEST(CollTraceUtilsTest, EventQueueMultiThreadTest) {
|
||||
EventQueue<std::string> q;
|
||||
std::vector<std::string> results;
|
||||
std::thread t0(consumer, std::ref(q), std::ref(results));
|
||||
std::thread t1(producer, std::ref(q), "hello");
|
||||
std::thread t2(producer, std::ref(q), "world");
|
||||
t0.join();
|
||||
t1.join();
|
||||
t2.join();
|
||||
EXPECT_TRUE(results[0] == "hello" || results[0] == "world");
|
||||
EXPECT_TRUE(results[1] == "hello" || results[1] == "world");
|
||||
}
|
||||
|
||||
TEST(CollTraceUtilsTest, getSizeMbTest) {
|
||||
std::unordered_map<int, std::vector<std::string>> bytesToTypes = {
|
||||
{1, {"ncclInt8", "ncclFp8E4M3", "ncclFp8E5M2"}},
|
||||
{2, {"ncclFloat16", "ncclBfloat16"}},
|
||||
{4, {"ncclInt32", "ncclUint32", "ncclFloat32"}},
|
||||
{8, {"ncclInt64", "ncclUint64", "ncclFloat64"}}};
|
||||
for (const auto& it : bytesToTypes) {
|
||||
auto types = it.second;
|
||||
for (const auto& type : types) {
|
||||
auto mb = latency_profiler::getSizeMb(type, 1024 * 1024);
|
||||
EXPECT_NEAR(mb, it.first, 0.01);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Criar uma nova questão referindo esta
Bloquear um utilizador