diff --git a/projects/rccl/CMakeLists.txt b/projects/rccl/CMakeLists.txt index 1b39e4898d..f5cff2ca01 100644 --- a/projects/rccl/CMakeLists.txt +++ b/projects/rccl/CMakeLists.txt @@ -652,6 +652,15 @@ set(SRC_FILES src/transport/p2p.cc src/transport/profiler.cc src/transport/shm.cc + src/include/latency_profiler/CollTrace.h + src/include/latency_profiler/CollTraceEvent.h + src/include/latency_profiler/CollTraceFunc.h + src/include/latency_profiler/CollTraceUtils.h + src/include/latency_profiler/EventQueue.h + src/misc/latency_profiler/CollTrace.cc + src/misc/latency_profiler/CollTraceEvent.cc + src/misc/latency_profiler/CollTraceFunc.cc + src/misc/latency_profiler/CollTraceUtils.cc ) if (ENABLE_MSCCL_KERNEL) diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index e9934e05d1..6e9b47ed87 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -25,6 +25,7 @@ #include // std::memcpy #include // PRIx64 #include +#include "latency_profiler/CollTraceFunc.h" using namespace rccl; @@ -1656,9 +1657,12 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan cudaStream_t launchStream = planner->streams->stream; void* extra[] = {plan->kernelArgs, &plan->kernelArgsSize}; + auto event = latency_profiler::collTraceAquireEventBaseline(plan, launchStream); if (planner->numStreams == 1 && !plan->persistent) { + latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get()); comm->lastStream = planner->streams->stream; CUDACHECKGOTO(hipExtLaunchKernel(plan->kernelFn, grid, block, extra, 0, launchStream, NULL, comm->doneEvent, 0), ret, do_return); + latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event)); return ncclSuccess; } @@ -1727,16 +1731,22 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan launchConfig.numAttrs = attrs; launchConfig.hStream = launchStream; + latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get()); CUCHECKGOTO(cuLaunchKernelEx(&launchConfig, fn, nullptr, extra), ret, do_return); + latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event)); #endif } else { // Standard kernel launch + latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get()); CUCHECKGOTO(cuLaunchKernel(fn, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra), ret, do_return); + latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event)); } #endif // Standard kernel launch //cuLaunchKernel(sym, grid.x, grid.y, grid.z, block.x, block.y, block.z, smem, launchStream, nullptr, extra); + latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get()); CUDACHECKGOTO(cudaLaunchKernel(sym, grid, block, extra, smem, launchStream), ret, do_return); + latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event)); do_return: return ret; diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index d918f6eee4..d0ead1c8d3 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -19,6 +19,7 @@ #include "graph.h" #include "nvmlwrap.h" #include "profiler.h" +#include "latency_profiler/CollTrace.h" #include "rccl_common.h" #include "recorder.h" @@ -634,6 +635,7 @@ struct ncclComm { hipEvent_t doneEvent; hipStream_t lastStream; + std::unique_ptr ctrace; #ifdef ENABLE_COLLTRACE struct ncclCollTrace* collTrace; diff --git a/projects/rccl/src/include/latency_profiler/CollTrace.h b/projects/rccl/src/include/latency_profiler/CollTrace.h new file mode 100644 index 0000000000..cca94d1eae --- /dev/null +++ b/projects/rccl/src/include/latency_profiler/CollTrace.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#pragma once + +#include +#include +#include "CollTraceEvent.h" +#include "CollTraceUtils.h" +#include "EventQueue.h" + +struct ncclComm; + +namespace latency_profiler { + +struct CollStats; + +// CollTrace Workflow +// 1) measure collective latency by adding cuda/hip event before and after +// a kernel launch in RCCL. + +// 2) Started a separate worker thread to collect latency data into +// its local ring buffer. + +// 3) Perform CPU level all gather to exchange latency data +// between threads (200 to 300us per 100 latency data exchange) + +// 4) Report to scuba when results buffer is full or every few minutes + +class CollTrace { + public: + CollTrace(ncclComm* comm); + __attribute__((visibility("default"))) ~CollTrace(); + + void enqueueEvent(std::unique_ptr event); + + std::unique_ptr createEvent( + CollTraceEvent::EventType type = CollTraceEvent::EventType::COMM); + + void recordCurCollResult(int rank, float latencyMs); + + void reportIfNeeded(bool checkInterval); + + private: + EventQueue eventQueue_; + std::thread profilingWorkerThread_; + void* collTraceThreadFn(int cudaDev); + std::atomic curCollId_{0}; + std::unique_ptr curEvent_; + std::deque> pastColls_; + + ncclComm* comm_{nullptr}; + std::string commHash_; + int rank_{-1}; + std::deque> stats_; + std::chrono::time_point lastReportTime_; +}; +} // namespace latency_profiler diff --git a/projects/rccl/src/include/latency_profiler/CollTraceEvent.h b/projects/rccl/src/include/latency_profiler/CollTraceEvent.h new file mode 100644 index 0000000000..7a80e87e01 --- /dev/null +++ b/projects/rccl/src/include/latency_profiler/CollTraceEvent.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#pragma once + +#include +#include +#include +#include +#include "checks.h" +#include "CollTraceUtils.h" + +namespace latency_profiler { + +// CUDA event pointer w/ deleter +struct CudaEventDeleter { + void operator()(cudaEvent_t e) { + // Ignore error at destroy + cudaEventDestroy(e); + } +}; +using CudaEventPtr = std::unique_ptr< + std::pointer_traits::element_type, + CudaEventDeleter>; + +// a wrapper class for cuda event +class CudaWaitEvent { + public: + CudaWaitEvent(CudaEventPtr e) : event_(std::move(e)) {} + ~CudaWaitEvent() {} + + cudaEvent_t getCudaEvent() { + return event_.get(); + } + + std::shared_ptr getElapsedTimeSinceEvent(CudaWaitEvent* start); + ncclResult_t waitEventFinish(); + + private: + CudaEventPtr event_; +}; + +// Event data structure +struct CollTraceEvent { + enum class EventType { COMM, TERMINATE }; + + CollTraceInfo coll; + std::unique_ptr start{nullptr}; + std::unique_ptr stop{nullptr}; + EventType eventType = EventType::COMM; + + CollTraceEvent(EventType type) : eventType(type) {} + CollTraceEvent() = default; + + ~CollTraceEvent() {} + + // CollTraceEvent is not copyable + CollTraceEvent(const CollTraceEvent&) = delete; + CollTraceEvent& operator=(const CollTraceEvent&) = delete; + + // CollTraceEvent is movable + CollTraceEvent(CollTraceEvent&&) = default; + CollTraceEvent& operator=(CollTraceEvent&&) = default; +}; +} // namespace latency_profiler diff --git a/projects/rccl/src/include/latency_profiler/CollTraceFunc.h b/projects/rccl/src/include/latency_profiler/CollTraceFunc.h new file mode 100644 index 0000000000..99e4c7c818 --- /dev/null +++ b/projects/rccl/src/include/latency_profiler/CollTraceFunc.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#pragma once + +#include "CollTraceEvent.h" +#include "comm.h" +#include "CollTraceUtils.h" + +namespace latency_profiler { +class CollTraceError : public std::runtime_error { + public: + explicit CollTraceError(const std::string& what) : std::runtime_error(what) {} +}; + +ncclResult_t collTraceInit(ncclComm* comm); + +ncclResult_t collTraceDestroy(ncclComm* comm); + +std::unique_ptr collTraceAquireEventBaseline( + ncclKernelPlan* plan, + cudaStream_t stream); + +ncclResult_t collTraceRecordStartEvent( + ncclComm* comm, + cudaStream_t launchStream, + CollTraceEvent* event); + +ncclResult_t collTraceRecordEndEvent( + ncclComm* comm, + ncclKernelPlan* plan, + cudaStream_t launchStream, + std::unique_ptr event); + +CollTraceInfo parseCollInfoFromCollTask(const ncclTaskColl& collTask); +} // namespace latency_profiler diff --git a/projects/rccl/src/include/latency_profiler/CollTraceUtils.h b/projects/rccl/src/include/latency_profiler/CollTraceUtils.h new file mode 100644 index 0000000000..48af7b83c2 --- /dev/null +++ b/projects/rccl/src/include/latency_profiler/CollTraceUtils.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#pragma once + +#include +#include +#include +#include + +namespace latency_profiler { + +struct CollStats { + const int collId; + const int percent; + const float minLatencyUs; + const float maxLatencyUs; + const std::string opName; + const std::string dataType; + int64_t count; + CollStats(const int collId, const int percent, const float minLatencyUs, const float maxLatencyUs, const std::string& opName, const std::string& dataType, const int64_t count) : collId(collId), percent(percent), minLatencyUs(minLatencyUs), maxLatencyUs(maxLatencyUs), opName(opName), dataType(dataType), count(count) { + + } +}; + +struct CollTraceInfo { + int64_t collId; + std::string opName; + std::string dataType; + int64_t count; + float latencyMs{-1}; +}; + +void reportToFile( + const std::deque>& stats, + const std::string& commHash); + +std::vector aggregateResults( + const std::deque>& info, + const std::vector& latencyAllGather, + int RANKS_PER_HOST, + int NCCL_COLLTRACE_RECORD_MAX); + +float getSizeMb(const std::string& dataType, int count); + +} // namespace latency_profiler diff --git a/projects/rccl/src/include/latency_profiler/EventQueue.h b/projects/rccl/src/include/latency_profiler/EventQueue.h new file mode 100644 index 0000000000..0d05a94265 --- /dev/null +++ b/projects/rccl/src/include/latency_profiler/EventQueue.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#pragma once + +#include +#include + +namespace latency_profiler { + +// A multi-producer, single consumer queue. +// This queue is designed to be used in scenarios where multiple producers +// are pushing items into the queue, and a single consumer is waiting for +// items to be available. + +template +class EventQueue { + private: + std::deque> queue_; + std::condition_variable cv_; + mutable std::mutex mutex_; + + public: + void push(std::unique_ptr item) { + { + std::lock_guard lock(mutex_); + queue_.push_back(std::move(item)); + } + cv_.notify_one(); + } + + std::unique_ptr waitPop() { + std::unique_lock lock(mutex_); + if (queue_.empty()) { + cv_.wait(lock, [this] { return !queue_.empty(); }); + } + std::unique_ptr item = std::move(queue_.front()); + queue_.pop_front(); + + return item; + } +}; +} // namespace latency_profiler diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index e8a8ca79e8..392d6b4c7c 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -56,6 +56,8 @@ #include "msccl/msccl_lifecycle.h" #include "msccl/msccl_status.h" +#include "latency_profiler/CollTrace.h" +#include "latency_profiler/CollTraceFunc.h" #ifndef STR2 #define STR2(v) #v @@ -2026,6 +2028,7 @@ static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) { NCCLCHECK(comm->tuner->init(comm->nRanks, comm->nNodes, ncclDebugLog, &comm->tunerContext)); } + NCCLCHECKGOTO(latency_profiler::collTraceInit(comm), res, fail); // update communicator state comm->initState = ncclSuccess; timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL]; @@ -2509,6 +2512,7 @@ static ncclResult_t commDestroySync(struct ncclAsyncJob* job_) { CUDACHECKGOTO(cudaSetDevice(comm->cudaDev), ret, fail); + NCCLCHECKGOTO(latency_profiler::collTraceDestroy(comm), ret, fail); TRACE(NCCL_INIT, "Destroying comm %p rank %d abortFlag %d asyncResult %d", comm, comm->rank, *comm->abortFlag, comm->asyncResult); if (comm->initState == ncclSuccess) { diff --git a/projects/rccl/src/misc/latency_profiler/CollTrace.cc b/projects/rccl/src/misc/latency_profiler/CollTrace.cc new file mode 100644 index 0000000000..fd862c0608 --- /dev/null +++ b/projects/rccl/src/misc/latency_profiler/CollTrace.cc @@ -0,0 +1,228 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#include "latency_profiler/CollTrace.h" +#include "bootstrap.h" +#include "checks.h" +#include "comm.h" +#include "param.h" + +NCCL_PARAM(ColltraceRecordMax, "COLLTRACE_RECORD_MAX", 100); +NCCL_PARAM(ColltraceMaxDumpSize, "COLLTRACE_MAX_DUMP_SIZE", 20); +NCCL_PARAM(ColltraceDumpIntervalSec, "COLLTRACE_DUMP_INTERVAL_SEC", 300); + +constexpr int RANKS_PER_HOST = 8; + +namespace latency_profiler { + +namespace { +CudaEventPtr getCudaEventPtr() { + cudaEvent_t newEvent = nullptr; + CUDACHECKIGNORE(cudaEventCreate(&newEvent)); + CudaEventPtr item(newEvent); + return item; +} +} // namespace + +CollTrace::CollTrace(ncclComm* comm) + : comm_(comm), + commHash_(std::to_string(comm->commHash)), + rank_(comm->rank) { + profilingWorkerThread_ = + std::thread{[this]() { return collTraceThreadFn(comm_->cudaDev); }}; +} + +CollTrace::~CollTrace() { + try { + INFO( + NCCL_INIT, + "COLLTRACE: commHash %s rank %d - Destroy START", + commHash_.c_str(), + rank_); + eventQueue_.push(std::unique_ptr( + new CollTraceEvent(CollTraceEvent::EventType::TERMINATE))); + if (profilingWorkerThread_.joinable()) { + profilingWorkerThread_.join(); + } + + if (rank_ == 0) { + reportIfNeeded(false); + } + + INFO( + NCCL_INIT, + "COLLTRACE: commHash %s rank %d - Destroy COMPLETE", + commHash_.c_str(), + rank_); + } catch (const std::exception& e) { + WARN( + "COLLTRACE: commHash %s rank %d - Destroy FAILED: %s", + commHash_.c_str(), + rank_, + e.what()); + } +} + +void* CollTrace::collTraceThreadFn(int cudaDev) { + INFO(NCCL_INIT, "CollTrace thread started for cudaDev %d", cudaDev); + auto err = cudaSetDevice(cudaDev); + if (err != cudaSuccess) { + WARN("Cuda failure '%s'", cudaGetErrorString(err)); + return nullptr; + } + + lastReportTime_ = std::chrono::steady_clock::now(); + + INFO( + NCCL_INIT, + "COLLTRACE: commHash %s rank %d - worker thread STARTED", + commHash_.c_str(), + rank_); + while (true) { + curEvent_ = eventQueue_.waitPop(); + if (curEvent_->eventType == CollTraceEvent::EventType::TERMINATE) { + break; + } + curEvent_->start->waitEventFinish(); + auto ncclRes = curEvent_->stop->waitEventFinish(); + float latency = -1; + + if (ncclRes == ncclSuccess) { + auto latencyMaybe = + curEvent_->stop->getElapsedTimeSinceEvent(curEvent_->start.get()); + // latencyMaybe could be nullopt when cudaEventElapsedTime failed + // this could happen when events are not recorded or stream is not valid + if (latencyMaybe == nullptr) { + WARN( + "CollTrace: getElapsedTimeSinceEvent failed, aborting worker thread"); + return nullptr; + } + latency = *latencyMaybe; + } + + recordCurCollResult(cudaDev, latency); + curEvent_.reset(); + } + + INFO( + NCCL_INIT, + "COLLTRACE: commHash %s rank %d - worker thread TERMINATE", + commHash_.c_str(), + rank_); + return nullptr; +} + +void CollTrace::enqueueEvent(std::unique_ptr event) { + event->coll.collId = curCollId_.fetch_add(1); + eventQueue_.push(std::move(event)); +} + +std::unique_ptr CollTrace::createEvent( + CollTraceEvent::EventType type) { + auto eventInfo = std::make_unique(type); + eventInfo->start = std::make_unique(getCudaEventPtr()); + eventInfo->stop = std::make_unique(getCudaEventPtr()); + + if (!eventInfo->start || !eventInfo->stop) { + std::unique_ptr nullCollTraceEvent(nullptr); + return nullCollTraceEvent; + } + return eventInfo; +} + +bool shouldAggregateRingBuffer(int collId) { + const int NCCL_COLLTRACE_RECORD_MAX = ncclParamColltraceRecordMax(); + return ((collId + 1) % NCCL_COLLTRACE_RECORD_MAX == 0); +} + +void CollTrace::reportIfNeeded(bool checkInterval = true) { + auto now = std::chrono::steady_clock::now(); + auto secs_passed = + std::chrono::duration_cast(now - lastReportTime_) + .count(); + if (checkInterval) { + if (secs_passed < ncclParamColltraceDumpIntervalSec() && + stats_.size() < ncclParamColltraceMaxDumpSize()) { + return; + } + } + + INFO( + NCCL_COLL, + "CollTrace: %ld seconds passed since last report, stats size = %zu, checkInterval = %d", + secs_passed, + stats_.size(), + checkInterval); + +// reportToScuba is a placeholder for oss environment. +// meta production reports to scuba instead of file, which enables +// filering, aggregation and visualization. +#ifdef ENABLE_SCUBA_LOGGING + reportToScuba(stats_, commHash_); +#else + reportToFile(stats_, commHash_); +#endif + lastReportTime_ = std::chrono::steady_clock::now(); + stats_.clear(); +} + +void CollTrace::recordCurCollResult(int rank, float latency) { + const int NCCL_COLLTRACE_RECORD_MAX = ncclParamColltraceRecordMax(); + + auto result = std::make_unique(curEvent_->coll); + auto collId = result->collId; + result->latencyMs = latency; + + pastColls_.push_back(std::move(result)); + if (pastColls_.size() > NCCL_COLLTRACE_RECORD_MAX) { + pastColls_.pop_front(); + } + + if (shouldAggregateRingBuffer(collId) && pastColls_.size() > 0) { + std::vector latencyAllGather; + latencyAllGather.resize(RANKS_PER_HOST * NCCL_COLLTRACE_RECORD_MAX, 0); + int start = (comm_->localRank) * NCCL_COLLTRACE_RECORD_MAX; + for (int i = start; i < start + NCCL_COLLTRACE_RECORD_MAX; i++) { + latencyAllGather[i] = pastColls_[i - start]->latencyMs; + } + auto before = std::chrono::high_resolution_clock::now(); + auto ncclResult = bootstrapIntraNodeAllGather( + comm_->bootstrap, + comm_->localRankToRank, + comm_->localRank, + comm_->localRanks, + latencyAllGather.data(), + NCCL_COLLTRACE_RECORD_MAX * sizeof(float)); + auto after = std::chrono::high_resolution_clock::now(); + auto interval_us = + std::chrono::duration_cast(after - before) + .count(); + if (ncclResult != ncclSuccess) { + WARN("CollTrace: All gather exchange latency data failed"); + return; + } + + if (rank == 0) { + INFO(NCCL_COLL, "latency metrics all gather takes %ld us", interval_us); + try { + auto stats = aggregateResults( + pastColls_, + latencyAllGather, + RANKS_PER_HOST, + NCCL_COLLTRACE_RECORD_MAX); + stats_.push_back(stats); + if (stats_.size() > ncclParamColltraceMaxDumpSize()) { + stats_.pop_front(); + } + reportIfNeeded(); + } catch (const std::exception& e) { + WARN("Aggregating error: %s", e.what()); + } + } + } +} + +} // namespace latency_profiler diff --git a/projects/rccl/src/misc/latency_profiler/CollTraceEvent.cc b/projects/rccl/src/misc/latency_profiler/CollTraceEvent.cc new file mode 100644 index 0000000000..2203b4c211 --- /dev/null +++ b/projects/rccl/src/misc/latency_profiler/CollTraceEvent.cc @@ -0,0 +1,42 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#include +#include "latency_profiler/CollTraceEvent.h" +#include "param.h" + +NCCL_PARAM(ColltraceCheckIntervalMs, "COLLTRACE_CHECK_INTERVAL_MS", 10); + +namespace latency_profiler { + +ncclResult_t CudaWaitEvent::waitEventFinish() { + // async polling case, query cuda whether event is ready every + // NCCL_COLLTRACE_CHECK_INTERVAL_MS ms + auto res = cudaEventQuery(event_.get()); + while (res != cudaSuccess) { + if (res != cudaErrorNotReady) { + CUDACHECK(res); + } + std::this_thread::sleep_for( + std::chrono::milliseconds(ncclParamColltraceCheckIntervalMs())); + res = cudaEventQuery(event_.get()); + } + return ncclSuccess; +} + +std::shared_ptr CudaWaitEvent::getElapsedTimeSinceEvent( + CudaWaitEvent* start) { + float elapsedTime; + auto res = + cudaEventElapsedTime(&elapsedTime, start->event_.get(), event_.get()); + if (res != cudaSuccess) { + WARN("get elapsed time failed error: %s", cudaGetErrorString(res)); + return nullptr; + } + return std::make_shared(elapsedTime); +} + +} // namespace latency_profiler diff --git a/projects/rccl/src/misc/latency_profiler/CollTraceFunc.cc b/projects/rccl/src/misc/latency_profiler/CollTraceFunc.cc new file mode 100644 index 0000000000..4b46172b84 --- /dev/null +++ b/projects/rccl/src/misc/latency_profiler/CollTraceFunc.cc @@ -0,0 +1,140 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#include "latency_profiler/CollTraceFunc.h" + +namespace latency_profiler { + +namespace { +bool enableCollTrace() { + const char* colltraceEnable = ncclGetEnv("RCCL_LATENCY_PROFILER"); + if (colltraceEnable != NULL) { + INFO( + NCCL_INIT, + "RCCL_LATENCY_PROFILER set by environment to %s.", + colltraceEnable); + if (strcmp(colltraceEnable, "1") == 0) { + return true; + } + } + return false; +} +} // namespace + +ncclResult_t collTraceInit(ncclComm* comm) { + if (!enableCollTrace()) { + return ncclSuccess; + } + comm->ctrace = std::make_unique(comm); + return ncclSuccess; +} + +ncclResult_t collTraceDestroy(ncclComm* comm) { + if (comm->ctrace == nullptr) { + return ncclSuccess; + } + comm->ctrace.reset(); + return ncclSuccess; +} + +ncclResult_t collTraceRecordStartEvent( + ncclComm* comm, + cudaStream_t launchStream, + CollTraceEvent* event) { + if (comm->ctrace && event) { + CUDACHECK( + cudaEventRecord(event->start.get()->getCudaEvent(), launchStream)); + } + return ncclSuccess; +} + +ncclResult_t collTraceRecordEndEvent( + ncclComm* comm, + ncclKernelPlan* plan, + cudaStream_t launchStream, + std::unique_ptr event) { + if (comm->ctrace && event) { + CUDACHECK(cudaEventRecord(event->stop.get()->getCudaEvent(), launchStream)); + comm->ctrace->enqueueEvent(std::move(event)); + } + return ncclSuccess; +} + +CollTraceInfo parseCollInfoFromCollTask(const ncclTaskColl& collTask) { + return CollTraceInfo{ + .opName = std::string{ncclFuncToString(collTask.func)}, + .dataType = std::string{ncclDatatypeToString(collTask.datatype)}, + .count = (int64_t)collTask.count, + }; +} + +std::shared_ptr parseCollInfoFromNcclKernelPlan( + ncclKernelPlan& plan, + cudaStream_t stream) { + if (plan.comm == nullptr || plan.comm->ctrace == nullptr) { + return nullptr; + } + auto collTaskHead = ncclIntruQueueHead(&plan.collTaskQueue); + if (collTaskHead == nullptr) { + WARN("CollTrace: no coll task in this plan, this plan is empty"); + return nullptr; + } + + CollTraceInfo collInfo = parseCollInfoFromCollTask(*collTaskHead); + return std::make_shared(collInfo); +} + +std::unique_ptr collTraceAquireEventCommon( + ncclComm* comm, + CollTraceEvent::EventType type, + cudaStream_t stream) { + if (!comm->ctrace) { + return nullptr; + } + struct ncclCudaGraph graph; + auto res = ncclCudaGetCapturingGraph(&graph, stream); + if (res != ncclSuccess) { + WARN("Internal error: ncclCudaGetCapturingGraph failed by %d", res); + return nullptr; + } + if (graph.graph != nullptr) { + // We are in a cuda graph, this is currently unsupported + WARN( + "COLLTRACE: does not support cuda graph. Collectives from comm %lx will be skipped", + comm->commHash); + return nullptr; + } + auto event = comm->ctrace->createEvent(type); + if (!event) { + throw CollTraceError("Event init failed"); + return nullptr; /*Event init failed*/ + } + return event; +} + +std::unique_ptr collTraceAquireEventBaseline( + ncclKernelPlan* plan, + cudaStream_t stream) { + auto collPtr = parseCollInfoFromNcclKernelPlan(*plan, stream); + if (collPtr == nullptr) { + return nullptr; + } + auto comm = plan->comm; + if (!comm->ctrace) { + return nullptr; + } + + auto event = + collTraceAquireEventCommon(comm, CollTraceEvent::EventType::COMM, stream); + if (event == nullptr) { + WARN("COLLTRACE: failed to aquire event"); + return nullptr; + } + event->coll = *collPtr; + return event; +} + +} // namespace latency_profiler diff --git a/projects/rccl/src/misc/latency_profiler/CollTraceUtils.cc b/projects/rccl/src/misc/latency_profiler/CollTraceUtils.cc new file mode 100644 index 0000000000..af33f22cd5 --- /dev/null +++ b/projects/rccl/src/misc/latency_profiler/CollTraceUtils.cc @@ -0,0 +1,87 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +#include "latency_profiler/CollTraceUtils.h" +#include "nccl_common.h" +#include "debug.h" + +namespace latency_profiler { + +float getSizeMb(const std::string& dataType, int count) { + if (dataType == "ncclInt8" || dataType == "ncclFp8E4M3" || + dataType == "ncclFp8E5M2") { + return count / 1024.0 / 1024.0; + } + + if (dataType == "ncclFloat16" || dataType == "ncclBfloat16") { + return 2 * count / 1024.0 / 1024.0; + } + + if (dataType == "ncclInt32" || dataType == "ncclUint32" || + dataType == "ncclFloat32") { + return 4 * count / 1024.0 / 1024.0; + } + + if (dataType == "ncclInt64" || dataType == "ncclUint64" || + dataType == "ncclFloat64") { + return 8 * count / 1024.0 / 1024.0; + } + + throw std::runtime_error("CollTrace: unsupported data type " + dataType); +} + +void reportToFile( + const std::deque>& stats, + const std::string& commHash) { + for (const auto& oneDumpStats : stats) { + for (const auto& elem : oneDumpStats) { + auto size_mb = getSizeMb(elem.dataType, elem.count); + INFO(NCCL_COLL, "coll_id %ld, percent %d, min_latency_us %f, max_latency_us %f, op_name %s, data_type %s, count %ld, message_size_MB %f, comm_hash %s", elem.collId, elem.percent, elem.minLatencyUs, elem.maxLatencyUs, elem.opName.c_str(), elem.dataType.c_str(), elem.count, size_mb, commHash.c_str()); + } + } +} + +std::vector aggregateResults( + const std::deque>& info, + const std::vector& latencyAllGather, + int RANKS_PER_HOST, + int NCCL_COLLTRACE_RECORD_MAX) { + std::vector> latencyMetrics; + for (auto rank = 0; rank < RANKS_PER_HOST; rank++) { + for (auto i = 0; i < NCCL_COLLTRACE_RECORD_MAX; i++) { + auto val = latencyAllGather.at(rank * NCCL_COLLTRACE_RECORD_MAX + i); + if (val == 0) { + throw std::runtime_error( + "CollTrace: latency value cannot be zero, CPU all gather failed"); + } + if (rank == 0) { + latencyMetrics.emplace_back(val, val); + } else { + latencyMetrics.at(i).first = + std::min(latencyMetrics.at(i).first, val); + latencyMetrics.at(i).second = + std::max(latencyMetrics.at(i).second, val); + } + } + } + std::vector results; + for (int i = 0; i < info.size(); i++) { + int percent = 100 * + (latencyMetrics.at(i).second - latencyMetrics.at(i).first) / + latencyMetrics.at(i).first; + results.emplace_back(CollStats( + (int)info[i]->collId, + percent, + latencyMetrics.at(i).first * 1000, + latencyMetrics.at(i).second * 1000, + info[i]->opName, + info[i]->dataType, + info[i]->count)); + } + return results; +} + +} // namespace latency_profiler diff --git a/projects/rccl/test/CMakeLists.txt b/projects/rccl/test/CMakeLists.txt index 24024a9e3f..2d9ee4685e 100644 --- a/projects/rccl/test/CMakeLists.txt +++ b/projects/rccl/test/CMakeLists.txt @@ -110,6 +110,9 @@ if(BUILD_TESTS) common/TestBedChild.cpp common/StandaloneUtils.cpp proxy_trace/ProxyTraceUnitTests.cpp + ../src/misc/proxy_trace/proxy_trace.cc + latency_profiler/LatencyProfilerUnitTest.cpp + ../src/misc/latency_profiler/CollTraceUtils.cc ) # Due to default hidden symbol visibility, append source file if build type is not Debug. diff --git a/projects/rccl/test/latency_profiler/LatencyProfilerUnitTest.cpp b/projects/rccl/test/latency_profiler/LatencyProfilerUnitTest.cpp new file mode 100644 index 0000000000..ae74b6292c --- /dev/null +++ b/projects/rccl/test/latency_profiler/LatencyProfilerUnitTest.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "latency_profiler/CollTraceUtils.h" +#include "latency_profiler/EventQueue.h" + +#include +#include +#include + +using latency_profiler::CollStats; +using latency_profiler::CollTraceInfo; +using latency_profiler::EventQueue; + +namespace RcclUnitTesting { + TEST(CollTraceUtilsTest, aggregateResultsTest) { + std::deque> results; + // Host has 2 ranks, Ring buffer has 3 records + auto info1 = CollTraceInfo{.collId = 123, .opName = "allReduce", .dataType = "float32", .count = 10}; + auto info2 = CollTraceInfo{.collId = 127, .opName = "allReduce", .dataType = "int64", .count = 20}; + auto info3 = CollTraceInfo{.collId = 200, .opName = "allGather", .dataType = "float32", .count = 50}; + results.emplace_back( + std::make_unique(info1)); + results.emplace_back( + std::make_unique(info2)); + results.emplace_back( + std::make_unique(info3)); + + std::vector latencyAllGather = {10, 20, 50, 15, 21, 45}; + auto stats = aggregateResults( + results, + latencyAllGather, + 2 /* ranks per host */, + 3 /* element per rank */); + EXPECT_EQ(3, stats.size()); + std::vector expected = { + CollStats(123, 50, 10000, 15000, "allReduce", "int64", 0), + CollStats(127, 5, 20000, 21000, "allReduce", "int64", 0), + CollStats(200, 11, 45000, 50000, "allReduce", "int64", 0) + }; + + for (int i = 0; i < 3; i++) { + EXPECT_EQ(stats[i].collId, expected[i].collId); + EXPECT_EQ(stats[i].percent, expected[i].percent); + EXPECT_EQ(stats[i].minLatencyUs, expected[i].minLatencyUs); + EXPECT_EQ(stats[i].maxLatencyUs, expected[i].maxLatencyUs); + } + } + + TEST(CollTraceUtilsTest, EventQueueOperationTest) { + EventQueue q; + q.push(std::make_unique(5)); + q.push(std::make_unique(100)); + auto res1 = q.waitPop(); + EXPECT_EQ(*res1, 5); + auto res2 = q.waitPop(); + EXPECT_EQ(*res2, 100); + } + + void producer(EventQueue& q, const std::string& str) { + q.push(std::make_unique(str)); + } + + void consumer(EventQueue& q, std::vector& results) { + results.clear(); + auto res1 = q.waitPop(); + results.push_back(*res1); + auto res2 = q.waitPop(); + results.push_back(*res2); + } + + TEST(CollTraceUtilsTest, EventQueueMultiThreadTest) { + EventQueue q; + std::vector results; + std::thread t0(consumer, std::ref(q), std::ref(results)); + std::thread t1(producer, std::ref(q), "hello"); + std::thread t2(producer, std::ref(q), "world"); + t0.join(); + t1.join(); + t2.join(); + EXPECT_TRUE(results[0] == "hello" || results[0] == "world"); + EXPECT_TRUE(results[1] == "hello" || results[1] == "world"); + } + + TEST(CollTraceUtilsTest, getSizeMbTest) { + std::unordered_map> bytesToTypes = { + {1, {"ncclInt8", "ncclFp8E4M3", "ncclFp8E5M2"}}, + {2, {"ncclFloat16", "ncclBfloat16"}}, + {4, {"ncclInt32", "ncclUint32", "ncclFloat32"}}, + {8, {"ncclInt64", "ncclUint64", "ncclFloat64"}}}; + for (const auto& it : bytesToTypes) { + auto types = it.second; + for (const auto& type : types) { + auto mb = latency_profiler::getSizeMb(type, 1024 * 1024); + EXPECT_NEAR(mb, it.first, 0.01); + } + } + } +}