Files
2025-12-05 18:58:43 +01:00

543 líneas
26 KiB
Python

#!/usr/bin/env python3
###############################################################################
# MIT License
#
# Copyright (c) 2023 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
###############################################################################
import os
import otf2
from otf2.enums import LocationType, LocationGroupType, RegionRole, Paradigm
import shutil
import time
from collections import defaultdict
from .importer import RocpdImportData
from . import output_config
def get_perfetto_category_name(category):
"""Map category names to perfetto category names"""
category_map = {
"NONE": "none",
"HSA_CORE_API": "hsa_api",
"HSA_AMD_EXT_API": "hsa_api",
"HSA_IMAGE_EXT_API": "hsa_api",
"HSA_FINALIZE_EXT_API": "hsa_api",
"HIP_RUNTIME_API": "hip_api",
"HIP_COMPILER_API": "hip_api",
"MARKER_CORE_API": "marker_api",
"MARKER_CORE_RANGE_API": "marker_api",
"MARKER_CONTROL_API": "marker_api",
"MARKER_NAME_API": "marker_api",
"MEMORY_COPY": "memory_copy",
"MEMORY_ALLOCATION": "memory_allocation",
"KERNEL_DISPATCH": "kernel_dispatch",
"SCRATCH_MEMORY": "scratch_memory",
"CORRELATION_ID_RETIREMENT": "none",
"RCCL_API": "rccl_api",
"OMPT": "openmp",
"RUNTIME_INITIALIZATION": "none",
"ROCDECODE_API": "rocdecode_api",
"ROCJPEG_API": "rocjpeg_api",
"HIP_STREAM": "hip_api",
"HIP_RUNTIME_API_EXT": "hip_api",
"HIP_COMPILER_API_EXT": "hip_api",
"ROCDECODE_API_EXT": "rocdecode_api",
"KFD_EVENT_PAGE_MIGRATE": "kfd_events",
"KFD_EVENT_PAGE_FAULT": "kfd_events",
"KFD_EVENT_QUEUE": "kfd_events",
"KFD_EVENT_UNMAP_FROM_GPU": "kfd_events",
"KFD_EVENT_DROPPED_EVENTS": "kfd_events",
"KFD_PAGE_MIGRATE": "kfd_events",
"KFD_PAGE_FAULT": "kfd_events",
"KFD_QUEUE": "kfd_events",
}
return category_map.get(category, "none")
def allocation_level_type_name(level, type):
if level == "REAL":
name = "MEMORY"
elif level == "VIRTUAL":
name = "MEMORY_VMEM"
elif level == "SCRATCH":
name = "SCRATCH_MEMORY"
else:
return "UNKNOWN_LEVEL"
if type == "ALLOC":
return name + "_ALLOCATE"
elif type == "FREE":
return name + "_FREE"
else:
return level + "_MEMORY_NONE"
return name
def write_otf2(importData, config):
timer_resolution = 1_000_000_000
trace_dir = getattr(config, "output_path", "./otf_traces")
trace_file = f"{getattr(config, 'output_file', 'traces')}_results"
if not os.path.exists(trace_dir):
os.makedirs(trace_dir)
if os.path.exists(trace_dir):
trace_subdir = os.path.join(trace_dir, trace_file)
if os.path.exists(trace_subdir):
shutil.rmtree(trace_subdir)
otf2_file = os.path.join(trace_dir, f"{trace_file}.otf2")
def_file = os.path.join(trace_dir, f"{trace_file}.def")
print(f"Writing: {otf2_file}")
print(f"Writing: {def_file}")
print(f"Writing: {trace_subdir} directory")
if os.path.exists(otf2_file):
os.remove(otf2_file)
if os.path.exists(def_file):
os.remove(def_file)
with otf2.writer.open(
trace_dir, trace_file, timer_resolution=timer_resolution
) as archive:
conn = getattr(importData, "connection", None)
if conn is not None:
try:
cursor = conn.cursor()
cursor.execute("SELECT MIN(start), MAX(fini) FROM processes;")
min_start, max_finish = cursor.fetchone()
with otf2.writer.DefinitionWriter(archive) as global_def_writer:
global_offset = min_start
duration = max_finish - min_start
realtime_timestamp = int(round(time.time() * timer_resolution))
global_def_writer.write_clock_properties(
timer_resolution, global_offset, duration, realtime_timestamp
)
perfetto_category = archive.definitions.attribute(
name="category", description="tracing category"
)
memory_copy_attributes = {perfetto_category: "memory_copy"}
memory_allocation_attributes = {
perfetto_category: "memory_allocation"
}
kernel_attributes = {perfetto_category: "kernel_dispatch"}
kernel_rename = getattr(config, "kernel_rename")
agent_index_value = getattr(config, "agent_index_value")
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT guid, id FROM rocpd_info_node")
for row in cursor:
guid, nid = row
cursor = conn.cursor()
cursor.execute(
"SELECT pid, hostname, command FROM processes WHERE guid = ? AND nid = ?",
(guid, nid),
)
for row in cursor:
pid, hostname, command = row
tree_node = archive.definitions.system_tree_node(
name=command, class_name=hostname, parent=None
)
cpu_location_group = archive.definitions.location_group(
name=command,
location_group_type=LocationGroupType.PROCESS,
system_tree_parent=tree_node,
)
api_calls = defaultdict(list)
memory_copies = defaultdict(list)
memory_allocations = defaultdict(list)
memory_deallocations = defaultdict(list)
memory_unknown = defaultdict(list)
kernel_dispatches = defaultdict(list)
agents = {}
cursor = conn.cursor()
cursor.execute(
"""SELECT tid, start, end, name, category FROM regions
WHERE guid = ? AND nid = ? AND pid = ?""",
(guid, nid, pid),
)
for row in cursor:
tid, start, end, name, category = row
api_calls[tid].append((start, end, name, category))
cursor = conn.cursor()
cursor.execute(
"""SELECT tid, dst_agent_abs_index, start, end, name
FROM memory_copies WHERE guid = ? AND nid = ?
AND pid = ? ORDER BY start ASC""",
(guid, nid, pid),
)
for row in cursor:
tid, agent, start, end, name = row
memory_copies[(tid, agent)].append((start, end, name))
cursor = conn.cursor()
cursor.execute(
"""SELECT tid, agent_abs_index, start, end, level, type
FROM memory_allocations WHERE guid = ? AND nid = ?
AND pid = ? ORDER BY start ASC""",
(guid, nid, pid),
)
for row in cursor:
tid, agent, start, end, level, type = row
name = allocation_level_type_name(level, type)
if type == "ALLOC":
memory_allocations[(tid, agent)].append(
(start, end, name)
)
elif type == "FREE":
memory_deallocations[tid].append((start, end, name))
else:
memory_unknown[tid].append((start, end, name))
cursor = conn.cursor()
cursor.execute(
"""SELECT tid, agent_abs_index, queue_id,
start, end, name, region
FROM kernels WHERE guid = ? AND nid = ?
AND pid = ? ORDER BY start ASC""",
(guid, nid, pid),
)
for row in cursor:
tid, agent, queue, start, end, name, region = row
if kernel_rename and region:
kernel_dispatches[(tid, agent, queue)].append(
(start, end, region)
)
else:
kernel_dispatches[(tid, agent, queue)].append(
(start, end, name)
)
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, type FROM rocpd_info_agent WHERE guid = ? AND nid = ?",
(guid, nid),
)
for row in cursor:
id, name, type = row
agents[id] = (
(
f"{type} Agent-{id}"
if agent_index_value
else f"{type} {type}-{id}"
),
archive.definitions.location_group(
name=name,
location_group_type=LocationGroupType.ACCELERATOR,
system_tree_parent=tree_node,
),
)
# Write API Call Events
for tid, data in api_calls.items():
cpu_location = archive.definitions.location(
name=f"Thread {tid}",
type=LocationType.CPU_THREAD,
group=cpu_location_group,
)
event_writer = otf2.writer.EventWriter(
archive, cpu_location
)
events = []
for start, end, name, category in data:
attributes = {
perfetto_category: get_perfetto_category_name(
category
)
}
region = archive.definitions.region(
name=name,
region_role=RegionRole.FUNCTION,
paradigm=Paradigm.HIP,
)
events.append((start, "start", region, attributes))
events.append((end, "end", region, None))
events.sort(key=lambda x: x[0])
for timestamp, event_type, region, attributes in events:
if event_type == "start":
event_writer.enter(
timestamp, region, attributes=attributes
)
else: # if event_type == "end":
event_writer.leave(timestamp, region)
# Write Memory Copy Events
for (tid, agent_id), data in memory_copies.items():
agent_name, agent_location_group = agents.get(
agent_id, (f"Unknown Agent {agent_id}", None)
)
memory_copy_location = archive.definitions.location(
name=f"Thread {tid}, Copy to {agent_name}",
type=LocationType.ACCELERATOR_STREAM,
group=agent_location_group,
)
memory_copy_writer = otf2.writer.EventWriter(
archive, memory_copy_location
)
memory_copy_events = []
for start, end, name in data:
region = archive.definitions.region(
name=name,
region_role=RegionRole.DATA_TRANSFER,
paradigm=Paradigm.HIP,
)
memory_copy_events.append((start, "enter", region))
memory_copy_events.append((end, "leave", region))
memory_copy_events.sort(key=lambda x: x[0])
for timestamp, event_type, region in memory_copy_events:
if event_type == "enter":
memory_copy_writer.enter(
timestamp,
region,
attributes=memory_copy_attributes,
)
else: # if event_type == "leave":
memory_copy_writer.leave(timestamp, region)
# Write Memory Allocation Events
for (tid, agent_id), data in memory_allocations.items():
agent_name, agent_location_group = agents.get(
agent_id, (f"Unknown Agent {agent_id}", None)
)
memory_allocation_location = archive.definitions.location(
name=f"Thread {tid}, Memory Allocate at {agent_name}",
type=LocationType.ACCELERATOR_STREAM,
group=agent_location_group,
)
memory_allocation_writer = otf2.writer.EventWriter(
archive, memory_allocation_location
)
memory_allocation_events = []
for start, end, name in data:
region = archive.definitions.region(
name=name,
region_role=RegionRole.ALLOCATE,
paradigm=Paradigm.HIP,
)
memory_allocation_events.append(
(start, "enter", region)
)
memory_allocation_events.append(
(end, "leave", region)
)
memory_allocation_events.sort(key=lambda x: x[0])
for (
timestamp,
event_type,
region,
) in memory_allocation_events:
if event_type == "enter":
memory_allocation_writer.enter(
timestamp,
region,
attributes=memory_allocation_attributes,
)
else: # if event_type == "leave":
memory_allocation_writer.leave(timestamp, region)
# Write Memory Deallocation Events
for tid, data in memory_deallocations.items():
memory_free_location = archive.definitions.location(
name=f"Thread {tid}, Memory Deallocate (Free)",
type=LocationType.ACCELERATOR_STREAM,
group=cpu_location_group,
)
memory_free_writer = otf2.writer.EventWriter(
archive, memory_free_location
)
memory_free_events = []
for start, end, name in data:
region = archive.definitions.region(
name=name,
region_role=RegionRole.DEALLOCATE,
paradigm=Paradigm.HIP,
)
memory_free_events.append((start, "enter", region))
memory_free_events.append((end, "leave", region))
memory_free_events.sort(key=lambda x: x[0])
for timestamp, event_type, region in memory_free_events:
if event_type == "enter":
memory_free_writer.enter(
timestamp,
region,
attributes=memory_allocation_attributes,
)
else: # if event_type == "leave":
memory_free_writer.leave(timestamp, region)
# Write Unknown Memory Events
for tid, data in memory_unknown.items():
memory_unknown_location = archive.definitions.location(
name=f"Thread {tid}, Memory Operation UNK",
type=LocationType.ACCELERATOR_STREAM,
group=cpu_location_group,
)
memory_unknown_writer = otf2.writer.EventWriter(
archive, memory_unknown_location
)
memory_unknown_events = []
for start, end, name in data:
region = archive.definitions.region(
name=name,
)
memory_unknown_events.append((start, "enter", region))
memory_unknown_events.append((end, "leave", region))
memory_unknown_events.sort(key=lambda x: x[0])
for (
timestamp,
event_type,
region,
) in memory_unknown_events:
if event_type == "enter":
memory_unknown_writer.enter(
timestamp,
region,
attributes=memory_allocation_attributes,
)
else: # if event_type == "leave":
memory_unknown_writer.leave(timestamp, region)
# Write Kernel Dispatch Events
for (tid, agent_id, queue), data in kernel_dispatches.items():
agent_name, agent_location_group = agents.get(
agent_id, (f"Unknown Agent {agent_id}", None)
)
kernel_location = archive.definitions.location(
name=f"Thread {tid}, Compute on {agent_name}, Queue {queue}",
type=LocationType.ACCELERATOR_STREAM,
group=agent_location_group,
)
kernel_writer = otf2.writer.EventWriter(
archive, kernel_location
)
kernel_events = []
for start, end, name in data:
region = archive.definitions.region(
name=name,
region_role=RegionRole.FUNCTION,
paradigm=Paradigm.HIP,
)
kernel_events.append((start, "enter", region))
kernel_events.append((end, "leave", region))
kernel_events.sort(key=lambda x: x[0])
for timestamp, event_type, region in kernel_events:
if event_type == "enter":
kernel_writer.enter(
timestamp,
region,
attributes=kernel_attributes,
)
else: # if event_type == "leave":
kernel_writer.leave(timestamp, region)
except Exception as e:
print("Could not query sqlite database:", e)
else:
print("No sqlite connection found in importData.")
def execute(input, config=None, **kwargs):
config = (
output_config.output_config(**kwargs)
if config is None
else config.update(**kwargs)
)
write_otf2(input, config)
def add_args(parser):
"""Add otf2 arguments."""
# Currently, no otf2 specific args
# otf2_options = parser.add_argument_group("OTF2 options")
# otf2_options.add_argument(
# "--kernel-rename",
# help="Use kernel names from debugging symbols if available",
# action="store_true",
# default=False,
# )
def process_args(input, args):
valid_args = []
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def main(argv=None):
import argparse
from . import time_window
from . import output_config
parser = argparse.ArgumentParser(
description="Convert rocPD to OTF2 format", allow_abbrev=False
)
required_params = parser.add_argument_group("Required arguments")
required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s), separated by spaces",
)
process_out_config_args = output_config.add_args(parser)
process_otf2_args = add_args(parser)
process_generic_args = output_config.add_generic_args(parser)
process_time_window_args = time_window.add_args(parser)
args = parser.parse_args(argv)
input = RocpdImportData(
args.input, automerge_limit=getattr(args, "automerge_limit", None)
)
out_cfg_args = process_out_config_args(input, args)
generic_out_cfg_args = process_generic_args(input, args)
otf2_args = process_otf2_args(input, args)
process_time_window_args(input, args)
all_args = {**out_cfg_args, **otf2_args, **generic_out_cfg_args}
execute(input, **all_args)
if __name__ == "__main__":
main()