SWDEV-524745 - Part-III Add multi device support for hip graph (#814)
- Retrieve the list of devices linked to each branch using stream ID x. - Identify the necessary streams for each device to facilitate graph execution. - Create the necessary streams for each device to ensure successful graph execution. - Implement support for launching a multi-device, single-branch graph. Co-authored-by: Anusha GodavarthySurya <Anusha.GodavarthySurya@amd.com>
このコミットが含まれているのは:
@@ -189,7 +189,8 @@ void Graph::ScheduleOneNode(Node node, int stream_id) {
|
||||
// Assign active stream to the current node
|
||||
node->stream_id_ = stream_id;
|
||||
max_streams_ = std::max(max_streams_, (stream_id + 1));
|
||||
|
||||
// Track which devices are used by each stream for multi-device graph execution
|
||||
streams_dev_ids_[stream_id].insert(node->dev_id_);
|
||||
// Process child graph separately, since, there is no connection
|
||||
if (node->GetType() == hipGraphNodeTypeGraph) {
|
||||
auto child = reinterpret_cast<hip::ChildGraphNode*>(node)->GetChildGraph();
|
||||
@@ -317,7 +318,7 @@ void Graph::clone(Graph* newGraph, bool cloneNodes) const {
|
||||
|
||||
// ================================================================================================
|
||||
Graph* Graph::clone() const {
|
||||
Graph* newGraph = new Graph(device_);
|
||||
Graph* newGraph = new Graph(getCurrentDevice());
|
||||
clone(newGraph);
|
||||
return newGraph;
|
||||
}
|
||||
@@ -332,39 +333,115 @@ bool GraphExec::isGraphExecValid(GraphExec* pGraphExec) {
|
||||
}
|
||||
|
||||
// ================================================================================================
|
||||
hipError_t GraphExec::CreateStreams(uint32_t num_streams) {
|
||||
hipError_t GraphExec::CreateStreams(uint32_t num_streams, int devId) {
|
||||
amd::ScopedLock lock(graphExecStreamCreateLock_);
|
||||
parallel_streams_.reserve(num_streams);
|
||||
|
||||
// Validate input parameters
|
||||
if (num_streams == 0) {
|
||||
ClPrint(amd::LOG_WARNING, amd::LOG_CODE,
|
||||
"[hipGraph] Attempting to create 0 streams for device %d", devId);
|
||||
return hipSuccess;
|
||||
}
|
||||
|
||||
if (devId < 0 || devId >= g_devices.size() || g_devices[devId] == nullptr) {
|
||||
ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Invalid device ID %d for stream creation",
|
||||
devId);
|
||||
return hipErrorInvalidDevice;
|
||||
}
|
||||
|
||||
// Check if streams already exist for this device
|
||||
if (parallel_streams_.find(devId) != parallel_streams_.end() &&
|
||||
!parallel_streams_[devId].empty()) {
|
||||
ClPrint(amd::LOG_WARNING, amd::LOG_CODE,
|
||||
"[hipGraph] Streams already exist for device %d, skipping creation", devId);
|
||||
return hipSuccess;
|
||||
}
|
||||
|
||||
parallel_streams_[devId].reserve(num_streams);
|
||||
|
||||
ClPrint(amd::LOG_INFO, amd::LOG_CODE, "[hipGraph] Creating %u parallel streams for device %d",
|
||||
num_streams, devId);
|
||||
|
||||
for (uint32_t i = 0; i < num_streams; ++i) {
|
||||
auto stream = new hip::Stream(hip::getCurrentDevice(), hip::Stream::Priority::Normal,
|
||||
hipStreamNonBlocking);
|
||||
auto stream =
|
||||
new hip::Stream(g_devices[devId], hip::Stream::Priority::Normal, hipStreamNonBlocking);
|
||||
|
||||
if (stream == nullptr || !stream->Create()) {
|
||||
ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed to %s stream %u for device %d",
|
||||
stream == nullptr ? "allocate" : "create", i, devId);
|
||||
if (stream != nullptr) {
|
||||
hip::Stream::Destroy(stream);
|
||||
}
|
||||
ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed to create parallel stream!");
|
||||
// Clean up any previously created streams for this device
|
||||
for (auto& created_stream : parallel_streams_[devId]) {
|
||||
hip::Stream::Destroy(created_stream);
|
||||
}
|
||||
parallel_streams_[devId].clear();
|
||||
return hipErrorOutOfMemory;
|
||||
}
|
||||
parallel_streams_.push_back(stream);
|
||||
|
||||
parallel_streams_[devId].push_back(stream);
|
||||
}
|
||||
return hipSuccess;
|
||||
}
|
||||
|
||||
void GraphExec::FindStreamsReqPerDev() {
|
||||
// Count streams required per device based on stream-to-device mappings
|
||||
for (auto const& [stream_id, dev_ids] : streams_dev_ids_) {
|
||||
for (auto dev_id : dev_ids) {
|
||||
max_streams_dev_[dev_id]++;
|
||||
}
|
||||
}
|
||||
|
||||
// Recursively process child graphs to determine their stream requirements
|
||||
for (auto node : vertices_) {
|
||||
if (node->GetType() == hipGraphNodeTypeGraph) {
|
||||
auto childNode = reinterpret_cast<ChildGraphNode*>(node);
|
||||
|
||||
// Recursively find stream requirements for child graph
|
||||
childNode->FindStreamsReqPerDev();
|
||||
|
||||
// Merge child graph's stream requirements with parent graph
|
||||
// Take the maximum streams needed per device to handle concurrent execution
|
||||
for (auto const& [dev_id, num_streams] : childNode->max_streams_dev_) {
|
||||
auto it = max_streams_dev_.find(dev_id);
|
||||
if (it != max_streams_dev_.end()) {
|
||||
// Device already has stream requirements - take the maximum
|
||||
max_streams_dev_[dev_id] = std::max(max_streams_dev_[dev_id], num_streams);
|
||||
} else {
|
||||
// New device - initialize with child graph's requirement
|
||||
max_streams_dev_[dev_id] = num_streams;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ================================================================================================
|
||||
hipError_t GraphExec::Init() {
|
||||
hipError_t status = hipSuccess;
|
||||
// create extra stream to avoid queue collision with the default execution stream
|
||||
if (max_streams_ > 1) {
|
||||
status = CreateStreams(max_streams_);
|
||||
}
|
||||
if (status != hipSuccess) {
|
||||
return status;
|
||||
}
|
||||
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
|
||||
if (max_streams_ == 1) {
|
||||
|
||||
if (max_streams_ == 1) {
|
||||
FindStreamsReqPerDev();
|
||||
if (max_streams_dev_.size() > 1) {
|
||||
// Multi-device graph detected - create parallel streams for each device
|
||||
for (auto const& [dev_id, num_streams] : max_streams_dev_) {
|
||||
ClPrint(amd::LOG_INFO, amd::LOG_API,
|
||||
"[hipGraph] For device id :%d max streams :%d for execution.\n", dev_id,
|
||||
num_streams);
|
||||
status = CreateStreams(num_streams, dev_id);
|
||||
if (status != hipSuccess) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
|
||||
// For graph nodes capture AQL packets to dispatch them directly during graph launch.
|
||||
status = CaptureAQLPackets();
|
||||
}
|
||||
} else {
|
||||
status = CreateStreams(max_streams_, hip::getCurrentDevice()->deviceId());
|
||||
}
|
||||
instantiateDeviceId_ = hip::getCurrentDevice()->deviceId();
|
||||
static_cast<ReferenceCountedObject*>(hip::getCurrentDevice())->retain();
|
||||
@@ -438,8 +515,8 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() {
|
||||
for (size_t i = 0; i < topoOrder_.size(); ++i) {
|
||||
auto& node = topoOrder_[i];
|
||||
|
||||
// Check if kernel node requires hidden heap and set it for the entire graph
|
||||
if (node->GetType() == hipGraphNodeTypeKernel) {
|
||||
// Check if graph requires hidden heap and set as part of graphExec param.
|
||||
static bool initialized = false;
|
||||
if (!initialized && reinterpret_cast<hip::GraphKernelNode*>(node)->HasHiddenHeap()) {
|
||||
SetHiddenHeap();
|
||||
@@ -447,54 +524,66 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() {
|
||||
}
|
||||
}
|
||||
|
||||
// Handle nodes that support graph capture
|
||||
if (node->GraphCaptureEnabled()) {
|
||||
// Start of a new batch
|
||||
PacketBatch newBatch;
|
||||
size_t j = i;
|
||||
// TODO: Add support for batching for multi-device linear graph
|
||||
if (max_streams_dev_.size() == 1) {
|
||||
// Single device - use batching optimization
|
||||
// Start of a new batch
|
||||
PacketBatch newBatch;
|
||||
size_t j = i;
|
||||
|
||||
// Collect packets from consecutive captured nodes
|
||||
while (j < topoOrder_.size() && topoOrder_[j]->GraphCaptureEnabled()) {
|
||||
auto& currentNode = topoOrder_[j];
|
||||
// Collect packets from consecutive captured nodes
|
||||
while (j < topoOrder_.size() && topoOrder_[j]->GraphCaptureEnabled()) {
|
||||
auto& currentNode = topoOrder_[j];
|
||||
|
||||
// Capture packets for this node
|
||||
std::vector<uint8_t*> nodePackets;
|
||||
std::vector<std::string> nodeKernelNames;
|
||||
status = currentNode->CaptureAndFormPacket(GetKernelArgManager(), &nodePackets,
|
||||
&nodeKernelNames);
|
||||
// Capture packets for this node
|
||||
std::vector<uint8_t*> nodePackets;
|
||||
std::vector<std::string> nodeKernelNames;
|
||||
status = currentNode->CaptureAndFormPacket(GetKernelArgManager(), &nodePackets,
|
||||
&nodeKernelNames);
|
||||
|
||||
if (status != hipSuccess || nodePackets.empty()) {
|
||||
LogError("Packet capture failed");
|
||||
return status;
|
||||
if (status != hipSuccess || nodePackets.empty()) {
|
||||
LogError("Packet capture failed");
|
||||
return status;
|
||||
}
|
||||
|
||||
// Create NodeRange for this node
|
||||
PacketBatch::NodeRange range;
|
||||
range.startIndex = newBatch.dispatchPackets.size();
|
||||
range.packetCount = nodePackets.size();
|
||||
range.enabled = true;
|
||||
|
||||
// Add to dispatch lists (initially all enabled)
|
||||
newBatch.dispatchPackets.insert(newBatch.dispatchPackets.end(), nodePackets.begin(),
|
||||
nodePackets.end());
|
||||
newBatch.dispatchKernelNames.insert(newBatch.dispatchKernelNames.end(),
|
||||
nodeKernelNames.begin(), nodeKernelNames.end());
|
||||
|
||||
// Store node mapping
|
||||
newBatch.nodeRanges.push_back(range);
|
||||
newBatch.nodeToRangeIndex[currentNode] = newBatch.nodeRanges.size() - 1;
|
||||
|
||||
// Mark this node as successfully captured
|
||||
nodeCaptureStatus_[j] = true;
|
||||
++j;
|
||||
}
|
||||
|
||||
// Create NodeRange for this node
|
||||
PacketBatch::NodeRange range;
|
||||
range.startIndex = newBatch.dispatchPackets.size();
|
||||
range.packetCount = nodePackets.size();
|
||||
range.enabled = true;
|
||||
// Add the batch if it has packets
|
||||
if (!newBatch.dispatchPackets.empty()) {
|
||||
packetBatches_.emplace_back(std::move(newBatch));
|
||||
}
|
||||
|
||||
// Add to dispatch lists (initially all enabled)
|
||||
newBatch.dispatchPackets.insert(newBatch.dispatchPackets.end(),
|
||||
nodePackets.begin(), nodePackets.end());
|
||||
newBatch.dispatchKernelNames.insert(newBatch.dispatchKernelNames.end(),
|
||||
nodeKernelNames.begin(), nodeKernelNames.end());
|
||||
|
||||
// Store node mapping
|
||||
newBatch.nodeRanges.push_back(range);
|
||||
newBatch.nodeToRangeIndex[currentNode] = newBatch.nodeRanges.size() - 1;
|
||||
|
||||
// Mark this node as successfully captured
|
||||
nodeCaptureStatus_[j] = true;
|
||||
++j;
|
||||
// Skip the nodes we just processed, the index will be incremented by the loop
|
||||
i = j - 1;
|
||||
} else {
|
||||
// Multi-device - capture individual packets without batching
|
||||
status = node->CaptureAndFormPacket(GetKernelArgManager());
|
||||
if (status != hipSuccess) {
|
||||
LogError("Individual packet capture failed for multi-device node");
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
// Add the batch if it has packets
|
||||
if (!newBatch.dispatchPackets.empty()) {
|
||||
packetBatches_.emplace_back(std::move(newBatch));
|
||||
}
|
||||
|
||||
// Skip the nodes we just processed, the index will be incremented by the loop
|
||||
i = j - 1;
|
||||
} else if (node->GetType() == hipGraphNodeTypeGraph) {
|
||||
auto childNode = reinterpret_cast<hip::ChildGraphNode*>(node);
|
||||
if (childNode->GetChildGraph()->max_streams_ == 1) {
|
||||
@@ -502,7 +591,8 @@ hipError_t GraphExec::CaptureAndFormPacketsForGraph() {
|
||||
status = childNode->CaptureAndFormPacketsForGraph();
|
||||
nodeCaptureStatus_[i] = (status == hipSuccess);
|
||||
if (status != hipSuccess) {
|
||||
status = hipSuccess; // Continue with other nodes
|
||||
LogWarning("Child graph packet capture failed continuing with other nodes");
|
||||
status = hipSuccess; // Continue processing other nodes
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -555,38 +645,43 @@ hipError_t GraphExec::UpdateAQLPacket(hip::GraphNode* node) {
|
||||
if (max_streams_ != 1 || !node->GraphCaptureEnabled()) {
|
||||
return hipSuccess;
|
||||
}
|
||||
//ToDo: Add batching support for multi-device linear graph
|
||||
if (max_streams_dev_.size() == 1) {
|
||||
// Find which batch contains this node and update it
|
||||
for (auto& batch : packetBatches_) {
|
||||
auto it = batch.nodeToRangeIndex.find(node);
|
||||
if (it != batch.nodeToRangeIndex.end()) {
|
||||
// Found the batch containing this node - update packets
|
||||
PacketBatch::NodeRange& range = batch.nodeRanges[it->second];
|
||||
|
||||
// Find which batch contains this node and update it
|
||||
for (auto& batch : packetBatches_) {
|
||||
auto it = batch.nodeToRangeIndex.find(node);
|
||||
if (it != batch.nodeToRangeIndex.end()) {
|
||||
// Found the batch containing this node - update packets
|
||||
PacketBatch::NodeRange& range = batch.nodeRanges[it->second];
|
||||
|
||||
// Capture new packets for this node
|
||||
std::vector<uint8_t*> newPackets;
|
||||
std::vector<std::string> newKernelNames;
|
||||
hipError_t status = node->CaptureAndFormPacket(kernArgManager_, &newPackets, &newKernelNames);
|
||||
if (status != hipSuccess) {
|
||||
return status;
|
||||
// Capture new packets for this node
|
||||
std::vector<uint8_t*> newPackets;
|
||||
std::vector<std::string> newKernelNames;
|
||||
hipError_t status =
|
||||
node->CaptureAndFormPacket(kernArgManager_, &newPackets, &newKernelNames);
|
||||
if (status != hipSuccess) {
|
||||
return status;
|
||||
}
|
||||
// Update dispatch packets (always update regardless of enabled state)
|
||||
// The enabled/disabled check happens during dispatch, not here
|
||||
for (size_t i = 0; i < range.packetCount && i < newPackets.size(); ++i) {
|
||||
size_t packetIndex = range.startIndex + i;
|
||||
batch.dispatchPackets[packetIndex] = newPackets[i];
|
||||
batch.dispatchKernelNames[packetIndex] = newKernelNames[i];
|
||||
}
|
||||
return hipSuccess;
|
||||
}
|
||||
// Update dispatch packets (always update regardless of enabled state)
|
||||
// The enabled/disabled check happens during dispatch, not here
|
||||
for (size_t i = 0; i < range.packetCount && i < newPackets.size(); ++i) {
|
||||
size_t packetIndex = range.startIndex + i;
|
||||
batch.dispatchPackets[packetIndex] = newPackets[i];
|
||||
batch.dispatchKernelNames[packetIndex] = newKernelNames[i];
|
||||
}
|
||||
return hipSuccess;
|
||||
}
|
||||
} else {
|
||||
return node->CaptureAndFormPacket(kernArgManager_);
|
||||
}
|
||||
return hipSuccess; // Node not in any batch
|
||||
}
|
||||
|
||||
// ================================================================================================
|
||||
hipError_t GraphExec::UpdatePacketBatchesForNodeEnableDisable(hip::GraphNode* node, bool isEnabled) {
|
||||
if (max_streams_ != 1 || !node->GraphCaptureEnabled()) {
|
||||
// Only handle single stream case with captured nodes
|
||||
if (max_streams_ != 1 && max_streams_dev_.size() == 1 && !node->GraphCaptureEnabled()) {
|
||||
// Only handle single stream and single device case with captured nodes
|
||||
return hipSuccess;
|
||||
}
|
||||
// Find which batch contains this node and update its enabled state
|
||||
@@ -687,10 +782,77 @@ hipError_t GraphExec::EnqueueGraphWithSingleList(hip::Stream* hip_stream) {
|
||||
}
|
||||
return status;
|
||||
}
|
||||
// ================================================================================================
|
||||
hipError_t GraphExec::EnqueueMultiDeviceLinearGraph(hip::Stream* launch_stream) {
|
||||
// Accumulate command tracks all the AQL packet batch that we submit to the HW. For now we track
|
||||
// only kernel nodes.
|
||||
amd::AccumulateCommand* accumulate = nullptr;
|
||||
hipError_t status = hipSuccess;
|
||||
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
|
||||
accumulate = new amd::AccumulateCommand(*launch_stream, {}, nullptr);
|
||||
}
|
||||
|
||||
auto createMarkerAndWait = [](hip::Stream* fromStream, hip::Stream* toStream) {
|
||||
amd::Command::EventWaitList wait_list;
|
||||
auto marker = new amd::Marker(*fromStream, true, wait_list);
|
||||
marker->enqueue();
|
||||
marker->release();
|
||||
wait_list.push_back(marker);
|
||||
auto wait_marker = new amd::Marker(*toStream, true, wait_list);
|
||||
wait_marker->enqueue();
|
||||
wait_marker->release();
|
||||
};
|
||||
|
||||
hip::Stream* prevStream = launch_stream;
|
||||
size_t batchIndex = 0;
|
||||
|
||||
for (size_t i = 0; i < topoOrder_.size(); ++i) {
|
||||
auto& node = topoOrder_[i];
|
||||
hip::Stream* currStream = parallel_streams_[node->dev_id_][0];
|
||||
|
||||
// Insert synchronization marker if switching devices
|
||||
if (prevStream->DeviceId() != currStream->DeviceId()) {
|
||||
createMarkerAndWait(prevStream, currStream);
|
||||
}
|
||||
// ToDo : Add batching for multi device graph launch
|
||||
if (topoOrder_[i]->GraphCaptureEnabled()) {
|
||||
if (topoOrder_[i]->GetEnabled()) {
|
||||
std::vector<uint8_t*>& gpuPackets = topoOrder_[i]->GetAqlPackets();
|
||||
std::vector<std::string> kernelNames;
|
||||
for (auto& packet : gpuPackets) {
|
||||
kernelNames.push_back(topoOrder_[i]->GetKernelName());
|
||||
}
|
||||
currStream->vdev()->dispatchAqlPacketBatch(gpuPackets, kernelNames, accumulate);
|
||||
}
|
||||
} else {
|
||||
topoOrder_[i]->SetStream(currStream);
|
||||
status = topoOrder_[i]->CreateCommand(topoOrder_[i]->GetQueue());
|
||||
topoOrder_[i]->EnqueueCommands(currStream);
|
||||
}
|
||||
prevStream = currStream;
|
||||
}
|
||||
|
||||
// Synchronize back to launch stream if we ended on a different device
|
||||
if (prevStream != launch_stream) {
|
||||
createMarkerAndWait(prevStream, launch_stream);
|
||||
}
|
||||
|
||||
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
|
||||
accumulate->enqueue();
|
||||
accumulate->release();
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
// ================================================================================================
|
||||
hipError_t Graph::UpdateStreams(hip::Stream* launch_stream,
|
||||
const std::vector<hip::Stream*>& parallel_streams) {
|
||||
void GraphExec::UpdateStreams(hip::Stream* launch_stream) {
|
||||
int devId = launch_stream->vdev()->device().index();
|
||||
if (parallel_streams_.find(devId) == parallel_streams_.end()) {
|
||||
LogPrintfError("UpdateStreams failed for device id:%d", devId);
|
||||
return;
|
||||
}
|
||||
auto parallel_streams = parallel_streams_[devId];
|
||||
// Current stream is the default in the assignment
|
||||
streams_.push_back(launch_stream);
|
||||
std::unordered_map<int, int> unique_stream_ids;
|
||||
@@ -710,7 +872,6 @@ hipError_t Graph::UpdateStreams(hip::Stream* launch_stream,
|
||||
for (int i = streams_.size(), j = 0; i < max_streams_ && j < collided_streams.size(); i++, j++) {
|
||||
streams_.push_back(collided_streams[j]);
|
||||
}
|
||||
return hipSuccess;
|
||||
}
|
||||
|
||||
|
||||
@@ -886,7 +1047,6 @@ bool Graph::RunNodes(int32_t base_stream, const std::vector<hip::Stream*>* paral
|
||||
// ================================================================================================
|
||||
hipError_t GraphExec::Run(hip::Stream* launch_stream) {
|
||||
hipError_t status = hipSuccess;
|
||||
|
||||
if (flags_ & hipGraphInstantiateFlagAutoFreeOnLaunch) {
|
||||
if (!topoOrder_.empty()) {
|
||||
topoOrder_[0]->GetParentGraph()->FreeAllMemory(launch_stream);
|
||||
@@ -908,13 +1068,13 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) {
|
||||
} else {
|
||||
repeatLaunch_ = true;
|
||||
}
|
||||
|
||||
ClPrint(amd::LOG_DEBUG, amd::LOG_CODE,
|
||||
"GraphExec::Run max_streams: %d, "
|
||||
"on device: %d, total number of nodes: %d",
|
||||
max_streams_, launch_stream->DeviceId(), topoOrder_.size());
|
||||
|
||||
if (max_streams_ == 1 && instantiateDeviceId_ == launch_stream->DeviceId()) {
|
||||
if (max_streams_ == 1 && max_streams_dev_.size() == 1 &&
|
||||
max_streams_dev_.begin()->first == launch_stream->DeviceId()) {
|
||||
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
|
||||
// If the graph has kernels that does device side allocation, during packet capture, heap is
|
||||
// allocated because heap pointer has to be added to the AQL packet, and initialized during
|
||||
@@ -926,6 +1086,8 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) {
|
||||
}
|
||||
}
|
||||
status = EnqueueGraphWithSingleList(launch_stream);
|
||||
} else if (max_streams_ == 1 && max_streams_dev_.size() > 1) {
|
||||
status = EnqueueMultiDeviceLinearGraph(launch_stream);
|
||||
} else if (max_streams_ == 1 && instantiateDeviceId_ != launch_stream->DeviceId()) {
|
||||
for (int i = 0; i < topoOrder_.size(); i++) {
|
||||
topoOrder_[i]->SetStream(launch_stream);
|
||||
@@ -934,10 +1096,7 @@ hipError_t GraphExec::Run(hip::Stream* launch_stream) {
|
||||
}
|
||||
} else {
|
||||
// Update streams for the graph execution
|
||||
status = UpdateStreams(launch_stream, parallel_streams_);
|
||||
if (status != hipSuccess) {
|
||||
return status;
|
||||
}
|
||||
UpdateStreams(launch_stream);
|
||||
// Execute all nodes in the graph
|
||||
if (!RunNodes()) {
|
||||
LogError("Failed to launch nodes!");
|
||||
|
||||
@@ -556,6 +556,7 @@ class Graph {
|
||||
roots_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES);
|
||||
leafs_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES);
|
||||
wait_order_.resize(DEBUG_HIP_FORCE_GRAPH_QUEUES);
|
||||
streams_dev_.reserve(g_devices.size());
|
||||
}
|
||||
void RemoveUserObjectFromOwingGraphs(UserObject* uObj) {
|
||||
for (auto& g : uObj->owning_graphs_) {
|
||||
@@ -673,12 +674,6 @@ class Graph {
|
||||
//! Schedules all nodes in the graph into different streams
|
||||
void ScheduleNodes();
|
||||
|
||||
//! Update streams for the graph execution
|
||||
hipError_t UpdateStreams(
|
||||
hip::Stream* launch_stream, //!< Launch stream from the application
|
||||
const std::vector<hip::Stream*>& parallel_stream //!< The list of parallel streams
|
||||
);
|
||||
|
||||
//! Runs one node on the assigned stream
|
||||
bool RunOneNode(Node node, //!< Node for the execution on GPU
|
||||
bool wait //!< Wait dependencies
|
||||
@@ -780,6 +775,10 @@ class Graph {
|
||||
|
||||
protected:
|
||||
int max_streams_ = 0; //!< Maximum number of streams used in the graph launch
|
||||
//!< Maps stream ID to the set of device IDs that use that stream.
|
||||
//!< Used to track which devices are accessed by each parallel stream
|
||||
//!< during multi-device graph execution scheduling.
|
||||
std::unordered_map<int, std::set<int>> streams_dev_ids_;
|
||||
|
||||
private:
|
||||
friend class GraphExec;
|
||||
@@ -802,6 +801,13 @@ class Graph {
|
||||
std::unordered_set<GraphNode*> capturedNodes_;
|
||||
bool graphInstantiated_;
|
||||
std::unordered_map<Node, Node> clonedNodes_;
|
||||
//! Map of device ID to vector of streams allocated for that device during graph execution.
|
||||
//! Each device may require multiple streams to handle parallel execution of graph nodes.
|
||||
std::unordered_map<int, std::vector<hip::Stream*>> streams_dev_;
|
||||
|
||||
//! Map tracking the maximum number of concurrent streams required per device for graph execution.
|
||||
//! Key: device ID, Value: maximum number of streams needed for that device
|
||||
std::unordered_map<int, int> max_streams_dev_;
|
||||
};
|
||||
|
||||
class GraphExec : public amd::ReferenceCountedObject, public Graph {
|
||||
@@ -816,13 +822,16 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph {
|
||||
}
|
||||
|
||||
~GraphExec() {
|
||||
for (auto stream : parallel_streams_) {
|
||||
if (stream != nullptr) {
|
||||
for (auto streams : parallel_streams_) {
|
||||
for (auto stream : streams.second) {
|
||||
if (stream != nullptr) {
|
||||
stream->finish();
|
||||
constexpr bool kForceDestroy = true;
|
||||
hip::Stream::Destroy(stream, kForceDestroy);
|
||||
constexpr bool kForceDestroy = true;
|
||||
hip::Stream::Destroy(stream, kForceDestroy);
|
||||
}
|
||||
}
|
||||
}
|
||||
parallel_streams_.clear();
|
||||
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
|
||||
if (kernArgManager_ != nullptr) {
|
||||
kernArgManager_->release();
|
||||
@@ -856,7 +865,7 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph {
|
||||
std::vector<Node>& GetNodes() { return topoOrder_; }
|
||||
uint64_t GetFlags() const { return flags_; }
|
||||
hipError_t Init();
|
||||
hipError_t CreateStreams(uint32_t num_streams);
|
||||
hipError_t CreateStreams(uint32_t num_streams, int devId = 0);
|
||||
hipError_t Run(hip::Stream* stream);
|
||||
// Capture GPU Packets from graph commands
|
||||
hipError_t CaptureAQLPackets();
|
||||
@@ -874,12 +883,21 @@ class GraphExec : public amd::ReferenceCountedObject, public Graph {
|
||||
hipError_t CaptureAndFormPacketsForGraph();
|
||||
void GetKernelArgSizeForGraph(std::unordered_map<int, size_t>& kernArgSizeForGraph);
|
||||
hipError_t EnqueueGraphWithSingleList(hip::Stream* hip_stream);
|
||||
//! Enqueue a multi-device linear graph for execution
|
||||
hipError_t EnqueueMultiDeviceLinearGraph(hip::Stream* hip_stream);
|
||||
bool TopologicalOrder() { return Graph::TopologicalOrder(topoOrder_); }
|
||||
//! Update streams for the graph execution with launch stream from application
|
||||
void UpdateStreams(hip::Stream* launch_stream);
|
||||
//! Find the number of streams required per device for multi-device graph execution
|
||||
//! This method analyzes the stream-to-device mappings and recursively processes
|
||||
//! child graphs to determine the maximum concurrent streams needed per device
|
||||
void FindStreamsReqPerDev();
|
||||
|
||||
protected:
|
||||
//! Topological order of the graph doesn't include nodes embedded as part of the child graph
|
||||
std::vector<Node> topoOrder_;
|
||||
std::vector<hip::Stream*> parallel_streams_;
|
||||
//! parallel streams per device
|
||||
std::unordered_map<int, std::vector<hip::Stream*>> parallel_streams_;
|
||||
uint64_t flags_ = 0;
|
||||
GraphKernelArgManager* kernArgManager_ = nullptr; //!< Kernel Arg manager for graph.
|
||||
int instantiateDeviceId_ = -1;
|
||||
|
||||
@@ -1295,38 +1295,37 @@ bool VirtualGPU::dispatchGenericAqlPacketBatch(const std::vector<AqlPacket*>& pa
|
||||
uint8_t packetType =
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_TYPE, HSA_PACKET_HEADER_WIDTH_TYPE);
|
||||
if (packetType == HSA_PACKET_TYPE_KERNEL_DISPATCH) {
|
||||
ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "Graph shader name : %s",
|
||||
(*kernelNames)[packetIndex].c_str());
|
||||
ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL, "Graph shader name : %s, device id : %u",
|
||||
(*kernelNames)[packetIndex].c_str(), dev().index());
|
||||
|
||||
ClPrint(amd::LOG_DETAIL_DEBUG, amd::LOG_AQL,
|
||||
"SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = "
|
||||
"0x%x (type=%d, barrier=%d, acquire=%d, release=%d), "
|
||||
"setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], "
|
||||
"private_seg_size=%u, group_seg_size=%u, kernel_obj=0x%zx, "
|
||||
"kernarg_address=0x%zx, completion_signal=0x%zx, correlation_id=%zu, "
|
||||
"rptr=%u, wptr=%u",
|
||||
gpu_queue_, gpu_queue_->base_address, gpu_queue_->id, header, packetType,
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_BARRIER,
|
||||
HSA_PACKET_HEADER_WIDTH_BARRIER),
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE,
|
||||
HSA_PACKET_HEADER_WIDTH_SCACQUIRE_FENCE_SCOPE),
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE,
|
||||
HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE),
|
||||
packet->setup,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->grid_size_x,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->grid_size_y,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->grid_size_z,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->workgroup_size_x,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->workgroup_size_y,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->workgroup_size_z,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->private_segment_size,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->group_segment_size,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->kernel_object,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->kernarg_address,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->completion_signal,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->reserved2,
|
||||
Hsa::queue_load_read_index_scacquire(gpu_queue_), index);
|
||||
}
|
||||
ClPrint(
|
||||
amd::LOG_DETAIL_DEBUG, amd::LOG_AQL,
|
||||
"SWq=0x%zx, HWq=0x%zx, id=%d, Dispatch Header = "
|
||||
"0x%x (type=%d, barrier=%d, acquire=%d, release=%d), "
|
||||
"setup=%d, grid=[%u, %u, %u], workgroup=[%u, %u, %u], "
|
||||
"private_seg_size=%u, group_seg_size=%u, kernel_obj=0x%zx, "
|
||||
"kernarg_address=0x%zx, completion_signal=0x%zx, correlation_id=%zu, "
|
||||
"rptr=%u, wptr=%u",
|
||||
gpu_queue_, gpu_queue_->base_address, gpu_queue_->id, header, packetType,
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_BARRIER, HSA_PACKET_HEADER_WIDTH_BARRIER),
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_SCACQUIRE_FENCE_SCOPE,
|
||||
HSA_PACKET_HEADER_WIDTH_SCACQUIRE_FENCE_SCOPE),
|
||||
extractAqlBits(header, HSA_PACKET_HEADER_SCRELEASE_FENCE_SCOPE,
|
||||
HSA_PACKET_HEADER_WIDTH_SCRELEASE_FENCE_SCOPE),
|
||||
packet->setup, reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->grid_size_x,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->grid_size_y,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->grid_size_z,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->workgroup_size_x,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->workgroup_size_y,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->workgroup_size_z,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->private_segment_size,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->group_segment_size,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->kernel_object,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->kernarg_address,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->completion_signal,
|
||||
reinterpret_cast<hsa_kernel_dispatch_packet_t*>(packet)->reserved2,
|
||||
Hsa::queue_load_read_index_scacquire(gpu_queue_), index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
/*
|
||||
Copyright (c) 2022-25 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include <hip_test_helper.hh>
|
||||
|
||||
#include <hip_test_common.hh>
|
||||
#include <hip_test_checkers.hh>
|
||||
#include <hip_test_kernels.hh>
|
||||
|
||||
#include "graph_memset_node_test_common.hh"
|
||||
#include "graph_tests_common.hh"
|
||||
|
||||
/**
|
||||
* @addtogroup hipGraphLaunch
|
||||
* @{
|
||||
* @ingroup GraphTest
|
||||
* `hipGraphLaunch(hipGraphExec_t graphExec, hipStream_t stream)` -
|
||||
* Launches an executable graph on the multi device
|
||||
*/
|
||||
|
||||
/**
|
||||
* Test Description
|
||||
* ------------------------
|
||||
* - Launches the single branch graph on multi device and verify the result
|
||||
* ------------------------
|
||||
* - catch/unit/graph//hipGraphMultiDevice.cc
|
||||
* Test requirements
|
||||
* ------------------------
|
||||
* - Multi-device
|
||||
* - HIP_VERSION >= 7.2
|
||||
*/
|
||||
static void check_output(int* inp, int* out, size_t size) {
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
REQUIRE(out[i] == ((inp[i] * inp[i]) * (inp[i] * inp[i])));
|
||||
}
|
||||
}
|
||||
|
||||
static void init_input(int* a, size_t size) {
|
||||
unsigned int seed = time(nullptr);
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
a[i] = (HipTest::RAND_R(&seed) & 0xFF);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TEST_CASE("Unit_hipGraphMultiDevice") {
|
||||
int nGpus = 0;
|
||||
HIP_CHECK(hipGetDeviceCount(&nGpus));
|
||||
if (nGpus < 2) {
|
||||
fprintf(stderr, "Need at least 2 GPUs, skipped!\n");
|
||||
return;
|
||||
}
|
||||
hipStream_t streamdev1, streamdev2;
|
||||
hipEvent_t eventdev1, eventdev2;
|
||||
hipGraph_t graph = nullptr;
|
||||
hipGraphExec_t graph_exec = nullptr;
|
||||
|
||||
constexpr size_t buffer_size = (1024 * 1024);
|
||||
constexpr auto blocksPerCU = 6;
|
||||
constexpr int block_size = 512;
|
||||
|
||||
int *ibuf_h, *buf_d1, *buf_d2, *outbuf_h;
|
||||
ibuf_h = new int[buffer_size];
|
||||
outbuf_h = new int[buffer_size];
|
||||
REQUIRE(ibuf_h != nullptr);
|
||||
|
||||
HIP_CHECK(hipSetDevice(0));
|
||||
HIP_CHECK(hipStreamCreate(&streamdev1));
|
||||
HIP_CHECK(hipMalloc(&buf_d1, buffer_size * sizeof(int)));
|
||||
HIP_CHECK(hipEventCreate(&eventdev1));
|
||||
|
||||
HIP_CHECK(hipSetDevice(1));
|
||||
HIP_CHECK(hipStreamCreate(&streamdev2));
|
||||
HIP_CHECK(hipMalloc(&buf_d2, buffer_size * sizeof(int)));
|
||||
HIP_CHECK(hipEventCreate(&eventdev2));
|
||||
|
||||
HIP_CHECK(hipSetDevice(0));
|
||||
init_input(ibuf_h, buffer_size);
|
||||
unsigned grid_size = HipTest::setNumBlocks(blocksPerCU, block_size, buffer_size);
|
||||
|
||||
HIP_CHECK(hipStreamBeginCapture(streamdev1, hipStreamCaptureModeGlobal));
|
||||
|
||||
HIP_CHECK(
|
||||
hipMemcpyAsync(buf_d1, ibuf_h, sizeof(int) * buffer_size, hipMemcpyHostToDevice, streamdev1));
|
||||
HipTest::vector_square<int>
|
||||
<<<grid_size, block_size, 0, streamdev1>>>(buf_d1, buf_d1, buffer_size);
|
||||
HIP_CHECK(hipEventRecord(eventdev1, streamdev1));
|
||||
HIP_CHECK(hipStreamWaitEvent(streamdev2, eventdev1));
|
||||
|
||||
HIP_CHECK(hipSetDevice(1));
|
||||
HIP_CHECK(hipMemcpyDtoDAsync(buf_d2, buf_d1, sizeof(int) * buffer_size, streamdev2));
|
||||
HipTest::vector_square<int>
|
||||
<<<grid_size, block_size, 0, streamdev2>>>(buf_d2, buf_d2, buffer_size);
|
||||
HIP_CHECK(hipEventRecord(eventdev2, streamdev2));
|
||||
HIP_CHECK(hipStreamWaitEvent(streamdev1, eventdev2));
|
||||
|
||||
HIP_CHECK(hipStreamEndCapture(streamdev1, &graph));
|
||||
|
||||
HIP_CHECK(hipSetDevice(0));
|
||||
HIP_CHECK(hipGraphInstantiate(&graph_exec, graph, nullptr, nullptr, 0));
|
||||
HIP_CHECK(hipGraphLaunch(graph_exec, streamdev1));
|
||||
HIP_CHECK(hipStreamSynchronize(streamdev1));
|
||||
|
||||
HIP_CHECK(hipSetDevice(1));
|
||||
HIP_CHECK(hipMemcpy(outbuf_h, buf_d2, sizeof(int) * buffer_size, hipMemcpyHostToDevice));
|
||||
check_output(ibuf_h, outbuf_h, buffer_size);
|
||||
|
||||
HIP_CHECK(hipGraphExecDestroy(graph_exec));
|
||||
HIP_CHECK(hipGraphDestroy(graph));
|
||||
|
||||
delete[] ibuf_h;
|
||||
delete[] outbuf_h;
|
||||
HIP_CHECK(hipFree(buf_d1));
|
||||
HIP_CHECK(hipFree(buf_d2));
|
||||
HIP_CHECK(hipStreamDestroy(streamdev1));
|
||||
HIP_CHECK(hipStreamDestroy(streamdev2));
|
||||
HIP_CHECK(hipEventDestroy(eventdev1));
|
||||
HIP_CHECK(hipEventDestroy(eventdev2));
|
||||
}
|
||||
|
||||
/**
|
||||
* End doxygen group GraphMultiDevice.
|
||||
* @}
|
||||
*/
|
||||
新しいイシューから参照
ユーザーをブロックする