diff --git a/projects/clr/hipamd/src/hip_graph_internal.cpp b/projects/clr/hipamd/src/hip_graph_internal.cpp index e55f41c645..c64523b3dc 100644 --- a/projects/clr/hipamd/src/hip_graph_internal.cpp +++ b/projects/clr/hipamd/src/hip_graph_internal.cpp @@ -64,6 +64,9 @@ std::unordered_set GraphExec::graphExecSet_; // Guards global exec graph set // we have graphExec object as part of child graph and we need recursive lock amd::Monitor GraphExec::graphExecSetLock_(true); +// Serialize the creation of internal streams from multiple threads, ensuring that each stream is +// mapped to different HSA queues. +amd::Monitor GraphExec::graphExecStreamCreateLock_(true); std::unordered_set UserObject::ObjectSet_; // Guards global user object amd::Monitor UserObject::UserObjectLock_{}; @@ -330,6 +333,7 @@ bool GraphExec::isGraphExecValid(GraphExec* pGraphExec) { // ================================================================================================ hipError_t GraphExec::CreateStreams(uint32_t num_streams) { + amd::ScopedLock lock(graphExecStreamCreateLock_); parallel_streams_.reserve(num_streams); for (uint32_t i = 0; i < num_streams; ++i) { auto stream = new hip::Stream(hip::getCurrentDevice(), hip::Stream::Priority::Normal, @@ -490,21 +494,32 @@ hipError_t GraphExec::EnqueueGraphWithSingleList(hip::Stream* hip_stream) { } // ================================================================================================ -void Graph::UpdateStreams(hip::Stream* launch_stream, - const std::vector& parallel_streams) { - // Allocate array for parallel streams, based on the graph scheduling + current stream - // We create extra stream to avoid collision - streams_.resize(max_streams_); +hipError_t Graph::UpdateStreams(hip::Stream* launch_stream, + const std::vector& parallel_streams) { // Current stream is the default in the assignment - streams_[0] = launch_stream; - // Assign the streams in the array of all streams - // Avoid stream that has collision with launch stream - for (uint32_t i = 1, j = 0; i < streams_.size(); j++) { - assert(j != parallel_streams.size()); - if (launch_stream->getQueueID() != parallel_streams[j]->getQueueID()) { - streams_[i++] = parallel_streams[j]; - } + streams_.push_back(launch_stream); + int* unique_stream_ids = new int[GPU_MAX_HW_QUEUES](); + if (unique_stream_ids == nullptr) { + LogError("Stream id array allocation is nullptr!"); + return hipErrorOutOfMemory; } + unique_stream_ids[launch_stream->getQueueID()] = 1; + std::vector collided_streams; + // Assign streams that are unique in parallel_streams and doesnt collide with launch stream + for (uint32_t i = 0; i < parallel_streams.size(); i++) { + if (unique_stream_ids[parallel_streams[i]->getQueueID()] == 0) { + streams_.push_back(parallel_streams[i]); + } else { + collided_streams.push_back(parallel_streams[i]); + } + unique_stream_ids[parallel_streams[i]->getQueueID()]++; + } + // Assign the remaining streams for execution. + for (int i = streams_.size(), j = 0; i < max_streams_ && j < collided_streams.size(); i++, j++) { + streams_.push_back(collided_streams[j]); + } + delete[] unique_stream_ids; + return hipSuccess; } @@ -728,7 +743,10 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { } } else { // Update streams for the graph execution - UpdateStreams(launch_stream, parallel_streams_); + status = UpdateStreams(launch_stream, parallel_streams_); + if (status != hipSuccess) { + return status; + } // Execute all nodes in the graph if (!RunNodes()) { LogError("Failed to launch nodes!"); diff --git a/projects/clr/hipamd/src/hip_graph_internal.hpp b/projects/clr/hipamd/src/hip_graph_internal.hpp index 2b6fafe699..2b5d2fee33 100644 --- a/projects/clr/hipamd/src/hip_graph_internal.hpp +++ b/projects/clr/hipamd/src/hip_graph_internal.hpp @@ -601,7 +601,7 @@ class Graph { void ScheduleNodes(); //! Update streams for the graph execution - void UpdateStreams( + hipError_t UpdateStreams( hip::Stream* launch_stream, //!< Launch stream from the application const std::vector& parallel_stream //!< The list of parallel streams ); @@ -735,6 +735,7 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { public: static std::unordered_set graphExecSet_; static amd::Monitor graphExecSetLock_; + static amd::Monitor graphExecStreamCreateLock_; GraphExec(uint64_t flags = 0) : ReferenceCountedObject(), Graph(hip::getCurrentDevice()), flags_(flags) { amd::ScopedLock lock(graphExecSetLock_);