[CI]: test-async-copy-tracing-validate : test_ancestor_ids (#364)

* Add ancestor ID to pftrace

* Simplify hipMemcpyAsync ancestor ID checks

* Review comments

[ROCm/rocprofiler-sdk commit: cac3ac1b98]
This commit is contained in:
Kuricheti, Mythreya
2025-04-25 09:19:17 -07:00
committed by GitHub
parent 1264ffeb45
commit cd106cda3c
2 changed files with 69 additions and 102 deletions
@@ -55,6 +55,34 @@ def get_operation_name(record, kind_idx, op_idx):
return None
def groupby_corr_id(trace_item, op_id=None):
"""
If op_id is not none, returns records only with that operation ID
{
corr_id-1: record with internal corr_id = corr_id-1
corr_id-2: record with internal corr_id = corr_id-2
...
}
"""
from rocprofiler_sdk.pytest_utils.dotdict import dotdict
ret = {}
for x in trace_item:
if op_id is not None and x.operation != op_id:
continue
corr_id = x.correlation_id["internal"]
if corr_id in ret.keys():
assert False, f"Duplicate internal corr_id {corr_id}"
else:
ret[corr_id] = x
return dotdict(ret)
def test_data_structure(input_data):
"""verify minimum amount of expected data is present"""
data = input_data
@@ -356,116 +384,43 @@ def test_async_copy_direction(input_data):
def test_ancestor_ids(input_data):
"""
This test ensures that each memcpy can be traced back to
a hipMemcpyAsync through ancestor IDs
"""
from rocprofiler_sdk.pytest_utils.dotdict import dotdict
data = input_data
sdk_data = data["rocprofiler-sdk-json-tool"]
buffer_records = sdk_data.buffer_records
hip_memcopy_id = get_operation(buffer_records, "HIP_RUNTIME_API", "hipMemcpyAsync")
has_async_memcopy_id = get_operation(
buffer_records, "HSA_AMD_EXT_API", "hsa_amd_memory_async_copy_on_engine"
)
def by_thread_id(trace_item, op_id, corr_id_name):
"""
Returns a dict like so, where corr_id_name == "ancestor" or "internal"
{
tid: {
corr_id-1: [ list of objects with value of corr_id_name == corr_id-1 ]
corr_id-2: [ list of objects with value of corr_id_name == corr_id-2 ]
...
}
}
"""
ret = {}
for x in trace_item:
if x.operation == op_id:
corr_id = x.correlation_id[corr_id_name]
if x.thread_id not in ret.keys():
ret[x.thread_id] = {}
if corr_id in ret[x.thread_id].keys():
ret[x.thread_id][corr_id].append(x)
else:
ret[x.thread_id][corr_id] = [x]
return dotdict(ret)
# dict with tid -> internal id -> record
hip_memcopies = by_thread_id(
buffer_records.hip_api_traces, hip_memcopy_id, "internal"
)
# dict with tid -> ancestor id -> record
hsa_memcopies = by_thread_id(
buffer_records.hsa_api_traces, has_async_memcopy_id, "ancestor"
)
# We expect the same keys in hip.thread_id and hsa.thread_id, because they are parent <-> child corr ids
# both must have same thread ids
# For each tid, hip.ancestor and hsa.internal keys must be same
assert (
hip_memcopies.keys() == hsa_memcopies.keys()
), "TIDs are not same for hsa and hip calls"
for tid in hip_memcopies.keys():
assert (
hip_memcopies[tid].keys() == hsa_memcopies[tid].keys()
), "Parent-child keys are not equal"
memcopies = buffer_records.memory_copies
num_hsa_memcopies = sum(
[
len(val)
for tid in hsa_memcopies.keys()
for _, val in hsa_memcopies[tid].items()
]
)
_, hip_op_ids = get_operation(buffer_records, "HIP_RUNTIME_API")
hip_memcopy_id = get_operation(buffer_records, "HIP_RUNTIME_API", "hipMemcpyAsync")
# dict with { internal id : record }
hip_memcopies = groupby_corr_id(buffer_records.hip_api_traces, hip_memcopy_id)
hsa_records = groupby_corr_id(buffer_records.hsa_api_traces)
hip_records = groupby_corr_id(buffer_records.hip_api_traces)
accounted_for_hip_ids = []
for memcpy in memcopies:
parent_hsa_call = hsa_records[memcpy.correlation_id.internal]
parent_hip_call = hip_records[parent_hsa_call.correlation_id.ancestor]
assert (
parent_hip_call.thread_id == parent_hsa_call.thread_id
), "Expected hsa and hip calls to be on the same thread"
assert hip_op_ids[parent_hip_call.operation] == "hipMemcpyAsync"
accounted_for_hip_ids.append(parent_hip_call.correlation_id.internal)
# Ensure we looked through all HIP entries
assert (
len(memcopies) == num_hsa_memcopies
), "Expected number of memcopies to be same as number of async HSA (hsa_amd_memory_async_copy_on_engine) calls"
for tid in hip_memcopies:
# We expect only 1 record with this internal id, per thread
for corr_id, records in hip_memcopies[tid].items():
assert (
len(records) == 1
), "Expected only one record with this (thread_id, corr_id)"
for _, tid_records in hsa_memcopies.items():
for _, records in tid_records.items():
for record in records:
child_memcpy_record = [
x
for x in buffer_records.memory_copies
if x.correlation_id.internal == record.correlation_id.internal
]
assert (
len(child_memcpy_record) == 1
), "Expected only one memcpy record with this internal ID"
child_memcpy_record = child_memcpy_record[0]
assert record.thread_id in hsa_memcopies.keys()
assert record.thread_id in hip_memcopies.keys()
assert (
record.correlation_id.ancestor
in hip_memcopies[record.thread_id].keys()
)
# This is basically the same as the Expected only one record with this (thread_id, corr_id) test
assert (
len(hip_memcopies[record.thread_id][record.correlation_id.ancestor])
== 1
), "Expected only one record with this (thread_id, corr_id)"
ancestor_hip_record = hip_memcopies[record.thread_id][
record.correlation_id.ancestor
][0]
assert (
ancestor_hip_record.correlation_id.internal
== record.correlation_id.ancestor
)
assert ancestor_hip_record.thread_id == record.thread_id
set(accounted_for_hip_ids) == set(hip_memcopies.keys()),
"Expected to account for all HIP memcpy calls through ancestor ID lookup",
)
def test_retired_correlation_ids(input_data):
@@ -2253,6 +2253,8 @@ write_perfetto()
itr.operation,
"corr_id",
itr.correlation_id.internal,
"ancestor_id",
itr.correlation_id.ancestor,
[&](::perfetto::EventContext ctx) {
for(const auto& aitr : _args)
sdk::add_perfetto_annotation(ctx, aitr.first, aitr.second);
@@ -2296,6 +2298,8 @@ write_perfetto()
itr.operation,
"corr_id",
itr.correlation_id.internal,
"ancestor_id",
itr.correlation_id.ancestor,
[&](::perfetto::EventContext ctx) {
for(const auto& aitr : _args)
sdk::add_perfetto_annotation(ctx, aitr.first, aitr.second);
@@ -2340,6 +2344,8 @@ write_perfetto()
itr.operation,
"corr_id",
itr.correlation_id.internal,
"ancestor_id",
itr.correlation_id.ancestor,
[&](::perfetto::EventContext ctx) {
for(const auto& aitr : _args)
sdk::add_perfetto_annotation(ctx, aitr.first, aitr.second);
@@ -2384,6 +2390,8 @@ write_perfetto()
itr.operation,
"corr_id",
itr.correlation_id.internal,
"ancestor_id",
itr.correlation_id.ancestor,
[&](::perfetto::EventContext ctx) {
for(const auto& aitr : _args)
sdk::add_perfetto_annotation(ctx, aitr.first, aitr.second);
@@ -2428,6 +2436,8 @@ write_perfetto()
itr.operation,
"corr_id",
itr.correlation_id.internal,
"ancestor_id",
itr.correlation_id.ancestor,
[&](::perfetto::EventContext ctx) {
for(const auto& aitr : _args)
sdk::add_perfetto_annotation(ctx, aitr.first, aitr.second);
@@ -2471,6 +2481,8 @@ write_perfetto()
itr.operation,
"corr_id",
itr.correlation_id.internal,
"ancestor_id",
itr.correlation_id.ancestor,
[&](::perfetto::EventContext ctx) {
for(const auto& aitr : _args)
sdk::add_perfetto_annotation(ctx, aitr.first, aitr.second);