From d3cc2c7668637cd370339aada02e0f982ca6ab92 Mon Sep 17 00:00:00 2001 From: "Godavarthy Surya, Anusha" Date: Fri, 10 Oct 2025 10:27:27 +0530 Subject: [PATCH] SWDEV-524745 - Part-III Add multi device support for hip graph (#814) - Retrieve the list of devices linked to each branch using stream ID x. - Identify the necessary streams for each device to facilitate graph execution. - Create the necessary streams for each device to ensure successful graph execution. - Implement support for launching a multi-device, single-branch graph. Co-authored-by: Anusha GodavarthySurya --- .../clr/hipamd/src/hip_graph_internal.cpp | 341 +++++++++++++----- .../clr/hipamd/src/hip_graph_internal.hpp | 42 ++- .../clr/rocclr/device/rocm/rocvirtual.cpp | 61 ++-- .../catch/unit/graph/hipGraphMultiDevice.cc | 145 ++++++++ 4 files changed, 455 insertions(+), 134 deletions(-) create mode 100644 projects/hip-tests/catch/unit/graph/hipGraphMultiDevice.cc diff --git a/projects/clr/hipamd/src/hip_graph_internal.cpp b/projects/clr/hipamd/src/hip_graph_internal.cpp index 7363f73f58..19e3a50dd5 100644 --- a/projects/clr/hipamd/src/hip_graph_internal.cpp +++ b/projects/clr/hipamd/src/hip_graph_internal.cpp @@ -189,7 +189,8 @@ void Graph::ScheduleOneNode(Node node, int stream_id) { // Assign active stream to the current node node->stream_id_ = stream_id; max_streams_ = std::max(max_streams_, (stream_id + 1)); - + // Track which devices are used by each stream for multi-device graph execution + streams_dev_ids_[stream_id].insert(node->dev_id_); // Process child graph separately, since, there is no connection if (node->GetType() == hipGraphNodeTypeGraph) { auto child = reinterpret_cast(node)->GetChildGraph(); @@ -317,7 +318,7 @@ void Graph::clone(Graph* newGraph, bool cloneNodes) const { // ================================================================================================ Graph* Graph::clone() const { - Graph* newGraph = new Graph(device_); + Graph* newGraph = new Graph(getCurrentDevice()); clone(newGraph); return newGraph; } @@ -332,39 +333,115 @@ bool GraphExec::isGraphExecValid(GraphExec* pGraphExec) { } // ================================================================================================ -hipError_t GraphExec::CreateStreams(uint32_t num_streams) { +hipError_t GraphExec::CreateStreams(uint32_t num_streams, int devId) { amd::ScopedLock lock(graphExecStreamCreateLock_); - parallel_streams_.reserve(num_streams); + + // Validate input parameters + if (num_streams == 0) { + ClPrint(amd::LOG_WARNING, amd::LOG_CODE, + "[hipGraph] Attempting to create 0 streams for device %d", devId); + return hipSuccess; + } + + if (devId < 0 || devId >= g_devices.size() || g_devices[devId] == nullptr) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Invalid device ID %d for stream creation", + devId); + return hipErrorInvalidDevice; + } + + // Check if streams already exist for this device + if (parallel_streams_.find(devId) != parallel_streams_.end() && + !parallel_streams_[devId].empty()) { + ClPrint(amd::LOG_WARNING, amd::LOG_CODE, + "[hipGraph] Streams already exist for device %d, skipping creation", devId); + return hipSuccess; + } + + parallel_streams_[devId].reserve(num_streams); + + ClPrint(amd::LOG_INFO, amd::LOG_CODE, "[hipGraph] Creating %u parallel streams for device %d", + num_streams, devId); + for (uint32_t i = 0; i < num_streams; ++i) { - auto stream = new hip::Stream(hip::getCurrentDevice(), hip::Stream::Priority::Normal, - hipStreamNonBlocking); + auto stream = + new hip::Stream(g_devices[devId], hip::Stream::Priority::Normal, hipStreamNonBlocking); + if (stream == nullptr || !stream->Create()) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed to %s stream %u for device %d", + stream == nullptr ? "allocate" : "create", i, devId); if (stream != nullptr) { hip::Stream::Destroy(stream); } - ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed to create parallel stream!"); + // Clean up any previously created streams for this device + for (auto& created_stream : parallel_streams_[devId]) { + hip::Stream::Destroy(created_stream); + } + parallel_streams_[devId].clear(); return hipErrorOutOfMemory; } - parallel_streams_.push_back(stream); + + parallel_streams_[devId].push_back(stream); } return hipSuccess; } +void GraphExec::FindStreamsReqPerDev() { + // Count streams required per device based on stream-to-device mappings + for (auto const& [stream_id, dev_ids] : streams_dev_ids_) { + for (auto dev_id : dev_ids) { + max_streams_dev_[dev_id]++; + } + } + + // Recursively process child graphs to determine their stream requirements + for (auto node : vertices_) { + if (node->GetType() == hipGraphNodeTypeGraph) { + auto childNode = reinterpret_cast(node); + + // Recursively find stream requirements for child graph + childNode->FindStreamsReqPerDev(); + + // Merge child graph's stream requirements with parent graph + // Take the maximum streams needed per device to handle concurrent execution + for (auto const& [dev_id, num_streams] : childNode->max_streams_dev_) { + auto it = max_streams_dev_.find(dev_id); + if (it != max_streams_dev_.end()) { + // Device already has stream requirements - take the maximum + max_streams_dev_[dev_id] = std::max(max_streams_dev_[dev_id], num_streams); + } else { + // New device - initialize with child graph's requirement + max_streams_dev_[dev_id] = num_streams; + } + } + } + } +} + // ================================================================================================ hipError_t GraphExec::Init() { hipError_t status = hipSuccess; // create extra stream to avoid queue collision with the default execution stream - if (max_streams_ > 1) { - status = CreateStreams(max_streams_); - } - if (status != hipSuccess) { - return status; - } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - if (max_streams_ == 1) { + + if (max_streams_ == 1) { + FindStreamsReqPerDev(); + if (max_streams_dev_.size() > 1) { + // Multi-device graph detected - create parallel streams for each device + for (auto const& [dev_id, num_streams] : max_streams_dev_) { + ClPrint(amd::LOG_INFO, amd::LOG_API, + "[hipGraph] For device id :%d max streams :%d for execution.\n", dev_id, + num_streams); + status = CreateStreams(num_streams, dev_id); + if (status != hipSuccess) { + return status; + } + } + } + if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { // For graph nodes capture AQL packets to dispatch them directly during graph launch. status = CaptureAQLPackets(); } + } else { + status = CreateStreams(max_streams_, hip::getCurrentDevice()->deviceId()); } instantiateDeviceId_ = hip::getCurrentDevice()->deviceId(); static_cast(hip::getCurrentDevice())->retain(); @@ -438,8 +515,8 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() { for (size_t i = 0; i < topoOrder_.size(); ++i) { auto& node = topoOrder_[i]; + // Check if kernel node requires hidden heap and set it for the entire graph if (node->GetType() == hipGraphNodeTypeKernel) { - // Check if graph requires hidden heap and set as part of graphExec param. static bool initialized = false; if (!initialized && reinterpret_cast(node)->HasHiddenHeap()) { SetHiddenHeap(); @@ -447,54 +524,66 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() { } } + // Handle nodes that support graph capture if (node->GraphCaptureEnabled()) { - // Start of a new batch - PacketBatch newBatch; - size_t j = i; + // TODO: Add support for batching for multi-device linear graph + if (max_streams_dev_.size() == 1) { + // Single device - use batching optimization + // Start of a new batch + PacketBatch newBatch; + size_t j = i; - // Collect packets from consecutive captured nodes - while (j < topoOrder_.size() && topoOrder_[j]->GraphCaptureEnabled()) { - auto& currentNode = topoOrder_[j]; + // Collect packets from consecutive captured nodes + while (j < topoOrder_.size() && topoOrder_[j]->GraphCaptureEnabled()) { + auto& currentNode = topoOrder_[j]; - // Capture packets for this node - std::vector nodePackets; - std::vector nodeKernelNames; - status = currentNode->CaptureAndFormPacket(GetKernelArgManager(), &nodePackets, - &nodeKernelNames); + // Capture packets for this node + std::vector nodePackets; + std::vector nodeKernelNames; + status = currentNode->CaptureAndFormPacket(GetKernelArgManager(), &nodePackets, + &nodeKernelNames); - if (status != hipSuccess || nodePackets.empty()) { - LogError("Packet capture failed"); - return status; + if (status != hipSuccess || nodePackets.empty()) { + LogError("Packet capture failed"); + return status; + } + + // Create NodeRange for this node + PacketBatch::NodeRange range; + range.startIndex = newBatch.dispatchPackets.size(); + range.packetCount = nodePackets.size(); + range.enabled = true; + + // Add to dispatch lists (initially all enabled) + newBatch.dispatchPackets.insert(newBatch.dispatchPackets.end(), nodePackets.begin(), + nodePackets.end()); + newBatch.dispatchKernelNames.insert(newBatch.dispatchKernelNames.end(), + nodeKernelNames.begin(), nodeKernelNames.end()); + + // Store node mapping + newBatch.nodeRanges.push_back(range); + newBatch.nodeToRangeIndex[currentNode] = newBatch.nodeRanges.size() - 1; + + // Mark this node as successfully captured + nodeCaptureStatus_[j] = true; + ++j; } - // Create NodeRange for this node - PacketBatch::NodeRange range; - range.startIndex = newBatch.dispatchPackets.size(); - range.packetCount = nodePackets.size(); - range.enabled = true; + // Add the batch if it has packets + if (!newBatch.dispatchPackets.empty()) { + packetBatches_.emplace_back(std::move(newBatch)); + } - // Add to dispatch lists (initially all enabled) - newBatch.dispatchPackets.insert(newBatch.dispatchPackets.end(), - nodePackets.begin(), nodePackets.end()); - newBatch.dispatchKernelNames.insert(newBatch.dispatchKernelNames.end(), - nodeKernelNames.begin(), nodeKernelNames.end()); - - // Store node mapping - newBatch.nodeRanges.push_back(range); - newBatch.nodeToRangeIndex[currentNode] = newBatch.nodeRanges.size() - 1; - - // Mark this node as successfully captured - nodeCaptureStatus_[j] = true; - ++j; + // Skip the nodes we just processed, the index will be incremented by the loop + i = j - 1; + } else { + // Multi-device - capture individual packets without batching + status = node->CaptureAndFormPacket(GetKernelArgManager()); + if (status != hipSuccess) { + LogError("Individual packet capture failed for multi-device node"); + return status; + } } - - // Add the batch if it has packets - if (!newBatch.dispatchPackets.empty()) { - packetBatches_.emplace_back(std::move(newBatch)); - } - - // Skip the nodes we just processed, the index will be incremented by the loop - i = j - 1; } else if (node->GetType() == hipGraphNodeTypeGraph) { auto childNode = reinterpret_cast(node); if (childNode->GetChildGraph()->max_streams_ == 1) { @@ -502,7 +591,8 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() { status = childNode->CaptureAndFormPacketsForGraph(); nodeCaptureStatus_[i] = (status == hipSuccess); if (status != hipSuccess) { - status = hipSuccess; // Continue with other nodes + LogWarning("Child graph packet capture failed continuing with other nodes"); + status = hipSuccess; // Continue processing other nodes } } } @@ -555,38 +645,43 @@ hipError_t GraphExec::UpdateAQLPacket(hip::GraphNode* node) { if (max_streams_ != 1 || !node->GraphCaptureEnabled()) { return hipSuccess; } + //ToDo: Add batching support for multi-device linear graph + if (max_streams_dev_.size() == 1) { + // Find which batch contains this node and update it + for (auto& batch : packetBatches_) { + auto it = batch.nodeToRangeIndex.find(node); + if (it != batch.nodeToRangeIndex.end()) { + // Found the batch containing this node - update packets + PacketBatch::NodeRange& range = batch.nodeRanges[it->second]; - // Find which batch contains this node and update it - for (auto& batch : packetBatches_) { - auto it = batch.nodeToRangeIndex.find(node); - if (it != batch.nodeToRangeIndex.end()) { - // Found the batch containing this node - update packets - PacketBatch::NodeRange& range = batch.nodeRanges[it->second]; - - // Capture new packets for this node - std::vector newPackets; - std::vector newKernelNames; - hipError_t status = node->CaptureAndFormPacket(kernArgManager_, &newPackets, &newKernelNames); - if (status != hipSuccess) { - return status; + // Capture new packets for this node + std::vector newPackets; + std::vector newKernelNames; + hipError_t status = + node->CaptureAndFormPacket(kernArgManager_, &newPackets, &newKernelNames); + if (status != hipSuccess) { + return status; + } + // Update dispatch packets (always update regardless of enabled state) + // The enabled/disabled check happens during dispatch, not here + for (size_t i = 0; i < range.packetCount && i < newPackets.size(); ++i) { + size_t packetIndex = range.startIndex + i; + batch.dispatchPackets[packetIndex] = newPackets[i]; + batch.dispatchKernelNames[packetIndex] = newKernelNames[i]; + } + return hipSuccess; } - // Update dispatch packets (always update regardless of enabled state) - // The enabled/disabled check happens during dispatch, not here - for (size_t i = 0; i < range.packetCount && i < newPackets.size(); ++i) { - size_t packetIndex = range.startIndex + i; - batch.dispatchPackets[packetIndex] = newPackets[i]; - batch.dispatchKernelNames[packetIndex] = newKernelNames[i]; - } - return hipSuccess; } + } else { + return node->CaptureAndFormPacket(kernArgManager_); } return hipSuccess; // Node not in any batch } // ================================================================================================ hipError_t GraphExec::UpdatePacketBatchesForNodeEnableDisable(hip::GraphNode* node, bool isEnabled) { - if (max_streams_ != 1 || !node->GraphCaptureEnabled()) { - // Only handle single stream case with captured nodes + if (max_streams_ != 1 && max_streams_dev_.size() == 1 && !node->GraphCaptureEnabled()) { + // Only handle single stream and single device case with captured nodes return hipSuccess; } // Find which batch contains this node and update its enabled state @@ -687,10 +782,77 @@ hipError_t GraphExec::EnqueueGraphWithSingleList(hip::Stream* hip_stream) { } return status; } +// ================================================================================================ +hipError_t GraphExec::EnqueueMultiDeviceLinearGraph(hip::Stream* launch_stream) { + // Accumulate command tracks all the AQL packet batch that we submit to the HW. For now we track + // only kernel nodes. + amd::AccumulateCommand* accumulate = nullptr; + hipError_t status = hipSuccess; + if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + accumulate = new amd::AccumulateCommand(*launch_stream, {}, nullptr); + } + + auto createMarkerAndWait = [](hip::Stream* fromStream, hip::Stream* toStream) { + amd::Command::EventWaitList wait_list; + auto marker = new amd::Marker(*fromStream, true, wait_list); + marker->enqueue(); + marker->release(); + wait_list.push_back(marker); + auto wait_marker = new amd::Marker(*toStream, true, wait_list); + wait_marker->enqueue(); + wait_marker->release(); + }; + + hip::Stream* prevStream = launch_stream; + size_t batchIndex = 0; + + for (size_t i = 0; i < topoOrder_.size(); ++i) { + auto& node = topoOrder_[i]; + hip::Stream* currStream = parallel_streams_[node->dev_id_][0]; + + // Insert synchronization marker if switching devices + if (prevStream->DeviceId() != currStream->DeviceId()) { + createMarkerAndWait(prevStream, currStream); + } + // ToDo : Add batching for multi device graph launch + if (topoOrder_[i]->GraphCaptureEnabled()) { + if (topoOrder_[i]->GetEnabled()) { + std::vector& gpuPackets = topoOrder_[i]->GetAqlPackets(); + std::vector kernelNames; + for (auto& packet : gpuPackets) { + kernelNames.push_back(topoOrder_[i]->GetKernelName()); + } + currStream->vdev()->dispatchAqlPacketBatch(gpuPackets, kernelNames, accumulate); + } + } else { + topoOrder_[i]->SetStream(currStream); + status = topoOrder_[i]->CreateCommand(topoOrder_[i]->GetQueue()); + topoOrder_[i]->EnqueueCommands(currStream); + } + prevStream = currStream; + } + + // Synchronize back to launch stream if we ended on a different device + if (prevStream != launch_stream) { + createMarkerAndWait(prevStream, launch_stream); + } + + if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + accumulate->enqueue(); + accumulate->release(); + } + + return status; +} // ================================================================================================ -hipError_t Graph::UpdateStreams(hip::Stream* launch_stream, - const std::vector& parallel_streams) { +void GraphExec::UpdateStreams(hip::Stream* launch_stream) { + int devId = launch_stream->vdev()->device().index(); + if (parallel_streams_.find(devId) == parallel_streams_.end()) { + LogPrintfError("UpdateStreams failed for device id:%d", devId); + return; + } + auto parallel_streams = parallel_streams_[devId]; // Current stream is the default in the assignment streams_.push_back(launch_stream); std::unordered_map unique_stream_ids; @@ -710,7 +872,6 @@ hipError_t Graph::UpdateStreams(hip::Stream* launch_stream, for (int i = streams_.size(), j = 0; i < max_streams_ && j < collided_streams.size(); i++, j++) { streams_.push_back(collided_streams[j]); } - return hipSuccess; } @@ -886,7 +1047,6 @@ bool Graph::RunNodes(int32_t base_stream, const std::vector* paral // ================================================================================================ hipError_t GraphExec::Run(hip::Stream* launch_stream) { hipError_t status = hipSuccess; - if (flags_ & hipGraphInstantiateFlagAutoFreeOnLaunch) { if (!topoOrder_.empty()) { topoOrder_[0]->GetParentGraph()->FreeAllMemory(launch_stream); @@ -908,13 +1068,13 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { } else { repeatLaunch_ = true; } - ClPrint(amd::LOG_DEBUG, amd::LOG_CODE, "GraphExec::Run max_streams: %d, " "on device: %d, total number of nodes: %d", max_streams_, launch_stream->DeviceId(), topoOrder_.size()); - if (max_streams_ == 1 && instantiateDeviceId_ == launch_stream->DeviceId()) { + if (max_streams_ == 1 && max_streams_dev_.size() == 1 && + max_streams_dev_.begin()->first == launch_stream->DeviceId()) { if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { // If the graph has kernels that does device side allocation, during packet capture, heap is // allocated because heap pointer has to be added to the AQL packet, and initialized during @@ -926,6 +1086,8 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { } } status = EnqueueGraphWithSingleList(launch_stream); + } else if (max_streams_ == 1 && max_streams_dev_.size() > 1) { + status = EnqueueMultiDeviceLinearGraph(launch_stream); } else if (max_streams_ == 1 && instantiateDeviceId_ != launch_stream->DeviceId()) { for (int i = 0; i < topoOrder_.size(); i++) { topoOrder_[i]->SetStream(launch_stream); @@ -934,10 +1096,7 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { } } else { // Update streams for the graph execution - status = UpdateStreams(launch_stream, parallel_streams_); - if (status != hipSuccess) { - return status; - } + UpdateStreams(launch_stream); // Execute all nodes in the graph if (!RunNodes()) { LogError("Failed to launch nodes!"); diff --git a/projects/clr/hipamd/src/hip_graph_internal.hpp b/projects/clr/hipamd/src/hip_graph_internal.hpp index 309802da3f..2f1c3e8487 100644 --- a/projects/clr/hipamd/src/hip_graph_internal.hpp +++ b/projects/clr/hipamd/src/hip_graph_internal.hpp @@ -556,6 +556,7 @@ class Graph { roots_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES); leafs_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES); wait_order_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES); + streams_dev_.reserve(g_devices.size()); } void RemoveUserObjectFromOwingGraphs(UserObject* uObj) { for (auto& g : uObj->owning_graphs_) { @@ -673,12 +674,6 @@ class Graph { //! Schedules all nodes in the graph into different streams void ScheduleNodes(); - //! Update streams for the graph execution - hipError_t UpdateStreams( - hip::Stream* launch_stream, //!< Launch stream from the application - const std::vector& parallel_stream //!< The list of parallel streams - ); - //! Runs one node on the assigned stream bool RunOneNode(Node node, //!< Node for the execution on GPU bool wait //!< Wait dependencies @@ -780,6 +775,10 @@ class Graph { protected: int max_streams_ = 0; //!< Maximum number of streams used in the graph launch + //!< Maps stream ID to the set of device IDs that use that stream. + //!< Used to track which devices are accessed by each parallel stream + //!< during multi-device graph execution scheduling. + std::unordered_map> streams_dev_ids_; private: friend class GraphExec; @@ -802,6 +801,13 @@ class Graph { std::unordered_set capturedNodes_; bool graphInstantiated_; std::unordered_map clonedNodes_; + //! Map of device ID to vector of streams allocated for that device during graph execution. + //! Each device may require multiple streams to handle parallel execution of graph nodes. + std::unordered_map> streams_dev_; + + //! Map tracking the maximum number of concurrent streams required per device for graph execution. + //! Key: device ID, Value: maximum number of streams needed for that device + std::unordered_map max_streams_dev_; }; class GraphExec : public amd::ReferenceCountedObject, public Graph { @@ -816,13 +822,16 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { } ~GraphExec() { - for (auto stream : parallel_streams_) { - if (stream != nullptr) { + for (auto streams : parallel_streams_) { + for (auto stream : streams.second) { + if (stream != nullptr) { stream->finish(); - constexpr bool kForceDestroy = true; - hip::Stream::Destroy(stream, kForceDestroy); + constexpr bool kForceDestroy = true; + hip::Stream::Destroy(stream, kForceDestroy); + } } } + parallel_streams_.clear(); if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { if (kernArgManager_ != nullptr) { kernArgManager_->release(); @@ -856,7 +865,7 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { std::vector& GetNodes() { return topoOrder_; } uint64_t GetFlags() const { return flags_; } hipError_t Init(); - hipError_t CreateStreams(uint32_t num_streams); + hipError_t CreateStreams(uint32_t num_streams, int devId = 0); hipError_t Run(hip::Stream* stream); // Capture GPU Packets from graph commands hipError_t CaptureAQLPackets(); @@ -874,12 +883,21 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { hipError_t CaptureAndFormPacketsForGraph(); void GetKernelArgSizeForGraph(std::unordered_map& kernArgSizeForGraph); hipError_t EnqueueGraphWithSingleList(hip::Stream* hip_stream); + //! Enqueue a multi-device linear graph for execution + hipError_t EnqueueMultiDeviceLinearGraph(hip::Stream* hip_stream); bool TopologicalOrder() { return Graph::TopologicalOrder(topoOrder_); } + //! Update streams for the graph execution with launch stream from application + void UpdateStreams(hip::Stream* launch_stream); + //! Find the number of streams required per device for multi-device graph execution + //! This method analyzes the stream-to-device mappings and recursively processes + //! child graphs to determine the maximum concurrent streams needed per device + void FindStreamsReqPerDev(); protected: //! Topological order of the graph doesn't include nodes embedded as part of the child graph std::vector topoOrder_; - std::vector parallel_streams_; + //! parallel streams per device + std::unordered_map> parallel_streams_; uint64_t flags_ = 0; GraphKernelArgManager* kernArgManager_ = nullptr; //!< Kernel Arg manager for graph. int instantiateDeviceId_ = -1; diff --git a/projects/clr/rocclr/device/rocm/rocvirtual.cpp b/projects/clr/rocclr/device/rocm/rocvirtual.cpp index 88a33cbdfe..71e2cf7f9a 100644 --- a/projects/clr/rocclr/device/rocm/rocvirtual.cpp +++ b/projects/clr/rocclr/device/rocm/rocvirtual.cpp @@ -1295,38 +1295,37 @@ bool VirtualGPU::dispatchGenericAqlPacketBatch(const std::vector& pa uint8_t packetType = extractAqlBits(header, HSA_PACKET_HEADER_TYPE, HSA_PACKET_HEADER_WIDTH_TYPE); if (packetType == HSA_PACKET_TYPE_KERNEL_DISPATCH) { - ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "Graph shader name : %s", - (*kernelNames)[packetIndex].c_str()); + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "Graph shader name : %s, device id : %u", + (*kernelNames)[packetIndex].c_str(), dev().index()); - ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, - "SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = " - "0x%x (type=%d, barrier=%d, acquire=%d, release=%d), " - "setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], " - "private_seg_size=%u, group_seg_size=%u, kernel_obj=0x%zx, " - "kernarg_address=0x%zx, completion_signal=0x%zx, correlation_id=%zu, " - "rptr=%u, wptr=%u", - gpu_queue_, gpu_queue_->base_address, gpu_queue_->id, header, packetType, - extractAqlBits(header, HSA_PACKET_HEADER_BARRIER, - HSA_PACKET_HEADER_WIDTH_BARRIER), - extractAqlBits(header, HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE, - HSA_PACKET_HEADER_WIDTH_SCACQUIRE_FENCE_SCOPE), - extractAqlBits(header, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, - HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE), - packet->setup, - reinterpret_cast(packet)->grid_size_x, - reinterpret_cast(packet)->grid_size_y, - reinterpret_cast(packet)->grid_size_z, - reinterpret_cast(packet)->workgroup_size_x, - reinterpret_cast(packet)->workgroup_size_y, - reinterpret_cast(packet)->workgroup_size_z, - reinterpret_cast(packet)->private_segment_size, - reinterpret_cast(packet)->group_segment_size, - reinterpret_cast(packet)->kernel_object, - reinterpret_cast(packet)->kernarg_address, - reinterpret_cast(packet)->completion_signal, - reinterpret_cast(packet)->reserved2, - Hsa::queue_load_read_index_scacquire(gpu_queue_), index); - } + ClPrint( + amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, + "SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = " + "0x%x (type=%d, barrier=%d, acquire=%d, release=%d), " + "setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], " + "private_seg_size=%u, group_seg_size=%u, kernel_obj=0x%zx, " + "kernarg_address=0x%zx, completion_signal=0x%zx, correlation_id=%zu, " + "rptr=%u, wptr=%u", + gpu_queue_, gpu_queue_->base_address, gpu_queue_->id, header, packetType, + extractAqlBits(header, HSA_PACKET_HEADER_BARRIER, HSA_PACKET_HEADER_WIDTH_BARRIER), + extractAqlBits(header, HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE, + HSA_PACKET_HEADER_WIDTH_SCACQUIRE_FENCE_SCOPE), + extractAqlBits(header, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, + HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE), + packet->setup, reinterpret_cast(packet)->grid_size_x, + reinterpret_cast(packet)->grid_size_y, + reinterpret_cast(packet)->grid_size_z, + reinterpret_cast(packet)->workgroup_size_x, + reinterpret_cast(packet)->workgroup_size_y, + reinterpret_cast(packet)->workgroup_size_z, + reinterpret_cast(packet)->private_segment_size, + reinterpret_cast(packet)->group_segment_size, + reinterpret_cast(packet)->kernel_object, + reinterpret_cast(packet)->kernarg_address, + reinterpret_cast(packet)->completion_signal, + reinterpret_cast(packet)->reserved2, + Hsa::queue_load_read_index_scacquire(gpu_queue_), index); + } } } diff --git a/projects/hip-tests/catch/unit/graph/hipGraphMultiDevice.cc b/projects/hip-tests/catch/unit/graph/hipGraphMultiDevice.cc new file mode 100644 index 0000000000..2b7ea89fc8 --- /dev/null +++ b/projects/hip-tests/catch/unit/graph/hipGraphMultiDevice.cc @@ -0,0 +1,145 @@ +/* +Copyright (c) 2022-25 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include + +#include + +#include +#include +#include + +#include "graph_memset_node_test_common.hh" +#include "graph_tests_common.hh" + +/** + * @addtogroup hipGraphLaunch + * @{ + * @ingroup GraphTest + * `hipGraphLaunch(hipGraphExec_t graphExec, hipStream_t stream)` - + * Launches an executable graph on the multi device + */ + +/** + * Test Description + * ------------------------ + * - Launches the single branch graph on multi device and verify the result + * ------------------------ + * - catch/unit/graph//hipGraphMultiDevice.cc + * Test requirements + * ------------------------ + * - Multi-device + * - HIP_VERSION >= 7.2 + */ +static void check_output(int* inp, int* out, size_t size) { + for (size_t i = 0; i < size; i++) { + REQUIRE(out[i] == ((inp[i] * inp[i]) * (inp[i] * inp[i]))); + } +} + +static void init_input(int* a, size_t size) { + unsigned int seed = time(nullptr); + for (size_t i = 0; i < size; i++) { + a[i] = (HipTest::RAND_R(&seed) & 0xFF); + } +} + + +TEST_CASE("Unit_hipGraphMultiDevice") { + int nGpus = 0; + HIP_CHECK(hipGetDeviceCount(&nGpus)); + if (nGpus < 2) { + fprintf(stderr, "Need at least 2 GPUs, skipped!\n"); + return; + } + hipStream_t streamdev1, streamdev2; + hipEvent_t eventdev1, eventdev2; + hipGraph_t graph = nullptr; + hipGraphExec_t graph_exec = nullptr; + + constexpr size_t buffer_size = (1024 * 1024); + constexpr auto blocksPerCU = 6; + constexpr int block_size = 512; + + int *ibuf_h, *buf_d1, *buf_d2, *outbuf_h; + ibuf_h = new int[buffer_size]; + outbuf_h = new int[buffer_size]; + REQUIRE(ibuf_h != nullptr); + + HIP_CHECK(hipSetDevice(0)); + HIP_CHECK(hipStreamCreate(&streamdev1)); + HIP_CHECK(hipMalloc(&buf_d1, buffer_size * sizeof(int))); + HIP_CHECK(hipEventCreate(&eventdev1)); + + HIP_CHECK(hipSetDevice(1)); + HIP_CHECK(hipStreamCreate(&streamdev2)); + HIP_CHECK(hipMalloc(&buf_d2, buffer_size * sizeof(int))); + HIP_CHECK(hipEventCreate(&eventdev2)); + + HIP_CHECK(hipSetDevice(0)); + init_input(ibuf_h, buffer_size); + unsigned grid_size = HipTest::setNumBlocks(blocksPerCU, block_size, buffer_size); + + HIP_CHECK(hipStreamBeginCapture(streamdev1, hipStreamCaptureModeGlobal)); + + HIP_CHECK( + hipMemcpyAsync(buf_d1, ibuf_h, sizeof(int) * buffer_size, hipMemcpyHostToDevice, streamdev1)); + HipTest::vector_square + <<>>(buf_d1, buf_d1, buffer_size); + HIP_CHECK(hipEventRecord(eventdev1, streamdev1)); + HIP_CHECK(hipStreamWaitEvent(streamdev2, eventdev1)); + + HIP_CHECK(hipSetDevice(1)); + HIP_CHECK(hipMemcpyDtoDAsync(buf_d2, buf_d1, sizeof(int) * buffer_size, streamdev2)); + HipTest::vector_square + <<>>(buf_d2, buf_d2, buffer_size); + HIP_CHECK(hipEventRecord(eventdev2, streamdev2)); + HIP_CHECK(hipStreamWaitEvent(streamdev1, eventdev2)); + + HIP_CHECK(hipStreamEndCapture(streamdev1, &graph)); + + HIP_CHECK(hipSetDevice(0)); + HIP_CHECK(hipGraphInstantiate(&graph_exec, graph, nullptr, nullptr, 0)); + HIP_CHECK(hipGraphLaunch(graph_exec, streamdev1)); + HIP_CHECK(hipStreamSynchronize(streamdev1)); + + HIP_CHECK(hipSetDevice(1)); + HIP_CHECK(hipMemcpy(outbuf_h, buf_d2, sizeof(int) * buffer_size, hipMemcpyHostToDevice)); + check_output(ibuf_h, outbuf_h, buffer_size); + + HIP_CHECK(hipGraphExecDestroy(graph_exec)); + HIP_CHECK(hipGraphDestroy(graph)); + + delete[] ibuf_h; + delete[] outbuf_h; + HIP_CHECK(hipFree(buf_d1)); + HIP_CHECK(hipFree(buf_d2)); + HIP_CHECK(hipStreamDestroy(streamdev1)); + HIP_CHECK(hipStreamDestroy(streamdev2)); + HIP_CHECK(hipEventDestroy(eventdev1)); + HIP_CHECK(hipEventDestroy(eventdev2)); +} + +/** + * End doxygen group GraphMultiDevice. + * @} + */