SWDEV-538606 - Handle updateStreams from multiple threads (#505)

このコミットが含まれているのは:
systems-assistant[bot]
2025-09-10 11:24:52 +05:30
committed by GitHub
コミット 75602772aa
2個のファイルの変更34行の追加15行の削除
+32 -14
ファイルの表示
@@ -64,6 +64,9 @@ std::unordered_set<GraphExec*> GraphExec::graphExecSet_;
// Guards global exec graph set
// we have graphExec object as part of child graph and we need recursive lock
amd::Monitor GraphExec::graphExecSetLock_(true);
// Serialize the creation of internal streams from multiple threads, ensuring that each stream is
// mapped to different HSA queues.
amd::Monitor GraphExec::graphExecStreamCreateLock_(true);
std::unordered_set<UserObject*> UserObject::ObjectSet_;
// Guards global user object
amd::Monitor UserObject::UserObjectLock_{};
@@ -330,6 +333,7 @@ bool GraphExec::isGraphExecValid(GraphExec* pGraphExec) {
// ================================================================================================
hipError_t GraphExec::CreateStreams(uint32_t num_streams) {
amd::ScopedLock lock(graphExecStreamCreateLock_);
parallel_streams_.reserve(num_streams);
for (uint32_t i = 0; i < num_streams; ++i) {
auto stream = new hip::Stream(hip::getCurrentDevice(), hip::Stream::Priority::Normal,
@@ -490,21 +494,32 @@ hipError_t GraphExec::EnqueueGraphWithSingleList(hip::Stream* hip_stream) {
}
// ================================================================================================
void Graph::UpdateStreams(hip::Stream* launch_stream,
const std::vector<hip::Stream*>& parallel_streams) {
// Allocate array for parallel streams, based on the graph scheduling + current stream
// We create extra stream to avoid collision
streams_.resize(max_streams_);
hipError_t Graph::UpdateStreams(hip::Stream* launch_stream,
const std::vector<hip::Stream*>& parallel_streams) {
// Current stream is the default in the assignment
streams_[0] = launch_stream;
// Assign the streams in the array of all streams
// Avoid stream that has collision with launch stream
for (uint32_t i = 1, j = 0; i < streams_.size(); j++) {
assert(j != parallel_streams.size());
if (launch_stream->getQueueID() != parallel_streams[j]->getQueueID()) {
streams_[i++] = parallel_streams[j];
}
streams_.push_back(launch_stream);
int* unique_stream_ids = new int[GPU_MAX_HW_QUEUES]();
if (unique_stream_ids == nullptr) {
LogError("Stream id array allocation is nullptr!");
return hipErrorOutOfMemory;
}
unique_stream_ids[launch_stream->getQueueID()] = 1;
std::vector<hip::Stream*> collided_streams;
// Assign streams that are unique in parallel_streams and doesnt collide with launch stream
for (uint32_t i = 0; i < parallel_streams.size(); i++) {
if (unique_stream_ids[parallel_streams[i]->getQueueID()] == 0) {
streams_.push_back(parallel_streams[i]);
} else {
collided_streams.push_back(parallel_streams[i]);
}
unique_stream_ids[parallel_streams[i]->getQueueID()]++;
}
// Assign the remaining streams for execution.
for (int i = streams_.size(), j = 0; i < max_streams_ && j < collided_streams.size(); i++, j++) {
streams_.push_back(collided_streams[j]);
}
delete[] unique_stream_ids;
return hipSuccess;
}
@@ -728,7 +743,10 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) {
}
} else {
// Update streams for the graph execution
UpdateStreams(launch_stream, parallel_streams_);
status = UpdateStreams(launch_stream, parallel_streams_);
if (status != hipSuccess) {
return status;
}
// Execute all nodes in the graph
if (!RunNodes()) {
LogError("Failed to launch nodes!");
+2 -1
ファイルの表示
@@ -601,7 +601,7 @@ class Graph {
void ScheduleNodes();
//! Update streams for the graph execution
void UpdateStreams(
hipError_t UpdateStreams(
hip::Stream* launch_stream, //!< Launch stream from the application
const std::vector<hip::Stream*>& parallel_stream //!< The list of parallel streams
);
@@ -735,6 +735,7 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph {
public:
static std::unordered_set<GraphExec*> graphExecSet_;
static amd::Monitor graphExecSetLock_;
static amd::Monitor graphExecStreamCreateLock_;
GraphExec(uint64_t flags = 0)
: ReferenceCountedObject(), Graph(hip::getCurrentDevice()), flags_(flags) {
amd::ScopedLock lock(graphExecSetLock_);