From be2bdabb7645cdcac5337bf269cc8375b7935ad6 Mon Sep 17 00:00:00 2001 From: Ioannis Assiouras Date: Tue, 6 Feb 2024 00:34:35 +0000 Subject: [PATCH] SWDEV-430437,SWDEV-434702 - Split the streamset per device Change-Id: If1bcca45825c9899462bb95ed6f637f5af806cc8 --- hipamd/src/hip_context.cpp | 2 +- hipamd/src/hip_device.cpp | 143 +++++++++++++++++++++++++++++- hipamd/src/hip_device_runtime.cpp | 2 +- hipamd/src/hip_internal.hpp | 31 ++++--- hipamd/src/hip_memory.cpp | 8 +- hipamd/src/hip_stream.cpp | 133 ++------------------------- 6 files changed, 175 insertions(+), 144 deletions(-) diff --git a/hipamd/src/hip_context.cpp b/hipamd/src/hip_context.cpp index 4b7f5c8098..06e700ee19 100644 --- a/hipamd/src/hip_context.cpp +++ b/hipamd/src/hip_context.cpp @@ -95,7 +95,7 @@ hip::Stream* getStream(hipStream_t stream, bool wait) { hip::Stream* hip_stream = reinterpret_cast(stream); if (wait && !(hip_stream->Flags() & hipStreamNonBlocking)) { constexpr bool WaitNullStreamOnly = true; - iHipWaitActiveStreams(hip_stream, WaitNullStreamOnly); + hip_stream->GetDevice()->WaitActiveStreams(hip_stream, WaitNullStreamOnly); } return hip_stream; } diff --git a/hipamd/src/hip_device.cpp b/hipamd/src/hip_device.cpp index 019ee223eb..5ea357f2f4 100644 --- a/hipamd/src/hip_device.cpp +++ b/hipamd/src/hip_device.cpp @@ -43,7 +43,7 @@ hip::Stream* Device::NullStream(bool wait) { } if (wait == true) { // Wait for all active streams before executing commands on the default - iHipWaitActiveStreams(null_stream_); + WaitActiveStreams(null_stream_); } return null_stream_; } @@ -149,11 +149,150 @@ void Device::Reset() { mem_pools_.clear(); } flags_ = hipDeviceScheduleSpin; - hip::Stream::destroyAllStreams(deviceId_); + destroyAllStreams(); amd::MemObjMap::Purge(devices()[0]); Create(); } +// ================================================================================================ +void Device::WaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream) { + amd::Command::EventWaitList eventWaitList(0); + bool submitMarker = 0; + + auto waitForStream = [&submitMarker, + &eventWaitList](hip::Stream* stream) { + if (amd::Command *command = stream->getLastQueuedCommand(true)) { + amd::Event &event = command->event(); + // Check HW status of the ROCcrl event. + // Note: not all ROCclr modes support HW status + bool ready = stream->device().IsHwEventReady(event); + if (!ready) { + ready = (command->status() == CL_COMPLETE); + } + submitMarker |= stream->vdev()->isFenceDirty(); + // Check the current active status + if (!ready) { + command->notifyCmdQueue(); + eventWaitList.push_back(command); + } else { + command->release(); + } + } + }; + + if (wait_null_stream) { + if (null_stream_) { + waitForStream(null_stream_); + } + } else { + amd::ScopedLock lock(streamSetLock); + + for (const auto& active_stream : streamSet) { + // If it's the current device + if (// Make sure it's a default stream + ((active_stream->Flags() & hipStreamNonBlocking) == 0) && + // and it's not the current stream + (active_stream != blocking_stream)) { + // Get the last valid command + waitForStream(active_stream); + } + } + } + + // Check if we have to wait anything + if (eventWaitList.size() > 0 || submitMarker) { + amd::Command* command = new amd::Marker(*blocking_stream, kMarkerDisableFlush, eventWaitList); + if (command != nullptr) { + command->enqueue(); + command->release(); + } + } + + // Release all active commands. It's safe after the marker was enqueued + for (const auto& it : eventWaitList) { + it->release(); + } +} + +// ================================================================================================ +void Device::AddStream(Stream* stream) { + amd::ScopedLock lock(streamSetLock); + streamSet.insert(stream); +} + +// ================================================================================================ +void Device::RemoveStream(Stream* stream){ + amd::ScopedLock lock(streamSetLock); + streamSet.erase(stream); +} + +// ================================================================================================ +bool Device::StreamExists(Stream* stream){ + amd::ScopedLock lock(streamSetLock); + if (streamSet.find(stream) != streamSet.end()) { + return true; + } + return false; +} + +// ================================================================================================ +void Device::destroyAllStreams() { + std::vector toBeDeleted; + { + amd::ScopedLock lock(streamSetLock); + for (auto& it : streamSet) { + if (it->Null() == false ) { + toBeDeleted.push_back(it); + } + } + } + for (auto& it : toBeDeleted) { + hip::Stream::Destroy(it); + } +} + +// ================================================================================================ +void Device::SyncAllStreams( bool cpu_wait) { + // Make a local copy to avoid stalls for GPU finish with multiple threads + std::vector streams; + streams.reserve(streamSet.size()); + { + amd::ScopedLock lock(streamSetLock); + for (auto it : streamSet) { + streams.push_back(it); + it->retain(); + } + } + for (auto it : streams) { + it->finish(cpu_wait); + it->release(); + } + // Release freed memory for all memory pools on the device + ReleaseFreedMemory(); +} + +// ================================================================================================ +bool Device::StreamCaptureBlocking() { + amd::ScopedLock lock(streamSetLock); + for (auto& it : streamSet) { + if (it->GetCaptureStatus() == hipStreamCaptureStatusActive && it->Flags() != hipStreamNonBlocking) { + return true; + } + } + return false; +} + +// ================================================================================================ +bool Device::existsActiveStreamForDevice() { + amd::ScopedLock lock(streamSetLock); + for (const auto& active_stream : streamSet) { + if (active_stream->GetQueueStatus()) { + return true; + } + } + return false; +} + // ================================================================================================ Device::~Device() { if (default_mem_pool_ != nullptr) { diff --git a/hipamd/src/hip_device_runtime.cpp b/hipamd/src/hip_device_runtime.cpp index 19a045dba4..b199bcdde8 100644 --- a/hipamd/src/hip_device_runtime.cpp +++ b/hipamd/src/hip_device_runtime.cpp @@ -610,7 +610,7 @@ hipError_t hipDeviceSetSharedMemConfig(hipSharedMemConfig config) { hipError_t hipDeviceSynchronize() { HIP_INIT_API(hipDeviceSynchronize); constexpr bool kDoWaitForCpu = true; - hip::Stream::SyncAllStreams(hip::getCurrentDevice()->deviceId(), kDoWaitForCpu); + hip::getCurrentDevice()->SyncAllStreams(kDoWaitForCpu); HIP_RETURN(hipSuccess); } diff --git a/hipamd/src/hip_internal.hpp b/hipamd/src/hip_internal.hpp index 46a03e0b4d..643d2c234c 100644 --- a/hipamd/src/hip_internal.hpp +++ b/hipamd/src/hip_internal.hpp @@ -306,15 +306,9 @@ public: /// Returns the CU mask for the current stream const std::vector GetCUMask() const { return cuMask_; } - /// Sync all streams - static void SyncAllStreams(int deviceId, bool cpu_wait = true); - /// Check whether any blocking stream running static bool StreamCaptureBlocking(); - /// Destroy all streams on a given device - static void destroyAllStreams(int deviceId); - static void Destroy(hip::Stream* stream); /// Check Stream Capture status to make sure it is done @@ -416,7 +410,6 @@ public: parallelCaptureStreams_.erase(it); } } - static bool existsActiveStreamForDevice(hip::Device* device); /// The stream should be destroyed via release() rather than delete private: @@ -426,6 +419,8 @@ public: /// HIP Device class class Device { amd::Monitor lock_{"Device lock", true}; + amd::Monitor streamSetLock{"Guards device stream set"}; + std::unordered_set streamSet; /// ROCclr context amd::Context* context_; /// Device's ID @@ -499,7 +494,7 @@ public: amd::ScopedLock lock(lock_); /// Either stream is active or device is active if (isActive_) return true; - if (Stream::existsActiveStreamForDevice(this)) { + if (existsActiveStreamForDevice()) { isActive_ = true; return true; } @@ -540,6 +535,22 @@ public: /// Returns true if memory pool is valid on this device bool IsMemoryPoolValid(MemoryPool* pool); + void AddStream(Stream* stream); + + void RemoveStream(Stream* stream); + + bool StreamExists(Stream* stream); + + void destroyAllStreams(); + + void SyncAllStreams( bool cpu_wait = true); + + bool StreamCaptureBlocking(); + + bool existsActiveStreamForDevice(); + /// Wait all active streams on the blocking queue. The method enqueues a wait command and + /// doesn't stall the current thread + void WaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream = false); }; /// Thread Local Storage Variables Aggregator Class @@ -589,10 +600,6 @@ public: extern void WaitThenDecrementSignal(hipStream_t stream, hipError_t status, void* user_data); - /// Wait all active streams on the blocking queue. The method enqueues a wait command and - /// doesn't stall the current thread - extern void iHipWaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream = false); - extern std::vector g_devices; extern hipError_t ihipDeviceGetCount(int* count); extern int ihipGetDevice(); diff --git a/hipamd/src/hip_memory.cpp b/hipamd/src/hip_memory.cpp index 613908501d..c95cdefe93 100644 --- a/hipamd/src/hip_memory.cpp +++ b/hipamd/src/hip_memory.cpp @@ -76,7 +76,7 @@ hipError_t ihipFree(void *ptr) { if (memory_object != nullptr) { // Wait on the device, associated with the current memory object during allocation auto device_id = memory_object->getUserData().deviceId; - hip::Stream::SyncAllStreams(device_id); + g_devices[device_id]->SyncAllStreams(); // Find out if memory belongs to any memory pool if (!g_devices[device_id]->FreeMemory(memory_object, nullptr)) { @@ -743,7 +743,7 @@ hipError_t ihipArrayDestroy(hipArray_t array) { auto image = as_amd(memObj); // Wait on the device, associated with the current memory object during allocation - hip::Stream::SyncAllStreams(image->getUserData().deviceId); + g_devices[image->getUserData().deviceId]->SyncAllStreams(); image->release(); delete array; @@ -1252,7 +1252,7 @@ hipError_t ihipHostUnregister(void* hostPtr) { if (mem != nullptr) { // Wait on the device, associated with the current memory object during allocation - hip::Stream::SyncAllStreams(mem->getUserData().deviceId); + g_devices[mem->getUserData().deviceId]->SyncAllStreams(); amd::MemObjMap::RemoveMemObj(hostPtr); for (const auto& device: g_devices) { @@ -4304,7 +4304,7 @@ hipError_t ihipMipmappedArrayDestroy(hipMipmappedArray_t mipmapped_array_ptr) { auto image = as_amd(mem_obj); // Wait on the device, associated with the current memory object during allocation - hip::Stream::SyncAllStreams(image->getUserData().deviceId); + g_devices[image->getUserData().deviceId]->SyncAllStreams(); image->release(); delete mipmapped_array_ptr; diff --git a/hipamd/src/hip_stream.cpp b/hipamd/src/hip_stream.cpp index 9d0475bc93..a78fc093d6 100644 --- a/hipamd/src/hip_stream.cpp +++ b/hipamd/src/hip_stream.cpp @@ -25,8 +25,6 @@ #include "hip_prof_api.h" namespace hip { -static amd::Monitor streamSetLock{"Guards global stream set"}; -static std::unordered_set streamSet; // ================================================================================================ Stream::Stream(hip::Device* dev, Priority p, unsigned int f, bool null_stream, @@ -43,8 +41,7 @@ Stream::Stream(hip::Device* dev, Priority p, unsigned int f, bool null_stream, originStream_(false), captureID_(0) { - amd::ScopedLock lock(streamSetLock); - streamSet.insert(this); + device_->AddStream(this); } // ================================================================================================ @@ -76,10 +73,7 @@ bool Stream::Create() { // ================================================================================================ void Stream::Destroy(hip::Stream* stream) { - { - amd::ScopedLock lock(streamSetLock); - streamSet.erase(stream); - } + stream->device_->RemoveStream(stream); stream->release(); } @@ -95,11 +89,12 @@ bool isValid(hipStream_t& stream) { } hip::Stream* s = reinterpret_cast(stream); - amd::ScopedLock lock(streamSetLock); - if (streamSet.find(s) == streamSet.end()) { - return false; + for (auto& device : g_devices) { + if (device->StreamExists(s)) { + return true; + } } - return true; + return false; } // ================================================================================================ @@ -122,53 +117,17 @@ int Stream::DeviceId(const hipStream_t hStream) { } // ================================================================================================ -void Stream::SyncAllStreams(int deviceId, bool cpu_wait) { - // Make a local copy to avoid stalls for GPU finish with multiple threads - std::vector streams; - streams.reserve(streamSet.size()); - { - amd::ScopedLock lock(streamSetLock); - for (auto it : streamSet) { - if (it->DeviceId() == deviceId) { - streams.push_back(it); - it->retain(); - } - } - } - for (auto it : streams) { - it->finish(cpu_wait); - it->release(); - } - // Release freed memory for all memory pools on the device - g_devices[deviceId]->ReleaseFreedMemory(); -} // ================================================================================================ bool Stream::StreamCaptureBlocking() { - amd::ScopedLock lock(streamSetLock); - for (auto& it : streamSet) { - if (it->GetCaptureStatus() == hipStreamCaptureStatusActive && it->Flags() != hipStreamNonBlocking) { + for (auto& device : g_devices) { + if (device->StreamCaptureBlocking()) { return true; } } return false; } -void Stream::destroyAllStreams(int deviceId) { - std::vector toBeDeleted; - { - amd::ScopedLock lock(streamSetLock); - for (auto& it : streamSet) { - if (it->Null() == false && it->DeviceId() == deviceId) { - toBeDeleted.push_back(it); - } - } - } - for (auto& it : toBeDeleted) { - hip::Stream::Destroy(it); - } -} - bool Stream::StreamCaptureOngoing(hipStream_t hStream) { hip::Stream* s = reinterpret_cast(hStream); // Allow capture to be less restrictive one one changes the stream capture interaction @@ -188,80 +147,6 @@ bool Stream::StreamCaptureOngoing(hipStream_t hStream) { } } -bool Stream::existsActiveStreamForDevice(hip::Device* device) { - - amd::ScopedLock lock(streamSetLock); - - for (const auto& active_stream : streamSet) { - if ((active_stream->GetDevice() == device) && - active_stream->GetQueueStatus()) { - return true; - } - } - return false; -} - -// ================================================================================================ -void iHipWaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream) { - amd::Command::EventWaitList eventWaitList(0); - bool submitMarker = 0; - - auto waitForStream = [&submitMarker, - &eventWaitList](hip::Stream* stream) { - if (amd::Command *command = stream->getLastQueuedCommand(true)) { - amd::Event &event = command->event(); - // Check HW status of the ROCcrl event. - // Note: not all ROCclr modes support HW status - bool ready = stream->device().IsHwEventReady(event); - if (!ready) { - ready = (command->status() == CL_COMPLETE); - } - submitMarker |= stream->vdev()->isFenceDirty(); - // Check the current active status - if (!ready) { - command->notifyCmdQueue(); - eventWaitList.push_back(command); - } else { - command->release(); - } - } - }; - - if (wait_null_stream) { - if (hip::Stream* null_stream = blocking_stream->GetDevice()->GetNullStream()) { - waitForStream(null_stream); - } - } else { - amd::ScopedLock lock(streamSetLock); - - for (const auto& active_stream : streamSet) { - // If it's the current device - if ((&active_stream->device() == &blocking_stream->device()) && - // Make sure it's a default stream - ((active_stream->Flags() & hipStreamNonBlocking) == 0) && - // and it's not the current stream - (active_stream != blocking_stream)) { - // Get the last valid command - waitForStream(active_stream); - } - } - } - - // Check if we have to wait anything - if (eventWaitList.size() > 0 || submitMarker) { - amd::Command* command = new amd::Marker(*blocking_stream, kMarkerDisableFlush, eventWaitList); - if (command != nullptr) { - command->enqueue(); - command->release(); - } - } - - // Release all active commands. It's safe after the marker was enqueued - for (const auto& it : eventWaitList) { - it->release(); - } -} - // ================================================================================================ void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data) { StreamCallback* cbo = reinterpret_cast(user_data);