[rocprofiler-sdk] Fix Stream ID Error for Attachment (#1142)

* Changed stream error warning, remove regex search from attach execute test

* Formatting

* Revert accidental change

* Fix stream hang error due to grabbing same lock twice

* Updated add stream code, need to update tests

* Update attachment tests to use streams, threads, and multiple devices

* Update tests and fix stream issues

* Updated error messages to be more explicit, updated json to csv code in conftest to include streams and threads

* Formatting

* Add attachment label to attachment tests and update validation to fix errors

* Fix attach twice conftest

* Disabled thread san tests for attachment since they no longer work with bin file changes

* Updated for comment

* Added null check for getting attach status
Этот коммит содержится в:
itrowbri
2025-10-17 16:34:05 -05:00
коммит произвёл GitHub
родитель 620ccbeb82
Коммит e7a26594b7
10 изменённых файлов: 219 добавлений и 90 удалений
+20 -10
Просмотреть файл
@@ -76,10 +76,10 @@ get_stream_map()
}
auto
add_stream(hipStream_t stream)
add_stream(hipStream_t stream, bool reindex_existing = true)
{
return get_stream_map()->wlock(
[](stream_map_t& _data, hipStream_t _stream) {
[](stream_map_t& _data, hipStream_t _stream, const bool _reindex_existing) {
static uint64_t idx_offset = 0;
auto idx = _data.size() + idx_offset;
@@ -91,6 +91,8 @@ add_stream(hipStream_t stream)
if(!_data.emplace(_stream, rocprofiler_stream_id_t{.handle = idx}).second)
{
// Do not change the index if attachment mode is currently active
if(!_reindex_existing) return _data.at(_stream);
idx_offset += 1;
// Handle special hipStreamPerThread case where each thread has it's own implicit
// stream ID. No need to update map since hipStreamPerThread is defined as 0x02
@@ -108,7 +110,8 @@ add_stream(hipStream_t stream)
}
return _data.at(_stream);
},
stream);
stream,
reindex_existing);
}
auto
@@ -128,18 +131,25 @@ get_stream_id(hipStream_t stream)
if(thr_stream_id.handle == 0) thr_stream_id = add_stream(stream);
return thr_stream_id;
}
return get_stream_map()->rlock(
[](const stream_map_t& _data, hipStream_t _stream) {
ROCP_ERROR_IF(_data.count(_stream) == 0)
auto stream_id = get_stream_map()->rlock(
[](const stream_map_t& _data,
hipStream_t _stream) -> std::optional<rocprofiler_stream_id_t> {
ROCP_INFO_IF(_data.count(_stream) == 0 &&
!rocprofiler::registration::supports_attachment())
<< fmt::format("failed to retrieve stream ID for hipStream_t ({}) in {}",
sdk::utility::as_hex(static_cast<void*>(_stream)),
__FILE__);
// Stream may not be tracked during attachment. You should use queue grouping with
// attachment
if(_data.count(_stream) == 0) return add_stream(_stream);
return _data.at(_stream);
if(_data.count(_stream) != 0)
return std::optional<rocprofiler_stream_id_t>{_data.at(_stream)};
return std::nullopt;
},
stream);
// Stream ID already exists
if(stream_id) return *stream_id;
ROCP_CI_LOG_IF(WARNING, !rocprofiler::registration::supports_attachment()) << fmt::format(
"Stream ID is not present in {} when attach feature is not being used", __FUNCTION__);
return add_stream(stream, false);
}
// Map rocprofiler_hip_stream_operation_t to respective name
+7 -1
Просмотреть файл
@@ -182,7 +182,7 @@ struct attach_status
auto*
get_attach_status()
{
static auto*& _v = common::static_object<attach_status>::construct(false);
static auto*& _v = common::static_object<attach_status>::construct();
return _v;
}
@@ -750,6 +750,12 @@ invoke_client_finalizer(rocprofiler_client_id_t client_id)
}
} // namespace
bool
supports_attachment()
{
return (get_attach_status()) ? get_attach_status()->has_attach_table : false;
}
void
init_logging()
{
+3
Просмотреть файл
@@ -80,6 +80,9 @@ set_init_status(int);
void
set_fini_status(int);
bool
supports_attachment();
// call tool_reattach function for all registered clients
rocprofiler_status_t
attach();
+101 -47
Просмотреть файл
@@ -24,10 +24,25 @@
#include <rocprofiler-sdk-roctx/roctx.h>
#include <unistd.h>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
/* Macro for checking GPU API return values */
#define HIP_ASSERT(call) \
do \
{ \
hipError_t gpuErr = call; \
if(hipSuccess != gpuErr) \
{ \
printf( \
"GPU API Error - %s:%d: '%s'\n", __FILE__, __LINE__, hipGetErrorString(gpuErr)); \
exit(1); \
} \
} while(0)
__global__ void
simple_kernel(float* data, int size)
{
@@ -38,44 +53,23 @@ simple_kernel(float* data, int size)
}
}
int
main(int /*argc*/, char** /*argv*/)
void
execute_kernels(const size_t tid,
const hipStream_t stream,
const size_t stream_id,
const size_t device_id)
{
std::cout << "Attachment test app started with PID: " << getpid() << std::endl;
// Initialize HIP
int device_count = 0;
hipError_t err = hipGetDeviceCount(&device_count);
if(err != hipSuccess || device_count == 0)
{
std::cerr << "No HIP devices found or error getting device count" << std::endl;
return 1;
}
std::cout << "After first call " << getpid() << std::endl;
// Set device
err = hipSetDevice(0);
if(err != hipSuccess)
{
std::cerr << "Failed to set device 0" << std::endl;
return 1;
}
HIP_ASSERT(hipSetDevice(device_id));
// Allocate memory
const int size = 1024 * 1024; // 1M elements
const size_t bytes = size * sizeof(float);
float* h_data = new float[size];
float* d_data;
float* d_data = nullptr;
err = hipMalloc(&d_data, bytes);
if(err != hipSuccess)
{
std::cerr << "Failed to allocate device memory" << std::endl;
delete[] h_data;
return 1;
}
HIP_ASSERT(hipMalloc(&d_data, bytes));
// Initialize data
for(int i = 0; i < size; ++i)
@@ -84,7 +78,8 @@ main(int /*argc*/, char** /*argv*/)
}
// Run kernels in a loop for a while
std::cout << "Starting kernel execution loop..." << std::endl;
std::cout << "Starting kernel execution loop for thread " << tid << " with stream " << stream_id
<< " on device " << device_id << "...\n";
const int num_iterations = 30;
for(int iter = 0; iter < num_iterations; ++iter)
@@ -95,10 +90,11 @@ main(int /*argc*/, char** /*argv*/)
// Copy data to device
roctxMark("Start_H2D_Copy");
err = hipMemcpy(d_data, h_data, bytes, hipMemcpyHostToDevice);
auto err = hipMemcpyAsync(d_data, h_data, bytes, hipMemcpyHostToDevice, stream);
if(err != hipSuccess)
{
std::cerr << "Failed to copy data to device" << std::endl;
std::cerr << "Failed to copy data for thread " << tid << " with stream " << stream_id
<< " on device " << device_id << "...\n";
roctxRangePop(); // Removed - ROCTx not linked
break;
}
@@ -109,46 +105,104 @@ main(int /*argc*/, char** /*argv*/)
int blocks_per_grid = (size + threads_per_block - 1) / threads_per_block;
hipLaunchKernelGGL(
simple_kernel, dim3(blocks_per_grid), dim3(threads_per_block), 0, 0, d_data, size);
simple_kernel, dim3(blocks_per_grid), dim3(threads_per_block), 0, stream, d_data, size);
// Copy data back
roctxMark("Start_D2H_Copy");
err = hipMemcpy(h_data, d_data, bytes, hipMemcpyDeviceToHost);
err = hipMemcpyAsync(h_data, d_data, bytes, hipMemcpyDeviceToHost, stream);
if(err != hipSuccess)
{
std::cerr << "Failed to copy data from device" << std::endl;
std::cerr << "Failed to copy data for thread " << tid << " with stream " << stream_id
<< " on device " << device_id << "...\n";
roctxRangePop(); // Removed - ROCTx not linked
break;
}
// Wait for completion
roctxMark("Device_Synchronize");
err = hipDeviceSynchronize();
roctxMark("Stream_Synchronize");
err = hipStreamSynchronize(stream);
if(err != hipSuccess)
{
std::cerr << "Failed to synchronize device" << std::endl;
std::cerr << "Failed to synchronize stream " << stream_id << " with thread " << tid
<< " on device " << device_id << "...\n";
roctxRangePop(); // Removed - ROCTx not linked
break;
}
roctxRangePop(); // Removed - ROCTx not linked
std::cout << "Iteration " << (iter + 1) << "/" << num_iterations << " completed"
<< std::endl;
// Small delay between iterations
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
std::cout << "Kernel execution loop completed" << std::endl;
std::cout << "Kernel execution loop completed for thread " << tid << " with stream "
<< stream_id << " on device " << device_id << "...\n";
// Cleanup
err = hipFree(d_data);
if(err != hipSuccess)
{
std::cerr << "Warning: Failed to free device memory" << std::endl;
}
HIP_ASSERT(hipFree(d_data));
delete[] h_data;
}
int
main(int argc, char** argv)
{
size_t nthreads{32};
size_t nstreams{8};
int ndevices{0};
for(int i = 1; i < argc; ++i)
{
auto _arg = std::string{argv[i]};
if(_arg == "?" || _arg == "-h" || _arg == "--help")
{
fprintf(stderr,
"usage: attachment-test [NUM_THREADS (%zu)] [NUM_STREAMS (%zu)] "
"[NUM_DEVICES (%d)]\n",
nthreads,
nstreams,
ndevices);
exit(EXIT_SUCCESS);
}
}
if(argc > 1) nthreads = std::atoll(argv[1]);
if(argc > 2) nstreams = std::atoll(argv[2]);
if(argc > 3) ndevices = std::stoi(argv[3]);
std::cout << "Attachment test app started with PID: " << getpid() << std::endl;
// Initialize HIP
int device_count = 0;
HIP_ASSERT(hipGetDeviceCount(&device_count));
if(device_count == 0)
{
std::cerr << "No HIP devices found or error getting device count" << std::endl;
return 1;
}
// Default ndecives to device_count. Ensure that we do not use more devices than are available
ndevices = ndevices == 0 ? device_count : ndevices;
if(ndevices > device_count)
{
std::cout << "Using " << device_count << " HIP devices instead of the requested "
<< ndevices << "\n";
ndevices = device_count;
}
std::cout << "After first call " << getpid() << std::endl;
auto _threads = std::vector<std::thread>{};
auto _streams = std::vector<hipStream_t>(nstreams);
_threads.reserve(nthreads);
for(auto& itr : _streams)
HIP_ASSERT(hipStreamCreate(&itr));
for(size_t i = 0; i < nthreads; ++i)
_threads.emplace_back(
execute_kernels, i, _streams.at(i % nstreams), i % nstreams, i % ndevices);
for(auto& itr : _threads)
itr.join();
// Destroy streams
for(auto itr : _streams)
HIP_ASSERT(hipStreamDestroy(itr));
std::cout << "Attachment test app finished" << std::endl;
+16 -16
Просмотреть файл
@@ -10,7 +10,8 @@ project(
find_package(rocprofiler-sdk REQUIRED)
set(ROCPROFILER_MEMCHECK_TYPES "AddressSanitizer" "UndefinedBehaviorSanitizer")
set(ROCPROFILER_MEMCHECK_TYPES "AddressSanitizer" "UndefinedBehaviorSanitizer"
"ThreadSanitizer")
if(ROCPROFILER_MEMCHECK AND ROCPROFILER_MEMCHECK IN_LIST ROCPROFILER_MEMCHECK_TYPES)
set(IS_DISABLED ON)
@@ -44,19 +45,18 @@ add_test(
set_tests_properties(
rocprofv3-test-attachment-attach-once-execute
PROPERTIES
TIMEOUT
60
LABELS
"integration-tests"
ENVIRONMENT
"${attachment-env}"
FAIL_REGULAR_EXPRESSION
"failed to retrieve stream ID|ERROR|FATAL|${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_SETUP
rocprofv3-test-attachment-attach-once
DISABLED
"${IS_DISABLED}")
PROPERTIES TIMEOUT
60
LABELS
"integration-tests;attachment"
ENVIRONMENT
"${attachment-env}"
FAIL_REGULAR_EXPRESSION
"ERROR|FATAL|${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_SETUP
rocprofv3-test-attachment-attach-once
DISABLED
"${IS_DISABLED}")
# Validate the output from the attached profiling
add_test(
@@ -74,7 +74,7 @@ set_tests_properties(
PROPERTIES TIMEOUT
30
LABELS
"integration-tests"
"integration-tests;attachment"
DEPENDS
rocprofv3-test-attachment-attach-once-execute
FIXTURES_REQUIRED
@@ -97,7 +97,7 @@ set_tests_properties(
PROPERTIES TIMEOUT
30
LABELS
"integration-tests"
"integration-tests;attachment"
DEPENDS
rocprofv3-test-attachment-attach-once-execute
FIXTURES_REQUIRED
+4
Просмотреть файл
@@ -116,6 +116,8 @@ def convert_json_records_to_csv_format(records, section_name, kernel_symbols=Non
if isinstance(queue_info, dict)
else queue_info
)
csv_record["Stream_Id"] = dispatch_info.get("stream_id", {}).get("handle", 0)
csv_record["Thread_Id"] = record.get("thread_id", 0)
csv_record["Kernel_Id"] = str(kernel_id)
# Correlation ID with internal/external handling
@@ -156,6 +158,8 @@ def convert_json_records_to_csv_format(records, section_name, kernel_symbols=Non
csv_record["Direction"] = "H2D" if src_agent < dst_agent else "D2H"
else:
csv_record["Direction"] = "D2D"
# Get stream ID
csv_record["Stream_Id"] = record.get("stream_id", {}).get("handle", 0)
# Correlation ID handling
corr_id = record.get("correlation_id", {})
+24
Просмотреть файл
@@ -41,9 +41,16 @@ def test_attachment_kernel_trace(kernel_input_data):
simple_kernel_found
), f"Expected 'simple_kernel' not found in kernel names: {kernel_names}"
kernel_threads = set()
kernel_streams = set()
NUM_KERNEL_THREADS = 32
expected_stream_ids = set([i for i in range(1, 9)])
# Verify basic kernel properties
for row in kernel_input_data:
if "simple_kernel" in row["Kernel_Name"]:
assert "Stream_Id" in row
assert "Thread_Id" in row
assert row["Kind"] == "KERNEL_DISPATCH"
assert int(row["Queue_Id"]) > 0
assert int(row["Kernel_Id"]) > 0
@@ -59,6 +66,15 @@ def test_attachment_kernel_trace(kernel_input_data):
assert int(row["Grid_Size_Y"]) >= 1
assert int(row["Grid_Size_Z"]) >= 1
thread_id = int(row["Thread_Id"])
stream_id = int(row["Stream_Id"])
kernel_threads.add(thread_id)
kernel_streams.add(stream_id)
# Exactly 8 streams and 32 threads
len(kernel_threads) == NUM_KERNEL_THREADS
kernel_streams == expected_stream_ids
def test_attachment_memory_copy_trace(memory_copy_input_data):
"""Verify that memory copy operations were captured during attachment."""
@@ -70,8 +86,11 @@ def test_attachment_memory_copy_trace(memory_copy_input_data):
host_to_device_count = 0
device_to_host_count = 0
memory_copy_streams = set()
expected_stream_ids = set([i for i in range(1, 9)])
for row in memory_copy_input_data:
assert "Stream_Id" in row
assert row["Kind"] == "MEMORY_COPY"
assert int(row["Correlation_Id"]) > 0
assert int(row["End_Timestamp"]) >= int(row["Start_Timestamp"])
@@ -84,9 +103,14 @@ def test_attachment_memory_copy_trace(memory_copy_input_data):
):
device_to_host_count += 1
stream_id = int(row["Stream_Id"])
memory_copy_streams.add(stream_id)
# We should have both H2D and D2H copies
assert host_to_device_count > 0, "No host-to-device memory copies captured"
assert device_to_host_count > 0, "No device-to-host memory copies captured"
# Exactly 8 streams
memory_copy_streams == expected_stream_ids
def test_attachment_hsa_api_trace(hsa_input_data):
+16 -16
Просмотреть файл
@@ -10,7 +10,8 @@ project(
find_package(rocprofiler-sdk REQUIRED)
set(ROCPROFILER_MEMCHECK_TYPES "AddressSanitizer" "UndefinedBehaviorSanitizer")
set(ROCPROFILER_MEMCHECK_TYPES "AddressSanitizer" "UndefinedBehaviorSanitizer"
"ThreadSanitizer")
if(ROCPROFILER_MEMCHECK AND ROCPROFILER_MEMCHECK IN_LIST ROCPROFILER_MEMCHECK_TYPES)
set(IS_DISABLED ON)
@@ -44,19 +45,18 @@ add_test(
set_tests_properties(
rocprofv3-test-attachment-attach-twice-execute
PROPERTIES
TIMEOUT
120
LABELS
"integration-tests"
ENVIRONMENT
"${attachment-env}"
FAIL_REGULAR_EXPRESSION
"failed to retrieve stream ID|ERROR|FATAL|${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_SETUP
rocprofv3-test-attachment-attach-twice
DISABLED
"${IS_DISABLED}")
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;attachment"
ENVIRONMENT
"${attachment-env}"
FAIL_REGULAR_EXPRESSION
"ERROR|FATAL|${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_SETUP
rocprofv3-test-attachment-attach-twice
DISABLED
"${IS_DISABLED}")
# Validate the output from the reattached profiling (CSV)
add_test(
@@ -74,7 +74,7 @@ set_tests_properties(
PROPERTIES TIMEOUT
30
LABELS
"integration-tests"
"integration-tests;attachment"
DEPENDS
rocprofv3-test-attachment-attach-twice-execute
FIXTURES_REQUIRED
@@ -97,7 +97,7 @@ set_tests_properties(
PROPERTIES TIMEOUT
30
LABELS
"integration-tests"
"integration-tests;attachment"
DEPENDS
rocprofv3-test-attachment-attach-twice-execute
FIXTURES_REQUIRED
+4
Просмотреть файл
@@ -117,6 +117,8 @@ def convert_json_records_to_csv_format(records, section_name, kernel_symbols=Non
else queue_info
)
csv_record["Kernel_Id"] = str(kernel_id)
csv_record["Stream_Id"] = dispatch_info.get("stream_id", {}).get("handle", 0)
csv_record["Thread_Id"] = record.get("thread_id", 0)
# Correlation ID with internal/external handling
corr_id = record.get("correlation_id", {})
@@ -156,6 +158,8 @@ def convert_json_records_to_csv_format(records, section_name, kernel_symbols=Non
csv_record["Direction"] = "H2D" if src_agent < dst_agent else "D2H"
else:
csv_record["Direction"] = "D2D"
# Get stream ID
csv_record["Stream_Id"] = record.get("stream_id", {}).get("handle", 0)
# Correlation ID handling
corr_id = record.get("correlation_id", {})
+24
Просмотреть файл
@@ -41,9 +41,16 @@ def test_attachment_kernel_trace(kernel_input_data):
simple_kernel_found
), f"Expected 'simple_kernel' not found in kernel names: {kernel_names}"
kernel_threads = set()
kernel_streams = set()
NUM_KERNEL_THREADS = 32
expected_stream_ids = set([i for i in range(1, 9)])
# Verify basic kernel properties
for row in kernel_input_data:
if "simple_kernel" in row["Kernel_Name"]:
assert "Stream_Id" in row
assert "Thread_Id" in row
assert row["Kind"] == "KERNEL_DISPATCH"
assert int(row["Queue_Id"]) > 0
assert int(row["Kernel_Id"]) > 0
@@ -59,6 +66,15 @@ def test_attachment_kernel_trace(kernel_input_data):
assert int(row["Grid_Size_Y"]) >= 1
assert int(row["Grid_Size_Z"]) >= 1
thread_id = int(row["Thread_Id"])
stream_id = int(row["Stream_Id"])
kernel_threads.add(thread_id)
kernel_streams.add(stream_id)
# Exactly 8 streams and 32 threads
len(kernel_threads) == NUM_KERNEL_THREADS
kernel_streams == expected_stream_ids
def test_attachment_memory_copy_trace(memory_copy_input_data):
"""Verify that memory copy operations were captured during attachment."""
@@ -70,8 +86,11 @@ def test_attachment_memory_copy_trace(memory_copy_input_data):
host_to_device_count = 0
device_to_host_count = 0
memory_copy_streams = set()
expected_stream_ids = set([i for i in range(1, 9)])
for row in memory_copy_input_data:
assert "Stream_Id" in row
assert row["Kind"] == "MEMORY_COPY"
assert int(row["Correlation_Id"]) > 0
assert int(row["End_Timestamp"]) >= int(row["Start_Timestamp"])
@@ -83,11 +102,16 @@ def test_attachment_memory_copy_trace(memory_copy_input_data):
"MEMORY_COPY_DEVICE_TO_HOST" in row["Direction"] or "D2H" in row["Direction"]
):
device_to_host_count += 1
stream_id = int(row["Stream_Id"])
memory_copy_streams.add(stream_id)
# We should have both H2D and D2H copies
assert host_to_device_count > 0, "No host-to-device memory copies captured"
assert device_to_host_count > 0, "No device-to-host memory copies captured"
# Exactly 8 streams
memory_copy_streams == expected_stream_ids
def test_attachment_hsa_api_trace(hsa_input_data):
"""Verify that HSA API calls were captured during attachment."""