clr: Implement dynamic stream to HWq logic (#1958)

* clr: Implement dynamic stream to HW queue assignment

This change implements dynamic stream to hardware queue (HWq) mapping
with the following features:

* Queue depth heuristics with weights for optimal HWq assignment
* Make last used queue sticky for better locality
* Use pipe HWq to pipe mapping - gfx9 follows a round-robin queue to
  pipe mapping based on creation order (single process per device only,
  as pipe ID is statically assigned by runtime)
* More aggressive heuristic usage for better queue distribution
* Extend dynamic queues support for all stream priorities

Environment variables:
* DEBUG_HIP_DYNAMIC_QUEUE: 0 - disabled, 1 - Depth heuristics 2 -
  Depth+Pipe heuristics
* DEBUG_HIP_IGNORE_STREAM_PRIORITY=1: ignore priority stream creation

* clr: Clean up last_used_queue_
This commit is contained in:
SaleelK
2026-01-23 10:40:54 -08:00
committed by GitHub
parent 89170521f5
commit 340f3aa887
10 changed files with 341 additions and 158 deletions
+7 -1
View File
@@ -32,7 +32,7 @@ namespace hip {
Stream::Stream(hip::Device* dev, Priority p, unsigned int f, bool null_stream,
const std::vector<uint32_t>& cuMask, hipStreamCaptureStatus captureStatus)
: amd::HostQueue(*dev->asContext(), *dev->devices()[0], 0, amd::CommandQueue::RealTimeDisabled,
convertToQueuePriority(p), cuMask),
convertToQueuePriority(p), cuMask, null_stream),
lock_("Stream Callback lock"),
device_(dev),
priority_(p),
@@ -585,6 +585,12 @@ hipError_t hipStreamQuery_common(hipStream_t stream) {
}
hipError_t status = ready ? hipSuccess : hipErrorNotReady;
command->release();
// Stream is complete - opportunistically release its HW queue if idle
if (ready) {
hip_stream->vdev()->ReleaseHwQueue();
}
return status;
}
+191 -98
View File
@@ -136,18 +136,22 @@ Device::Device(hsa_agent_t bkendDevice)
alloc_granularity_(0),
xferQueue_(nullptr),
freeMem_(0),
vgpusAccess_(true) /* Virtual GPU List Ops Lock */
,
vgpusAccess_(true), /* Virtual GPU List Ops Lock */
hsa_exclusive_gpu_access_(false),
queuePool_(QueuePriority::Total),
coopHostcallBuffer_(nullptr),
queueWithCUMaskPool_(QueuePriority::Total),
numOfVgpus_(0),
preferred_numa_node_(0),
maxSdmaReadMask_(0),
maxSdmaWriteMask_(0),
sdma_engine_allocator_(*this),
cpu_agent_info_(nullptr) {
cpu_agent_info_(nullptr),
numHwPipes_(4) {
// Initialize queue pools with proper comparators (requires 'this' pointer)
for (uint i = 0; i < QueuePriority::Total; ++i) {
queuePool_.emplace_back(QueueCompare(this));
queueWithCUMaskPool_.emplace_back(QueueCompare(this));
}
group_segment_.handle = 0;
gpuvm_segment_.handle = 0;
gpu_fine_grained_segment_.handle = 0;
@@ -225,6 +229,11 @@ Device::~Device() {
glb_ctx_ = nullptr;
}
// Destroy transfer queue FIRST (before destroying queues in pool)
// because its destructor will call releaseQueue()
delete xferQueue_;
xferQueue_ = nullptr;
for (auto& it : queuePool_) {
for (auto qIter = it.begin(); qIter != it.end();) {
hsa_queue_t* queue = qIter->first;
@@ -244,9 +253,6 @@ Device::~Device() {
}
queuePool_.clear();
// Destroy transfer queue
delete xferQueue_;
delete blitProgram_;
if (context_ != nullptr) {
@@ -1695,6 +1701,7 @@ device::VirtualDevice* Device::createVirtualDevice(amd::CommandQueue* queue) {
bool profiling = (queue != nullptr) && queue->properties().test(CL_QUEUE_PROFILING_ENABLE);
bool cooperative = false;
bool dedicated_queue = (queue != nullptr) && queue->isDedicatedQueue();
// If amd command queue is null, then it's an internal device queue
if (queue == nullptr) {
@@ -1708,7 +1715,8 @@ device::VirtualDevice* Device::createVirtualDevice(amd::CommandQueue* queue) {
bool q = (queue != nullptr);
VirtualGPU* virtualDevice =
new VirtualGPU(*this, profiling, cooperative, q ? queue->cuMask() : defaultCuMask,
q ? queue->priority() : amd::CommandQueue::Priority::Normal);
q ? queue->priority() : amd::CommandQueue::Priority::Normal,
dedicated_queue);
if (!virtualDevice->create()) {
delete virtualDevice;
@@ -2811,7 +2819,7 @@ VirtualGPU* Device::xferQueue() const {
return nullptr;
}
if (xferQueue_->gpu_queue() == nullptr) {
xferQueue_->set_gpu_queue(thisDevice->AcquireActiveNormalQueue());
xferQueue_->set_gpu_queue(thisDevice->AcquireActiveQueue(amd::CommandQueue::Priority::Normal));
}
}
xferQueue_->enableSyncBlit();
@@ -2863,58 +2871,73 @@ void Device::getHwEventTime(const amd::Event& event, uint64_t* start, uint64_t*
}
// ================================================================================================
hsa_queue_t* Device::getQueueFromPool(const uint qIndex) {
// Check if queue with refCount 0 is available to use
if (queuePool_[qIndex].size() < GPU_MAX_HW_QUEUES) {
for (auto& it : queuePool_[qIndex]) {
if (it.second.refCount == 0) {
it.second.refCount++;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Selected queue refCount: %p (%d)",
it.first->base_address, it.second.refCount);
return it.first;
}
}
} else {
if (qIndex < QueuePriority::Total && queuePool_[qIndex].size() > 0) {
// Search through all available queues for the lowest counter.
// Note: the map is sorted in the allocation order for possible round-robin selection
typedef decltype(queuePool_)::value_type::const_reference PoolRef;
auto lowest = std::min_element(
queuePool_[qIndex].begin(), queuePool_[qIndex].end(),
[](PoolRef A, PoolRef B) { return A.second.refCount < B.second.refCount; });
lowest->second.refCount++;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Selected queue refCount: %p (%d)",
lowest->first->base_address, lowest->second.refCount);
return lowest->first;
}
hsa_queue_t* Device::getQueueFromPool(const uint qIndex, bool force_reuse) {
// Only reuse queues when we've reached the maximum limit, unless forced
// Below the limit, return nullptr to allow creating new queues
if (!force_reuse && queuePool_[qIndex].size() < settings().max_hw_queues_) {
return nullptr;
}
// We've hit the limit, must reuse - find the queue with lowest load metric
if (qIndex < QueuePriority::Total && queuePool_[qIndex].size() > 0) {
typedef decltype(queuePool_)::value_type::const_reference PoolRef;
// Select queue based on dynamic_queues_ mode
decltype(queuePool_[qIndex].begin()) lowest;
uint32_t mode = settings().dynamic_queues_;
// gfx9XX pipe distribution: queues map to pipes via queue_id % num_pipes
const bool pipe_dist = settings().queue_pipe_dist_;
const uint32_t num_pipes = numHwPipes_;
lowest = std::min_element(
queuePool_[qIndex].begin(), queuePool_[qIndex].end(),
[mode, pipe_dist, num_pipes](PoolRef A, PoolRef B) {
if (mode >= 1) {
// Mode 1+: Advanced weighted metric with dedicated queue penalty
// Metric = dedicated_queue_penalty + (depth << 4) + refCount
uint64_t metricA = A.second.GetLoadMetric(A.first, mode);
uint64_t metricB = B.second.GetLoadMetric(B.first, mode);
if (metricA == metricB && pipe_dist) {
// gfx9XX pipe distribution: prefer lower pipe IDs for consistent distribution
uint64_t pipeA = A.first->id % num_pipes;
uint64_t pipeB = B.first->id % num_pipes;
return pipeA < pipeB;
}
return metricA < metricB;
} else {
// Mode 0: Simple refCount-based selection
return A.second.refCount < B.second.refCount;
}
});
lowest->second.refCount++;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE,
"Selected queue (mode=%u): %p refCount: %d, depth: %lu, metric: %lu, pipe: %d%s",
mode, lowest->first->base_address, lowest->second.refCount,
QueueInfo::GetHwQueueDepth(lowest->first),
lowest->second.GetLoadMetric(lowest->first, mode),
pipe_dist ? (lowest->first->id % num_pipes) : -1,
force_reuse ? " (forced)" : "");
return lowest->first;
}
return nullptr;
}
// ================================================================================================
hsa_queue_t* Device::AcquireActiveNormalQueue() {
hsa_queue_t* Device::AcquireActiveQueue(amd::CommandQueue::Priority priority) {
uint32_t queue_size = ROC_AQL_QUEUE_SIZE;
auto queue = acquireQueue(queue_size, false, std::vector<uint32_t>{},
amd::CommandQueue::Priority::Normal, true);
priority, true, false);
return queue;
}
// ================================================================================================
hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue,
const std::vector<uint32_t>& cuMask,
amd::CommandQueue::Priority priority, bool managed) {
amd::ScopedLock l(active_queue_access_);
assert(queuePool_[QueuePriority::Low].size() <= GPU_MAX_HW_QUEUES ||
queuePool_[QueuePriority::Normal].size() <= GPU_MAX_HW_QUEUES ||
queuePool_[QueuePriority::High].size() <= GPU_MAX_HW_QUEUES);
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE,
"Number of allocated hardware queues with low priority: %d,"
" with normal priority: %d, with high priority: %d, maximum per priority is: %d",
queuePool_[QueuePriority::Low].size(), queuePool_[QueuePriority::Normal].size(),
queuePool_[QueuePriority::High].size(), GPU_MAX_HW_QUEUES);
amd::CommandQueue::Priority priority, bool managed,
bool dedicated_queue) {
hsa_amd_queue_priority_t queue_priority;
uint qIndex;
switch (priority) {
@@ -2934,22 +2957,49 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue,
break;
}
// If we have reached the max number of queues, reuse an existing queue with the matching queue
// priority, choosing the one with the least number of users. Note: Don't attempt to reuse the
// cooperative queue, since it's single per device
if (!coop_queue && (cuMask.size() == 0) &&
((queuePool_[qIndex].size() == GPU_MAX_HW_QUEUES) || queuePool_[qIndex].size() > 0)) {
hsa_queue_t* queue = getQueueFromPool(qIndex);
if (queue != nullptr) {
if (!managed && (qIndex == QueuePriority::Normal)) {
num_normal_queues_++;
}
return queue;
}
// If flag set, force all streams to normal priority
// This means that GPU_MAX_HW_QUEUES may need to be incremented
// to account for the additional normal priority queues
if (DEBUG_HIP_IGNORE_STREAM_PRIORITY) {
queue_priority = HSA_AMD_QUEUE_PRIORITY_NORMAL;
qIndex = QueuePriority::Normal;
}
// Else create a new queue. This also includes the initial state where there
// is no queue.
{ // Lock
amd::ScopedLock l(active_queue_access_);
assert(queuePool_[QueuePriority::Low].size() <= settings().max_hw_queues_ ||
queuePool_[QueuePriority::Normal].size() <= settings().max_hw_queues_ ||
queuePool_[QueuePriority::High].size() <= settings().max_hw_queues_);
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE,
"Number of allocated hardware queues with low priority: %d,"
" with normal priority: %d, with high priority: %d, maximum per priority is: %d",
queuePool_[QueuePriority::Low].size(), queuePool_[QueuePriority::Normal].size(),
queuePool_[QueuePriority::High].size(), settings().max_hw_queues_);
// If we have reached the max number of queues, reuse an existing queue with the matching queue
// priority, choosing the one with the least number of users. Note: Don't attempt to reuse the
// cooperative queue, since it's single per device.
// num_queues_[qIndex] tracks persistent (non-managed) queues per priority.
// When the total queues (managed + non-managed) exceed max_hw_queues_, we must reuse existing
// queues. 'managed' streams do not increment num_queues_, allowing them to use the
// pool without permanently consuming slots. ReleaseActiveQueue() uses this counter to
// decide when to start reclaiming queues.
if (!coop_queue && (cuMask.size() == 0) &&
(queuePool_[qIndex].size() >= settings().max_hw_queues_)) {
hsa_queue_t* queue = getQueueFromPool(qIndex, false);
if (queue != nullptr) {
if (!managed) {
num_queues_[qIndex]++;
}
return queue;
}
}
} // Lock release
// Create a new queue.
uint32_t queue_max_packets = 0;
if (HSA_STATUS_SUCCESS !=
Hsa::agent_get_info(bkendDevice_, HSA_AGENT_INFO_QUEUE_MAX_SIZE, &queue_max_packets)) {
@@ -2971,9 +3021,14 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue,
&queue) != HSA_STATUS_SUCCESS) {
queue_size >>= 1;
if (queue_size < 64) {
// if a queue with the same requested priority available from the pool, returns it here
if (!coop_queue && (cuMask.size() == 0) && (queuePool_[qIndex].size() > 0)) {
return getQueueFromPool(qIndex);
LogError("Device::acquireQueue: hsa_queue_create failed!");
// If we can't create even a small queue, try to reuse any existing queue
if (!coop_queue && (cuMask.size() == 0)) {
amd::ScopedLock l(active_queue_access_);
if (queuePool_[qIndex].size() > 0) {
bool kForceReuse = true;
return getQueueFromPool(qIndex, kForceReuse);
}
}
ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_QUEUE,
"Device::acquireQueue: hsa_queue_create failed!");
@@ -3067,12 +3122,14 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue,
return nullptr;
}
if (cuMask.size() != 0) {
amd::ScopedLock l(active_queue_access_);
// add queues with custom CU mask into their special pool to keep track
// of mapping of these queues to their associated queueInfo (i.e., hostcall buffers)
auto result = queueWithCUMaskPool_[qIndex].emplace(std::make_pair(queue, QueueInfo()));
assert(result.second && "QueueInfo already exists");
auto& qInfo = result.first->second;
qInfo.refCount = 1;
qInfo.hasDedicatedQueue_ = dedicated_queue; // Track if this is a dedicated queue
return queue;
}
@@ -3083,22 +3140,41 @@ hsa_queue_t* Device::acquireQueue(uint32_t queue_size_hint, bool coop_queue,
// per device.
return queue;
}
// Add queue to the pool (including dedicated queues)
amd::ScopedLock l(active_queue_access_);
auto result = queuePool_[qIndex].emplace(std::make_pair(queue, QueueInfo()));
assert(result.second && "QueueInfo already exists");
auto& qInfo = result.first->second;
qInfo.refCount = 1;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "acquireQueue refCount: %p (%d)",
result.first->first->base_address, result.first->second.refCount);
if (!managed && (cuMask.size() == 0) && (qIndex = QueuePriority::Normal)) {
num_normal_queues_++;
qInfo.hasDedicatedQueue_ = dedicated_queue; // Track if this is a dedicated queue
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "acquireQueue refCount: %p (%d) %s",
result.first->first->base_address, result.first->second.refCount,
dedicated_queue ? "(dedicated)" : "");
if (!managed && (cuMask.size() == 0)) {
num_queues_[qIndex]++;
}
return queue;
}
// ================================================================================================
bool Device::ReleaseActiveNormalQueue(hsa_queue_t* queue) {
bool Device::ReleaseActiveQueue(hsa_queue_t* queue, amd::CommandQueue::Priority priority) {
uint qIndex;
switch (priority) {
case amd::CommandQueue::Priority::Low:
qIndex = QueuePriority::Low;
break;
case amd::CommandQueue::Priority::High:
qIndex = QueuePriority::High;
break;
case amd::CommandQueue::Priority::Normal:
case amd::CommandQueue::Priority::Medium:
default:
qIndex = QueuePriority::Normal;
break;
}
// Release a queue if the total number of allocated queues exceeds the max possible
if (num_normal_queues_.load() > GPU_MAX_HW_QUEUES) {
if (num_queues_[qIndex].load() > settings().max_hw_queues_) {
releaseQueue(queue, std::vector<uint32_t>{}, false, true);
return true;
} else {
@@ -3109,36 +3185,52 @@ bool Device::ReleaseActiveNormalQueue(hsa_queue_t* queue) {
// ================================================================================================
void Device::releaseQueue(hsa_queue_t* queue, const std::vector<uint32_t>& cuMask, bool coop_queue,
bool managed) {
amd::ScopedLock l(active_queue_access_);
for (auto& it : cuMask.size() == 0 ? queuePool_ : queueWithCUMaskPool_) {
auto qIter = it.find(queue);
if (qIter != it.end()) {
if (!managed && (cuMask.size() == 0) && (&it == &queuePool_[QueuePriority::Normal])) {
num_normal_queues_--;
}
auto& qInfo = qIter->second;
assert(qInfo.refCount > 0);
qInfo.refCount--;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "releaseQueue refCount:%p (%d)",
qIter->first->base_address, qIter->second.refCount);
// hsa queues with cumask set are not being reused. Hence, if the app uses multiple
// such queues it can cause memory leak and those must be destroyed here once the
// refcount reaches 0.
if ((!cuMask.empty()) && (qInfo.refCount == 0)) {
if (qInfo.hostcallBuffer_) {
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE,
"Deleting hostcall buffer %p for hardware queue %p", qInfo.hostcallBuffer_,
qIter->first->base_address);
amd::disableHostcalls(qInfo.hostcallBuffer_);
context().svmFree(qInfo.hostcallBuffer_);
// Defer cleanup operations outside the lock
void* hostcallBufferToFree = nullptr;
bool shouldDestroyQueue = false;
{ // Lock
amd::ScopedLock l(active_queue_access_);
auto& pools = cuMask.size() == 0 ? queuePool_ : queueWithCUMaskPool_;
for (uint qIndex = 0; qIndex < pools.size(); ++qIndex) {
auto& it = pools[qIndex];
auto qIter = it.find(queue);
if (qIter != it.end()) {
if (!managed && (cuMask.size() == 0)) {
num_queues_[qIndex]--;
}
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Deleting hardware queue %p with refCount 0",
queue->base_address);
qIter = it.erase(qIter);
Hsa::queue_destroy(queue);
auto& qInfo = qIter->second;
assert(qInfo.refCount > 0);
qInfo.refCount--;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "releaseQueue refCount:%p (%d)",
qIter->first->base_address, qIter->second.refCount);
// hsa queues with cumask set are not being reused. Hence, if the app uses multiple
// such queues it can cause memory leak and those must be destroyed here once the
// refcount reaches 0.
if ((!cuMask.empty()) && (qInfo.refCount == 0)) {
hostcallBufferToFree = qInfo.hostcallBuffer_;
shouldDestroyQueue = true;
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Deleting hardware queue %p with refCount 0",
queue->base_address);
it.erase(qIter);
}
break; // Found and processed the queue
}
}
} // Lock release
// Perform expensive cleanup operations outside the lock
if (shouldDestroyQueue) {
if (hostcallBufferToFree) {
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE,
"Deleting hostcall buffer %p for hardware queue %p", hostcallBufferToFree,
queue->base_address);
amd::disableHostcalls(hostcallBufferToFree);
context().svmFree(hostcallBufferToFree);
}
Hsa::queue_destroy(queue);
}
if (coop_queue) { // cooperative queue
ClPrint(amd::LOG_INFO, amd::LOG_QUEUE, "Deleting CG enabled hardware queue %p ",
queue->base_address);
@@ -3150,6 +3242,7 @@ void* Device::getOrCreateHostcallBuffer(hsa_queue_t* queue, bool coop_queue,
const std::vector<uint32_t>& cuMask) {
decltype(queuePool_)::value_type::iterator qIter;
bool found = false;
if (!coop_queue) {
for (auto& it : cuMask.size() == 0 ? queuePool_ : queueWithCUMaskPool_) {
qIter = it.find(queue);
+41 -13
View File
@@ -552,18 +552,17 @@ class Device : public NullDevice {
VirtualGPU* xferQueue() const;
//! Acquire HSA queue. This method can create a new HSA queue or
//! share previously created
hsa_queue_t* acquireQueue(
uint32_t queue_size_hint, bool coop_queue = false, const std::vector<uint32_t>& cuMask = {},
amd::CommandQueue::Priority priority = amd::CommandQueue::Priority::Normal,
bool managed = false);
bool managed = false, bool dedicated_queue = false);
//! Release HSA queue
void releaseQueue(hsa_queue_t*, const std::vector<uint32_t>& cuMask = {}, bool coop_queue = false,
bool managed = false);
hsa_queue_t* AcquireActiveNormalQueue();
bool ReleaseActiveNormalQueue(hsa_queue_t* queue);
hsa_queue_t* AcquireActiveQueue(amd::CommandQueue::Priority priority);
bool ReleaseActiveQueue(hsa_queue_t* queue, amd::CommandQueue::Priority priority);
//! For the given HSA queue, return an existing hostcall buffer or create a
//! new one. queuePool_ keeps a mapping from HSA queue to hostcall buffer.
@@ -619,8 +618,11 @@ class Device : public NullDevice {
//! Removes a kernel from the kernel map
void RemoveKernel(Kernel& gpuKernel) const;
// Returns the number of allocated normal queues on this device
uint32_t NumNormalQueues() const { return num_normal_queues_.load(); }
// Returns the number of allocated queues for a given priority on this device
uint32_t NumQueues(uint qIndex) const { return num_queues_[qIndex].load(); }
//! enum for keeping the total and available queue priorities
enum QueuePriority : uint { Low = 0, Normal = 1, High = 2, Total = 3 };
//! Returns true if PM4 emulation is enabled
bool IsPm4Emulation() const { return pm4_emulation_; }
@@ -678,12 +680,40 @@ class Device : public NullDevice {
struct QueueInfo {
int refCount; //! Reference counter. Shows how many time the queue was shared
void* hostcallBuffer_; //! Host call buffer for the HSA queue
bool hasDedicatedQueue_; //! True if this queue is a dedicated queue (e.g., null stream)
// Constructor
QueueInfo() : refCount(0), hostcallBuffer_(nullptr), hasDedicatedQueue_(false) {}
//! Get the current hardware queue depth (wptr - rptr)
static uint64_t GetHwQueueDepth(hsa_queue_t* queue) {
uint64_t wptr = Hsa::queue_load_write_index_relaxed(queue);
uint64_t rptr = Hsa::queue_load_read_index_relaxed(queue);
return wptr - rptr;
}
//! Get a combined metric for queue selection (lower is better)
uint64_t GetLoadMetric(hsa_queue_t* queue, uint32_t mode = 1) const {
auto depth = GetHwQueueDepth(queue);
// Dedicated queue penalty: prefer regular queues, but use dedicated if regular queues
// have depth > ~128 packets. Penalty = 128 << 4 = 2048.
uint64_t dedicated_queue_penalty = hasDedicatedQueue_ ? 2048 : 0;
// Advanced weighted metric: Give queue depth significantly more weight than refCount
uint64_t metric = dedicated_queue_penalty + (depth << 4) + static_cast<uint64_t>(refCount);
return metric;
}
};
struct QueueCompare {
const Device* device_;
QueueCompare(const Device* dev = nullptr) : device_(dev) {}
// Customized queue compare operator to make sure the queues are sorted in the creation order
bool operator()(hsa_queue_t* lhs, hsa_queue_t* rhs) const {
if (DEBUG_HIP_DYNAMIC_QUEUES) {
if (device_ != nullptr && device_->settings().dynamic_queues_ > 0) {
return (lhs->id < rhs->id) ? true : false;
} else {
return (lhs < rhs) ? true : false;
@@ -693,10 +723,10 @@ class Device : public NullDevice {
//! a vector for keeping Pool of HSA queues with low, normal and high priorities for recycling
std::vector<std::map<hsa_queue_t*, QueueInfo, QueueCompare>> queuePool_;
amd::Monitor active_queue_access_; //!< Lock to serialise virtual gpu list access
std::atomic<uint32_t> num_normal_queues_{0}; //!< The total number of allocated normal queues
std::atomic<uint32_t> num_queues_[QueuePriority::Total] = {}; //!< Per-priority queue counters
//! returns a hsa queue from queuePool with least refCount and updates the refCount as well
hsa_queue_t* getQueueFromPool(const uint qIndex);
//! Use dynamic queues mode to get a queue from pool
hsa_queue_t* getQueueFromPool(const uint qIndex, bool force_reuse = false);
void* coopHostcallBuffer_;
//! returns value for corresponding LinkAttrbutes in a vector given Memory pool.
@@ -712,6 +742,7 @@ class Device : public NullDevice {
uint32_t maxSdmaWriteMask_;
bool isXgmi_; //!< Flag to indicate if there is XGMI between CPU<->GPU
bool pm4_emulation_ = false; //!< Flag to indicate if PM4 emulation is enabled
uint32_t numHwPipes_; //!< Number of hardware pipes
//! SDMA engine allocator for per-stream affinity
struct SdmaEngineAllocator {
@@ -743,9 +774,6 @@ class Device : public NullDevice {
public:
std::atomic<uint> numOfVgpus_; //!< Virtual gpu unique index
//! enum for keeping the total and available queue priorities
enum QueuePriority : uint { Low = 0, Normal = 1, High = 2, Total = 3 };
#if defined(__clang__)
#if __has_feature(address_sanitizer)
virtual device::UriLocator* createUriLocator() const;
@@ -91,9 +91,13 @@ Settings::Settings() {
gwsInitSupported_ = true;
limit_blit_wg_ = 16;
dynamic_queues_ = amd::IS_HIP ? DEBUG_HIP_DYNAMIC_QUEUES : false;
dynamic_queues_ = amd::IS_HIP ? DEBUG_HIP_DYNAMIC_QUEUES : 0;
// note: OCL user events don't allow CPU blocking calls in DD mode
blocking_blit_ = amd::IS_HIP || !AMD_DIRECT_DISPATCH;
max_hw_queues_ = GPU_MAX_HW_QUEUES;
queue_pipe_dist_ = false;
}
// ================================================================================================
@@ -153,6 +157,7 @@ bool Settings::create(bool fullProfile, const amd::Isa& isa, bool enableXNACK, b
(gfxStepping == 0 || gfxStepping == 1 || gfxStepping == 2)))) {
// Enable Barrier Value packet is only for MI2XX/300
barrier_value_packet_ = true;
queue_pipe_dist_ = DEBUG_HIP_DYNAMIC_QUEUES == 2 ? true : false;
}
setKernelArgImpl(isa, isXgmi, hasValidHDPFlush);
@@ -48,9 +48,10 @@ class Settings : public device::Settings {
uint system_scope_signal_ : 1; //!< HSA signal is visibile to the entire system
uint fgs_kernel_arg_ : 1; //!< Use fine grain kernel arg segment
uint barrier_value_packet_ : 1; //!< Barrier value packet functionality
uint dynamic_queues_ : 1; //!< Dynamic queues management
uint dynamic_queues_ : 2; //!< Dynamic queues: 0=off, 1=Depth
uint blocking_blit_ : 1; //!< Blit ops can be blocking on CPU
uint reserved_ : 21;
uint queue_pipe_dist_ : 1; //!< MI300 queue pipe distribution (gfx94x)
uint reserved_ : 19;
};
uint value_;
};
@@ -74,6 +75,7 @@ class Settings : public device::Settings {
uint32_t hmmFlags_; //!< HMM functionality control flags
uint32_t limit_blit_wg_; //!< The number of workgroups for blit execution
uint32_t max_hw_queues_; //!< Effective maximum HW queues (accounts for null stream reservation)
//! Default constructor
Settings();
+57 -28
View File
@@ -314,6 +314,10 @@ bool HsaAmdSignalHandler(hsa_signal_value_t value, void* arg) {
// Update the batch, since signal is complete
gpu->updateCommandsState(ts->command().GetBatchHead());
// Opportunistically try to release the HW queue if it's now idle
// This helps reclaim queues in async workloads without explicit sync
gpu->ReleaseHwQueue();
// Reset API callback signal. It will release AQL queue and start commands processing
if (callback_signal.handle != 0 && isBlocking) {
Hsa::signal_subtract_relaxed(callback_signal, 1);
@@ -1011,9 +1015,10 @@ bool VirtualGPU::processMemObjects(const amd::Kernel& kernel, const_address para
// ================================================================================================
uint64_t VirtualGPU::getQueueID() {
amd::ScopedLock lock(execution());
if (gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveNormalQueue();
// Dedicated queues keep their HW queue, never acquire from pool
if (!dedicated_queue_ && gpu_queue_ == nullptr) {
amd::ScopedLock lock(execution());
gpu_queue_ = roc_device_.AcquireActiveQueue(priority_);
}
return gpu_queue_->id;
}
@@ -1713,7 +1718,8 @@ bool VirtualGPU::releaseGpuMemoryFence(bool skip_cpu_wait) {
// ================================================================================================
VirtualGPU::VirtualGPU(Device& device, bool profiling, bool cooperative,
const std::vector<uint32_t>& cuMask, amd::CommandQueue::Priority priority)
const std::vector<uint32_t>& cuMask, amd::CommandQueue::Priority priority,
bool dedicated_queue)
: device::VirtualDevice(device),
state_(0),
gpu_queue_(nullptr),
@@ -1728,9 +1734,10 @@ VirtualGPU::VirtualGPU(Device& device, bool profiling, bool cooperative,
managed_kernarg_buffer_(*this, device.settings().kernargPoolSize_),
cuMask_(cuMask),
priority_(priority),
copy_command_type_(0),
fence_state_(Device::CacheState::kCacheStateInvalid),
fence_dirty_(false) {
copy_command_type_(0),
fence_state_(Device::CacheState::kCacheStateInvalid),
fence_dirty_(false),
dedicated_queue_(dedicated_queue) {
index_ = device.numOfVgpus_++;
gpu_device_ = device.getBackendDevice();
printfdbg_ = nullptr;
@@ -1791,8 +1798,9 @@ VirtualGPU::~VirtualGPU() {
if (tracking_created_) {
amd::ScopedLock l(execution());
if (gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveNormalQueue();
// Dedicated queues keep their HW queue, never acquire from pool
if (!dedicated_queue_ && gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveQueue(priority_);
}
// Windows requires an interrupt in more cases than Linux for OS fence updates
force_irq_ = IS_WINDOWS;
@@ -1839,7 +1847,8 @@ VirtualGPU::~VirtualGPU() {
bool VirtualGPU::create() {
// Pick a reasonable queue size
uint32_t queue_size = ROC_AQL_QUEUE_SIZE;
gpu_queue_ = roc_device_.acquireQueue(queue_size, cooperative_, cuMask_, priority_);
gpu_queue_ = roc_device_.acquireQueue(queue_size, cooperative_, cuMask_, priority_, false,
dedicated_queue_);
if (!gpu_queue_) return false;
if (!managed_kernarg_buffer_.Create(Device::MemorySegment::kKernArg)) {
@@ -2004,29 +2013,48 @@ void VirtualGPU::ReleaseSdmaEngines() {
// ================================================================================================
void VirtualGPU::ReleaseAllHwQueues() {
if (roc_device_.settings().dynamic_queues_ &&
(roc_device_.NumNormalQueues() > GPU_MAX_HW_QUEUES)) {
// Lock the device to make the following thread safe
amd::ScopedLock lock(roc_device_.vgpusAccess());
for (uint idx = 0; idx < roc_device_.vgpus().size(); ++idx) {
roc_device_.vgpus()[idx]->ReleaseHwQueue();
if (roc_device_.settings().dynamic_queues_) {
// Check if any priority level exceeds max_hw_queues_
bool should_release = false;
for (uint qIdx = 0; qIdx < Device::QueuePriority::Total; ++qIdx) {
if (roc_device_.NumQueues(qIdx) > roc_device_.settings().max_hw_queues_) {
should_release = true;
break;
}
}
if (should_release) {
// Lock the device to make the following thread safe
amd::ScopedLock lock(roc_device_.vgpusAccess());
for (uint idx = 0; idx < roc_device_.vgpus().size(); ++idx) {
roc_device_.vgpus()[idx]->ReleaseHwQueue();
}
}
}
}
// ================================================================================================
void VirtualGPU::ReleaseHwQueue() {
// Try to release normal queue to the pool of active queues
if (roc_device_.settings().dynamic_queues_ &&
(priority_ == amd::CommandQueue::Priority::Normal) && !cooperative_ &&
// Dedicated queues keep their HW queue, never release to pool
if (dedicated_queue_) {
return;
}
// Try to release queue to the pool of active queues.
// Use tryLock() since this may be called from the HsaAmdSignalHandler
// and blocking here could cause deadlock
if (roc_device_.settings().dynamic_queues_ > 0 && !cooperative_ &&
(cuMask_.size() == 0)) {
amd::ScopedLock lock(execution());
if (gpu_queue_ != nullptr) {
if (IsQueueIdle()) {
if (roc_device_.ReleaseActiveNormalQueue(gpu_queue_)) {
gpu_queue_ = nullptr;
// If tryLock fails, skip the release - the queue will be released
// on next opportunity
if (execution().tryLock()) {
if (gpu_queue_ != nullptr) {
if (IsQueueIdle()) {
if (roc_device_.ReleaseActiveQueue(gpu_queue_, priority_)) {
gpu_queue_ = nullptr;
}
}
}
execution().unlock();
}
}
}
@@ -2037,8 +2065,9 @@ void VirtualGPU::ReleaseHwQueue() {
* and then calls start() to get the current host timestamp.
*/
void VirtualGPU::profilingBegin(amd::Command& command, bool sdmaProfiling) {
if (gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveNormalQueue();
// Dedicated queues keep their HW queue, never acquire from pool
if (!dedicated_queue_ && gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveQueue(priority_);
}
// Track the current command
command_ = &command;
@@ -4050,8 +4079,8 @@ void VirtualGPU::submitMarker(amd::Marker& vcmd) {
force_irq_ = IS_WINDOWS;
// It should be safe to call flush directly if there are not pending dispatches without
// HSA signal callback
if (gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveNormalQueue();
if (!dedicated_queue_ && gpu_queue_ == nullptr) {
gpu_queue_ = roc_device_.AcquireActiveQueue(priority_);
}
flush(vcmd.GetBatchHead());
} else {
+18 -8
View File
@@ -303,6 +303,9 @@ class VirtualGPU : public device::VirtualDevice {
//! Empty check for external signals
bool IsExternalSignalListEmpty() const { return external_signals_.empty(); }
//! Adds a raw signal for dependency tracking
void AddDynamicQueueWait(hsa_signal_t signal) { dynamic_queue_waits_.push_back(signal); }
//! Get/Set SDMA profiling
bool GetSDMAProfiling() { return sdma_profiling_; }
void SetSDMAProfiling(bool profile) {
@@ -328,12 +331,14 @@ class VirtualGPU : public device::VirtualDevice {
bool sdma_profiling_ = false; //!< If TRUE, then SDMA profiling is enabled
const VirtualGPU& gpu_; //!< VirtualGPU, associated with this tracker
std::vector<ProfilingSignal*> external_signals_; //!< External signals for a wait in this queue
std::vector<hsa_signal_t> dynamic_queue_waits_; //!< Extra raw signals for a wait in this queue
std::vector<hsa_signal_t> waiting_signals_; //!< Current waiting signals in this queue
};
VirtualGPU(Device& device, bool profiling = false, bool cooperative = false,
const std::vector<uint32_t>& cuMask = {},
amd::CommandQueue::Priority priority = amd::CommandQueue::Priority::Normal);
amd::CommandQueue::Priority priority = amd::CommandQueue::Priority::Normal,
bool dedicated_queue = false);
~VirtualGPU();
bool create();
@@ -551,7 +556,7 @@ class VirtualGPU : public device::VirtualDevice {
last_write_index_ = index;
// Update the last completion signal if the packet has one
if (packet.completion_signal.handle != 0) {
last_barrier_index_ = index;
last_packet_with_signal_index_ = index;
last_completion_signal_ = packet.completion_signal;
}
}
@@ -559,16 +564,20 @@ class VirtualGPU : public device::VirtualDevice {
//! Returns true if the queue is considered as idle. That means all submitted packets are
//! complete. Note: it doesn't track the state of caches
bool IsQueueIdle() const {
bool result = false;
if (gpu_queue_ == nullptr) {
return true;
}
// Make sure the last packet contained a completion signal
if (last_barrier_index_ == last_write_index_) {
if (last_packet_with_signal_index_ == last_write_index_) {
if ((last_write_index_ == 0) && (last_completion_signal_.handle == 0)) {
result = true;
return true;
} else {
result = (Hsa::signal_load_relaxed(last_completion_signal_) == 0);
return (Hsa::signal_load_relaxed(last_completion_signal_) == 0);
}
}
return result;
return false;
}
std::vector<amd::Memory*> pinnedMems_; //!< Pinned memory list
@@ -627,6 +636,7 @@ class VirtualGPU : public device::VirtualDevice {
//!< bit-vector representing the CU mask. Each active bit represents using one CU
const std::vector<uint32_t> cuMask_;
amd::CommandQueue::Priority priority_; //!< The priority for the hsa queue
bool dedicated_queue_; //!< TRUE if this VirtualGPU has a dedicated queue (e.g., null stream)
cl_command_type copy_command_type_; //!< Type of the copy command, used for ROC profiler
//!< OCL doesn't distinguish different copy types,
@@ -636,7 +646,7 @@ class VirtualGPU : public device::VirtualDevice {
std::atomic<bool> fence_dirty_; //!< Fence modified flag
uint64_t last_write_index_ = 0; //!< The last HW queue write index for any packet
uint64_t last_barrier_index_ = 0; //!< The last HW queue write index for a packet
uint64_t last_packet_with_signal_index_ = 0;//!< The last HW queue write index for a packet
//!< with a completion signal
hsa_signal_t last_completion_signal_{}; //!< The last completion signal
@@ -35,9 +35,10 @@
namespace amd {
HostQueue::HostQueue(Context& context, Device& device, cl_command_queue_properties props,
uint queueRTCUs, Priority priority, const std::vector<uint32_t>& cuMask)
uint queueRTCUs, Priority priority, const std::vector<uint32_t>& cuMask,
bool dedicated_queue)
: CommandQueue(context, device, props, device.info().queueProperties_, queueRTCUs, priority,
cuMask),
cuMask, dedicated_queue),
lastEnqueueCommand_(nullptr),
head_(nullptr),
tail_(nullptr),
@@ -88,6 +88,9 @@ class CommandQueue : public RuntimeObject {
//! Returns the base class object
CommandQueue* asCommandQueue() { return this; }
//! Returns TRUE if this queue requires a dedicated HW queue
bool isDedicatedQueue() const { return dedicated_queue_; }
virtual ~CommandQueue() {}
//! Returns TRUE if the object was successfully created
@@ -123,7 +126,8 @@ class CommandQueue : public RuntimeObject {
cl_command_queue_properties propMask, //!< Queue properties mask
uint rtCUs = RealTimeDisabled, //!< Avaialble real time compute units
Priority priority = Priority::Normal, //!< Queue priority
const std::vector<uint32_t>& cuMask = {} //!< CU mask
const std::vector<uint32_t>& cuMask = {}, //!< CU mask
bool dedicated_queue = false //!< TRUE if requires dedicated HW queue
)
: properties_(propMask, properties),
rtCUs_(rtCUs),
@@ -132,7 +136,8 @@ class CommandQueue : public RuntimeObject {
lastCmdLock_(),
device_(device),
context_(context),
cuMask_(cuMask) {}
cuMask_(cuMask),
dedicated_queue_(dedicated_queue) {}
Properties properties_; //!< Queue properties
uint rtCUs_; //!< The number of used RT compute units
@@ -142,6 +147,7 @@ class CommandQueue : public RuntimeObject {
Device& device_; //!< The device
SharedReference<Context> context_; //!< The context of this command queue
const std::vector<uint32_t> cuMask_; //!< The CU mask
bool dedicated_queue_ = false; //!< TRUE if this queue requires a dedicated HW queue
private:
//! Disable copy constructor
@@ -213,7 +219,7 @@ class HostQueue : public CommandQueue {
*/
HostQueue(Context& context, Device& device, cl_command_queue_properties properties,
uint queueRTCUs = 0, Priority priority = Priority::Normal,
const std::vector<uint32_t>& cuMask = {});
const std::vector<uint32_t>& cuMask = {}, bool dedicated_queue = false);
//! Returns TRUE if this command queue can accept commands.
virtual bool create() { return thread_.acceptingCommands_; }
+5 -2
View File
@@ -265,8 +265,11 @@ release(bool, DEBUG_HIP_KERNARG_COPY_OPT, true, \
"Enable/Disable multiple kern arg copies") \
release(bool, DEBUG_CLR_KERNARG_HDP_FLUSH_WA, false, \
"Toggle kernel arg copy workaround") \
release(bool, DEBUG_HIP_DYNAMIC_QUEUES, false, \
"Forces dynamic queue management") \
release(uint, DEBUG_HIP_DYNAMIC_QUEUES, 2, \
"Dynamic queue management: 0=off, 1=Queue depth heuristic," \
"2= Queue Depth + Pipe distribution") \
release(bool, DEBUG_HIP_IGNORE_STREAM_PRIORITY, false, \
"Ignore priority streams") \
release(uint, HIP_SKIP_ABORT_ON_GPU_ERROR, true, \
"Set this to true, to avoid host side abort for GPU errors") \
release(bool, HIP_FORCE_SPIRV_CODEOBJECT, false, \