From bc65ca64fc5be1cde0d24713fe707297b90c228d Mon Sep 17 00:00:00 2001 From: German Andryeyev Date: Wed, 29 Apr 2020 02:11:37 -0400 Subject: [PATCH] Clean-up the list of blocking streams - Insert the stream into the list on the host queue creation, instead of stream creation Change-Id: Ib25053019f7df97e5bc786922a6587b9514852d3 --- hipamd/vdi/hip_event.cpp | 6 +-- hipamd/vdi/hip_internal.hpp | 6 +-- hipamd/vdi/hip_stream.cpp | 98 ++++++++++++++++++++++--------------- 3 files changed, 63 insertions(+), 47 deletions(-) diff --git a/hipamd/vdi/hip_event.cpp b/hipamd/vdi/hip_event.cpp index 0cd061c1e8..26191ec380 100644 --- a/hipamd/vdi/hip_event.cpp +++ b/hipamd/vdi/hip_event.cpp @@ -217,18 +217,14 @@ hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream) { HIP_RETURN(hipErrorInvalidHandle); } - hip::Event* e = reinterpret_cast(event); - - hip::Stream* s = reinterpret_cast(stream); amd::HostQueue* queue = hip::getQueue(stream); - amd::Command* command = queue->getLastQueuedCommand(true); - if (command == nullptr) { command = new amd::Marker(*queue, false); command->enqueue(); } + hip::Event* e = reinterpret_cast(event); e->addMarker(queue, command); HIP_RETURN(hipSuccess); diff --git a/hipamd/vdi/hip_internal.hpp b/hipamd/vdi/hip_internal.hpp index c0ced2e8a3..3d7c5249a8 100644 --- a/hipamd/vdi/hip_internal.hpp +++ b/hipamd/vdi/hip_internal.hpp @@ -90,10 +90,10 @@ namespace hip { public: Stream(Device* dev, amd::CommandQueue::Priority p, unsigned int f = 0, bool null_stream = false); - bool create(); + bool Create(); amd::HostQueue* asHostQueue(); - void destroy(); - void finish() const; + void Destroy(); + void Finish() const; /// Get device ID associated with the current stream; int DeviceId() const; /// Returns if stream is null stream diff --git a/hipamd/vdi/hip_stream.cpp b/hipamd/vdi/hip_stream.cpp index b2838c0164..fbcd223ed2 100644 --- a/hipamd/vdi/hip_stream.cpp +++ b/hipamd/vdi/hip_stream.cpp @@ -42,51 +42,71 @@ class StreamCallback { namespace hip { +// ================================================================================================ Stream::Stream(hip::Device* dev, amd::CommandQueue::Priority p, unsigned int f, bool null_stream) : queue_(nullptr), lock_("Stream Callback lock"), device_(dev), priority_(p), flags_(f), null_(null_stream) {} -bool Stream::create() { +// ================================================================================================ +bool Stream::Create() { cl_command_queue_properties properties = CL_QUEUE_PROFILING_ENABLE; queue_ = new amd::HostQueue(*device_->asContext(), *device_->devices()[0], properties, amd::CommandQueue::RealTimeDisabled, priority_); - assert(queue_ != nullptr); - return queue_->create(); -} - -amd::HostQueue* Stream::asHostQueue() { - if (queue_ == nullptr) { - if (!create()) { - return nullptr; - } else if (Null()) { - // Make sure the null stream is inserted into the list of default/blocking streams + // Create a host queue + bool result = (queue_ != nullptr) ? queue_->create() : false; + // Insert just created stream into the list of the blocking queues + if (result) { + if (!(flags_ & hipStreamNonBlocking)) { amd::ScopedLock lock(streamSetLock); streamSet.insert(this); } + } else { + Destroy(); + } + return result; +} + +// ================================================================================================ +amd::HostQueue* Stream::asHostQueue() { + // Access to the stream object is lock protected, because possible allocation + amd::ScopedLock l(Lock()); + if (queue_ == nullptr) { + // Create the host queue for the first time + if (!Create()) { + return nullptr; + } } return queue_; } -void Stream::destroy() { +// ================================================================================================ +void Stream::Destroy() { if (queue_ != nullptr) { queue_->release(); queue_ = nullptr; + + amd::ScopedLock lock(streamSetLock); + streamSet.erase(this); } + delete this; } -void Stream::finish() const { +// ================================================================================================ +void Stream::Finish() const { if (queue_ != nullptr) { queue_->finish(); } } +// ================================================================================================ int Stream::DeviceId() const { return device_->deviceId(); } }; +// ================================================================================================ void iHipWaitActiveStreams(amd::HostQueue* blocking_queue, bool wait_null_stream) { amd::Command::EventWaitList eventWaitList; { @@ -95,12 +115,12 @@ void iHipWaitActiveStreams(amd::HostQueue* blocking_queue, bool wait_null_stream for (const auto& stream : streamSet) { amd::HostQueue* active_queue = stream->asHostQueue(); // If it's the current device - if ((active_queue != nullptr) && (&active_queue->device() == &blocking_queue->device()) && + if ((&active_queue->device() == &blocking_queue->device()) && // and it's not the current stream (active_queue != blocking_queue) && // check for a wait on the null stream (stream->Null() == wait_null_stream)) { - // Get the last valid so command + // Get the last valid command amd::Command* command = active_queue->getLastQueuedCommand(true); if ((command != nullptr) && // Check the current active status @@ -126,6 +146,7 @@ void iHipWaitActiveStreams(amd::HostQueue* blocking_queue, bool wait_null_stream } } +// ================================================================================================ void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data) { hipError_t status = hipSuccess; StreamCallback* cbo = reinterpret_cast(user_data); @@ -137,18 +158,15 @@ void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, delete cbo; } -static hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags, amd::CommandQueue::Priority priority) { +// ================================================================================================ +static hipError_t ihipStreamCreate(hipStream_t* stream, + unsigned int flags, amd::CommandQueue::Priority priority) { hip::Stream* hStream = new hip::Stream(hip::getCurrentDevice(), priority, flags); if (hStream == nullptr) { return hipErrorOutOfMemory; } - if (!(flags & hipStreamNonBlocking)) { - amd::ScopedLock lock(streamSetLock); - streamSet.insert(hStream); - } - *stream = reinterpret_cast(hStream); ClPrint(amd::LOG_INFO, amd::LOG_API, "ihipStreamCreate: %zx", hStream); @@ -156,18 +174,21 @@ static hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags, amd: return hipSuccess; } +// ================================================================================================ hipError_t hipStreamCreateWithFlags(hipStream_t *stream, unsigned int flags) { HIP_INIT_API(hipStreamCreateWithFlags, stream, flags); HIP_RETURN(ihipStreamCreate(stream, flags, amd::CommandQueue::Priority::Normal)); } +// ================================================================================================ hipError_t hipStreamCreate(hipStream_t *stream) { HIP_INIT_API(hipStreamCreate, stream); HIP_RETURN(ihipStreamCreate(stream, hipStreamDefault, amd::CommandQueue::Priority::Normal)); } +// ================================================================================================ hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags, int priority) { HIP_INIT_API(hipStreamCreateWithPriority, stream, flags, priority); @@ -180,6 +201,7 @@ hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags, return HIP_RETURN(ihipStreamCreate(stream, flags, static_cast(priority))); } +// ================================================================================================ hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPriority) { HIP_INIT_API(hipDeviceGetStreamPriorityRange, leastPriority, greatestPriority); @@ -193,13 +215,12 @@ hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPrio return HIP_RETURN(hipSuccess); } +// ================================================================================================ hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int* flags) { HIP_INIT_API(hipStreamGetFlags, stream, flags); - hip::Stream* hStream = reinterpret_cast(stream); - - if (flags != nullptr && hStream != nullptr) { - *flags = hStream->Flags(); + if ((flags != nullptr) && (stream != nullptr)) { + *flags = reinterpret_cast(stream)->Flags(); } else { HIP_RETURN(hipErrorInvalidValue); } @@ -207,15 +228,17 @@ hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int* flags) { HIP_RETURN(hipSuccess); } +// ================================================================================================ hipError_t hipStreamSynchronize(hipStream_t stream) { HIP_INIT_API(hipStreamSynchronize, stream); - amd::HostQueue* hostQueue = hip::getQueue(stream); - hostQueue->finish(); + // Wait for the current host queue + hip::getQueue(stream)->finish(); HIP_RETURN(hipSuccess); } +// ================================================================================================ hipError_t hipStreamDestroy(hipStream_t stream) { HIP_INIT_API(hipStreamDestroy, stream); @@ -223,32 +246,27 @@ hipError_t hipStreamDestroy(hipStream_t stream) { HIP_RETURN(hipErrorInvalidHandle); } - amd::ScopedLock lock(streamSetLock); - - hip::Stream* hStream = reinterpret_cast(stream); - - hStream->destroy(); - streamSet.erase(hStream); - - delete hStream; + reinterpret_cast(stream)->Destroy(); HIP_RETURN(hipSuccess); } +// ================================================================================================ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int flags) { HIP_INIT_API(hipStreamWaitEvent, stream, event, flags); - amd::HostQueue* queue = hip::getQueue(stream); - if (event == nullptr) { HIP_RETURN(hipErrorInvalidHandle); } + amd::HostQueue* queue = hip::getQueue(stream); + hip::Event* e = reinterpret_cast(event); - return HIP_RETURN(e->streamWait(queue, flags)); + HIP_RETURN(e->streamWait(queue, flags)); } +// ================================================================================================ hipError_t hipStreamQuery(hipStream_t stream) { HIP_INIT_API(hipStreamQuery, stream); @@ -256,6 +274,7 @@ hipError_t hipStreamQuery(hipStream_t stream) { amd::Command* command = hostQueue->getLastQueuedCommand(true); if (command == nullptr) { + // Nothing was submitted to the queue HIP_RETURN(hipSuccess); } @@ -268,11 +287,12 @@ hipError_t hipStreamQuery(hipStream_t stream) { HIP_RETURN(status); } +// ================================================================================================ hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, unsigned int flags) { HIP_INIT_API(hipStreamAddCallback, stream, callback, userData, flags); - amd::HostQueue* hostQueue = reinterpret_cast(stream)->asHostQueue(); + amd::HostQueue* hostQueue = hip::getQueue(stream); amd::Command* command = hostQueue->getLastQueuedCommand(true); if (command == nullptr) { amd::Command::EventWaitList eventWaitList;