From 340f3aa88718396ae34da040962f0d3c7e900355 Mon Sep 17 00:00:00 2001 From: SaleelK Date: Fri, 23 Jan 2026 10:40:54 -0800 Subject: [PATCH] clr: Implement dynamic stream to HWq logic (#1958) * clr: Implement dynamic stream to HW queue assignment This change implements dynamic stream to hardware queue (HWq) mapping with the following features: * Queue depth heuristics with weights for optimal HWq assignment * Make last used queue sticky for better locality * Use pipe HWq to pipe mapping - gfx9 follows a round-robin queue to pipe mapping based on creation order (single process per device only, as pipe ID is statically assigned by runtime) * More aggressive heuristic usage for better queue distribution * Extend dynamic queues support for all stream priorities Environment variables: * DEBUG_HIP_DYNAMIC_QUEUE: 0 - disabled, 1 - Depth heuristics 2 - Depth+Pipe heuristics * DEBUG_HIP_IGNORE_STREAM_PRIORITY=1: ignore priority stream creation * clr: Clean up last_used_queue_ --- projects/clr/hipamd/src/hip_stream.cpp | 8 +- projects/clr/rocclr/device/rocm/rocdevice.cpp | 289 ++++++++++++------ projects/clr/rocclr/device/rocm/rocdevice.hpp | 54 +++- .../clr/rocclr/device/rocm/rocsettings.cpp | 7 +- .../clr/rocclr/device/rocm/rocsettings.hpp | 6 +- .../clr/rocclr/device/rocm/rocvirtual.cpp | 85 ++++-- .../clr/rocclr/device/rocm/rocvirtual.hpp | 26 +- projects/clr/rocclr/platform/commandqueue.cpp | 5 +- projects/clr/rocclr/platform/commandqueue.hpp | 12 +- projects/clr/rocclr/utils/flags.hpp | 7 +- 10 files changed, 341 insertions(+), 158 deletions(-) diff --git a/projects/clr/hipamd/src/hip_stream.cpp b/projects/clr/hipamd/src/hip_stream.cpp index cdd446239d..268b65858f 100644 --- a/projects/clr/hipamd/src/hip_stream.cpp +++ b/projects/clr/hipamd/src/hip_stream.cpp @@ -32,7 +32,7 @@ namespace hip { Stream::Stream(hip::Device* dev, Priority p, unsigned int f, bool null_stream, const std::vector& cuMask, hipStreamCaptureStatus captureStatus) : amd::HostQueue(*dev->asContext(), *dev->devices()[0], 0, amd::CommandQueue::RealTimeDisabled, - convertToQueuePriority(p), cuMask), + convertToQueuePriority(p), cuMask, null_stream), lock_("Stream Callback lock"), device_(dev), priority_(p), @@ -585,6 +585,12 @@ hipError_t hipStreamQuery_common(hipStream_t stream) { } hipError_t status = ready ? hipSuccess : hipErrorNotReady; command->release(); + + // Stream is complete - opportunistically release its HW queue if idle + if (ready) { + hip_stream->vdev()->ReleaseHwQueue(); + } + return status; } diff --git a/projects/clr/rocclr/device/rocm/rocdevice.cpp b/projects/clr/rocclr/device/rocm/rocdevice.cpp index 32ce8c4703..c41de73a14 100644 --- a/projects/clr/rocclr/device/rocm/rocdevice.cpp +++ b/projects/clr/rocclr/device/rocm/rocdevice.cpp @@ -136,18 +136,22 @@ Device::Device(hsa_agent_t bkendDevice) alloc_granularity_(0), xferQueue_(nullptr), freeMem_(0), - vgpusAccess_(true) /* Virtual GPU List Ops Lock */ - , + vgpusAccess_(true), /* Virtual GPU List Ops Lock */ hsa_exclusive_gpu_access_(false), - queuePool_(QueuePriority::Total), coopHostcallBuffer_(nullptr), - queueWithCUMaskPool_(QueuePriority::Total), numOfVgpus_(0), preferred_numa_node_(0), maxSdmaReadMask_(0), maxSdmaWriteMask_(0), sdma_engine_allocator_(*this), - cpu_agent_info_(nullptr) { + cpu_agent_info_(nullptr), + numHwPipes_(4) { + // Initialize queue pools with proper comparators (requires 'this' pointer) + for (uint i = 0; i < QueuePriority::Total; ++i) { + queuePool_.emplace_back(QueueCompare(this)); + queueWithCUMaskPool_.emplace_back(QueueCompare(this)); + } + group_segment_.handle = 0; gpuvm_segment_.handle = 0; gpu_fine_grained_segment_.handle = 0; @@ -225,6 +229,11 @@ Device::~Device() { glb_ctx_ = nullptr; } + // Destroy transfer queue FIRST (before destroying queues in pool) + // because its destructor will call releaseQueue() + delete xferQueue_; + xferQueue_ = nullptr; + for (auto& it : queuePool_) { for (auto qIter = it.begin(); qIter != it.end();) { hsa_queue_t* queue = qIter->first; @@ -244,9 +253,6 @@ Device::~Device() { } queuePool_.clear(); - // Destroy transfer queue - delete xferQueue_; - delete blitProgram_; if (context_ != nullptr) { @@ -1695,6 +1701,7 @@ device::VirtualDevice* Device::createVirtualDevice(amd::CommandQueue* queue) { bool profiling = (queue != nullptr) && queue->properties().test(CL_QUEUE_PROFILING_ENABLE); bool cooperative = false; + bool dedicated_queue = (queue != nullptr) && queue->isDedicatedQueue(); // If amd command queue is null, then it's an internal device queue if (queue == nullptr) { @@ -1708,7 +1715,8 @@ device::VirtualDevice* Device::createVirtualDevice(amd::CommandQueue* queue) { bool q = (queue != nullptr); VirtualGPU* virtualDevice = new VirtualGPU(*this, profiling, cooperative, q ? queue->cuMask() : defaultCuMask, - q ? queue->priority() : amd::CommandQueue::Priority::Normal); + q ? queue->priority() : amd::CommandQueue::Priority::Normal, + dedicated_queue); if (!virtualDevice->create()) { delete virtualDevice; @@ -2811,7 +2819,7 @@ VirtualGPU* Device::xferQueue() const { return nullptr; } if (xferQueue_->gpu_queue() == nullptr) { - xferQueue_->set_gpu_queue(thisDevice->AcquireActiveNormalQueue()); + xferQueue_->set_gpu_queue(thisDevice->AcquireActiveQueue(amd::CommandQueue::Priority::Normal)); } } xferQueue_->enableSyncBlit(); @@ -2863,58 +2871,73 @@ void Device::getHwEventTime(const amd::Event& event, uint64_t* start, uint64_t* } // ================================================================================================ -hsa_queue_t* Device::getQueueFromPool(const uint qIndex) { - // Check if queue with refCount 0 is available to use - if (queuePool_[qIndex].size() < GPU_MAX_HW_QUEUES) { - for (auto& it : queuePool_[qIndex]) { - if (it.second.refCount == 0) { - it.second.refCount++; - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Selected queue refCount: %p (%d)", - it.first->base_address, it.second.refCount); - return it.first; - } - } - } else { - if (qIndex < QueuePriority::Total && queuePool_[qIndex].size() > 0) { - // Search through all available queues for the lowest counter. - // Note: the map is sorted in the allocation order for possible round-robin selection - typedef decltype(queuePool_)::value_type::const_reference PoolRef; - auto lowest = std::min_element( - queuePool_[qIndex].begin(), queuePool_[qIndex].end(), - [](PoolRef A, PoolRef B) { return A.second.refCount < B.second.refCount; }); - lowest->second.refCount++; - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Selected queue refCount: %p (%d)", - lowest->first->base_address, lowest->second.refCount); - return lowest->first; - } +hsa_queue_t* Device::getQueueFromPool(const uint qIndex, bool force_reuse) { + // Only reuse queues when we've reached the maximum limit, unless forced + // Below the limit, return nullptr to allow creating new queues + if (!force_reuse && queuePool_[qIndex].size() < settings().max_hw_queues_) { + return nullptr; + } + + // We've hit the limit, must reuse - find the queue with lowest load metric + if (qIndex < QueuePriority::Total && queuePool_[qIndex].size() > 0) { + typedef decltype(queuePool_)::value_type::const_reference PoolRef; + + // Select queue based on dynamic_queues_ mode + decltype(queuePool_[qIndex].begin()) lowest; + uint32_t mode = settings().dynamic_queues_; + + // gfx9XX pipe distribution: queues map to pipes via queue_id % num_pipes + const bool pipe_dist = settings().queue_pipe_dist_; + const uint32_t num_pipes = numHwPipes_; + + lowest = std::min_element( + queuePool_[qIndex].begin(), queuePool_[qIndex].end(), + [mode, pipe_dist, num_pipes](PoolRef A, PoolRef B) { + if (mode >= 1) { + // Mode 1+: Advanced weighted metric with dedicated queue penalty + // Metric = dedicated_queue_penalty + (depth << 4) + refCount + uint64_t metricA = A.second.GetLoadMetric(A.first, mode); + uint64_t metricB = B.second.GetLoadMetric(B.first, mode); + + if (metricA == metricB && pipe_dist) { + // gfx9XX pipe distribution: prefer lower pipe IDs for consistent distribution + uint64_t pipeA = A.first->id % num_pipes; + uint64_t pipeB = B.first->id % num_pipes; + return pipeA < pipeB; + } + return metricA < metricB; + } else { + // Mode 0: Simple refCount-based selection + return A.second.refCount < B.second.refCount; + } + }); + + lowest->second.refCount++; + ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, + "Selected queue (mode=%u): %p refCount: %d, depth: %lu, metric: %lu, pipe: %d%s", + mode, lowest->first->base_address, lowest->second.refCount, + QueueInfo::GetHwQueueDepth(lowest->first), + lowest->second.GetLoadMetric(lowest->first, mode), + pipe_dist ? (lowest->first->id % num_pipes) : -1, + force_reuse ? " (forced)" : ""); + return lowest->first; } return nullptr; } // ================================================================================================ -hsa_queue_t* Device::AcquireActiveNormalQueue() { +hsa_queue_t* Device::AcquireActiveQueue(amd::CommandQueue::Priority priority) { uint32_t queue_size = ROC_AQL_QUEUE_SIZE; auto queue = acquireQueue(queue_size, false, std::vector{}, - amd::CommandQueue::Priority::Normal, true); + priority, true, false); return queue; } // ================================================================================================ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue, const std::vector& cuMask, - amd::CommandQueue::Priority priority, bool managed) { - amd::ScopedLock l(active_queue_access_); - - assert(queuePool_[QueuePriority::Low].size() <= GPU_MAX_HW_QUEUES || - queuePool_[QueuePriority::Normal].size() <= GPU_MAX_HW_QUEUES || - queuePool_[QueuePriority::High].size() <= GPU_MAX_HW_QUEUES); - - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, - "Number of allocated hardware queues with low priority: %d," - " with normal priority: %d, with high priority: %d, maximum per priority is: %d", - queuePool_[QueuePriority::Low].size(), queuePool_[QueuePriority::Normal].size(), - queuePool_[QueuePriority::High].size(), GPU_MAX_HW_QUEUES); - + amd::CommandQueue::Priority priority, bool managed, + bool dedicated_queue) { hsa_amd_queue_priority_t queue_priority; uint qIndex; switch (priority) { @@ -2934,22 +2957,49 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue, break; } - // If we have reached the max number of queues, reuse an existing queue with the matching queue - // priority, choosing the one with the least number of users. Note: Don't attempt to reuse the - // cooperative queue, since it's single per device - if (!coop_queue && (cuMask.size() == 0) && - ((queuePool_[qIndex].size() == GPU_MAX_HW_QUEUES) || queuePool_[qIndex].size() > 0)) { - hsa_queue_t* queue = getQueueFromPool(qIndex); - if (queue != nullptr) { - if (!managed && (qIndex == QueuePriority::Normal)) { - num_normal_queues_++; - } - return queue; - } + // If flag set, force all streams to normal priority + // This means that GPU_MAX_HW_QUEUES may need to be incremented + // to account for the additional normal priority queues + if (DEBUG_HIP_IGNORE_STREAM_PRIORITY) { + queue_priority = HSA_AMD_QUEUE_PRIORITY_NORMAL; + qIndex = QueuePriority::Normal; } - // Else create a new queue. This also includes the initial state where there - // is no queue. + { // Lock + amd::ScopedLock l(active_queue_access_); + + assert(queuePool_[QueuePriority::Low].size() <= settings().max_hw_queues_ || + queuePool_[QueuePriority::Normal].size() <= settings().max_hw_queues_ || + queuePool_[QueuePriority::High].size() <= settings().max_hw_queues_); + + ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, + "Number of allocated hardware queues with low priority: %d," + " with normal priority: %d, with high priority: %d, maximum per priority is: %d", + queuePool_[QueuePriority::Low].size(), queuePool_[QueuePriority::Normal].size(), + queuePool_[QueuePriority::High].size(), settings().max_hw_queues_); + + // If we have reached the max number of queues, reuse an existing queue with the matching queue + // priority, choosing the one with the least number of users. Note: Don't attempt to reuse the + // cooperative queue, since it's single per device. + + // num_queues_[qIndex] tracks persistent (non-managed) queues per priority. + // When the total queues (managed + non-managed) exceed max_hw_queues_, we must reuse existing + // queues. 'managed' streams do not increment num_queues_, allowing them to use the + // pool without permanently consuming slots. ReleaseActiveQueue() uses this counter to + // decide when to start reclaiming queues. + if (!coop_queue && (cuMask.size() == 0) && + (queuePool_[qIndex].size() >= settings().max_hw_queues_)) { + hsa_queue_t* queue = getQueueFromPool(qIndex, false); + if (queue != nullptr) { + if (!managed) { + num_queues_[qIndex]++; + } + return queue; + } + } + } // Lock release + + // Create a new queue. uint32_t queue_max_packets = 0; if (HSA_STATUS_SUCCESS != Hsa::agent_get_info(bkendDevice_, HSA_AGENT_INFO_QUEUE_MAX_SIZE, &queue_max_packets)) { @@ -2971,9 +3021,14 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue, &queue) != HSA_STATUS_SUCCESS) { queue_size >>= 1; if (queue_size < 64) { - // if a queue with the same requested priority available from the pool, returns it here - if (!coop_queue && (cuMask.size() == 0) && (queuePool_[qIndex].size() > 0)) { - return getQueueFromPool(qIndex); + LogError("Device::acquireQueue: hsa_queue_create failed!"); + // If we can't create even a small queue, try to reuse any existing queue + if (!coop_queue && (cuMask.size() == 0)) { + amd::ScopedLock l(active_queue_access_); + if (queuePool_[qIndex].size() > 0) { + bool kForceReuse = true; + return getQueueFromPool(qIndex, kForceReuse); + } } ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_QUEUE, "Device::acquireQueue: hsa_queue_create failed!"); @@ -3067,12 +3122,14 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue, return nullptr; } if (cuMask.size() != 0) { + amd::ScopedLock l(active_queue_access_); // add queues with custom CU mask into their special pool to keep track // of mapping of these queues to their associated queueInfo (i.e., hostcall buffers) auto result = queueWithCUMaskPool_[qIndex].emplace(std::make_pair(queue, QueueInfo())); assert(result.second && "QueueInfo already exists"); auto& qInfo = result.first->second; qInfo.refCount = 1; + qInfo.hasDedicatedQueue_ = dedicated_queue; // Track if this is a dedicated queue return queue; } @@ -3083,22 +3140,41 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue, // per device. return queue; } + + // Add queue to the pool (including dedicated queues) + amd::ScopedLock l(active_queue_access_); auto result = queuePool_[qIndex].emplace(std::make_pair(queue, QueueInfo())); assert(result.second && "QueueInfo already exists"); auto& qInfo = result.first->second; qInfo.refCount = 1; - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "acquireQueue refCount: %p (%d)", - result.first->first->base_address, result.first->second.refCount); - if (!managed && (cuMask.size() == 0) && (qIndex = QueuePriority::Normal)) { - num_normal_queues_++; + qInfo.hasDedicatedQueue_ = dedicated_queue; // Track if this is a dedicated queue + ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "acquireQueue refCount: %p (%d) %s", + result.first->first->base_address, result.first->second.refCount, + dedicated_queue ? "(dedicated)" : ""); + if (!managed && (cuMask.size() == 0)) { + num_queues_[qIndex]++; } return queue; } // ================================================================================================ -bool Device::ReleaseActiveNormalQueue(hsa_queue_t* queue) { +bool Device::ReleaseActiveQueue(hsa_queue_t* queue, amd::CommandQueue::Priority priority) { + uint qIndex; + switch (priority) { + case amd::CommandQueue::Priority::Low: + qIndex = QueuePriority::Low; + break; + case amd::CommandQueue::Priority::High: + qIndex = QueuePriority::High; + break; + case amd::CommandQueue::Priority::Normal: + case amd::CommandQueue::Priority::Medium: + default: + qIndex = QueuePriority::Normal; + break; + } // Release a queue if the total number of allocated queues exceeds the max possible - if (num_normal_queues_.load() > GPU_MAX_HW_QUEUES) { + if (num_queues_[qIndex].load() > settings().max_hw_queues_) { releaseQueue(queue, std::vector{}, false, true); return true; } else { @@ -3109,36 +3185,52 @@ bool Device::ReleaseActiveNormalQueue(hsa_queue_t* queue) { // ================================================================================================ void Device::releaseQueue(hsa_queue_t* queue, const std::vector& cuMask, bool coop_queue, bool managed) { - amd::ScopedLock l(active_queue_access_); - for (auto& it : cuMask.size() == 0 ? queuePool_ : queueWithCUMaskPool_) { - auto qIter = it.find(queue); - if (qIter != it.end()) { - if (!managed && (cuMask.size() == 0) && (&it == &queuePool_[QueuePriority::Normal])) { - num_normal_queues_--; - } - auto& qInfo = qIter->second; - assert(qInfo.refCount > 0); - qInfo.refCount--; - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "releaseQueue refCount:%p (%d)", - qIter->first->base_address, qIter->second.refCount); - // hsa queues with cumask set are not being reused. Hence, if the app uses multiple - // such queues it can cause memory leak and those must be destroyed here once the - // refcount reaches 0. - if ((!cuMask.empty()) && (qInfo.refCount == 0)) { - if (qInfo.hostcallBuffer_) { - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, - "Deleting hostcall buffer %p for hardware queue %p", qInfo.hostcallBuffer_, - qIter->first->base_address); - amd::disableHostcalls(qInfo.hostcallBuffer_); - context().svmFree(qInfo.hostcallBuffer_); + // Defer cleanup operations outside the lock + void* hostcallBufferToFree = nullptr; + bool shouldDestroyQueue = false; + + { // Lock + amd::ScopedLock l(active_queue_access_); + auto& pools = cuMask.size() == 0 ? queuePool_ : queueWithCUMaskPool_; + for (uint qIndex = 0; qIndex < pools.size(); ++qIndex) { + auto& it = pools[qIndex]; + auto qIter = it.find(queue); + if (qIter != it.end()) { + if (!managed && (cuMask.size() == 0)) { + num_queues_[qIndex]--; } - ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Deleting hardware queue %p with refCount 0", - queue->base_address); - qIter = it.erase(qIter); - Hsa::queue_destroy(queue); + auto& qInfo = qIter->second; + assert(qInfo.refCount > 0); + qInfo.refCount--; + ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "releaseQueue refCount:%p (%d)", + qIter->first->base_address, qIter->second.refCount); + // hsa queues with cumask set are not being reused. Hence, if the app uses multiple + // such queues it can cause memory leak and those must be destroyed here once the + // refcount reaches 0. + if ((!cuMask.empty()) && (qInfo.refCount == 0)) { + hostcallBufferToFree = qInfo.hostcallBuffer_; + shouldDestroyQueue = true; + ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Deleting hardware queue %p with refCount 0", + queue->base_address); + it.erase(qIter); + } + break; // Found and processed the queue } } + } // Lock release + + // Perform expensive cleanup operations outside the lock + if (shouldDestroyQueue) { + if (hostcallBufferToFree) { + ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, + "Deleting hostcall buffer %p for hardware queue %p", hostcallBufferToFree, + queue->base_address); + amd::disableHostcalls(hostcallBufferToFree); + context().svmFree(hostcallBufferToFree); + } + Hsa::queue_destroy(queue); } + if (coop_queue) { // cooperative queue ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Deleting CG enabled hardware queue %p ", queue->base_address); @@ -3150,6 +3242,7 @@ void* Device::getOrCreateHostcallBuffer(hsa_queue_t* queue, bool coop_queue, const std::vector& cuMask) { decltype(queuePool_)::value_type::iterator qIter; bool found = false; + if (!coop_queue) { for (auto& it : cuMask.size() == 0 ? queuePool_ : queueWithCUMaskPool_) { qIter = it.find(queue); diff --git a/projects/clr/rocclr/device/rocm/rocdevice.hpp b/projects/clr/rocclr/device/rocm/rocdevice.hpp index 4b90935533..ede43da95f 100644 --- a/projects/clr/rocclr/device/rocm/rocdevice.hpp +++ b/projects/clr/rocclr/device/rocm/rocdevice.hpp @@ -552,18 +552,17 @@ class Device : public NullDevice { VirtualGPU* xferQueue() const; //! Acquire HSA queue. This method can create a new HSA queue or - //! share previously created hsa_queue_t* acquireQueue( uint32_t queue_size_hint, bool coop_queue = false, const std::vector& cuMask = {}, amd::CommandQueue::Priority priority = amd::CommandQueue::Priority::Normal, - bool managed = false); + bool managed = false, bool dedicated_queue = false); //! Release HSA queue void releaseQueue(hsa_queue_t*, const std::vector& cuMask = {}, bool coop_queue = false, bool managed = false); - hsa_queue_t* AcquireActiveNormalQueue(); - bool ReleaseActiveNormalQueue(hsa_queue_t* queue); + hsa_queue_t* AcquireActiveQueue(amd::CommandQueue::Priority priority); + bool ReleaseActiveQueue(hsa_queue_t* queue, amd::CommandQueue::Priority priority); //! For the given HSA queue, return an existing hostcall buffer or create a //! new one. queuePool_ keeps a mapping from HSA queue to hostcall buffer. @@ -619,8 +618,11 @@ class Device : public NullDevice { //! Removes a kernel from the kernel map void RemoveKernel(Kernel& gpuKernel) const; - // Returns the number of allocated normal queues on this device - uint32_t NumNormalQueues() const { return num_normal_queues_.load(); } + // Returns the number of allocated queues for a given priority on this device + uint32_t NumQueues(uint qIndex) const { return num_queues_[qIndex].load(); } + + //! enum for keeping the total and available queue priorities + enum QueuePriority : uint { Low = 0, Normal = 1, High = 2, Total = 3 }; //! Returns true if PM4 emulation is enabled bool IsPm4Emulation() const { return pm4_emulation_; } @@ -678,12 +680,40 @@ class Device : public NullDevice { struct QueueInfo { int refCount; //! Reference counter. Shows how many time the queue was shared void* hostcallBuffer_; //! Host call buffer for the HSA queue + bool hasDedicatedQueue_; //! True if this queue is a dedicated queue (e.g., null stream) + + // Constructor + QueueInfo() : refCount(0), hostcallBuffer_(nullptr), hasDedicatedQueue_(false) {} + + //! Get the current hardware queue depth (wptr - rptr) + static uint64_t GetHwQueueDepth(hsa_queue_t* queue) { + uint64_t wptr = Hsa::queue_load_write_index_relaxed(queue); + uint64_t rptr = Hsa::queue_load_read_index_relaxed(queue); + return wptr - rptr; + } + + //! Get a combined metric for queue selection (lower is better) + uint64_t GetLoadMetric(hsa_queue_t* queue, uint32_t mode = 1) const { + auto depth = GetHwQueueDepth(queue); + + // Dedicated queue penalty: prefer regular queues, but use dedicated if regular queues + // have depth > ~128 packets. Penalty = 128 << 4 = 2048. + uint64_t dedicated_queue_penalty = hasDedicatedQueue_ ? 2048 : 0; + + // Advanced weighted metric: Give queue depth significantly more weight than refCount + uint64_t metric = dedicated_queue_penalty + (depth << 4) + static_cast(refCount); + return metric; + } }; struct QueueCompare { + const Device* device_; + + QueueCompare(const Device* dev = nullptr) : device_(dev) {} + // Customized queue compare operator to make sure the queues are sorted in the creation order bool operator()(hsa_queue_t* lhs, hsa_queue_t* rhs) const { - if (DEBUG_HIP_DYNAMIC_QUEUES) { + if (device_ != nullptr && device_->settings().dynamic_queues_ > 0) { return (lhs->id < rhs->id) ? true : false; } else { return (lhs < rhs) ? true : false; @@ -693,10 +723,10 @@ class Device : public NullDevice { //! a vector for keeping Pool of HSA queues with low, normal and high priorities for recycling std::vector> queuePool_; amd::Monitor active_queue_access_; //!< Lock to serialise virtual gpu list access - std::atomic num_normal_queues_{0}; //!< The total number of allocated normal queues + std::atomic num_queues_[QueuePriority::Total] = {}; //!< Per-priority queue counters - //! returns a hsa queue from queuePool with least refCount and updates the refCount as well - hsa_queue_t* getQueueFromPool(const uint qIndex); + //! Use dynamic queues mode to get a queue from pool + hsa_queue_t* getQueueFromPool(const uint qIndex, bool force_reuse = false); void* coopHostcallBuffer_; //! returns value for corresponding LinkAttrbutes in a vector given Memory pool. @@ -712,6 +742,7 @@ class Device : public NullDevice { uint32_t maxSdmaWriteMask_; bool isXgmi_; //!< Flag to indicate if there is XGMI between CPU<->GPU bool pm4_emulation_ = false; //!< Flag to indicate if PM4 emulation is enabled + uint32_t numHwPipes_; //!< Number of hardware pipes //! SDMA engine allocator for per-stream affinity struct SdmaEngineAllocator { @@ -743,9 +774,6 @@ class Device : public NullDevice { public: std::atomic numOfVgpus_; //!< Virtual gpu unique index - //! enum for keeping the total and available queue priorities - enum QueuePriority : uint { Low = 0, Normal = 1, High = 2, Total = 3 }; - #if defined(__clang__) #if __has_feature(address_sanitizer) virtual device::UriLocator* createUriLocator() const; diff --git a/projects/clr/rocclr/device/rocm/rocsettings.cpp b/projects/clr/rocclr/device/rocm/rocsettings.cpp index ad1a231dbe..7d87deabfe 100644 --- a/projects/clr/rocclr/device/rocm/rocsettings.cpp +++ b/projects/clr/rocclr/device/rocm/rocsettings.cpp @@ -91,9 +91,13 @@ Settings::Settings() { gwsInitSupported_ = true; limit_blit_wg_ = 16; - dynamic_queues_ = amd::IS_HIP ? DEBUG_HIP_DYNAMIC_QUEUES : false; + dynamic_queues_ = amd::IS_HIP ? DEBUG_HIP_DYNAMIC_QUEUES : 0; // note: OCL user events don't allow CPU blocking calls in DD mode blocking_blit_ = amd::IS_HIP || !AMD_DIRECT_DISPATCH; + + max_hw_queues_ = GPU_MAX_HW_QUEUES; + + queue_pipe_dist_ = false; } // ================================================================================================ @@ -153,6 +157,7 @@ bool Settings::create(bool fullProfile, const amd::Isa& isa, bool enableXNACK, b (gfxStepping == 0 || gfxStepping == 1 || gfxStepping == 2)))) { // Enable Barrier Value packet is only for MI2XX/300 barrier_value_packet_ = true; + queue_pipe_dist_ = DEBUG_HIP_DYNAMIC_QUEUES == 2 ? true : false; } setKernelArgImpl(isa, isXgmi, hasValidHDPFlush); diff --git a/projects/clr/rocclr/device/rocm/rocsettings.hpp b/projects/clr/rocclr/device/rocm/rocsettings.hpp index a5e1494d28..4584554a0c 100644 --- a/projects/clr/rocclr/device/rocm/rocsettings.hpp +++ b/projects/clr/rocclr/device/rocm/rocsettings.hpp @@ -48,9 +48,10 @@ class Settings : public device::Settings { uint system_scope_signal_ : 1; //!< HSA signal is visibile to the entire system uint fgs_kernel_arg_ : 1; //!< Use fine grain kernel arg segment uint barrier_value_packet_ : 1; //!< Barrier value packet functionality - uint dynamic_queues_ : 1; //!< Dynamic queues management + uint dynamic_queues_ : 2; //!< Dynamic queues: 0=off, 1=Depth uint blocking_blit_ : 1; //!< Blit ops can be blocking on CPU - uint reserved_ : 21; + uint queue_pipe_dist_ : 1; //!< MI300 queue pipe distribution (gfx94x) + uint reserved_ : 19; }; uint value_; }; @@ -74,6 +75,7 @@ class Settings : public device::Settings { uint32_t hmmFlags_; //!< HMM functionality control flags uint32_t limit_blit_wg_; //!< The number of workgroups for blit execution + uint32_t max_hw_queues_; //!< Effective maximum HW queues (accounts for null stream reservation) //! Default constructor Settings(); diff --git a/projects/clr/rocclr/device/rocm/rocvirtual.cpp b/projects/clr/rocclr/device/rocm/rocvirtual.cpp index 00c54e15e9..c06ded49d0 100644 --- a/projects/clr/rocclr/device/rocm/rocvirtual.cpp +++ b/projects/clr/rocclr/device/rocm/rocvirtual.cpp @@ -314,6 +314,10 @@ bool HsaAmdSignalHandler(hsa_signal_value_t value, void* arg) { // Update the batch, since signal is complete gpu->updateCommandsState(ts->command().GetBatchHead()); + // Opportunistically try to release the HW queue if it's now idle + // This helps reclaim queues in async workloads without explicit sync + gpu->ReleaseHwQueue(); + // Reset API callback signal. It will release AQL queue and start commands processing if (callback_signal.handle != 0 && isBlocking) { Hsa::signal_subtract_relaxed(callback_signal, 1); @@ -1011,9 +1015,10 @@ bool VirtualGPU::processMemObjects(const amd::Kernel& kernel, const_address para // ================================================================================================ uint64_t VirtualGPU::getQueueID() { - amd::ScopedLock lock(execution()); - if (gpu_queue_ == nullptr) { - gpu_queue_ = roc_device_.AcquireActiveNormalQueue(); + // Dedicated queues keep their HW queue, never acquire from pool + if (!dedicated_queue_ && gpu_queue_ == nullptr) { + amd::ScopedLock lock(execution()); + gpu_queue_ = roc_device_.AcquireActiveQueue(priority_); } return gpu_queue_->id; } @@ -1713,7 +1718,8 @@ bool VirtualGPU::releaseGpuMemoryFence(bool skip_cpu_wait) { // ================================================================================================ VirtualGPU::VirtualGPU(Device& device, bool profiling, bool cooperative, - const std::vector& cuMask, amd::CommandQueue::Priority priority) + const std::vector& cuMask, amd::CommandQueue::Priority priority, + bool dedicated_queue) : device::VirtualDevice(device), state_(0), gpu_queue_(nullptr), @@ -1728,9 +1734,10 @@ VirtualGPU::VirtualGPU(Device& device, bool profiling, bool cooperative, managed_kernarg_buffer_(*this, device.settings().kernargPoolSize_), cuMask_(cuMask), priority_(priority), - copy_command_type_(0), - fence_state_(Device::CacheState::kCacheStateInvalid), - fence_dirty_(false) { + copy_command_type_(0), + fence_state_(Device::CacheState::kCacheStateInvalid), + fence_dirty_(false), + dedicated_queue_(dedicated_queue) { index_ = device.numOfVgpus_++; gpu_device_ = device.getBackendDevice(); printfdbg_ = nullptr; @@ -1791,8 +1798,9 @@ VirtualGPU::~VirtualGPU() { if (tracking_created_) { amd::ScopedLock l(execution()); - if (gpu_queue_ == nullptr) { - gpu_queue_ = roc_device_.AcquireActiveNormalQueue(); + // Dedicated queues keep their HW queue, never acquire from pool + if (!dedicated_queue_ && gpu_queue_ == nullptr) { + gpu_queue_ = roc_device_.AcquireActiveQueue(priority_); } // Windows requires an interrupt in more cases than Linux for OS fence updates force_irq_ = IS_WINDOWS; @@ -1839,7 +1847,8 @@ VirtualGPU::~VirtualGPU() { bool VirtualGPU::create() { // Pick a reasonable queue size uint32_t queue_size = ROC_AQL_QUEUE_SIZE; - gpu_queue_ = roc_device_.acquireQueue(queue_size, cooperative_, cuMask_, priority_); + gpu_queue_ = roc_device_.acquireQueue(queue_size, cooperative_, cuMask_, priority_, false, + dedicated_queue_); if (!gpu_queue_) return false; if (!managed_kernarg_buffer_.Create(Device::MemorySegment::kKernArg)) { @@ -2004,29 +2013,48 @@ void VirtualGPU::ReleaseSdmaEngines() { // ================================================================================================ void VirtualGPU::ReleaseAllHwQueues() { - if (roc_device_.settings().dynamic_queues_ && - (roc_device_.NumNormalQueues() > GPU_MAX_HW_QUEUES)) { - // Lock the device to make the following thread safe - amd::ScopedLock lock(roc_device_.vgpusAccess()); - for (uint idx = 0; idx < roc_device_.vgpus().size(); ++idx) { - roc_device_.vgpus()[idx]->ReleaseHwQueue(); + if (roc_device_.settings().dynamic_queues_) { + // Check if any priority level exceeds max_hw_queues_ + bool should_release = false; + for (uint qIdx = 0; qIdx < Device::QueuePriority::Total; ++qIdx) { + if (roc_device_.NumQueues(qIdx) > roc_device_.settings().max_hw_queues_) { + should_release = true; + break; + } + } + if (should_release) { + // Lock the device to make the following thread safe + amd::ScopedLock lock(roc_device_.vgpusAccess()); + for (uint idx = 0; idx < roc_device_.vgpus().size(); ++idx) { + roc_device_.vgpus()[idx]->ReleaseHwQueue(); + } } } } // ================================================================================================ void VirtualGPU::ReleaseHwQueue() { - // Try to release normal queue to the pool of active queues - if (roc_device_.settings().dynamic_queues_ && - (priority_ == amd::CommandQueue::Priority::Normal) && !cooperative_ && + // Dedicated queues keep their HW queue, never release to pool + if (dedicated_queue_) { + return; + } + + // Try to release queue to the pool of active queues. + // Use tryLock() since this may be called from the HsaAmdSignalHandler + // and blocking here could cause deadlock + if (roc_device_.settings().dynamic_queues_ > 0 && !cooperative_ && (cuMask_.size() == 0)) { - amd::ScopedLock lock(execution()); - if (gpu_queue_ != nullptr) { - if (IsQueueIdle()) { - if (roc_device_.ReleaseActiveNormalQueue(gpu_queue_)) { - gpu_queue_ = nullptr; + // If tryLock fails, skip the release - the queue will be released + // on next opportunity + if (execution().tryLock()) { + if (gpu_queue_ != nullptr) { + if (IsQueueIdle()) { + if (roc_device_.ReleaseActiveQueue(gpu_queue_, priority_)) { + gpu_queue_ = nullptr; + } } } + execution().unlock(); } } } @@ -2037,8 +2065,9 @@ void VirtualGPU::ReleaseHwQueue() { * and then calls start() to get the current host timestamp. */ void VirtualGPU::profilingBegin(amd::Command& command, bool sdmaProfiling) { - if (gpu_queue_ == nullptr) { - gpu_queue_ = roc_device_.AcquireActiveNormalQueue(); + // Dedicated queues keep their HW queue, never acquire from pool + if (!dedicated_queue_ && gpu_queue_ == nullptr) { + gpu_queue_ = roc_device_.AcquireActiveQueue(priority_); } // Track the current command command_ = &command; @@ -4050,8 +4079,8 @@ void VirtualGPU::submitMarker(amd::Marker& vcmd) { force_irq_ = IS_WINDOWS; // It should be safe to call flush directly if there are not pending dispatches without // HSA signal callback - if (gpu_queue_ == nullptr) { - gpu_queue_ = roc_device_.AcquireActiveNormalQueue(); + if (!dedicated_queue_ && gpu_queue_ == nullptr) { + gpu_queue_ = roc_device_.AcquireActiveQueue(priority_); } flush(vcmd.GetBatchHead()); } else { diff --git a/projects/clr/rocclr/device/rocm/rocvirtual.hpp b/projects/clr/rocclr/device/rocm/rocvirtual.hpp index 518adfec63..e9dee419b8 100644 --- a/projects/clr/rocclr/device/rocm/rocvirtual.hpp +++ b/projects/clr/rocclr/device/rocm/rocvirtual.hpp @@ -303,6 +303,9 @@ class VirtualGPU : public device::VirtualDevice { //! Empty check for external signals bool IsExternalSignalListEmpty() const { return external_signals_.empty(); } + //! Adds a raw signal for dependency tracking + void AddDynamicQueueWait(hsa_signal_t signal) { dynamic_queue_waits_.push_back(signal); } + //! Get/Set SDMA profiling bool GetSDMAProfiling() { return sdma_profiling_; } void SetSDMAProfiling(bool profile) { @@ -328,12 +331,14 @@ class VirtualGPU : public device::VirtualDevice { bool sdma_profiling_ = false; //!< If TRUE, then SDMA profiling is enabled const VirtualGPU& gpu_; //!< VirtualGPU, associated with this tracker std::vector external_signals_; //!< External signals for a wait in this queue + std::vector dynamic_queue_waits_; //!< Extra raw signals for a wait in this queue std::vector waiting_signals_; //!< Current waiting signals in this queue }; VirtualGPU(Device& device, bool profiling = false, bool cooperative = false, const std::vector& cuMask = {}, - amd::CommandQueue::Priority priority = amd::CommandQueue::Priority::Normal); + amd::CommandQueue::Priority priority = amd::CommandQueue::Priority::Normal, + bool dedicated_queue = false); ~VirtualGPU(); bool create(); @@ -551,7 +556,7 @@ class VirtualGPU : public device::VirtualDevice { last_write_index_ = index; // Update the last completion signal if the packet has one if (packet.completion_signal.handle != 0) { - last_barrier_index_ = index; + last_packet_with_signal_index_ = index; last_completion_signal_ = packet.completion_signal; } } @@ -559,16 +564,20 @@ class VirtualGPU : public device::VirtualDevice { //! Returns true if the queue is considered as idle. That means all submitted packets are //! complete. Note: it doesn't track the state of caches bool IsQueueIdle() const { - bool result = false; + if (gpu_queue_ == nullptr) { + return true; + } + // Make sure the last packet contained a completion signal - if (last_barrier_index_ == last_write_index_) { + if (last_packet_with_signal_index_ == last_write_index_) { if ((last_write_index_ == 0) && (last_completion_signal_.handle == 0)) { - result = true; + return true; } else { - result = (Hsa::signal_load_relaxed(last_completion_signal_) == 0); + return (Hsa::signal_load_relaxed(last_completion_signal_) == 0); } } - return result; + + return false; } std::vector pinnedMems_; //!< Pinned memory list @@ -627,6 +636,7 @@ class VirtualGPU : public device::VirtualDevice { //!< bit-vector representing the CU mask. Each active bit represents using one CU const std::vector cuMask_; amd::CommandQueue::Priority priority_; //!< The priority for the hsa queue + bool dedicated_queue_; //!< TRUE if this VirtualGPU has a dedicated queue (e.g., null stream) cl_command_type copy_command_type_; //!< Type of the copy command, used for ROC profiler //!< OCL doesn't distinguish different copy types, @@ -636,7 +646,7 @@ class VirtualGPU : public device::VirtualDevice { std::atomic fence_dirty_; //!< Fence modified flag uint64_t last_write_index_ = 0; //!< The last HW queue write index for any packet - uint64_t last_barrier_index_ = 0; //!< The last HW queue write index for a packet + uint64_t last_packet_with_signal_index_ = 0;//!< The last HW queue write index for a packet //!< with a completion signal hsa_signal_t last_completion_signal_{}; //!< The last completion signal diff --git a/projects/clr/rocclr/platform/commandqueue.cpp b/projects/clr/rocclr/platform/commandqueue.cpp index 4fead460b0..8baf4e6bf3 100644 --- a/projects/clr/rocclr/platform/commandqueue.cpp +++ b/projects/clr/rocclr/platform/commandqueue.cpp @@ -35,9 +35,10 @@ namespace amd { HostQueue::HostQueue(Context& context, Device& device, cl_command_queue_properties props, - uint queueRTCUs, Priority priority, const std::vector& cuMask) + uint queueRTCUs, Priority priority, const std::vector& cuMask, + bool dedicated_queue) : CommandQueue(context, device, props, device.info().queueProperties_, queueRTCUs, priority, - cuMask), + cuMask, dedicated_queue), lastEnqueueCommand_(nullptr), head_(nullptr), tail_(nullptr), diff --git a/projects/clr/rocclr/platform/commandqueue.hpp b/projects/clr/rocclr/platform/commandqueue.hpp index ded6c5693b..ca8c619262 100644 --- a/projects/clr/rocclr/platform/commandqueue.hpp +++ b/projects/clr/rocclr/platform/commandqueue.hpp @@ -88,6 +88,9 @@ class CommandQueue : public RuntimeObject { //! Returns the base class object CommandQueue* asCommandQueue() { return this; } + //! Returns TRUE if this queue requires a dedicated HW queue + bool isDedicatedQueue() const { return dedicated_queue_; } + virtual ~CommandQueue() {} //! Returns TRUE if the object was successfully created @@ -123,7 +126,8 @@ class CommandQueue : public RuntimeObject { cl_command_queue_properties propMask, //!< Queue properties mask uint rtCUs = RealTimeDisabled, //!< Avaialble real time compute units Priority priority = Priority::Normal, //!< Queue priority - const std::vector& cuMask = {} //!< CU mask + const std::vector& cuMask = {}, //!< CU mask + bool dedicated_queue = false //!< TRUE if requires dedicated HW queue ) : properties_(propMask, properties), rtCUs_(rtCUs), @@ -132,7 +136,8 @@ class CommandQueue : public RuntimeObject { lastCmdLock_(), device_(device), context_(context), - cuMask_(cuMask) {} + cuMask_(cuMask), + dedicated_queue_(dedicated_queue) {} Properties properties_; //!< Queue properties uint rtCUs_; //!< The number of used RT compute units @@ -142,6 +147,7 @@ class CommandQueue : public RuntimeObject { Device& device_; //!< The device SharedReference context_; //!< The context of this command queue const std::vector cuMask_; //!< The CU mask + bool dedicated_queue_ = false; //!< TRUE if this queue requires a dedicated HW queue private: //! Disable copy constructor @@ -213,7 +219,7 @@ class HostQueue : public CommandQueue { */ HostQueue(Context& context, Device& device, cl_command_queue_properties properties, uint queueRTCUs = 0, Priority priority = Priority::Normal, - const std::vector& cuMask = {}); + const std::vector& cuMask = {}, bool dedicated_queue = false); //! Returns TRUE if this command queue can accept commands. virtual bool create() { return thread_.acceptingCommands_; } diff --git a/projects/clr/rocclr/utils/flags.hpp b/projects/clr/rocclr/utils/flags.hpp index 656c5895da..fba7e2a140 100644 --- a/projects/clr/rocclr/utils/flags.hpp +++ b/projects/clr/rocclr/utils/flags.hpp @@ -265,8 +265,11 @@ release(bool, DEBUG_HIP_KERNARG_COPY_OPT, true, \ "Enable/Disable multiple kern arg copies") \ release(bool, DEBUG_CLR_KERNARG_HDP_FLUSH_WA, false, \ "Toggle kernel arg copy workaround") \ -release(bool, DEBUG_HIP_DYNAMIC_QUEUES, false, \ - "Forces dynamic queue management") \ +release(uint, DEBUG_HIP_DYNAMIC_QUEUES, 2, \ + "Dynamic queue management: 0=off, 1=Queue depth heuristic," \ + "2= Queue Depth + Pipe distribution") \ +release(bool, DEBUG_HIP_IGNORE_STREAM_PRIORITY, false, \ + "Ignore priority streams") \ release(uint, HIP_SKIP_ABORT_ON_GPU_ERROR, true, \ "Set this to true, to avoid host side abort for GPU errors") \ release(bool, HIP_FORCE_SPIRV_CODEOBJECT, false, \