From c105dcd05b09685bd6cb0d0a7b3bdbb0f30c510f Mon Sep 17 00:00:00 2001 From: SaleelK Date: Mon, 1 Dec 2025 12:49:26 -0800 Subject: [PATCH] clr: Use graph segment scheduling to process HIP Graphs (#1372) * clr: Use graph segment scheduling to process HIP Graphs * Add a broader path to use capture packet capture for all topologies * Refactor code * Use DEBUG_HIP_GRAPH_SEGMENT_SCHEDULING to toggle new vs classic path, Enabled by default * clr: Few fixes and improvements * clr: Detect complex graphs to take classic path * Use DEBUG_HIP_GRAPH_SEGMENT_SCHEDULING=2 to force segment scheduling path * clr: Fix a cornercase stack corruption * clr: Track commands of segments instead of snapshots * clr: Fix Batch dispatch logic * Track fence_dirty_ flag for command of other streams * Dependency resolution markers can now accomodate dirty fence on cross streams --------- Co-authored-by: Ioannis Assiouras Co-authored-by: Godavarthy Surya, Anusha --- projects/clr/hipamd/src/hip_graph.cpp | 103 +- .../clr/hipamd/src/hip_graph_internal.cpp | 1495 +++++++++++++---- .../clr/hipamd/src/hip_graph_internal.hpp | 258 ++- projects/clr/hipamd/src/hip_internal.hpp | 2 + projects/clr/hipamd/src/hip_memory.cpp | 2 - .../clr/rocclr/device/pal/palsettings.cpp | 2 +- projects/clr/rocclr/device/rocm/rocblit.cpp | 4 +- .../clr/rocclr/device/rocm/rocvirtual.cpp | 186 +- .../clr/rocclr/device/rocm/rocvirtual.hpp | 4 +- projects/clr/rocclr/platform/command.hpp | 10 +- projects/clr/rocclr/utils/flags.hpp | 4 +- 11 files changed, 1563 insertions(+), 507 deletions(-) diff --git a/projects/clr/hipamd/src/hip_graph.cpp b/projects/clr/hipamd/src/hip_graph.cpp index 96ff61dd48..7af3b01f61 100644 --- a/projects/clr/hipamd/src/hip_graph.cpp +++ b/projects/clr/hipamd/src/hip_graph.cpp @@ -1425,9 +1425,9 @@ hipError_t hipGraphExecMemcpyNodeSetParams1D(hipGraphExec_t hGraphExec, hipGraph if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(clonedNode)); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(clonedNode)); } HIP_RETURN(status); } @@ -1523,12 +1523,14 @@ hipError_t ihipGraphInstantiate(hip::GraphExec** pGraphExec, hip::Graph* graph, return hipErrorOutOfMemory; } graph->clone(*pGraphExec, true); - (*pGraphExec)->ScheduleNodes(); - if (false == (*pGraphExec)->TopologicalOrder()) { + + hipError_t scheduleStatus = (*pGraphExec)->ScheduleNodes(); + if (scheduleStatus != hipSuccess) { delete *pGraphExec; - return hipErrorInvalidValue; + *pGraphExec = nullptr; + return scheduleStatus; } - graph->SetGraphInstantiated(true); + if (DEBUG_HIP_GRAPH_DOT_PRINT) { static int i = 1; std::string filename = @@ -1538,7 +1540,10 @@ hipError_t ihipGraphInstantiate(hip::GraphExec** pGraphExec, hip::Graph* graph, LogPrintfInfo("[hipGraph] graph dump:%s", filename.c_str()); } } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + + graph->SetGraphInstantiated(true); + + if ((*pGraphExec)->IsSegmentSchedulingEnabled()) { (*pGraphExec)->SetKernelArgManager(new hip::GraphKernelArgManager()); } return (*pGraphExec)->Init(); @@ -1555,7 +1560,7 @@ hipError_t hipGraphInstantiate(hipGraphExec_t* pGraphExec, hipGraph_t graph, if (status == hipSuccess) { *pGraphExec = reinterpret_cast(ge); } - HIP_RETURN(status); + HIP_RETURN(status, ReturnPtrValue(pGraphExec)); } hipError_t hipGraphInstantiateWithFlags(hipGraphExec_t* pGraphExec, hipGraph_t graph, @@ -1574,7 +1579,7 @@ hipError_t hipGraphInstantiateWithFlags(hipGraphExec_t* pGraphExec, hipGraph_t g hip::GraphExec* ge; hipError_t status = ihipGraphInstantiate(&ge, reinterpret_cast(graph), flags); *pGraphExec = reinterpret_cast(ge); - HIP_RETURN(status); + HIP_RETURN(status, ReturnPtrValue(pGraphExec)); } hipError_t hipGraphInstantiateWithParams(hipGraphExec_t* pGraphExec, hipGraph_t graph, @@ -1609,7 +1614,7 @@ hipError_t hipGraphInstantiateWithParams(hipGraphExec_t* pGraphExec, hipGraph_t HIP_RETURN(status); } - HIP_RETURN(hipSuccess); + HIP_RETURN(hipSuccess, ReturnPtrValue(pGraphExec)); } hipError_t hipGraphExecDestroy(hipGraphExec_t pGraphExec) { @@ -1820,9 +1825,9 @@ hipError_t hipGraphExecMemcpyNodeSetParams(hipGraphExec_t hGraphExec, hipGraphNo if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(clonedNode)); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(clonedNode)); } HIP_RETURN(status); } @@ -1871,9 +1876,9 @@ hipError_t hipGraphExecMemsetNodeSetParams(hipGraphExec_t hGraphExec, hipGraphNo if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(clonedNode)); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(clonedNode)); } HIP_RETURN(status); } @@ -1931,9 +1936,9 @@ hipError_t hipGraphExecKernelNodeSetParams(hipGraphExec_t hGraphExec, hipGraphNo if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(clonedNode)); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(clonedNode)); } HIP_RETURN(status); } @@ -2008,13 +2013,18 @@ hipError_t hipGraphExecChildGraphNodeSetParams(hipGraphExec_t hGraphExec, hipGra if (status != hipSuccess) { return status; } - if (reinterpret_cast(clonedNode)->GetGraphCaptureStatus()) { + + hip::ChildGraphNode* childNode = reinterpret_cast(clonedNode); + + // After SetParams updates node parameters in-place, we need to update the cached AQL packets + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled() || childNode->GetGraphCaptureStatus()) { std::vector childGraphNodes; - reinterpret_cast(clonedNode)->TopologicalOrder(childGraphNodes); + childNode->TopologicalOrder(childGraphNodes); for (std::vector::size_type i = 0; i != childGraphNodes.size(); i++) { if (childGraphNodes[i]->GraphCaptureEnabled()) { - status = reinterpret_cast(clonedNode) - ->UpdateAQLPacket(reinterpret_cast(childGraphNodes[i])); + status = + childNode->UpdateAQLPacket(reinterpret_cast(childGraphNodes[i])); if (status != hipSuccess) { return status; } @@ -2414,9 +2424,9 @@ hipError_t hipGraphExecMemcpyNodeSetParamsFromSymbol(hipGraphExec_t hGraphExec, if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(clonedNode)); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(clonedNode)); } HIP_RETURN(status); } @@ -2497,9 +2507,9 @@ hipError_t hipGraphExecMemcpyNodeSetParamsToSymbol(hipGraphExec_t hGraphExec, hi if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(clonedNode)); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(clonedNode)); } HIP_RETURN(status); } @@ -2734,10 +2744,11 @@ hipError_t hipGraphExecUpdate(hipGraphExec_t hGraphExec, hipGraph_t hGraph, *updateResult_out = hipGraphExecUpdateErrorNotSupported; } HIP_RETURN(hipErrorGraphExecUpdateFailure); - } else if (DEBUG_CLR_GRAPH_PACKET_CAPTURE && newGraphNodes[i]->GraphCaptureEnabled()) { - status = - reinterpret_cast(hGraphExec) - ->UpdateAQLPacket(reinterpret_cast(oldGraphExecNodes[i])); + } else { + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled() && newGraphNodes[i]->GraphCaptureEnabled()) { + status = graphExec->UpdateAQLPacket(reinterpret_cast(oldGraphExecNodes[i])); + } } } else { *hErrorNode_out = reinterpret_cast(newGraphNodes[i]); @@ -3091,12 +3102,16 @@ hipError_t hipGraphNodeSetEnabled(hipGraphExec_t hGraphExec, hipGraphNode_t hNod HIP_RETURN(hipErrorInvalidValue); } clonedNode->SetEnabled(isEnabled); - // Update packet batches when node is enabled/disabled - hipError_t status = graphExec->UpdatePacketBatchesForNodeEnableDisable(clonedNode, isEnabled != 0); - if (status != hipSuccess) { - HIP_RETURN(status); + + hipError_t status = hipSuccess; + if (graphExec->IsSegmentSchedulingEnabled()) { + // Update packet batches when node is enabled/disabled + status = graphExec->UpdatePacketBatchesForNodeEnableDisable(clonedNode, isEnabled != 0); + if (status != hipSuccess) { + HIP_RETURN(status); + } } - HIP_RETURN(hipSuccess); + HIP_RETURN(status); } hipError_t hipGraphNodeGetEnabled(hipGraphExec_t hGraphExec, hipGraphNode_t hNode, @@ -3449,8 +3464,9 @@ hipError_t hipDrvGraphExecMemsetNodeSetParams(hipGraphExec_t hGraphExec, hipGrap if (status != hipSuccess) { HIP_RETURN(status); } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(hGraphExec)->UpdateAQLPacket(clonedNode); + auto graphExec = reinterpret_cast(hGraphExec); + if (graphExec->IsSegmentSchedulingEnabled()) { + status = graphExec->UpdateAQLPacket(clonedNode); } HIP_RETURN(status); } @@ -3572,8 +3588,9 @@ hipError_t hipGraphExecNodeSetParams(hipGraphExec_t graphExec, hipGraphNode_t no return status; } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - status = reinterpret_cast(graphExec)->UpdateAQLPacket(clonedNode); + auto graphExecPtr = reinterpret_cast(graphExec); + if (graphExecPtr->IsSegmentSchedulingEnabled()) { + status = graphExecPtr->UpdateAQLPacket(clonedNode); } return status; } diff --git a/projects/clr/hipamd/src/hip_graph_internal.cpp b/projects/clr/hipamd/src/hip_graph_internal.cpp index 8a8f93c8f1..ab8d069988 100644 --- a/projects/clr/hipamd/src/hip_graph_internal.cpp +++ b/projects/clr/hipamd/src/hip_graph_internal.cpp @@ -19,7 +19,6 @@ THE SOFTWARE. */ #include "hip_graph_internal.hpp" -#include #define CASE_STRING(X, C) \ case X: \ @@ -137,10 +136,11 @@ void Graph::RemoveNode(const Node& node) { } // ================================================================================================ -// root nodes are all vertices with 0 in-degrees std::vector Graph::GetRootNodes() const { + // root nodes are all vertices with 0 in-degrees std::vector roots; - for (auto entry : vertices_) { + + for (const auto& entry : vertices_) { if (entry->GetInDegree() == 0) { roots.push_back(entry); ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, "[hipGraph] Root node: %s(%p)", @@ -194,7 +194,7 @@ void Graph::ScheduleOneNode(Node node, int stream_id) { // Process child graph separately, since, there is no connection if (node->GetType() == hipGraphNodeTypeGraph) { auto child = reinterpret_cast(node)->GetChildGraph(); - child->ScheduleNodes(); + hipError_t status = child->ScheduleNodes(); max_streams_ = std::max(max_streams_, child->max_streams_); reinterpret_cast(node)->GraphExec::TopologicalOrder(); } @@ -208,14 +208,35 @@ void Graph::ScheduleOneNode(Node node, int stream_id) { } // ================================================================================================ -void Graph::ScheduleNodes() { - for (auto node : vertices_) { - node->stream_id_ = -1; - node->signal_is_required_ = false; +hipError_t Graph::ScheduleNodes() { + if (use_segment_scheduling_) { + // Segment packet scheduling logic + hipError_t result = ScheduleNodesIntoBatches(); + + // If ScheduleNodesIntoBatches returns hipErrorNotReady, it indicates + // a complex graph that would benefit from classic path, so fall back + if (result == hipErrorNotReady) { + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, + "[hipGraph] Falling back to classic scheduling for complex graph"); + // Clear any partial segment data that might have been created + segments_.clear(); + node_to_segment_id_.clear(); + segments_per_level_.clear(); + max_dependency_level_ = -1; + // Disable segment scheduling for this graph permanently + use_segment_scheduling_ = false; + + // Continue to classic scheduling logic below + } else { + // Return success or actual error (not the special fallback indicator) + return result; + } } + + // Classic scheduling logic memset(&roots_[0], 0, sizeof(Node) * roots_.size()); max_streams_ = 0; - // Start processing all nodes in the graph to find async executions. + int stream_id = 0; for (auto node : vertices_) { if (node->stream_id_ == -1) { @@ -232,6 +253,414 @@ void Graph::ScheduleNodes() { stream_id = (stream_id + 1) % DEBUG_HIP_FORCE_GRAPH_QUEUES; } } + + // Topological order is only needed for original scheduling + GraphExec* graphExec = dynamic_cast(this); + if (graphExec && !graphExec->TopologicalOrder()) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] TopologicalOrder failed - invalid graph"); + return hipErrorInvalidValue; + } + + return hipSuccess; +} + +// ================================================================================================ +hipError_t Graph::ScheduleNodesIntoBatches() { + // Handle empty graph case - valid, nothing to schedule + if (GetNodeCount() == 0) { + return hipSuccess; + } + + // Find execution paths hierarchically (new approach) + auto hierarchical_paths = FindExecutionPathsHierarchical(); + if (hierarchical_paths.paths.empty()) { + // If we have nodes but no paths, this indicates an invalid graph (likely a cycle) + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] No execution paths found - graph may contain cycles"); + return hipErrorInvalidValue; + } + + // Create segments from hierarchical paths (new approach) + CreateSegmentsFromPaths(hierarchical_paths); + // Verify we created at least one valid segment + if (segments_.empty()) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] No valid segments created from execution paths"); + return hipErrorInvalidValue; + } + + // Check if this is a complex graph that would benefit from classic path + // Complex graphs: 16+ segments with average segment length < 8 + const size_t kSegmentSizeThreshold = 16; + const double kAvgSegmentLengthThreshold = 8.0; + if (segments_.size() >= kSegmentSizeThreshold && DEBUG_HIP_GRAPH_SEGMENT_SCHEDULING != 2) { + size_t total_nodes = 0; + for (const auto& segment : segments_) { + total_nodes += segment.nodes.size(); + } + double avg_segment_length = static_cast(total_nodes) / segments_.size(); + + if (avg_segment_length < kAvgSegmentLengthThreshold) { + ClPrint(amd::LOG_INFO, amd::LOG_CODE, + "[hipGraph] Complex graph detected: %zu segments, avg length %.2f - " + "falling back to classic path for better performance", + segments_.size(), avg_segment_length); + // Return special status to indicate fallback to classic path + return hipErrorNotReady; + } + } + + // Resolve segment dependencies and calculate dependency levels + ResolveSegmentDependencies(); + + // Calculate topological order for fallback paths and compatibility + // (e.g., child graphs, legacy execution, GetNodes() API) + GraphExec* graphExec = dynamic_cast(this); + if (graphExec && !graphExec->TopologicalOrder()) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] TopologicalOrder failed - graph may contain cycles"); + return hipErrorInvalidValue; + } + + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, + "[hipGraph] ScheduleNodesIntoBatches: Total nodes = %zu, total segments = %zu max " + "dependency level = %d, max streams = %d", + GetNodeCount(), segments_.size(), max_dependency_level_, max_streams_); + + return hipSuccess; +} + +// ================================================================================================ +void Graph::ResolveSegmentDependencies() { + // Resolve dependencies within this graph + for (size_t i = 0; i < segments_.size(); ++i) { + auto& segment = segments_[i]; + + // Only check first node for incoming dependencies + if (segment.first_node != nullptr) { + const auto& dependencies = segment.first_node->GetDependencies(); + + for (const auto& dep_node : dependencies) { + // Find which segment this dependency belongs to (within this graph) + auto dep_it = node_to_segment_id_.find(dep_node); + if (dep_it != node_to_segment_id_.end()) { + int dep_segment_id = dep_it->second; + + // Validate segment ID is within bounds + if (dep_segment_id < 0 || dep_segment_id >= static_cast(segments_.size())) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] Invalid segment ID %d (segments size: %zu)", + dep_segment_id, segments_.size()); + continue; // Skip invalid segment ID + } + + // Add dependency if not already present + if (std::find(segment.segment_ids_dependencies.begin(), + segment.segment_ids_dependencies.end(), + dep_segment_id) == segment.segment_ids_dependencies.end()) { + segment.segment_ids_dependencies.push_back(dep_segment_id); + + // Also add this segment as an edge of the dependency segment + segments_[dep_segment_id].segment_ids_edges.push_back(i); + } + } + } + } + } + + // Recursively resolve dependencies in child graphs + // When a parent segment depends on a segment containing a child graph node, + // it implicitly depends on ALL segments in that child graph completing. + for (auto& segment : segments_) { + if (segment.child_graph_ptr != nullptr) { + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, + "[hipGraph] Recursively resolving dependencies" + "for child graph %p in segment [id=%d]", + segment.child_graph_ptr, segment.id); + + // Child graph resolves its own internal segment dependencies + segment.child_graph_ptr->ResolveSegmentDependencies(); + } + } + + // Calculate dependency levels and max_streams_ using topological sort + CalculateSegmentTopoDependencyLevels(); +} + +// ================================================================================================ +void Graph::CalculateSegmentTopoDependencyLevels() { + // Topological sort of segments to calculate dependency levels + // Assume each segment is a node and the dependencies are segments edges + // Segments with same dependency level can be processed in parallel + std::queue queue; + std::unordered_map in_degree; + + // Reset max dependency level, max streams, and segments per level + max_dependency_level_ = -1; + max_streams_ = 1; + segments_per_level_.clear(); + + // Initialize in-degree for each segment and enqueue root segments + for (size_t i = 0; i < segments_.size(); ++i) { + segments_[i].dependency_level = -1; + in_degree[i] = segments_[i].segment_ids_dependencies.size(); + + if (in_degree[i] == 0) { + // Root segments have level 0 + segments_[i].dependency_level = 0; + queue.push(i); + max_dependency_level_ = 0; + segments_per_level_[0].push_back(i); + } + } + + // Process segments in topological order + while (!queue.empty()) { + int current_id = queue.front(); + queue.pop(); + + auto& current_segment = segments_[current_id]; + int current_level = current_segment.dependency_level; + + // Process all segments that depend on current segment + for (int edge_id : current_segment.segment_ids_edges) { + auto& edge_segment = segments_[edge_id]; + + // Calculate the dependency level for this segment + // It's one level higher than the maximum of its dependencies + int new_level = current_level + 1; + if (edge_segment.dependency_level < new_level) { + edge_segment.dependency_level = new_level; + // Track the maximum dependency level + max_dependency_level_ = std::max(max_dependency_level_, new_level); + } + + // Decrease in-degree and enqueue if all dependencies processed + in_degree[edge_id]--; + if (in_degree[edge_id] == 0) { + queue.push(edge_id); + // Add segment to its dependency level + segments_per_level_[edge_segment.dependency_level].push_back(edge_id); + } + } + } + + // Calculate max_streams_ based on maximum parallelism at any dependency level + for (const auto& level_segments : segments_per_level_) { + max_streams_ = std::max(max_streams_, static_cast(level_segments.second.size())); + } +} + +// ================================================================================================ +hip::Graph::GraphExecutionPaths Graph::FindExecutionPathsHierarchical() { + hip::Graph::GraphExecutionPaths graph_paths; + graph_paths.graph_ptr = this; + + // Find all root nodes (nodes with no dependencies) + const auto& root_nodes = GetRootNodes(); + + std::unordered_set visited; + for (const auto& root : root_nodes) { + // For each root, find all possible paths starting from it + std::vector current_path; + FindPathsRecursiveHierarchical(root, current_path, visited, graph_paths); + } + return graph_paths; +} + +// ================================================================================================ +void Graph::FindPathsRecursiveHierarchical(Node node, + std::vector& current_path, + std::unordered_set& visited, + hip::Graph::GraphExecutionPaths& graph_paths) { + // Lambda to save current path as a HierarchicalPath + auto savePath = [&graph_paths](const std::vector& path, int device_id, + Node child_node = nullptr, int child_index = -1) { + hip::Graph::HierarchicalPath h_path; + h_path.nodes = path; + h_path.device_id = device_id; + h_path.child_graph_node = child_node; + h_path.child_graph_paths_index = child_index; + graph_paths.paths.push_back(std::move(h_path)); + }; + + // Check if already visited + if (visited.find(node->GetID()) != visited.end()) { + return; + } + + // Mark regular nodes as visited + visited.insert(node->GetID()); + + // Check if device ID changed from previous node in path + bool device_changed = false; + int current_device_id = node->GetDeviceId(); + if (!current_path.empty()) { + int prev_device_id = current_path.back()->GetDeviceId(); + if (prev_device_id != current_device_id) { + device_changed = true; + // Save current path before device change + savePath(current_path, prev_device_id); + current_path.clear(); + } + } + + // Handle child graph nodes specially + if (node->GetType() == hipGraphNodeTypeGraph) { + // Save path before child graph node (if any) + if (!current_path.empty()) { + savePath(current_path, current_path.back()->GetDeviceId()); + current_path.clear(); + } + + // Get the child graph and recursively process it + auto childGraphNode = reinterpret_cast(node); + auto childGraph = childGraphNode->GetChildGraph(); + + if (childGraph != nullptr) { + // Create a new GraphExecutionPaths for this child graph + hip::Graph::GraphExecutionPaths child_graph_exec_paths; + child_graph_exec_paths.graph_ptr = childGraph; + + // Find all root nodes in the child graph + const auto& child_root_nodes = childGraph->GetRootNodes(); + std::unordered_set child_visited; + + for (const auto& child_root : child_root_nodes) { + std::vector child_current_path; + childGraph->FindPathsRecursiveHierarchical(child_root, child_current_path, + child_visited, child_graph_exec_paths); + } + + // Store the child graph paths + int child_graph_index = graph_paths.child_graph_paths.size(); + graph_paths.child_graph_paths.push_back(std::move(child_graph_exec_paths)); + + // Create a path containing just the child graph node + std::vector child_node_path = {childGraphNode}; + savePath(child_node_path, current_device_id, childGraphNode, child_graph_index); + } + + // Clear current path and continue with edges from the child graph node + current_path.clear(); + const auto& edges = node->GetEdges(); + for (const auto& edge : edges) { + FindPathsRecursiveHierarchical(edge, current_path, visited, graph_paths); + } + + return; + } + + // Regular node - add to current path + current_path.push_back(node); + + // Edges are out degrees, Dependencies are in degrees + const auto& edges = node->GetEdges(); + const auto& dependencies = node->GetDependencies(); + + // Check if this is a fork node (multiple outgoing edges) + bool is_fork = edges.size() > 1; + // Check if this is a join node (multiple incoming dependencies) + bool is_join = dependencies.size() > 1; + + if (is_fork || is_join) { + // Save current path as a separate segment + if (!current_path.empty()) { + std::vector path_to_save = current_path; + Node saved_join_node = nullptr; + + // For join nodes, save path without the join node itself + // For fork nodes, save the complete path + if (is_join) { + saved_join_node = path_to_save.back(); + path_to_save.pop_back(); + } + + if (!path_to_save.empty()) { + savePath(path_to_save, path_to_save.back()->GetDeviceId()); + } + current_path.clear(); + + // For nodes that are both fork and join, save them as their own segment + if (saved_join_node != nullptr && is_fork) { + std::vector fork_join_segment = {saved_join_node}; + savePath(fork_join_segment, saved_join_node->GetDeviceId()); + } + + // Put the join node back in current_path for further traversal + // But not if it's also a fork node, because we'll traverse branches separately + if (saved_join_node != nullptr && !is_fork) { + current_path.push_back(saved_join_node); + } + } + + // Traverse each branch until it hits a join + for (const auto& edge : edges) { + FindPathsRecursiveHierarchical(edge, current_path, visited, graph_paths); + + // Save the path if it's not empty and this was a fork/join boundary + if (!current_path.empty() && (is_fork || is_join)) { + savePath(current_path, current_path.back()->GetDeviceId()); + current_path.clear(); + } + } + } else if (edges.size() == 1) { + // Single edge - continue on same path + FindPathsRecursiveHierarchical(edges[0], current_path, visited, graph_paths); + } + + // Save any remaining path (handles leaf nodes and leaf join nodes) + if (!current_path.empty()) { + savePath(current_path, current_path.back()->GetDeviceId()); + current_path.clear(); + } +} + +// ================================================================================================ +void Graph::CreateSegmentsFromPaths(const hip::Graph::GraphExecutionPaths& exec_paths) { + // Clear previous segments + segments_.clear(); + node_to_segment_id_.clear(); + + // Create a segment for each execution path at this level + int segment_id = 0; + for (size_t i = 0; i < exec_paths.paths.size(); ++i) { + const auto& h_path = exec_paths.paths[i]; + if (h_path.nodes.empty()) continue; + + Segment segment; + segment.id = segment_id; + segment.nodes = h_path.nodes; + segment.first_node = h_path.nodes.front(); + segment.last_node = h_path.nodes.back(); + + // Preserve child graph information from hierarchical path + if (h_path.child_graph_node != nullptr && h_path.child_graph_paths_index >= 0) { + // Get direct pointer to child graph from the node + auto childGraphNode = reinterpret_cast(h_path.child_graph_node); + segment.child_graph_ptr = childGraphNode->GetChildGraph(); + } + + segments_.push_back(segment); + + // Map each node in this segment to the segment ID (local to this graph) + for (const auto& node : segment.nodes) { + node_to_segment_id_[node] = segment_id; + node->segment_id_ = segment_id; + } + + segment_id++; + } + + // Recursively process child graphs + for (size_t i = 0; i < exec_paths.child_graph_paths.size(); ++i) { + const auto& child_paths = exec_paths.child_graph_paths[i]; + + if (child_paths.graph_ptr != nullptr) { + // Let the child graph create its own segments + child_paths.graph_ptr->CreateSegmentsFromPaths(child_paths); + } + } } // ================================================================================================ @@ -305,10 +734,23 @@ void Graph::clone(Graph* newGraph, bool cloneNodes) const { userObj.first->owning_graphs_.insert(newGraph); } // Clone the root nodes to the new graph + // Map original root node pointers to their cloned counterparts if (roots_.size() > 0) { - memcpy(&newGraph->roots_[0], &roots_[0], sizeof(Node) * roots_.size()); + for (size_t i = 0; i < roots_.size(); ++i) { + if (roots_[i] != nullptr) { + auto it = newGraph->clonedNodes_.find(roots_[i]); + if (it != newGraph->clonedNodes_.end()) { + newGraph->roots_[i] = it->second; + } else { + newGraph->roots_[i] = nullptr; + } + } else { + newGraph->roots_[i] = nullptr; + } + } } newGraph->memAllocNodePtrs_ = memAllocNodePtrs_; + if (!cloneNodes) { newGraph->clonedNodes_.clear(); } @@ -334,7 +776,6 @@ bool GraphExec::isGraphExecValid(GraphExec* pGraphExec) { hipError_t GraphExec::CreateStreams(uint32_t num_streams, int devId) { amd::ScopedLock lock(graphExecStreamCreateLock_); - // Validate input parameters if (num_streams == 0) { ClPrint(amd::LOG_WARNING, amd::LOG_CODE, "[hipGraph] Attempting to create 0 streams for device %d", devId); @@ -355,14 +796,14 @@ hipError_t GraphExec::CreateStreams(uint32_t num_streams, int devId) { return hipSuccess; } - parallel_streams_[devId].reserve(num_streams); - + // Cap the number of streams to DEBUG_HIP_FORCE_GRAPH_QUEUES + uint32_t max_streams = std::min(num_streams, DEBUG_HIP_FORCE_GRAPH_QUEUES); ClPrint(amd::LOG_INFO, amd::LOG_CODE, "[hipGraph] Creating %u parallel streams for device %d", - num_streams, devId); - - for (uint32_t i = 0; i < num_streams; ++i) { - auto stream = - new hip::Stream(g_devices[devId], hip::Stream::Priority::Normal, hipStreamNonBlocking); + max_streams, devId); + parallel_streams_[devId].reserve(max_streams); + for (uint32_t i = 0; i < max_streams; ++i) { + auto stream = new hip::Stream(g_devices[devId], hip::Stream::Priority::Normal, + hipStreamNonBlocking); if (stream == nullptr || !stream->Create()) { ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed to %s stream %u for device %d", @@ -383,6 +824,7 @@ hipError_t GraphExec::CreateStreams(uint32_t num_streams, int devId) { return hipSuccess; } +// ================================================================================================ void GraphExec::FindStreamsReqPerDev() { // Count streams required per device based on stream-to-device mappings for (auto const& [stream_id, dev_ids] : streams_dev_ids_) { @@ -413,35 +855,92 @@ void GraphExec::FindStreamsReqPerDev() { } } } + + // Account for the launch stream that's available only on the instantiation device + // We only need to create (count - 1) extra streams for the instantiation device + for (auto& [dev_id, count] : max_streams_dev_) { + if (dev_id == instantiateDeviceId_ && count > 0) { + count = count - 1; + } + } +} + +// ================================================================================================ +void GraphExec::FindStreamsReqPerDevForSegments() { + // For packet engine mode: analyze segments to determine stream requirements per device + // We need to track the maximum number of concurrent segments per device at any level + + std::unordered_map streams_per_dev_at_level; + + for (const auto& [level, segment_ids] : segments_per_level_) { + streams_per_dev_at_level.clear(); + + // Count segments per device at this level + for (int segment_id : segment_ids) { + if (segment_id >= 0 && segment_id < static_cast(segments_.size())) { + const auto& segment = segments_[segment_id]; + + // Determine device ID from segment's first node + int dev_id = hip::getCurrentDevice()->deviceId(); + if (!segment.nodes.empty() && segment.first_node != nullptr) { + dev_id = segment.first_node->GetDeviceId(); + } + + streams_per_dev_at_level[dev_id]++; + } + } + + // Update max streams per device based on this level's requirements + for (const auto& [dev_id, count] : streams_per_dev_at_level) { + max_streams_dev_[dev_id] = std::max(max_streams_dev_[dev_id], count); + } + } + + // Account for the launch stream that's available only on the instantiation device + // We only need to create (count - 1) extra streams for the instantiation device + for (auto& [dev_id, count] : max_streams_dev_) { + if (dev_id == instantiateDeviceId_ && count > 0) { + count = count - 1; + } + } } // ================================================================================================ hipError_t GraphExec::Init() { hipError_t status = hipSuccess; - // create extra stream to avoid queue collision with the default execution stream + // Set instantiation device ID early so Find functions can use it + instantiateDeviceId_ = hip::getCurrentDevice()->deviceId(); - if (max_streams_ == 1) { - FindStreamsReqPerDev(); - if (max_streams_dev_.size() > 1) { - // Multi-device graph detected - create parallel streams for each device - for (auto const& [dev_id, num_streams] : max_streams_dev_) { - ClPrint(amd::LOG_INFO, amd::LOG_API, - "[hipGraph] For device id :%d max streams :%d for execution.\n", dev_id, - num_streams); + // create extra stream to avoid queue collision with the default execution stream + if (max_streams_ >= 1) { + if (use_segment_scheduling_) { + // For packet engine: analyze segments to determine per-device stream requirements + FindStreamsReqPerDevForSegments(); + } else { + // For classic scheduling: use stream-to-device mappings + FindStreamsReqPerDev(); + } + + // Create parallel streams for each device based on computed requirements + // Note: max_streams_dev_ already accounts for the launch stream, so it contains + // the number of extra streams to create + for (auto const& [dev_id, num_streams] : max_streams_dev_) { + if (num_streams > 0) { status = CreateStreams(num_streams, dev_id); if (status != hipSuccess) { return status; } + } else { + // No extra streams needed } } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - // For graph nodes capture AQL packets to dispatch them directly during graph launch. - status = CaptureAQLPackets(); - } - } else { - status = CreateStreams(max_streams_, hip::getCurrentDevice()->deviceId()); } - instantiateDeviceId_ = hip::getCurrentDevice()->deviceId(); + + if (use_segment_scheduling_) { + // For graph nodes capture AQL packets to dispatch them directly during graph launch. + status = CaptureAQLPackets(); + } + static_cast(hip::getCurrentDevice())->retain(); return status; } @@ -452,22 +951,31 @@ constexpr uint32_t kKernArgChunkSize = 128 * Ki; void GraphExec::GetKernelArgSizeForGraph(std::unordered_map& kernArgSizeForGraph) { // Calculate the kernel argument size required for all graph kernel nodes // when GPU packet capture is enabled - for (hip::GraphNode* node : topoOrder_) { - if (node->GraphCaptureEnabled()) { - // Accumulate the kernel argument size for each device - kernArgSizeForGraph[node->dev_id_] += node->GetKerArgSize(); - } else if (node->GetType() == hipGraphNodeTypeGraph) { - // Handle child graph nodes - auto childNode = reinterpret_cast(node); - // Child graph shares same kernel arg manager - GraphKernelArgManager* KernelArgManager = GetKernelArgManager(); - if (KernelArgManager != nullptr) { - KernelArgManager->retain(); - childNode->SetKernelArgManager(KernelArgManager); - // Recursively process child graph if it uses single stream - if (childNode->GetChildGraph()->max_streams_ == 1) { - childNode->GetKernelArgSizeForGraph(kernArgSizeForGraph); + if (use_segment_scheduling_ && !segments_.empty()) { + for (const auto& segment : segments_) { + // Handle child graph segments - skip node iteration, process recursively + if (segment.child_graph_ptr != nullptr) { + auto childGraphExec = dynamic_cast(segment.child_graph_ptr); + if (childGraphExec != nullptr) { + // Child graphs share the same kernel arg manager as parent + if (childGraphExec->GetKernelArgManager() == nullptr) { + auto kernArgMgr = GetKernelArgManager(); + if (kernArgMgr != nullptr) { + kernArgMgr->retain(); // Increment ref count for child's reference + childGraphExec->SetKernelArgManager(kernArgMgr); + } + } + childGraphExec->GetKernelArgSizeForGraph(kernArgSizeForGraph); + } + continue; // Skip processing nodes in this segment + } + + // Process regular nodes in this segment + for (hip::GraphNode* node : segment.nodes) { + if (node->GraphCaptureEnabled()) { + // Accumulate the kernel argument size for each device + kernArgSizeForGraph[node->dev_id_] += node->GetKerArgSize(); } } } @@ -476,7 +984,6 @@ void GraphExec::GetKernelArgSizeForGraph(std::unordered_map& kernAr // ================================================================================================ // Enable or disable a graph node's packets in the batch // Simply updates the enabled state and count of disabled nodes -// ================================================================================================ void GraphExec::PacketBatch::setEnabled(GraphNode* node, bool enabled) { auto it = nodeToRangeIndex.find(node); if (it == nodeToRangeIndex.end()) { @@ -490,7 +997,10 @@ void GraphExec::PacketBatch::setEnabled(GraphNode* node, bool enabled) { // Update counter based on state change if (enabled) { // Node being enabled: decrement counter - disabledNodeCount--; + // Defensive check to prevent underflow + if (disabledNodeCount > 0) { + disabledNodeCount--; + } } else { // Node being disabled: increment counter disabledNodeCount++; @@ -499,42 +1009,84 @@ void GraphExec::PacketBatch::setEnabled(GraphNode* node, bool enabled) { } // ================================================================================================ -hipError_t GraphExec::CaptureAndFormPacketsForGraph() { - hipError_t status = hipSuccess; +// Rebuild cached filtered lists of enabled packets +// Only rebuilds if cache is stale (size doesn't match expected enabled count) +// ================================================================================================ +void GraphExec::PacketBatch::rebuildFilteredLists() { + // Calculate expected size based on currently enabled nodes + size_t expectedCount = 0; + for (const auto& range : nodeRanges) { + if (range.enabled) { + expectedCount += range.packetCount; + } + } - // Clear previous capture status and batches - nodeCaptureStatus_.clear(); - nodeCaptureStatus_.resize(topoOrder_.size(), false); + // Cache is valid if size matches - no rebuild needed + if (enabledPackets.size() == expectedCount) { + return; + } - // Clear previous batches - packetBatches_.clear(); + // Cache is stale - rebuild it + enabledPackets.clear(); + enabledKernelNames.clear(); - // Process nodes and create batches of consecutive captured nodes - for (size_t i = 0; i < topoOrder_.size(); ++i) { - auto& node = topoOrder_[i]; + enabledPackets.reserve(expectedCount); + enabledKernelNames.reserve(expectedCount); - // Check if kernel node requires hidden heap and set it for the entire graph - if (node->GetType() == hipGraphNodeTypeKernel) { - static bool initialized = false; - if (!initialized && reinterpret_cast(node)->HasHiddenHeap()) { - SetHiddenHeap(); - initialized = true; + // Build filtered lists from enabled node ranges + for (const auto& range : nodeRanges) { + if (range.enabled) { + for (size_t j = 0; j < range.packetCount; ++j) { + size_t packetIndex = range.startIndex + j; + enabledPackets.push_back(dispatchPackets[packetIndex]); + enabledKernelNames.push_back(dispatchKernelNames[packetIndex]); } } + } +} - // Handle nodes that support graph capture - if (node->GraphCaptureEnabled()) { - // TODO: Add support for batching for multi-device linear graph - if (max_streams_dev_.size() == 1) { - // Single device - use batching optimization +// ================================================================================================ +hipError_t GraphExec::CaptureAndFormPacketsForGraph() { + // Fixme: Only single stream child graph nodes are supported. + hipError_t status = hipSuccess; + + // Clear previous batches + segmentBatches_.clear(); + + // Process nodes from segments + for (const auto& segment : segments_) { + // Skip segments that only contain a child graph metadata node + // Child graphs are processed recursively later + if (segment.child_graph_ptr != nullptr) { + continue; + } + + // Create a SegmentBatch for this segment + auto [it, inserted] = segmentBatches_.emplace(segment.id, segment.id); + // Initialize node_capture_status for this segment + auto& currentSegBatch = it->second; + currentSegBatch.node_capture_status.resize(segment.nodes.size(), false); + for (size_t i = 0; i < segment.nodes.size(); ++i) { + auto& node = segment.nodes[i]; + + // Check if kernel node requires hidden heap and set it for the entire graph + if (node->GetType() == hipGraphNodeTypeKernel) { + static bool initialized = false; + if (!initialized && reinterpret_cast(node)->HasHiddenHeap()) { + SetHiddenHeap(); + initialized = true; + } + } + + // Handle nodes that support graph capture + if (node->GraphCaptureEnabled()) { // Start of a new batch PacketBatch newBatch; - size_t j = i; // Collect packets from consecutive captured nodes - while (j < topoOrder_.size() && topoOrder_[j]->GraphCaptureEnabled()) { - auto& currentNode = topoOrder_[j]; - + size_t j = i; + while (j < segment.nodes.size() && segment.nodes[j]->GraphCaptureEnabled()) { + auto& currentNode = segment.nodes[j]; // Capture packets for this node std::vector nodePackets; std::vector nodeKernelNames; @@ -547,10 +1099,14 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() { } // Create NodeRange for this node - PacketBatch::NodeRange range; - range.startIndex = newBatch.dispatchPackets.size(); - range.packetCount = nodePackets.size(); - range.enabled = true; + // RangeIndex is 0 at the start + const size_t rangeIndex = newBatch.nodeRanges.size(); + const size_t startIndex = newBatch.dispatchPackets.size(); + const size_t packetCount = nodePackets.size(); + + // Reserve space to avoid reallocations during insertion + newBatch.dispatchPackets.reserve(startIndex + packetCount); + newBatch.dispatchKernelNames.reserve(startIndex + packetCount); // Add to dispatch lists (initially all enabled) newBatch.dispatchPackets.insert(newBatch.dispatchPackets.end(), nodePackets.begin(), @@ -558,43 +1114,51 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() { newBatch.dispatchKernelNames.insert(newBatch.dispatchKernelNames.end(), nodeKernelNames.begin(), nodeKernelNames.end()); - // Store node mapping - newBatch.nodeRanges.push_back(range); - newBatch.nodeToRangeIndex[currentNode] = newBatch.nodeRanges.size() - 1; + // Store node mapping with range info + newBatch.nodeRanges.push_back({startIndex, packetCount, true}); + newBatch.nodeToRangeIndex[currentNode] = rangeIndex; // Mark this node as successfully captured - nodeCaptureStatus_[j] = true; + currentSegBatch.node_capture_status[j] = true; ++j; } // Add the batch if it has packets if (!newBatch.dispatchPackets.empty()) { - packetBatches_.emplace_back(std::move(newBatch)); + currentSegBatch.packet_batches.emplace_back(std::move(newBatch)); } // Skip the nodes we just processed, the index will be incremented by the loop i = j - 1; - } else { - // Multi-device - capture individual packets without batching - status = node->CaptureAndFormPacket(GetKernelArgManager()); - if (status != hipSuccess) { - LogError("Individual packet capture failed for multi-device node"); - return status; - } } - } else if (node->GetType() == hipGraphNodeTypeGraph) { - auto childNode = reinterpret_cast(node); - if (childNode->GetChildGraph()->max_streams_ == 1) { - childNode->SetGraphCaptureStatus(true); - status = childNode->CaptureAndFormPacketsForGraph(); - nodeCaptureStatus_[i] = (status == hipSuccess); + } + } + + // Recursively process child graphs to capture their packets + for (const auto& segment : segments_) { + if (segment.child_graph_ptr != nullptr) { + auto childGraphExec = dynamic_cast(segment.child_graph_ptr); + if (childGraphExec != nullptr) { + // Child graphs share the same kernel arg manager as parent + // This is critical for packet capture to work correctly + if (childGraphExec->GetKernelArgManager() == nullptr) { + auto kernArgMgr = GetKernelArgManager(); + if (kernArgMgr != nullptr) { + kernArgMgr->retain(); // Increment ref count for child's reference + childGraphExec->SetKernelArgManager(kernArgMgr); + } + } + + status = childGraphExec->CaptureAndFormPacketsForGraph(); if (status != hipSuccess) { - LogWarning("Child graph packet capture failed continuing with other nodes"); - status = hipSuccess; // Continue processing other nodes + LogWarning("Child graph packet capture failed for child graph in segment"); + // Continue processing other child graphs + status = hipSuccess; } } } } + return status; } @@ -610,22 +1174,22 @@ hipError_t GraphExec::CaptureAQLPackets() { kernArgSizeForGraph[devId] = 0; } GetKernelArgSizeForGraph(kernArgSizeForGraph); - + // Allocate kernel argument pools on respective devices with extra space for updates for (const auto& deviceKernArgPair : kernArgSizeForGraph) { const int deviceId = deviceKernArgPair.first; const size_t kernArgSize = deviceKernArgPair.second; - + if (kernArgSize == 0) { continue; } const size_t totalPoolSize = kernArgSize + kKernArgChunkSize; if (!kernArgManager_->AllocGraphKernargPool(totalPoolSize, g_devices[deviceId]->devices()[0])) { - ClPrint(amd::LOG_ERROR, amd::LOG_CODE, - "[hipGraph] Failed to allocate kernel argument pool of size %zu for device %d", + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] Failed to allocate kernel argument pool of size %zu for device %d", totalPoolSize, deviceId); - return hipErrorMemoryAllocation; + return hipErrorMemoryAllocation; } } @@ -640,92 +1204,139 @@ hipError_t GraphExec::CaptureAQLPackets() { // ================================================================================================ hipError_t GraphExec::UpdateAQLPacket(hip::GraphNode* node) { - if (max_streams_ != 1 || !node->GraphCaptureEnabled()) { + if (!node->GraphCaptureEnabled()) { return hipSuccess; } - //ToDo: Add batching support for multi-device linear graph - if (max_streams_dev_.size() == 1) { - // Find which batch contains this node and update it - for (auto& batch : packetBatches_) { - auto it = batch.nodeToRangeIndex.find(node); - if (it != batch.nodeToRangeIndex.end()) { - // Found the batch containing this node - update packets - PacketBatch::NodeRange& range = batch.nodeRanges[it->second]; - // Capture new packets for this node - std::vector newPackets; - std::vector newKernelNames; - hipError_t status = node->CaptureAndFormPacket(kernArgManager_, &newPackets, - &newKernelNames); - if (status != hipSuccess) { - return status; - } - // Number of packets per node can change - if (newPackets.size() != range.packetCount) { - size_t rangeIdx = it->second; - size_t oldPacketCount = range.packetCount; - size_t newPacketCount = newPackets.size(); - int64_t packetDelta = static_cast(newPacketCount) - - static_cast(oldPacketCount); - ClPrint(amd::LOG_INFO, amd::LOG_CODE, - "[hipGraph] Handling packet count change for node (type=%d): %zu -> %zu packets", - node->GetType(), oldPacketCount, newPacketCount); - if (packetDelta > 0) { - // Need to insert additional packets - // Insert new slots at the end of this node's range - size_t insertPos = range.startIndex + oldPacketCount; - batch.dispatchPackets.insert(batch.dispatchPackets.begin() + insertPos, - packetDelta, nullptr); - batch.dispatchKernelNames.insert(batch.dispatchKernelNames.begin() + insertPos, - packetDelta, std::string()); - } else if (packetDelta < 0) { - // Need to remove packets - size_t removePos = range.startIndex + newPacketCount; - size_t removeCount = std::abs(packetDelta); - batch.dispatchPackets.erase(batch.dispatchPackets.begin() + removePos, - batch.dispatchPackets.begin() + removePos + removeCount); - batch.dispatchKernelNames.erase(batch.dispatchKernelNames.begin() + removePos, - batch.dispatchKernelNames.begin() + removePos + removeCount); - } - // Update this node's packet count - range.packetCount = newPacketCount; - // Adjust startIndex for all subsequent nodes in this batch - for (size_t i = rangeIdx + 1; i < batch.nodeRanges.size(); ++i) { - batch.nodeRanges[i].startIndex += packetDelta; - } - } - // Update dispatch packets (always update regardless of enabled state) - // The enabled/disabled check happens during dispatch, not here - for (size_t i = 0; i < range.packetCount && i < newPackets.size(); ++i) { - size_t packetIndex = range.startIndex + i; - batch.dispatchPackets[packetIndex] = newPackets[i]; - batch.dispatchKernelNames[packetIndex] = newKernelNames[i]; - } - return hipSuccess; - } - } - } else { - return node->CaptureAndFormPacket(kernArgManager_); + // Todo: Add batching support for multi-device linear graph + // Use node_to_segment_id_ for O(1) segment lookup + auto segIdIt = node_to_segment_id_.find(node); + if (segIdIt == node_to_segment_id_.end()) { + return hipSuccess; // Node not in any segment } - return hipSuccess; // Node not in any batch + + int segmentId = segIdIt->second; + + // Find the segment batch for this segment ID using O(1) map lookup + auto segBatchIt = segmentBatches_.find(segmentId); + if (segBatchIt == segmentBatches_.end()) { + return hipSuccess; // Segment not found + } + + auto& segBatch = segBatchIt->second; + + // Search only within this segment's packet batches + for (auto& packetBatch : segBatch.packet_batches) { + auto it = packetBatch.nodeToRangeIndex.find(node); + if (it != packetBatch.nodeToRangeIndex.end()) { + // Found the batch containing this node - update packets + PacketBatch::NodeRange& range = packetBatch.nodeRanges[it->second]; + + // Capture new packets for this node + std::vector newPackets; + std::vector newKernelNames; + hipError_t status = node->CaptureAndFormPacket(kernArgManager_, &newPackets, + &newKernelNames); + if (status != hipSuccess) { + return status; + } + // Number of packets per node can change + const size_t oldPacketCount = range.packetCount; + const size_t newPacketCount = newPackets.size(); + + if (newPacketCount != oldPacketCount) { + const size_t rangeIdx = it->second; + const int64_t packetDelta = + static_cast(newPacketCount) - static_cast(oldPacketCount); + + ClPrint( + amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, + "[hipGraph] Packet count change for node (type=%d): %zu -> %zu packets (delta=%ld)", + node->GetType(), oldPacketCount, newPacketCount, packetDelta); + + if (packetDelta > 0) { + // Insert additional packet slots at the end of this node's range + const size_t insertPos = range.startIndex + oldPacketCount; + packetBatch.dispatchPackets.insert(packetBatch.dispatchPackets.begin() + insertPos, + static_cast(packetDelta), nullptr); + packetBatch.dispatchKernelNames.insert( + packetBatch.dispatchKernelNames.begin() + insertPos, + static_cast(packetDelta), std::string()); + } else { + // Negative packetDelta, remove excess packet slots from the end of this node's range + const size_t removePos = range.startIndex + newPacketCount; + const size_t removeCount = oldPacketCount - newPacketCount; + + // Validate bounds before erasing + if (removePos + removeCount > packetBatch.dispatchPackets.size()) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] Invalid packet removal bounds: pos=%zu, count=%zu, size=%zu", + removePos, removeCount, packetBatch.dispatchPackets.size()); + return hipErrorInvalidValue; + } + + packetBatch.dispatchPackets.erase( + packetBatch.dispatchPackets.begin() + removePos, + packetBatch.dispatchPackets.begin() + removePos + removeCount); + packetBatch.dispatchKernelNames.erase( + packetBatch.dispatchKernelNames.begin() + removePos, + packetBatch.dispatchKernelNames.begin() + removePos + removeCount); + } + + // Update this node's packet count and adjust startIndex for all subsequent nodes + range.packetCount = newPacketCount; + for (size_t i = rangeIdx + 1; i < packetBatch.nodeRanges.size(); ++i) { + packetBatch.nodeRanges[i].startIndex = static_cast( + static_cast(packetBatch.nodeRanges[i].startIndex) + packetDelta); + } + } + + // Update dispatch packets (always update regardless of enabled state) + // The enabled/disabled check happens during dispatch, not here + for (size_t i = 0; i < range.packetCount && i < newPackets.size(); ++i) { + size_t packetIndex = range.startIndex + i; + packetBatch.dispatchPackets[packetIndex] = newPackets[i]; + packetBatch.dispatchKernelNames[packetIndex] = newKernelNames[i]; + } + return hipSuccess; + } + } + return hipSuccess; // Node not in any batch } // ================================================================================================ hipError_t GraphExec::UpdatePacketBatchesForNodeEnableDisable(hip::GraphNode* node, bool isEnabled) { - if (max_streams_ != 1 && max_streams_dev_.size() == 1 && !node->GraphCaptureEnabled()) { - // Only handle single stream and single device case with captured nodes + if (!node->GraphCaptureEnabled()) { + // Only handle single stream case with captured nodes return hipSuccess; } - // Find which batch contains this node and update its enabled state - for (auto& batch : packetBatches_) { - auto it = batch.nodeToRangeIndex.find(node); - if (it != batch.nodeToRangeIndex.end()) { + + // Use node_to_segment_id_ for O(1) segment lookup + auto segIdIt = node_to_segment_id_.find(node); + if (segIdIt == node_to_segment_id_.end()) { + return hipSuccess; // Node not in any segment + } + + int segmentId = segIdIt->second; + + // Find the segment batch for this segment ID using O(1) map lookup + auto segBatchIt = segmentBatches_.find(segmentId); + if (segBatchIt == segmentBatches_.end()) { + return hipSuccess; // Segment not found + } + + auto& segBatch = segBatchIt->second; + + // Search only within this segment's packet batches + for (auto& packetBatch : segBatch.packet_batches) { + auto it = packetBatch.nodeToRangeIndex.find(node); + if (it != packetBatch.nodeToRangeIndex.end()) { // Found the batch containing this node - update enabled state - batch.setEnabled(node, isEnabled); + packetBatch.setEnabled(node, isEnabled); return hipSuccess; } } - return hipSuccess; // Node not in any batch + return hipSuccess; } // ================================================================================================ @@ -736,157 +1347,352 @@ void GraphExec::DecrementRefCount(cl_event event, cl_int command_exec_status, vo } // ================================================================================================ +void GraphExec::AssignStreamsToSegments( + const std::vector& segments_at_level, + hip::Stream* launch_stream, + const std::vector& streams, + std::unordered_map& segment_to_stream) { -hipError_t GraphExec::EnqueueGraphWithSingleList(hip::Stream* hip_stream) { - // Accumulate command tracks all the AQL packet batch that we submit to the HW. For now - // we track only kernel nodes. - amd::AccumulateCommand* accumulate = nullptr; + // Assign streams to segments at this level using round-robin + for (size_t idx = 0; idx < segments_at_level.size(); ++idx) { + int segment_id = segments_at_level[idx]; + const auto& segment = segments_[segment_id]; + + // Determine device ID for this segment from its first node + int segment_device_id = launch_stream->DeviceId(); + if (!segment.nodes.empty() && segment.first_node != nullptr) { + segment_device_id = segment.first_node->GetDeviceId(); + } + + hip::Stream* assigned_stream = nullptr; + + // Use collision-handled streams if provided (single-device case) + if (!streams.empty()) { + // Round-robin across the collision-handled streams + size_t stream_idx = idx % streams.size(); + assigned_stream = streams[stream_idx]; + } else if (parallel_streams_.find(segment_device_id) != parallel_streams_.end() && + !parallel_streams_[segment_device_id].empty()) { + // Multi-device case: Use device-aware stream selection from parallel_streams_ + const auto& device_streams = parallel_streams_[segment_device_id]; + size_t stream_idx = idx % (device_streams.size() + 1); + assigned_stream = (stream_idx == 0) ? launch_stream : device_streams[stream_idx - 1]; + } else { + // Fallback to launch stream if no parallel streams available + assigned_stream = launch_stream; + } + + segment_to_stream[segment_id] = assigned_stream; + } +} + +// ================================================================================================ +amd::Command* GraphExec::EnqueueSegmentedGraph(hip::Stream* launch_stream, + const std::vector& streams, + hipError_t* out_status) { hipError_t status = hipSuccess; - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - accumulate = new amd::AccumulateCommand(*hip_stream, {}, nullptr); + if (out_status != nullptr) { + *out_status = hipSuccess; + } + + // Lambda to create and enqueue a marker with wait list + auto enqueueMarker = [](hip::Stream* stream, const amd::Command::EventWaitList& wait_list) { + auto marker = new amd::Marker(*stream, true, wait_list); + // Marker is only for dependency, no need to flush caches. + marker->setCommandEntryScope(amd::Device::kCacheStateIgnore); + if (marker != nullptr) { + marker->enqueue(); + marker->release(); + } + }; + + // Map to track which stream each segment uses - MUST persist across all levels + // so we can look up streams for dependencies from previous levels + std::unordered_map segment_to_stream; + // Map to track the last enqueued command for each segment for dependency tracking + // This is critical for handling cross-level dependencies with stream reuse + std::unordered_map segment_last_command; + + // Process segments level by level using the pre-calculated max_dependency_level_ + for (int level = 0; level <= max_dependency_level_; ++level) { + auto level_it = segments_per_level_.find(level); + if (level_it == segments_per_level_.end()) { + continue; + } + + const auto& segments_at_level = level_it->second; + + // Assign streams to segments at this level + AssignStreamsToSegments(segments_at_level, launch_stream, streams, segment_to_stream); + + // Process each segment at this level + for (int segment_id : segments_at_level) { + const auto& segment = segments_[segment_id]; + hip::Stream* current_stream = segment_to_stream[segment_id]; + + // Handle dependencies: add wait markers if dependent segments are on different streams + // Look up the specific command for each dependency segment + amd::Command::EventWaitList wait_list; + for (int dep_segment_id : segment.segment_ids_dependencies) { + // Dependencies are present in the segment_to_stream and segment_last_command map + auto stream_it = segment_to_stream.find(dep_segment_id); + if (stream_it == segment_to_stream.end()) { + continue; + } + hip::Stream* dep_stream = stream_it->second; + + // Need to wait if dependency is on a different stream + if (dep_stream != current_stream) { + auto cmd_it = segment_last_command.find(dep_segment_id); + if (cmd_it != segment_last_command.end() && cmd_it->second != nullptr) { + // Retain command before adding to wait list for proper lifetime management + cmd_it->second->retain(); + wait_list.push_back(cmd_it->second); + } + } + } + + // If there are cross-stream dependencies, insert a marker to wait + if (!wait_list.empty()) { + enqueueMarker(current_stream, wait_list); + // Release our retains - marker has its own retain on wait list events + for (auto* cmd : wait_list) { + cmd->release(); + } + } + + // Create accumulate command for this segment + amd::AccumulateCommand* accumulate = new amd::AccumulateCommand(*current_stream, {}, nullptr); + + // Enqueue this segment using the helper function + status = EnqueueSegment(segment, current_stream, accumulate); + + if (status != hipSuccess) { + accumulate->release(); + // Clean up any previously enqueued commands + for (auto& pair : segment_last_command) { + if (pair.second != nullptr) { + pair.second->release(); + } + } + if (out_status != nullptr) { + *out_status = status; + } + return nullptr; + } + + // Do not release as this is released at the end + accumulate->enqueue(); + + segment_last_command[segment_id] = accumulate; + } + } + + // Synchronize all streams with work back to launch_stream + // Build a map of stream to last command by collecting from the highest-level segment on each + // stream This is critical because unordered_map iteration order is undefined, so we must + // explicitly track dependency levels to ensure we wait on the last command (highest level) on + // each stream + std::unordered_map stream_last_command_map; + std::unordered_map stream_max_level; // Track max dependency level per stream + + for (const auto& pair : segment_last_command) { + int seg_id = pair.first; + amd::Command* cmd = pair.second; + auto stream_it = segment_to_stream.find(seg_id); + if (stream_it != segment_to_stream.end()) { + hip::Stream* stream = stream_it->second; + int seg_dependency_level = segments_[seg_id].dependency_level; + + // Only update if this segment is at a strictly higher level + // Using strict > ensures deterministic behavior when multiple segments + // are at the same level on the same stream + auto level_it = stream_max_level.find(stream); + if (level_it == stream_max_level.end() || + seg_dependency_level > level_it->second) { + stream_max_level[stream] = seg_dependency_level; + stream_last_command_map[stream] = cmd; + } + } + } + + amd::Command::EventWaitList final_wait_list; + for (const auto& pair : stream_last_command_map) { + hip::Stream* stream = pair.first; + amd::Command* last_cmd = pair.second; + + // Sync all streams except the launch_stream itself + if (stream != launch_stream && last_cmd != nullptr) { + // Retain commands before adding to wait list since marker will retain them + // and we'll release them later in cleanup + last_cmd->retain(); + final_wait_list.push_back(last_cmd); + } + } + + // If there are other streams with work, sync them back to launch_stream + if (!final_wait_list.empty()) { + enqueueMarker(launch_stream, final_wait_list); + } + + // Release the extra retains for commands in final_wait_list + // (marker has its own retain, we release ours) + for (auto* cmd : final_wait_list) { + if (cmd != nullptr) { + cmd->release(); + } + } + + // Get the last command enqueued on the launch_stream for parent dependency tracking + // This is to prevent release in cleanup loop, this determines graph execution completion + amd::Command* last_command = nullptr; + auto launch_stream_it = stream_last_command_map.find(launch_stream); + if (launch_stream_it != stream_last_command_map.end()) { + last_command = launch_stream_it->second; + // Find the segment that produced this command and remove it from cleanup + for (auto it = segment_last_command.begin(); it != segment_last_command.end(); ) { + if (it->second == last_command) { + it = segment_last_command.erase(it); + break; + } else { + ++it; + } + } + } + + // Release all other enqueued accumulate commands + for (auto& pair : segment_last_command) { + if (pair.second != nullptr) { + pair.second->release(); + } + } + + if (out_status != nullptr) { + *out_status = status; + } + return last_command; +} + +// ================================================================================================ +// Graph segment to queue dispatch matching +hipError_t GraphExec::EnqueueSegment(const Segment& segment, hip::Stream* stream, + amd::AccumulateCommand* accumulate) { + hipError_t status = hipSuccess; + + // Find the SegmentBatch for this segment using O(1) map lookup + SegmentBatch* segBatch = nullptr; + auto segBatchIt = segmentBatches_.find(segment.id); + if (segBatchIt != segmentBatches_.end()) { + segBatch = &segBatchIt->second; } size_t batchIndex = 0; - // Process nodes in topological order with mixed execution strategy - for (size_t i = 0; i < topoOrder_.size(); ++i) { - auto& node = topoOrder_[i]; - - if (!node->GraphCaptureEnabled()) { - // Node doesn't support capture - execute individually if enabled - if (node->GetEnabled() != 0) { - node->SetStream(hip_stream); - status = node->CreateCommand(node->GetQueue()); - node->EnqueueCommands(hip_stream); + // Handle child graph segments - recursively enqueue the entire child graph + if (segment.child_graph_ptr != nullptr) { + auto childGraphExec = dynamic_cast(segment.child_graph_ptr); + if (childGraphExec != nullptr) { + // Child graphs share the same kernel arg manager as parent (for packet capture) + if (childGraphExec->GetKernelArgManager() == nullptr) { + auto kernArgMgr = GetKernelArgManager(); + if (kernArgMgr != nullptr) { + kernArgMgr->retain(); // Increment ref count for child's reference + childGraphExec->SetKernelArgManager(kernArgMgr); + } } - } else if (i < nodeCaptureStatus_.size() && nodeCaptureStatus_[i]) { - // Node was successfully captured - dispatch the batch with enabled nodes only - if (batchIndex < packetBatches_.size()) { - const auto& batch = packetBatches_[batchIndex]; - // O(1) check: if no disabled nodes, dispatch entire batch directly - // This avoids creating new vectors when all nodes are enabled (common case) - if (batch.disabledNodeCount == 0) { - // Fast path: all nodes enabled, dispatch entire batch - bool batchStatus = hip_stream->vdev()->dispatchAqlPacketBatch( - batch.dispatchPackets, batch.dispatchKernelNames, accumulate); - if (!batchStatus) { - status = hipErrorUnknown; - accumulate->release(); - return status; - } + + // Recursively enqueue the child graph with its own dependency tracking + // Child graphs use their own parallel_streams_, so pass empty vector + hipError_t child_status = hipSuccess; + amd::Command* child_last_cmd = childGraphExec->EnqueueSegmentedGraph( + stream, {}, &child_status); + + if (child_status != hipSuccess) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] EnqueueSegment: Failed to enqueue child graph, status=%d", + child_status); + return child_status; + } + + // Child graph's work is already enqueued to the stream + // The returned last command tracks completion - release our reference + if (child_last_cmd != nullptr) { + child_last_cmd->release(); + } + } + + // Child graph segment has no regular nodes to process + return hipSuccess; + } + + // Process all nodes in this segment + for (size_t i = 0; i < segment.nodes.size(); ++i) { + auto& node = segment.nodes[i]; + if (DEBUG_HIP_GRAPH_DOT_PRINT) { + node->stream_id_ = stream->GetStreamId(); + } + if (!node->GraphCaptureEnabled()) { + // Node doesn't support capture - execute individually + node->SetStream(stream); + status = node->CreateCommand(node->GetQueue()); + node->EnqueueCommands(stream); + } else if (segBatch && i < segBatch->node_capture_status.size() && + segBatch->node_capture_status[i]) { + // Node was successfully captured - dispatch its batch + if (segBatch && batchIndex < segBatch->packet_batches.size()) { + auto& packetBatch = segBatch->packet_batches[batchIndex]; + + // Select which vectors to dispatch based on whether nodes are disabled + const std::vector* packetsToDispatch; + const std::vector* kernelNamesToDispatch; + + if (packetBatch.disabledNodeCount == 0) { + // No disabled nodes - use full batch + packetsToDispatch = &packetBatch.dispatchPackets; + kernelNamesToDispatch = &packetBatch.dispatchKernelNames; } else { - // Slow path: some nodes disabled, create filtered vectors - std::vector enabledPackets; - std::vector enabledKernelNames; - for (const auto& range : batch.nodeRanges) { - if (range.enabled) { - // Add packets for this enabled node - for (size_t j = 0; j < range.packetCount; ++j) { - size_t packetIndex = range.startIndex + j; - enabledPackets.push_back(batch.dispatchPackets[packetIndex]); - enabledKernelNames.push_back(batch.dispatchKernelNames[packetIndex]); - } - } - } - // Only dispatch if there are enabled packets - if (!enabledPackets.empty()) { - bool batchStatus = hip_stream->vdev()->dispatchAqlPacketBatch( - enabledPackets, enabledKernelNames, accumulate); - if (!batchStatus) { - status = hipErrorUnknown; - accumulate->release(); - return status; - } - } + // Some nodes disabled - rebuild and use filtered lists + packetBatch.rebuildFilteredLists(); + packetsToDispatch = &packetBatch.enabledPackets; + kernelNamesToDispatch = &packetBatch.enabledKernelNames; } + // Dispatch the selected batch + if (!packetsToDispatch->empty()) { + bool batchStatus = stream->vdev()->dispatchAqlPacketBatch( + *packetsToDispatch, *kernelNamesToDispatch, accumulate); + if (!batchStatus) { + status = hipErrorUnknown; + return status; + } + } + if (DEBUG_HIP_GRAPH_DOT_PRINT) { + for(int j = i; j < i + packetBatch.nodeRanges.size(); j++) { + segment.nodes[j]->stream_id_ = stream->GetStreamId(); + } + } // Skip all consecutive captured nodes that belong to this batch - i += packetBatches_[batchIndex].nodeRanges.size() - 1; // -1 because loop will increment - + i += packetBatch.nodeRanges.size() - 1; // -1 because loop will increment ++batchIndex; } } } - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - accumulate->enqueue(); - accumulate->release(); - } - return status; -} -// ================================================================================================ -hipError_t GraphExec::EnqueueMultiDeviceLinearGraph(hip::Stream* launch_stream) { - // Accumulate command tracks all the AQL packet batch that we submit to the HW. For now we track - // only kernel nodes. - amd::AccumulateCommand* accumulate = nullptr; - hipError_t status = hipSuccess; - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - accumulate = new amd::AccumulateCommand(*launch_stream, {}, nullptr); - } - - auto createMarkerAndWait = [](hip::Stream* fromStream, hip::Stream* toStream) { - amd::Command::EventWaitList wait_list; - auto marker = new amd::Marker(*fromStream, true, wait_list); - marker->enqueue(); - marker->release(); - wait_list.push_back(marker); - auto wait_marker = new amd::Marker(*toStream, true, wait_list); - wait_marker->enqueue(); - wait_marker->release(); - }; - - hip::Stream* prevStream = launch_stream; - size_t batchIndex = 0; - - for (size_t i = 0; i < topoOrder_.size(); ++i) { - auto& node = topoOrder_[i]; - hip::Stream* currStream = parallel_streams_[node->dev_id_][0]; - - // Insert synchronization marker if switching devices - if (prevStream->DeviceId() != currStream->DeviceId()) { - createMarkerAndWait(prevStream, currStream); - } - // ToDo : Add batching for multi device graph launch - if (topoOrder_[i]->GraphCaptureEnabled()) { - if (topoOrder_[i]->GetEnabled()) { - std::vector& gpuPackets = topoOrder_[i]->GetAqlPackets(); - std::vector kernelNames; - for (auto& packet : gpuPackets) { - kernelNames.push_back(topoOrder_[i]->GetKernelName()); - } - currStream->vdev()->dispatchAqlPacketBatch(gpuPackets, kernelNames, accumulate); - } - } else { - topoOrder_[i]->SetStream(currStream); - status = topoOrder_[i]->CreateCommand(topoOrder_[i]->GetQueue()); - topoOrder_[i]->EnqueueCommands(currStream); - } - prevStream = currStream; - } - - // Synchronize back to launch stream if we ended on a different device - if (prevStream != launch_stream) { - createMarkerAndWait(prevStream, launch_stream); - } - - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - accumulate->enqueue(); - accumulate->release(); - } - return status; } // ================================================================================================ void GraphExec::UpdateStreams(hip::Stream* launch_stream) { int devId = launch_stream->vdev()->device().index(); + // Clear any previous stream assignments + streams_.clear(); + // Current stream is the default in the assignment + streams_.push_back(launch_stream); if (parallel_streams_.find(devId) == parallel_streams_.end()) { LogPrintfError("UpdateStreams failed for device id:%d", devId); return; } auto parallel_streams = parallel_streams_[devId]; - // Current stream is the default in the assignment - streams_.push_back(launch_stream); std::unordered_map unique_stream_ids; unique_stream_ids[launch_stream->getQueueID()] = 1; std::vector collided_streams; @@ -1094,13 +1900,24 @@ bool Graph::RunNodes(int32_t base_stream, const std::vector* paral return true; } +hipError_t ihipGraphDebugDotPrint(hip::Graph* graph, const char* path, unsigned int flags); + // ================================================================================================ hipError_t GraphExec::Run(hip::Stream* launch_stream) { hipError_t status = hipSuccess; + + // Get the first node based on scheduling mode + Node firstNode = nullptr; + if (use_segment_scheduling_ && !segments_.empty() && !segments_[0].nodes.empty()) { + firstNode = segments_[0].nodes[0]; + } else if (!topoOrder_.empty()) { + firstNode = topoOrder_[0]; + } + if (flags_ & hipGraphInstantiateFlagAutoFreeOnLaunch) { - if (!topoOrder_.empty()) { - topoOrder_[0]->GetParentGraph()->FreeAllMemory(launch_stream); - topoOrder_[0]->GetParentGraph()->memalloc_nodes_ = 0; + if (firstNode != nullptr) { + firstNode->GetParentGraph()->FreeAllMemory(launch_stream); + firstNode->GetParentGraph()->memalloc_nodes_ = 0; if (!AMD_DIRECT_DISPATCH) { // The MemoryPool::FreeAllMemory queues a memory unmap command that for !AMD_DIRECT_DISPATCH // runs asynchonously. Make sure that freeAllMemory is complete before creating new commands @@ -1112,32 +1929,54 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { // If this is a repeat launch, make sure corresponding MemFreeNode exists for a MemAlloc node if (repeatLaunch_ == true) { - if (!topoOrder_.empty() && topoOrder_[0]->GetParentGraph()->GetMemAllocNodeCount() > 0) { + if (firstNode != nullptr && firstNode->GetParentGraph()->GetMemAllocNodeCount() > 0) { return hipErrorInvalidValue; } } else { repeatLaunch_ = true; } - ClPrint(amd::LOG_DEBUG, amd::LOG_CODE, - "GraphExec::Run max_streams: %d, " - "on device: %d, total number of nodes: %d", - max_streams_, launch_stream->DeviceId(), topoOrder_.size()); - if (max_streams_ == 1 && max_streams_dev_.size() == 1 && - max_streams_dev_.begin()->first == launch_stream->DeviceId()) { - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - // If the graph has kernels that does device side allocation, during packet capture, heap is - // allocated because heap pointer has to be added to the AQL packet, and initialized during - // graph launch. - static bool initialized = false; - if (!initialized && HasHiddenHeap()) { - launch_stream->vdev()->HiddenHeapInit(); - initialized = true; + ClPrint(amd::LOG_DEBUG, amd::LOG_CODE, "GraphExec::Run max_streams: %d, on device: %d", + max_streams_, launch_stream->DeviceId()); + + if (use_segment_scheduling_ && instantiateDeviceId_ == launch_stream->DeviceId()) { + // If the graph has kernels that does device side allocation, during packet capture, heap is + // allocated because heap pointer has to be added to the AQL packet, and initialized during + // graph launch. + static bool initialized = false; + // Todo: Hidden heap initialization is done only for single device graph + if (!initialized && HasHiddenHeap()) { + launch_stream->vdev()->HiddenHeapInit(); + initialized = true; + } + // Update streams for the graph execution only if launch stream changed + if (lastLaunchStream_ != launch_stream) { + UpdateStreams(launch_stream); + lastLaunchStream_ = launch_stream; + } + amd::Command* last_cmd = nullptr; + if (max_streams_dev_.size() == 1) { + // Single-device: pass collision-handled streams_ to EnqueueSegmentedGraph + last_cmd = EnqueueSegmentedGraph(launch_stream, streams_, &status); + } else { + // Multi-device: pass empty vector, will use parallel_streams_ internally + last_cmd = EnqueueSegmentedGraph(launch_stream, {}, &status); + } + + // Release the last command as we don't need to track it for top-level graph execution + if (last_cmd != nullptr) { + last_cmd->release(); + } + if (DEBUG_HIP_GRAPH_DOT_PRINT && !graph_dumped_) { + graph_dumped_ = true; + std::string filename = + "graph_" + std::to_string(amd::Os::getProcessId()) + "_dot_print_launch_1"; + hipError_t status = ihipGraphDebugDotPrint(this, filename.c_str(), 0); + if (status == hipSuccess) { + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, "[hipGraph] graph dump:%s", + filename.c_str()); } } - status = EnqueueGraphWithSingleList(launch_stream); - } else if (max_streams_ == 1 && max_streams_dev_.size() > 1) { - status = EnqueueMultiDeviceLinearGraph(launch_stream); } else if (max_streams_ == 1 && instantiateDeviceId_ != launch_stream->DeviceId()) { for (int i = 0; i < topoOrder_.size(); i++) { topoOrder_[i]->SetStream(launch_stream); @@ -1145,8 +1984,11 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { topoOrder_[i]->EnqueueCommands(launch_stream); } } else { - // Update streams for the graph execution - UpdateStreams(launch_stream); + // Update streams for the graph execution only if launch stream changed + if (lastLaunchStream_ != launch_stream) { + UpdateStreams(launch_stream); + lastLaunchStream_ = launch_stream; + } // Execute all nodes in the graph if (!RunNodes()) { LogError("Failed to launch nodes!"); @@ -1160,6 +2002,8 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) { amd::Event& event = CallbackCommand->event(); constexpr bool kBlocking = false; if (!event.setCallback(CL_COMPLETE, GraphExec::DecrementRefCount, this, kBlocking)) { + this->release(); + CallbackCommand->release(); return hipErrorInvalidHandle; } CallbackCommand->enqueue(); @@ -1204,7 +2048,6 @@ address GraphKernelArgManager::AllocKernArg(size_t size, size_t alignment, int d } auto& current_pool = device_pools.back(); - // Calculate aligned address for the allocation address aligned_addr = amd::alignUp(current_pool.kernarg_pool_addr_ + current_pool.kernarg_pool_offset_, alignment); const size_t new_pool_usage = (aligned_addr + size) - current_pool.kernarg_pool_addr_; @@ -1247,7 +2090,7 @@ void GraphKernelArgManager::ReadBackOrFlush() { // Perform readback operation on the last byte of the pool address dev_ptr = pool.kernarg_pool_addr_ + pool.kernarg_pool_size_; volatile unsigned char* sentinel_ptr = reinterpret_cast(dev_ptr - 1); - + // Read-modify-write sequence with memory barriers volatile unsigned char kSentinel = *sentinel_ptr; _mm_sfence(); diff --git a/projects/clr/hipamd/src/hip_graph_internal.hpp b/projects/clr/hipamd/src/hip_graph_internal.hpp index b19d2390ff..ca3966ba66 100644 --- a/projects/clr/hipamd/src/hip_graph_internal.hpp +++ b/projects/clr/hipamd/src/hip_graph_internal.hpp @@ -320,6 +320,7 @@ class GraphNode : public hipGraphNodeDOTAttribute { const std::vector& GetDependencies() const { return dependencies_; } /// Update graph node dependecies void SetDependencies(std::vector& dependencies) { + dependencies_.clear(); for (auto entry : dependencies) { dependencies_.push_back(entry); } @@ -366,6 +367,7 @@ class GraphNode : public hipGraphNodeDOTAttribute { const std::vector& GetEdges() const { return edges_; } /// Updates graph node children void SetEdges(std::vector& edges) { + edges_.clear(); for (auto entry : edges) { edges_.push_back(entry); } @@ -425,19 +427,10 @@ class GraphNode : public hipGraphNodeDOTAttribute { } unsigned int GetEnabled() const { return isEnabled_; } void SetEnabled(unsigned int isEnabled) { isEnabled_ = isEnabled; } - // Returns true if capture is enabled for the current node. + + // Base implementation returns false; specific node types should override. virtual bool GraphCaptureEnabled() { - bool isGraphCapture = false; - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { - switch (GetType()) { - case hipGraphNodeTypeMemset: - isGraphCapture = true; - break; - default: - break; - } - } - return isGraphCapture; + return false; } virtual void PrintAttributes(std::ostream& out, hipGraphDebugDotFlags flag) override { out << "["; @@ -454,6 +447,7 @@ class GraphNode : public hipGraphNodeDOTAttribute { out << GetLabel(flag); if (DEBUG_HIP_GRAPH_DOT_PRINT) { out << "\nStreamId:" << stream_id_; + out << "\nSegmentId:" << segment_id_; out << "\nSignalIsRequired: " << ((signal_is_required_) ? "true" : "false"); out << "\nDeviceId:" << dev_id_; } @@ -479,6 +473,7 @@ class GraphNode : public hipGraphNodeDOTAttribute { size_t inDegree_; //!< count of in coming edges (@todo: remove, it's dependencies_.size()) size_t outDegree_; //!< count of outgoing edges (@todo: remove, it's edges_.size()) int32_t stream_id_ = -1; //! Stream ID on which this node will be executed + int32_t segment_id_ = -1; //! Segment ID on which this node will be executed int32_t launch_id_ = -1; //! Launch ID of this node in the entire graph execution sequence static int nextID; Graph* parentGraph_; @@ -556,6 +551,8 @@ class Graph { graphSet_.insert(this); mem_pool_ = device->GetGraphMemoryPool(); graphInstantiated_ = false; + // Initialize per-graph segment scheduling flag from global env var + use_segment_scheduling_ = DEBUG_HIP_GRAPH_SEGMENT_SCHEDULING; roots_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES); leafs_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES); wait_order_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES); @@ -568,7 +565,7 @@ class Graph { } } } - ~Graph() { + virtual ~Graph() { for (auto node : vertices_) { delete node; } @@ -639,6 +636,8 @@ class Graph { const std::vector& GetTopoOrder() const { return topoOrder_; } /// returns all the edges in the graph std::vector> GetEdges() const; + /// Returns whether segment scheduling is enabled for this graph + bool IsSegmentSchedulingEnabled() const { return use_segment_scheduling_; } // returns the original graph ptr if cloned const Graph* getOriginalGraph() const { return pOriginalGraph_; } // Add user obj resource to graph @@ -679,7 +678,43 @@ class Graph { ); //! Schedules all nodes in the graph into different streams - void ScheduleNodes(); + hipError_t ScheduleNodes(); + + // Hierarchical path structure for child graph support + struct HierarchicalPath { + std::vector nodes; //!< Nodes in this path (at this level only) + Node child_graph_node = nullptr; //!< Reference to child graph node if present in path + int child_graph_paths_index = -1; //!< Index into child_graph_paths (-1 if no child) + int device_id = -1; //!< Device ID for this path + }; + + // Structure to store execution paths for a graph and its children hierarchically + struct GraphExecutionPaths { + Graph* graph_ptr = nullptr; //!< Pointer to the graph this belongs to + std::vector paths; //!< All execution paths at this level only + std::vector child_graph_paths; //!< Child graph execution paths + }; + + //! Schedules nodes into batches for optimized execution + hipError_t ScheduleNodesIntoBatches(); + + //! Find execution paths hierarchically, keeping child graphs separate + GraphExecutionPaths FindExecutionPathsHierarchical(); + + //! Recursively find all paths from a node with hierarchical child graph handling + void FindPathsRecursiveHierarchical(Node node, + std::vector& current_path, + std::unordered_set& visited, + GraphExecutionPaths& graph_paths); + + //! Create segments from hierarchical execution paths + void CreateSegmentsFromPaths(const GraphExecutionPaths& exec_paths); + + //! Resolve dependencies between segments + void ResolveSegmentDependencies(); + + //! Calculate dependency levels for segments using topological sort + void CalculateSegmentTopoDependencyLevels(); //! Runs one node on the assigned stream bool RunOneNode(Node node); //!< Node for the execution on GPU @@ -785,8 +820,35 @@ class Graph { //!< during multi-device graph execution scheduling. std::unordered_map> streams_dev_ids_; int instantiateDeviceId_ = -1; - //! Topological order of the graph doesn't include nodes embedded as part of the child graph + //! Topological order of the graph doesn't include nodes embedded as part of the child graph std::vector topoOrder_; + + // Segment dependency structures + struct Segment { + int id = -1; + int stream_id = -1; // Assigned stream for this segment + int dependency_level = -1; // Topological level (0 = root, 1 = depends on root, etc.) + std::vector nodes; + std::vector segment_ids_dependencies; // Segments this segment depends on (within same graph) + std::vector segment_ids_edges; // Segments that depend on this segment (within same graph) + Node first_node = nullptr; + Node last_node = nullptr; + + // Hierarchical child graph information + Graph* child_graph_ptr = nullptr; // Direct pointer to child graph for quick access + }; + + //! Segment information for batch scheduling + std::vector segments_; + //! Map of node to segment ID + std::unordered_map node_to_segment_id_; + //! Maximum dependency level in the segment graph + int max_dependency_level_ = -1; + //!< Map of dependency level to list of segment IDs at that level + std::unordered_map> segments_per_level_; + + std::unordered_map clonedNodes_; + private: friend class GraphExec; std::vector vertices_; @@ -807,7 +869,10 @@ class Graph { hip::MemoryPool* mem_pool_; //!< Memory pool, associated with this graph std::unordered_set capturedNodes_; bool graphInstantiated_; - std::unordered_map clonedNodes_; + //!< Per-graph flag to control segment scheduling + //!< Can be disabled per-graph for complex graphs that benefit from classic path + bool use_segment_scheduling_; + //! Map of device ID to vector of streams allocated for that device during graph execution. //! Each device may require multiple streams to handle parallel execution of graph nodes. std::unordered_map> streams_dev_; @@ -815,6 +880,17 @@ class Graph { //! Map tracking the maximum number of concurrent streams required per device for graph execution. //! Key: device ID, Value: maximum number of streams needed for that device std::unordered_map max_streams_dev_; + + // Batch-based scheduling structures + struct Batch { + int id = -1; + int stream_id = 0; + std::vector nodes; + std::vector incoming_stream_ids; + Node last_node = nullptr; + }; + + std::vector batches_; }; class GraphExec : public amd::ReferenceCountedObject, public Graph { @@ -822,6 +898,7 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { static std::unordered_set graphExecSet_; static amd::Monitor graphExecSetLock_; static amd::Monitor graphExecStreamCreateLock_; + bool graph_dumped_ = false; GraphExec(uint64_t flags = 0) : ReferenceCountedObject(), Graph(hip::getCurrentDevice()), flags_(flags) { amd::ScopedLock lock(graphExecSetLock_); @@ -832,20 +909,20 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { for (auto streams : parallel_streams_) { for (auto stream : streams.second) { if (stream != nullptr) { + stream->finish(); constexpr bool kForceDestroy = true; hip::Stream::Destroy(stream, kForceDestroy); } } } parallel_streams_.clear(); - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + if (IsSegmentSchedulingEnabled()) { if (kernArgManager_ != nullptr) { kernArgManager_->release(); } } - packetBatches_.clear(); - nodeCaptureStatus_.clear(); + segmentBatches_.clear(); } Node GetClonedNode(Node node) { @@ -885,9 +962,13 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { static void DecrementRefCount(cl_event event, cl_int command_exec_status, void* user_data); hipError_t CaptureAndFormPacketsForGraph(); void GetKernelArgSizeForGraph(std::unordered_map& kernArgSizeForGraph); - hipError_t EnqueueGraphWithSingleList(hip::Stream* hip_stream); - //! Enqueue a multi-device linear graph for execution - hipError_t EnqueueMultiDeviceLinearGraph(hip::Stream* hip_stream); + + amd::Command* EnqueueSegmentedGraph(hip::Stream* launch_stream, + const std::vector& streams, + hipError_t* out_status = nullptr); + hipError_t EnqueueSegment(const Segment& segment, hip::Stream* stream, + amd::AccumulateCommand* accumulate); + bool TopologicalOrder() { return Graph::TopologicalOrder(topoOrder_); } //! Update streams for the graph execution with launch stream from application void UpdateStreams(hip::Stream* launch_stream); @@ -895,20 +976,41 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { //! This method analyzes the stream-to-device mappings and recursively processes //! child graphs to determine the maximum concurrent streams needed per device void FindStreamsReqPerDev(); + //! Find the number of streams required per device for packet engine mode + //! This method analyzes segments to determine per-device stream requirements + void FindStreamsReqPerDevForSegments(); + //! Get the parallel streams map for synchronization before destruction + const std::unordered_map>& GetParallelStreams() const { + return parallel_streams_; + } protected: + //! Assign streams to segments at a given dependency level + void AssignStreamsToSegments( + const std::vector& segments_at_level, + hip::Stream* launch_stream, + const std::vector& streams, + std::unordered_map& segment_to_stream); + //! parallel streams per device std::unordered_map> parallel_streams_; uint64_t flags_ = 0; GraphKernelArgManager* kernArgManager_ = nullptr; //!< Kernel Arg manager for graph. bool hasHiddenHeap_ = false; //!< Hidden heap indicator for Kernel node bool repeatLaunch_ = false; + //!< Track last launch stream to avoid redundant UpdateStreams + hip::Stream* lastLaunchStream_ = nullptr; // PacketBatch structure struct PacketBatch { // Main dispatch vectors - always ready for batch dispatch std::vector dispatchPackets; std::vector dispatchKernelNames; + + // Cached filtered lists - built on-demand when nodes are disabled + std::vector enabledPackets; + std::vector enabledKernelNames; + // Node tracking struct NodeRange { size_t startIndex; // Start index in dispatchPackets @@ -921,13 +1023,22 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph { PacketBatch() {} // O(1) enable/disable operations - just update state void setEnabled(GraphNode* node, bool enabled); + // Rebuild cached filtered lists if cache is stale + void rebuildFilteredLists(); + }; + + //! Structure linking packet batches to segments + struct SegmentBatch { + int segment_id; // Segment this batch belongs to + std::vector node_capture_status; // Capture status for each node in this segment + std::vector packet_batches; // All packet batches for this segment + + SegmentBatch(int seg_id) : segment_id(seg_id) {} }; //! Batches of accumulated packets and kernel names for batch dispatch optimization - //! Each batch contains packets from consecutive captured nodes - std::vector packetBatches_; - //! Track which nodes were successfully captured (true) vs need individual execution (false) - std::vector nodeCaptureStatus_; + //! Map from segment ID to SegmentBatch for O(1) lookup + std::unordered_map segmentBatches_; }; class ChildGraphNode : public GraphNode, public GraphExec { @@ -950,6 +1061,13 @@ class ChildGraphNode : public GraphNode, public GraphExec { bool GetGraphCaptureStatus() { return graphCaptureStatus_; } + bool GraphCaptureEnabled() override { + if (IsSegmentSchedulingEnabled()) { + return graphCaptureStatus_; + } + return false; + } + std::vector& GetChildGraphNodeOrder() { return topoOrder_; } void SetStream(hip::Stream* stream) override { stream_ = stream; } @@ -959,9 +1077,26 @@ class ChildGraphNode : public GraphNode, public GraphExec { } void EnqueueCommands(hip::Stream* stream) override { - if (graphCaptureStatus_) { - hipError_t status = EnqueueGraphWithSingleList(stream); + // Note: For segmented graphs, EnqueueSegment now calls EnqueueSegmentedGraph recursively + // This method is kept as a fallback for non-segmented execution or legacy paths + + if (graphCaptureStatus_ || !segments_.empty()) { + // Use hierarchical segment-based enqueue via EnqueueSegmentedGraph + // Use this child graph's own parallel_streams_, so pass empty vector + hipError_t status = hipSuccess; + amd::Command* last_cmd = EnqueueSegmentedGraph(stream, {}, &status); + + if (last_cmd != nullptr) { + // This is a fallback path - we don't need to track the command + last_cmd->release(); + } + + if (status != hipSuccess) { + ClPrint(amd::LOG_ERROR, amd::LOG_CODE, + "[hipGraph] ChildGraphNode::EnqueueCommands failed with status=%d", status); + } } else if (max_streams_ == 1) { + // Legacy topological order execution for non-segmented graphs for (int i = 0; i < topoOrder_.size(); i++) { topoOrder_[i]->SetStream(stream_); hipError_t status = topoOrder_[i]->CreateCommand(topoOrder_[i]->GetQueue()); @@ -1054,6 +1189,7 @@ class GraphKernelNode : public GraphNode { out << GetLabel(flag); if (DEBUG_HIP_GRAPH_DOT_PRINT) { out << "StreamId:" << stream_id_; + out << "\nSegmentId:" << segment_id_; out << "\nSignalIsRequired: " << ((signal_is_required_) ? "true" : "false"); out << "\nDeviceId:" << dev_id_; } @@ -1137,7 +1273,7 @@ class GraphKernelNode : public GraphNode { } hip::DeviceFunc* function = hip::DeviceFunc::asFunction(func); amd::Kernel* kernel = function->kernel(); - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + if (parentGraph_ != nullptr && parentGraph_->IsSegmentSchedulingEnabled()) { auto device = g_devices[dev_id_]->devices()[0]; device::Kernel* devKernel = const_cast(kernel->getDeviceKernel(*device)); kernargSegmentByteSize_ = devKernel->KernargSegmentByteSize(); @@ -1270,6 +1406,11 @@ class GraphKernelNode : public GraphNode { GraphNode* clone() const override { return new GraphKernelNode(*this); } hipError_t CreateCommand(hip::Stream* stream) override { + // Clear commands_ first, even if node is disabled + hipError_t status = GraphNode::CreateCommand(stream); + if (status != hipSuccess) { + return status; + } if (!isEnabled_) { return hipSuccess; } @@ -1280,14 +1421,10 @@ class GraphKernelNode : public GraphNode { hip::DeviceFunc* function = hip::DeviceFunc::asFunction(func); amd::Kernel* kernel = function->kernel(); amd::ScopedLock lock(function->dflock_); - hipError_t status = validateKernelParams(&kernelParams_, func, dev_id_); + status = validateKernelParams(&kernelParams_, func, dev_id_); if (hipSuccess != status) { return status; } - status = GraphNode::CreateCommand(stream); - if (status != hipSuccess) { - return status; - } commands_.reserve(1); amd::Command* command; uint32_t flags = 0; @@ -1471,14 +1608,13 @@ class GraphKernelNode : public GraphNode { } virtual bool GraphCaptureEnabled() override { - bool isGraphCapture = false; - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + if (parentGraph_ != nullptr && parentGraph_->IsSegmentSchedulingEnabled()) { // Disable capture for cooperative kernels if (!coopKernel_) { - isGraphCapture = true; + return true; } } - return isGraphCapture; + return false; } }; @@ -1500,15 +1636,16 @@ class GraphMemcpyNode : public GraphNode { GraphNode* clone() const override { return new GraphMemcpyNode(*this); } virtual hipError_t CreateCommand(hip::Stream* stream) override { + // Clear commands_ first, even if node is disabled + hipError_t status = GraphNode::CreateCommand(stream); + if (status != hipSuccess) { + return status; + } if (!isEnabled_ || ((copyParams_.kind == hipMemcpyHostToHost || copyParams_.kind == hipMemcpyDefault) && IsHtoHMemcpy(copyParams_.dstPtr.ptr, copyParams_.srcPtr.ptr))) { return hipSuccess; } - hipError_t status = GraphNode::CreateCommand(stream); - if (status != hipSuccess) { - return status; - } commands_.reserve(1); amd::Command* command; status = ihipMemcpy3DCommand(command, ©Params_, stream); @@ -1632,17 +1769,16 @@ class GraphMemcpyNode : public GraphNode { } } virtual bool GraphCaptureEnabled() override { - bool isGraphCapture = false; - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + if (parentGraph_ != nullptr && parentGraph_->IsSegmentSchedulingEnabled()) { switch (copyParams_.kind) { case hipMemcpyDeviceToDevice: - isGraphCapture = true; + return true; break; default: break; } } - return isGraphCapture; + return false; } }; @@ -1705,14 +1841,15 @@ class GraphMemcpyNode1D : public GraphMemcpyNode { GraphNode* clone() const override { return new GraphMemcpyNode1D(*this); } virtual hipError_t CreateCommand(hip::Stream* stream) override { - if (!isEnabled_ || - ((kind_ == hipMemcpyHostToHost || kind_ == hipMemcpyDefault) && IsHtoHMemcpy(dst_, src_))) { - return hipSuccess; - } + // Clear commands_ first, even if node is disabled hipError_t status = GraphNode::CreateCommand(stream); if (status != hipSuccess) { return status; } + if (!isEnabled_ || + ((kind_ == hipMemcpyHostToHost || kind_ == hipMemcpyDefault) && IsHtoHMemcpy(dst_, src_))) { + return hipSuccess; + } commands_.reserve(1); amd::Command* command = nullptr; if (!AMD_DIRECT_DISPATCH) { @@ -1867,18 +2004,17 @@ class GraphMemcpyNode1D : public GraphMemcpyNode { } } virtual bool GraphCaptureEnabled() override { - bool isGraphCapture = false; - if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) { + if (parentGraph_ != nullptr && parentGraph_->IsSegmentSchedulingEnabled()) { hip::MemcpyType type = ihipGetMemcpyType(src_, dst_, kind_); switch (type) { case hipCopyBuffer: - isGraphCapture = true; + return true; break; default: break; } } - return isGraphCapture; + return false; } }; @@ -2139,6 +2275,13 @@ class GraphMemsetNode : public GraphNode { } } + virtual bool GraphCaptureEnabled() override { + if (parentGraph_ != nullptr && parentGraph_->IsSegmentSchedulingEnabled()) { + return true; + } + return false; + } + hipError_t CreateCommand(hip::Stream* stream) override { hipError_t status = GraphNode::CreateCommand(stream); if (status != hipSuccess) { @@ -2319,6 +2462,8 @@ class GraphHostNode : public GraphNode { amd::Command::EventWaitList waitList; commands_.reserve(1); amd::Command* command = new amd::Marker(*stream, !kMarkerDisableFlush, waitList); + // This is just to invoke a callback, so no need to flush caches. + command->setCommandEntryScope(amd::Device::kCacheStateIgnore); commands_.emplace_back(command); return hipSuccess; } @@ -2333,6 +2478,9 @@ class GraphHostNode : public GraphNode { if (!commands_[0]->setCallback(CL_COMPLETE, GraphHostNode::Callback, &NodeParams_)) { ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed during setCallback"); } + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_CODE, + "EnqueueCommands: NodeParams_.fn=%p, NodeParams_.userData=%p", NodeParams_.fn, + NodeParams_.userData); commands_[0]->enqueue(); // Add the new barrier to stall the stream, until the callback is done amd::Command::EventWaitList eventWaitList; @@ -2342,6 +2490,8 @@ class GraphHostNode : public GraphNode { if (block_command == nullptr) { ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed during block command creation"); } + // This is just to invoke a callback, so no need to flush caches. + block_command->setCommandEntryScope(amd::Device::kCacheStateIgnore); block_command->enqueue(); block_command->notifyCmdQueue(); block_command->release(); diff --git a/projects/clr/hipamd/src/hip_internal.hpp b/projects/clr/hipamd/src/hip_internal.hpp index e533c7d4e3..7b93b2a166 100644 --- a/projects/clr/hipamd/src/hip_internal.hpp +++ b/projects/clr/hipamd/src/hip_internal.hpp @@ -47,6 +47,8 @@ #define KCYN "\x1B[36m" #define KWHT "\x1B[37m" +template T ReturnPtrValue(T* ptr) { return (ptr != nullptr) ? *ptr : nullptr; } + namespace hip{ extern std::once_flag g_ihipInitialized; } diff --git a/projects/clr/hipamd/src/hip_memory.cpp b/projects/clr/hipamd/src/hip_memory.cpp index 35d9c7517a..f802c4b68c 100644 --- a/projects/clr/hipamd/src/hip_memory.cpp +++ b/projects/clr/hipamd/src/hip_memory.cpp @@ -34,8 +34,6 @@ namespace hip { amd::Monitor hipArraySetLock{}; std::unordered_set hipArraySet; -template T ReturnPtrValue(T* ptr) { return (ptr != nullptr) ? *ptr : nullptr; } - // ================================================================================================ amd::Memory* getMemoryObject(const void* ptr, size_t& offset, size_t size) { auto memObj = amd::MemObjMap::FindMemObj(ptr, &offset); diff --git a/projects/clr/rocclr/device/pal/palsettings.cpp b/projects/clr/rocclr/device/pal/palsettings.cpp index 1f242f46d1..70ba6d6f81 100644 --- a/projects/clr/rocclr/device/pal/palsettings.cpp +++ b/projects/clr/rocclr/device/pal/palsettings.cpp @@ -131,7 +131,7 @@ Settings::Settings() { : HIP_FORCE_DEV_KERNARG; limit_blit_wg_ = 16; - DEBUG_CLR_GRAPH_PACKET_CAPTURE = false; // disable graph performance optimizations for PAL + DEBUG_HIP_GRAPH_SEGMENT_SCHEDULING = 0; // disable graph performance optimizations for PAL } bool Settings::create(const Pal::DeviceProperties& palProp, diff --git a/projects/clr/rocclr/device/rocm/rocblit.cpp b/projects/clr/rocclr/device/rocm/rocblit.cpp index b4c0db7c16..09c3c03b04 100644 --- a/projects/clr/rocclr/device/rocm/rocblit.cpp +++ b/projects/clr/rocclr/device/rocm/rocblit.cpp @@ -352,7 +352,7 @@ bool DmaBlitManager::copyBufferRect(device::Memory& srcMemory, device::Memory& d } } - // The hsa copy api would result in a dirty cache state + // The ROCR copy api guarantees coherency after the copy gpu().setFenceDirty(false); return true; } @@ -590,7 +590,7 @@ inline bool DmaBlitManager::rocrCopyBuffer(address dst, hsa_agent_t& dstAgent, c if (status == HSA_STATUS_SUCCESS) { gpu().addSystemScope(); - // The hsa copy api would result in a dirty cache state + // The ROCR copy api guarantees coherency after the copy gpu().setFenceDirty(false); } else { gpu().Barriers().ResetCurrentSignal(); diff --git a/projects/clr/rocclr/device/rocm/rocvirtual.cpp b/projects/clr/rocclr/device/rocm/rocvirtual.cpp index 76a40c7667..c26a5a2e9b 100644 --- a/projects/clr/rocclr/device/rocm/rocvirtual.cpp +++ b/projects/clr/rocclr/device/rocm/rocvirtual.cpp @@ -553,8 +553,10 @@ hsa_signal_t VirtualGPU::HwQueueTracker::ActiveSignal(hsa_signal_value_t init_va if (HSA_STATUS_SUCCESS != result) { LogError("hsa_amd_signal_async_handler() failed to set the handler!"); } else { - ClPrint(amd::LOG_INFO, amd::LOG_SIG, "Set Handler: handle(0x%lx), timestamp(%p)", - prof_signal->signal_.handle, prof_signal); + ClPrint(amd::LOG_INFO, amd::LOG_SIG, + "Set Handler: handle(0x%lx), timestamp(%p), blocking CB=%d", + prof_signal->signal_.handle, prof_signal, + ts->command().Callback() != nullptr && ts->GetBlocking()); } } } @@ -1009,7 +1011,7 @@ bool VirtualGPU::dispatchGenericAqlPacket(AqlPacket* packet, uint16_t header, ui // Check for queue full and wait if needed. uint64_t index = Hsa::queue_add_write_index_screlease(gpu_queue_, 1); - fence_dirty_ = true; + setFenceDirty(true); if (addSystemScope_) { header &= ~(HSA_FENCE_SCOPE_AGENT << HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE | @@ -1024,14 +1026,14 @@ bool VirtualGPU::dispatchGenericAqlPacket(AqlPacket* packet, uint16_t header, ui // Reset fence_dirty_ flag if we submit a packet with system scopes if (expected_fence_state == amd::Device::kCacheStateSystem) { - fence_dirty_ = false; + setFenceDirty(false); } // Dirty optimization to save on consequent dispatch packets which have requested flushes if (fence_state_ == amd::Device::kCacheStateSystem && expected_fence_state == amd::Device::kCacheStateSystem) { header = dispatchPacketHeader_; - fence_dirty_ = true; + setFenceDirty(true); } fence_state_ = static_cast(expected_fence_state); @@ -1076,7 +1078,7 @@ bool VirtualGPU::dispatchGenericAqlPacket(AqlPacket* packet, uint16_t header, ui if (header != 0) { packet_store_release(reinterpret_cast(aql_loc), header, rest); } - ClPrint(amd::LOG_DEBUG, amd::LOG_AQL, + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = " "0x%x (type=%d, barrier=%d, acquire=%d, release=%d), " "setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], private_seg_size=%u, " @@ -1204,12 +1206,18 @@ bool VirtualGPU::dispatchGenericAqlPacketBatch(const std::vector& pa amd::Os::yield(); } - fence_dirty_ = true; + setFenceDirty(true); // Save header of first packet in this batch AqlPacket* firstPacket = packets[processedPackets]; uint16_t firstPacketHeader = firstPacket->header; uint16_t firstPacketRest = firstPacket->setup; + // Separate header for doorbell ring that can be modified + uint16_t doorbellHeader = firstPacketHeader; + + // Save header of last packet in this batch (if different from first) + AqlPacket* lastPacket = packets[processedPackets + batchSize - 1]; + uint16_t lastPacketHeader = lastPacket->header; // Process batchSize packets for (size_t i = 0; i < batchSize; ++i) { @@ -1217,8 +1225,6 @@ bool VirtualGPU::dispatchGenericAqlPacketBatch(const std::vector& pa uint64_t index = startIndex + i; AqlPacket* packet = packets[packetIndex]; - uint16_t header = packet->header; - bool attachSignal = timestamp_ != nullptr || attach_signal; @@ -1247,84 +1253,105 @@ bool VirtualGPU::dispatchGenericAqlPacketBatch(const std::vector& pa AqlPacket* aql_loc = &((AqlPacket*)(gpu_queue_->base_address))[index & queueMask]; // For first packet in batch, invalidate header before writing - if (i == 0) { + bool isFirstPacket = (i == 0); + bool isLastPacket = (i == batchSize - 1); + + if (isFirstPacket) { if (addSystemScope_) { - // Add system scope on the acq on first packet - firstPacketHeader &= ~(HSA_FENCE_SCOPE_AGENT << HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE); - firstPacketHeader |= (HSA_FENCE_SCOPE_SYSTEM << HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE); + // Add system scope on the acq on first packet (modify doorbell header) + doorbellHeader &= ~(HSA_FENCE_SCOPE_AGENT << HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE); + doorbellHeader |= (HSA_FENCE_SCOPE_SYSTEM << HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE); } + // Invalidate the header of the first packet in the batch packet->header = (HSA_PACKET_TYPE_INVALID << HSA_PACKET_HEADER_TYPE); + } - // Copy the packet and then write the valid of the first packet - *aql_loc = *packet; - - // Restore the header of the first packet - packet->header = firstPacketHeader; - } else { - // For the end packet in batch set flags - if (i == batchSize - 1) { - if (addSystemScope_) { - // Add system scope on the release on last packet + // For the end packet in batch set flags + if (isLastPacket) { + if (addSystemScope_) { + // If batch has only 1 packet, update doorbell header for release scope + // (packet->header is already invalid, so don't modify it) + if (batchSize == 1) { + doorbellHeader &= ~(HSA_FENCE_SCOPE_AGENT << HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE); + doorbellHeader |= (HSA_FENCE_SCOPE_SYSTEM << HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE); + } else { + // Add system scope on the release on last packet (different from first) packet->header &= ~(HSA_FENCE_SCOPE_AGENT << HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE); packet->header |= (HSA_FENCE_SCOPE_SYSTEM << HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE); - addSystemScope_ = false; } - auto expected_fence_state = - extractAqlBits(packet->header, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, - HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE); - // Reset fence_dirty_ flag if we submit a packet with system scopes - if (expected_fence_state == amd::Device::kCacheStateSystem) { - fence_dirty_ = false; - } - fence_state_ = static_cast(expected_fence_state); + addSystemScope_ = false; } - - // Copy the packet to the queue - *aql_loc = *packet; + // Use doorbellHeader for single packet batch (packet->header is invalid), + // else use packet->header + uint16_t headerForFenceState = (batchSize == 1) ? doorbellHeader : packet->header; + auto expected_fence_state = + extractAqlBits(headerForFenceState, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, + HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE); + // Reset fence_dirty_ flag if we submit a packet with system scopes + if (expected_fence_state == amd::Device::kCacheStateSystem) { + setFenceDirty(false); + } + fence_state_ = static_cast(expected_fence_state); } + // Copy the packet to the queue + *aql_loc = *packet; + // Print kernel name for kernel dispatch packets if (kernelNames && packetIndex < kernelNames->size()) { + // Use doorbellHeader for first packet (packet->header is invalid), else use packet->header + uint16_t headerForPrint = isFirstPacket ? doorbellHeader : packet->header; uint8_t packetType = - extractAqlBits(header, HSA_PACKET_HEADER_TYPE, HSA_PACKET_HEADER_WIDTH_TYPE); + extractAqlBits(headerForPrint, HSA_PACKET_HEADER_TYPE, HSA_PACKET_HEADER_WIDTH_TYPE); if (packetType == HSA_PACKET_TYPE_KERNEL_DISPATCH) { ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_KERN2, "Graph ShaderName : %s, device id : %u", (*kernelNames)[packetIndex].c_str(), dev().index()); - ClPrint( - amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, - "SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = " - "0x%x (type=%d, barrier=%d, acquire=%d, release=%d), " - "setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], " - "private_seg_size=%u, group_seg_size=%u, kernel_obj=0x%zx, " - "kernarg_address=0x%zx, completion_signal=0x%zx, correlation_id=%zu, " - "rptr=%u, wptr=%u", - gpu_queue_, gpu_queue_->base_address, gpu_queue_->id, header, packetType, - extractAqlBits(header, HSA_PACKET_HEADER_BARRIER, HSA_PACKET_HEADER_WIDTH_BARRIER), - extractAqlBits(header, HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE, - HSA_PACKET_HEADER_WIDTH_SCACQUIRE_FENCE_SCOPE), - extractAqlBits(header, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, - HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE), - packet->setup, reinterpret_cast(packet)->grid_size_x, - reinterpret_cast(packet)->grid_size_y, - reinterpret_cast(packet)->grid_size_z, - reinterpret_cast(packet)->workgroup_size_x, - reinterpret_cast(packet)->workgroup_size_y, - reinterpret_cast(packet)->workgroup_size_z, - reinterpret_cast(packet)->private_segment_size, - reinterpret_cast(packet)->group_segment_size, - reinterpret_cast(packet)->kernel_object, - reinterpret_cast(packet)->kernarg_address, - reinterpret_cast(packet)->completion_signal, - reinterpret_cast(packet)->reserved2, - Hsa::queue_load_read_index_scacquire(gpu_queue_), index); + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, + "SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = " + "0x%x (type=%d, barrier=%d, acquire=%d, release=%d), " + "setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], " + "private_seg_size=%u, group_seg_size=%u, kernel_obj=0x%zx, " + "kernarg_address=0x%zx, completion_signal=0x%zx, correlation_id=%zu, " + "rptr=%u, wptr=%u", + gpu_queue_, gpu_queue_->base_address, gpu_queue_->id, headerForPrint, packetType, + extractAqlBits(headerForPrint, HSA_PACKET_HEADER_BARRIER, + HSA_PACKET_HEADER_WIDTH_BARRIER), + extractAqlBits(headerForPrint, HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE, + HSA_PACKET_HEADER_WIDTH_SCACQUIRE_FENCE_SCOPE), + extractAqlBits(headerForPrint, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, + HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE), + packet->setup, + reinterpret_cast(packet)->grid_size_x, + reinterpret_cast(packet)->grid_size_y, + reinterpret_cast(packet)->grid_size_z, + reinterpret_cast(packet)->workgroup_size_x, + reinterpret_cast(packet)->workgroup_size_y, + reinterpret_cast(packet)->workgroup_size_z, + reinterpret_cast(packet)->private_segment_size, + reinterpret_cast(packet)->group_segment_size, + reinterpret_cast(packet)->kernel_object, + reinterpret_cast(packet)->kernarg_address, + reinterpret_cast(packet)->completion_signal, + reinterpret_cast(packet)->reserved2, + Hsa::queue_load_read_index_scacquire(gpu_queue_), index); } } + + // Restore the header of the first packet + if (isFirstPacket) { + packet->header = firstPacketHeader; + } + + // Restore the header of the last packet (if different from first) + if (isLastPacket && batchSize > 1) { + packet->header = lastPacketHeader; + } } // Write valid header for the first packet in the batch AqlPacket* aql_loc = &((AqlPacket*)(gpu_queue_->base_address))[startIndex & queueMask]; - packet_store_release(reinterpret_cast(aql_loc), firstPacketHeader, firstPacketRest); + packet_store_release(reinterpret_cast(aql_loc), doorbellHeader, firstPacketRest); // Ring doorbell for this batch Hsa::signal_store_screlease(gpu_queue_->doorbell_signal, startIndex); @@ -1367,8 +1394,7 @@ bool VirtualGPU::dispatchAqlPacketBatch(const std::vector& packets, dispatchBlockingWait(); - // Add all kernel names in bulk - vcmd->addKernelNames(kernelNames); + vcmd->setKernelNamesRef(&kernelNames); // Dispatch all packets with a single doorbell ring // Cast packets vector to AQL packets vector on the fly @@ -1428,7 +1454,7 @@ void VirtualGPU::dispatchBarrierPacket(uint16_t packetHeader, bool skipSignal, uint64_t index = Hsa::queue_add_write_index_screlease(gpu_queue_, 1); uint64_t read = Hsa::queue_load_read_index_relaxed(gpu_queue_); - fence_dirty_ = true; + setFenceDirty(true); auto cache_state = extractAqlBits(packetHeader, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE); if (!skipSignal && (signal.handle == 0)) { @@ -1443,7 +1469,7 @@ void VirtualGPU::dispatchBarrierPacket(uint16_t packetHeader, bool skipSignal, // Reset fence_dirty_ flag if we submit a barrier with system scopes if (cache_state == amd::Device::kCacheStateSystem) { - fence_dirty_ = false; + setFenceDirty(false); } while ((index - Hsa::queue_load_read_index_scacquire(gpu_queue_)) >= queueMask); @@ -1453,7 +1479,7 @@ void VirtualGPU::dispatchBarrierPacket(uint16_t packetHeader, bool skipSignal, packet_store_release(reinterpret_cast(aql_loc), packetHeader, 0); Hsa::signal_store_screlease(gpu_queue_->doorbell_signal, index); - ClPrint(amd::LOG_DEBUG, amd::LOG_AQL, + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "SWq=0x%zx, HWq=0x%zx, id=%d, BarrierAND Header = 0x%x (type=%d, barrier=%d, acquire=%d," " release=%d), " "dep_signal=[0x%zx, 0x%zx, 0x%zx, 0x%zx, 0x%zx], completion_signal=0x%zx, " @@ -1512,7 +1538,7 @@ void VirtualGPU::dispatchBarrierValuePacket(uint16_t packetHeader, bool resolveD } } - fence_dirty_ = true; + setFenceDirty(true); auto cache_state = extractAqlBits(packetHeader, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE, HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE); @@ -1527,7 +1553,7 @@ void VirtualGPU::dispatchBarrierValuePacket(uint16_t packetHeader, bool resolveD // Reset fence_dirty_ flag if we submit a barrier if (cache_state == amd::Device::kCacheStateSystem) { - fence_dirty_ = false; + setFenceDirty(false); } uint64_t index = Hsa::queue_add_write_index_screlease(gpu_queue_, 1); @@ -1543,7 +1569,7 @@ void VirtualGPU::dispatchBarrierValuePacket(uint16_t packetHeader, bool resolveD Hsa::signal_store_screlease(gpu_queue_->doorbell_signal, index); - ClPrint(amd::LOG_DEBUG, amd::LOG_AQL, + ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "SWq=0x%zx, HWq=0x%zx, id=%d, BarrierValue Header = 0x%x AmdFormat = 0x%x " "(type=%d, barrier=%d, acquire=%d, release=%d), " "signal=0x%zx, value = 0x%llx mask = 0x%llx cond: %s, completion_signal=0x%zx, " @@ -1576,7 +1602,7 @@ void VirtualGPU::ResetQueueStates() { // ================================================================================================ bool VirtualGPU::releaseGpuMemoryFence(bool skip_cpu_wait) { - if (hasPendingDispatch_ || !Barriers().IsExternalSignalListEmpty()) { + if (hasPendingDispatch_ || isFenceDirty() || !Barriers().IsExternalSignalListEmpty()) { // Dispatch barrier packet into the queue dispatchBarrierPacket(kBarrierPacketHeader); hasPendingDispatch_ = false; @@ -1944,6 +1970,17 @@ void VirtualGPU::profilingBegin(amd::Command& command, bool sdmaProfiling) { } else { // Assume serialization on the same queue... } + + // Check if the waiting event's queue has a dirty fence and propagate it + if (!isFenceDirty()) { + amd::Command* wait_cmd = static_cast(*it); + if (wait_cmd->queue() != nullptr && wait_cmd->queue() != command.queue()) { + device::VirtualDevice* wait_vdev = wait_cmd->queue()->vdev(); + if (wait_vdev != nullptr && wait_vdev->isFenceDirty()) { + setFenceDirty(true); + } + } + } } } } @@ -3688,7 +3725,7 @@ bool VirtualGPU::submitKernelInternal(const amd::NDRangeContainer& sizes, const if (isGraphCapture) { argBuffer = command_->getGraphKernArg(gpuKernel.KernargSegmentByteSize(), gpuKernel.KernargSegmentAlignment(), dev().index()); - command_->SetKernelName(gpuKernel.getDemangledName().c_str()); + command_->SetKernelName(gpuKernel.getDemangledName()); } else { ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_KERN, "KernargSegmentByteSize = %lu " @@ -3916,6 +3953,7 @@ void VirtualGPU::submitMarker(amd::Marker& vcmd) { if (timestamp_ != nullptr) { const Settings& settings = dev().settings(); int32_t releaseFlags = vcmd.getCommandEntryScope(); + if (releaseFlags == Device::CacheState::kCacheStateIgnore) { if (settings.barrier_value_packet_ && vcmd.profilingInfo().marker_ts_) { dispatchBarrierValuePacket(kBarrierVendorPacketNopScopeHeader, true); diff --git a/projects/clr/rocclr/device/rocm/rocvirtual.hpp b/projects/clr/rocclr/device/rocm/rocvirtual.hpp index 3ebe66aaa5..c7d71a8882 100644 --- a/projects/clr/rocclr/device/rocm/rocvirtual.hpp +++ b/projects/clr/rocclr/device/rocm/rocvirtual.hpp @@ -448,8 +448,8 @@ class VirtualGPU : public device::VirtualDevice { amd::Command* command() const { return command_; } void* allocKernArg(size_t size, size_t alignment); - bool isFenceDirty() const { return fence_dirty_; } - void setFenceDirty(bool state) { fence_dirty_ = state; } + bool isFenceDirty() const { return fence_dirty_.load(std::memory_order_acquire); } + void setFenceDirty(bool state) { fence_dirty_.store(state, std::memory_order_release); } void WaitCompleteSignal(hsa_signal_t signal); void HiddenHeapInit(); diff --git a/projects/clr/rocclr/platform/command.hpp b/projects/clr/rocclr/platform/command.hpp index 5a8fd92853..7b14880031 100644 --- a/projects/clr/rocclr/platform/command.hpp +++ b/projects/clr/rocclr/platform/command.hpp @@ -1383,6 +1383,7 @@ class AccumulateCommand : public Command { private: //! Kernel names and timestamps list for activity profiling std::vector kernelNames_; + const std::vector* kernelNamesRef_ = nullptr; std::vector> tsList_; public: @@ -1399,13 +1400,20 @@ class AccumulateCommand : public Command { kernelNames_.insert(kernelNames_.end(), kernelNames.begin(), kernelNames.end()); } + //! Set kernel names by reference + void setKernelNamesRef(const std::vector* kernelNames) { + kernelNamesRef_ = kernelNames; + } + //! Add kernel timestamp to the list if available void addTimestamps(uint64_t startTs, uint64_t endTs) { tsList_.push_back(std::make_pair(startTs, endTs)); } //! Return the kernel names - const std::vector& getKernelNames() const { return kernelNames_; } + const std::vector& getKernelNames() const { + return kernelNamesRef_ != nullptr ? *kernelNamesRef_ : kernelNames_; + } //! Return the kernel timestamps const std::vector>& getTimestamps() const { return tsList_; } diff --git a/projects/clr/rocclr/utils/flags.hpp b/projects/clr/rocclr/utils/flags.hpp index ddacf2182a..b641ef7d79 100644 --- a/projects/clr/rocclr/utils/flags.hpp +++ b/projects/clr/rocclr/utils/flags.hpp @@ -237,8 +237,6 @@ release(size_t, HIP_INITIAL_DM_SIZE, 8 * Mi, \ "Set initial heap size for device malloc.") \ release(bool, HIP_FORCE_DEV_KERNARG, true, \ "Force device mem for kernel args.") \ -release(bool, DEBUG_CLR_GRAPH_PACKET_CAPTURE, true, \ - "Enable/Disable graph packet capturing") \ release(bool, GPU_DEBUG_ENABLE, false, \ "Enables collection of extra info for debugger at some perf cost") \ release(cstring, HIPRTC_COMPILE_OPTIONS_APPEND, "", \ @@ -255,6 +253,8 @@ release(uint, DEBUG_HIP_FORCE_GRAPH_QUEUES, 4, \ "Forces the number of streams for the graph parallel execution") \ release(uint, DEBUG_HIP_GRAPH_BATCH_SIZE, 256, \ "Number of graph nodes to batch at a time") \ +release(uint, DEBUG_HIP_GRAPH_SEGMENT_SCHEDULING, 1, \ + "0 = Disable, 1 = Enable, 2 = Force") \ release(uint, DEBUG_HIP_BLOCK_SYNC, 50, \ "Blocks synchronization on CPU until the callback processing is done")\ release(uint, DEBUG_CLR_MAX_BATCH_SIZE, 1000, \