543 líneas
26 KiB
Python
543 líneas
26 KiB
Python
#!/usr/bin/env python3
|
|
###############################################################################
|
|
# MIT License
|
|
#
|
|
# Copyright (c) 2023 Advanced Micro Devices, Inc.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import os
|
|
import otf2
|
|
from otf2.enums import LocationType, LocationGroupType, RegionRole, Paradigm
|
|
import shutil
|
|
import time
|
|
from collections import defaultdict
|
|
|
|
from .importer import RocpdImportData
|
|
from . import output_config
|
|
|
|
|
|
def get_perfetto_category_name(category):
|
|
"""Map category names to perfetto category names"""
|
|
category_map = {
|
|
"NONE": "none",
|
|
"HSA_CORE_API": "hsa_api",
|
|
"HSA_AMD_EXT_API": "hsa_api",
|
|
"HSA_IMAGE_EXT_API": "hsa_api",
|
|
"HSA_FINALIZE_EXT_API": "hsa_api",
|
|
"HIP_RUNTIME_API": "hip_api",
|
|
"HIP_COMPILER_API": "hip_api",
|
|
"MARKER_CORE_API": "marker_api",
|
|
"MARKER_CORE_RANGE_API": "marker_api",
|
|
"MARKER_CONTROL_API": "marker_api",
|
|
"MARKER_NAME_API": "marker_api",
|
|
"MEMORY_COPY": "memory_copy",
|
|
"MEMORY_ALLOCATION": "memory_allocation",
|
|
"KERNEL_DISPATCH": "kernel_dispatch",
|
|
"SCRATCH_MEMORY": "scratch_memory",
|
|
"CORRELATION_ID_RETIREMENT": "none",
|
|
"RCCL_API": "rccl_api",
|
|
"OMPT": "openmp",
|
|
"RUNTIME_INITIALIZATION": "none",
|
|
"ROCDECODE_API": "rocdecode_api",
|
|
"ROCJPEG_API": "rocjpeg_api",
|
|
"HIP_STREAM": "hip_api",
|
|
"HIP_RUNTIME_API_EXT": "hip_api",
|
|
"HIP_COMPILER_API_EXT": "hip_api",
|
|
"ROCDECODE_API_EXT": "rocdecode_api",
|
|
"KFD_EVENT_PAGE_MIGRATE": "kfd_events",
|
|
"KFD_EVENT_PAGE_FAULT": "kfd_events",
|
|
"KFD_EVENT_QUEUE": "kfd_events",
|
|
"KFD_EVENT_UNMAP_FROM_GPU": "kfd_events",
|
|
"KFD_EVENT_DROPPED_EVENTS": "kfd_events",
|
|
"KFD_PAGE_MIGRATE": "kfd_events",
|
|
"KFD_PAGE_FAULT": "kfd_events",
|
|
"KFD_QUEUE": "kfd_events",
|
|
}
|
|
return category_map.get(category, "none")
|
|
|
|
|
|
def allocation_level_type_name(level, type):
|
|
if level == "REAL":
|
|
name = "MEMORY"
|
|
elif level == "VIRTUAL":
|
|
name = "MEMORY_VMEM"
|
|
elif level == "SCRATCH":
|
|
name = "SCRATCH_MEMORY"
|
|
else:
|
|
return "UNKNOWN_LEVEL"
|
|
|
|
if type == "ALLOC":
|
|
return name + "_ALLOCATE"
|
|
elif type == "FREE":
|
|
return name + "_FREE"
|
|
else:
|
|
return level + "_MEMORY_NONE"
|
|
|
|
return name
|
|
|
|
|
|
def write_otf2(importData, config):
|
|
|
|
timer_resolution = 1_000_000_000
|
|
trace_dir = getattr(config, "output_path", "./otf_traces")
|
|
trace_file = f"{getattr(config, 'output_file', 'traces')}_results"
|
|
if not os.path.exists(trace_dir):
|
|
os.makedirs(trace_dir)
|
|
|
|
if os.path.exists(trace_dir):
|
|
trace_subdir = os.path.join(trace_dir, trace_file)
|
|
if os.path.exists(trace_subdir):
|
|
shutil.rmtree(trace_subdir)
|
|
|
|
otf2_file = os.path.join(trace_dir, f"{trace_file}.otf2")
|
|
def_file = os.path.join(trace_dir, f"{trace_file}.def")
|
|
|
|
print(f"Writing: {otf2_file}")
|
|
print(f"Writing: {def_file}")
|
|
print(f"Writing: {trace_subdir} directory")
|
|
|
|
if os.path.exists(otf2_file):
|
|
os.remove(otf2_file)
|
|
if os.path.exists(def_file):
|
|
os.remove(def_file)
|
|
with otf2.writer.open(
|
|
trace_dir, trace_file, timer_resolution=timer_resolution
|
|
) as archive:
|
|
conn = getattr(importData, "connection", None)
|
|
if conn is not None:
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT MIN(start), MAX(fini) FROM processes;")
|
|
min_start, max_finish = cursor.fetchone()
|
|
with otf2.writer.DefinitionWriter(archive) as global_def_writer:
|
|
global_offset = min_start
|
|
duration = max_finish - min_start
|
|
realtime_timestamp = int(round(time.time() * timer_resolution))
|
|
global_def_writer.write_clock_properties(
|
|
timer_resolution, global_offset, duration, realtime_timestamp
|
|
)
|
|
perfetto_category = archive.definitions.attribute(
|
|
name="category", description="tracing category"
|
|
)
|
|
memory_copy_attributes = {perfetto_category: "memory_copy"}
|
|
memory_allocation_attributes = {
|
|
perfetto_category: "memory_allocation"
|
|
}
|
|
kernel_attributes = {perfetto_category: "kernel_dispatch"}
|
|
kernel_rename = getattr(config, "kernel_rename")
|
|
agent_index_value = getattr(config, "agent_index_value")
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT DISTINCT guid, id FROM rocpd_info_node")
|
|
for row in cursor:
|
|
guid, nid = row
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT pid, hostname, command FROM processes WHERE guid = ? AND nid = ?",
|
|
(guid, nid),
|
|
)
|
|
for row in cursor:
|
|
pid, hostname, command = row
|
|
tree_node = archive.definitions.system_tree_node(
|
|
name=command, class_name=hostname, parent=None
|
|
)
|
|
cpu_location_group = archive.definitions.location_group(
|
|
name=command,
|
|
location_group_type=LocationGroupType.PROCESS,
|
|
system_tree_parent=tree_node,
|
|
)
|
|
api_calls = defaultdict(list)
|
|
memory_copies = defaultdict(list)
|
|
memory_allocations = defaultdict(list)
|
|
memory_deallocations = defaultdict(list)
|
|
memory_unknown = defaultdict(list)
|
|
kernel_dispatches = defaultdict(list)
|
|
agents = {}
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""SELECT tid, start, end, name, category FROM regions
|
|
WHERE guid = ? AND nid = ? AND pid = ?""",
|
|
(guid, nid, pid),
|
|
)
|
|
for row in cursor:
|
|
tid, start, end, name, category = row
|
|
api_calls[tid].append((start, end, name, category))
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""SELECT tid, dst_agent_abs_index, start, end, name
|
|
FROM memory_copies WHERE guid = ? AND nid = ?
|
|
AND pid = ? ORDER BY start ASC""",
|
|
(guid, nid, pid),
|
|
)
|
|
for row in cursor:
|
|
tid, agent, start, end, name = row
|
|
memory_copies[(tid, agent)].append((start, end, name))
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""SELECT tid, agent_abs_index, start, end, level, type
|
|
FROM memory_allocations WHERE guid = ? AND nid = ?
|
|
AND pid = ? ORDER BY start ASC""",
|
|
(guid, nid, pid),
|
|
)
|
|
for row in cursor:
|
|
tid, agent, start, end, level, type = row
|
|
name = allocation_level_type_name(level, type)
|
|
if type == "ALLOC":
|
|
memory_allocations[(tid, agent)].append(
|
|
(start, end, name)
|
|
)
|
|
elif type == "FREE":
|
|
memory_deallocations[tid].append((start, end, name))
|
|
else:
|
|
memory_unknown[tid].append((start, end, name))
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""SELECT tid, agent_abs_index, queue_id,
|
|
start, end, name, region
|
|
FROM kernels WHERE guid = ? AND nid = ?
|
|
AND pid = ? ORDER BY start ASC""",
|
|
(guid, nid, pid),
|
|
)
|
|
for row in cursor:
|
|
tid, agent, queue, start, end, name, region = row
|
|
if kernel_rename and region:
|
|
kernel_dispatches[(tid, agent, queue)].append(
|
|
(start, end, region)
|
|
)
|
|
else:
|
|
kernel_dispatches[(tid, agent, queue)].append(
|
|
(start, end, name)
|
|
)
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT id, name, type FROM rocpd_info_agent WHERE guid = ? AND nid = ?",
|
|
(guid, nid),
|
|
)
|
|
for row in cursor:
|
|
id, name, type = row
|
|
agents[id] = (
|
|
(
|
|
f"{type} Agent-{id}"
|
|
if agent_index_value
|
|
else f"{type} {type}-{id}"
|
|
),
|
|
archive.definitions.location_group(
|
|
name=name,
|
|
location_group_type=LocationGroupType.ACCELERATOR,
|
|
system_tree_parent=tree_node,
|
|
),
|
|
)
|
|
|
|
# Write API Call Events
|
|
for tid, data in api_calls.items():
|
|
cpu_location = archive.definitions.location(
|
|
name=f"Thread {tid}",
|
|
type=LocationType.CPU_THREAD,
|
|
group=cpu_location_group,
|
|
)
|
|
event_writer = otf2.writer.EventWriter(
|
|
archive, cpu_location
|
|
)
|
|
events = []
|
|
for start, end, name, category in data:
|
|
attributes = {
|
|
perfetto_category: get_perfetto_category_name(
|
|
category
|
|
)
|
|
}
|
|
|
|
region = archive.definitions.region(
|
|
name=name,
|
|
region_role=RegionRole.FUNCTION,
|
|
paradigm=Paradigm.HIP,
|
|
)
|
|
events.append((start, "start", region, attributes))
|
|
events.append((end, "end", region, None))
|
|
events.sort(key=lambda x: x[0])
|
|
for timestamp, event_type, region, attributes in events:
|
|
if event_type == "start":
|
|
event_writer.enter(
|
|
timestamp, region, attributes=attributes
|
|
)
|
|
else: # if event_type == "end":
|
|
event_writer.leave(timestamp, region)
|
|
|
|
# Write Memory Copy Events
|
|
for (tid, agent_id), data in memory_copies.items():
|
|
agent_name, agent_location_group = agents.get(
|
|
agent_id, (f"Unknown Agent {agent_id}", None)
|
|
)
|
|
memory_copy_location = archive.definitions.location(
|
|
name=f"Thread {tid}, Copy to {agent_name}",
|
|
type=LocationType.ACCELERATOR_STREAM,
|
|
group=agent_location_group,
|
|
)
|
|
memory_copy_writer = otf2.writer.EventWriter(
|
|
archive, memory_copy_location
|
|
)
|
|
memory_copy_events = []
|
|
for start, end, name in data:
|
|
region = archive.definitions.region(
|
|
name=name,
|
|
region_role=RegionRole.DATA_TRANSFER,
|
|
paradigm=Paradigm.HIP,
|
|
)
|
|
memory_copy_events.append((start, "enter", region))
|
|
memory_copy_events.append((end, "leave", region))
|
|
memory_copy_events.sort(key=lambda x: x[0])
|
|
for timestamp, event_type, region in memory_copy_events:
|
|
if event_type == "enter":
|
|
memory_copy_writer.enter(
|
|
timestamp,
|
|
region,
|
|
attributes=memory_copy_attributes,
|
|
)
|
|
else: # if event_type == "leave":
|
|
memory_copy_writer.leave(timestamp, region)
|
|
|
|
# Write Memory Allocation Events
|
|
for (tid, agent_id), data in memory_allocations.items():
|
|
agent_name, agent_location_group = agents.get(
|
|
agent_id, (f"Unknown Agent {agent_id}", None)
|
|
)
|
|
memory_allocation_location = archive.definitions.location(
|
|
name=f"Thread {tid}, Memory Allocate at {agent_name}",
|
|
type=LocationType.ACCELERATOR_STREAM,
|
|
group=agent_location_group,
|
|
)
|
|
memory_allocation_writer = otf2.writer.EventWriter(
|
|
archive, memory_allocation_location
|
|
)
|
|
memory_allocation_events = []
|
|
for start, end, name in data:
|
|
region = archive.definitions.region(
|
|
name=name,
|
|
region_role=RegionRole.ALLOCATE,
|
|
paradigm=Paradigm.HIP,
|
|
)
|
|
memory_allocation_events.append(
|
|
(start, "enter", region)
|
|
)
|
|
memory_allocation_events.append(
|
|
(end, "leave", region)
|
|
)
|
|
memory_allocation_events.sort(key=lambda x: x[0])
|
|
for (
|
|
timestamp,
|
|
event_type,
|
|
region,
|
|
) in memory_allocation_events:
|
|
if event_type == "enter":
|
|
memory_allocation_writer.enter(
|
|
timestamp,
|
|
region,
|
|
attributes=memory_allocation_attributes,
|
|
)
|
|
else: # if event_type == "leave":
|
|
memory_allocation_writer.leave(timestamp, region)
|
|
|
|
# Write Memory Deallocation Events
|
|
for tid, data in memory_deallocations.items():
|
|
memory_free_location = archive.definitions.location(
|
|
name=f"Thread {tid}, Memory Deallocate (Free)",
|
|
type=LocationType.ACCELERATOR_STREAM,
|
|
group=cpu_location_group,
|
|
)
|
|
memory_free_writer = otf2.writer.EventWriter(
|
|
archive, memory_free_location
|
|
)
|
|
memory_free_events = []
|
|
for start, end, name in data:
|
|
region = archive.definitions.region(
|
|
name=name,
|
|
region_role=RegionRole.DEALLOCATE,
|
|
paradigm=Paradigm.HIP,
|
|
)
|
|
memory_free_events.append((start, "enter", region))
|
|
memory_free_events.append((end, "leave", region))
|
|
memory_free_events.sort(key=lambda x: x[0])
|
|
for timestamp, event_type, region in memory_free_events:
|
|
if event_type == "enter":
|
|
memory_free_writer.enter(
|
|
timestamp,
|
|
region,
|
|
attributes=memory_allocation_attributes,
|
|
)
|
|
else: # if event_type == "leave":
|
|
memory_free_writer.leave(timestamp, region)
|
|
|
|
# Write Unknown Memory Events
|
|
for tid, data in memory_unknown.items():
|
|
memory_unknown_location = archive.definitions.location(
|
|
name=f"Thread {tid}, Memory Operation UNK",
|
|
type=LocationType.ACCELERATOR_STREAM,
|
|
group=cpu_location_group,
|
|
)
|
|
memory_unknown_writer = otf2.writer.EventWriter(
|
|
archive, memory_unknown_location
|
|
)
|
|
memory_unknown_events = []
|
|
for start, end, name in data:
|
|
region = archive.definitions.region(
|
|
name=name,
|
|
)
|
|
memory_unknown_events.append((start, "enter", region))
|
|
memory_unknown_events.append((end, "leave", region))
|
|
memory_unknown_events.sort(key=lambda x: x[0])
|
|
for (
|
|
timestamp,
|
|
event_type,
|
|
region,
|
|
) in memory_unknown_events:
|
|
if event_type == "enter":
|
|
memory_unknown_writer.enter(
|
|
timestamp,
|
|
region,
|
|
attributes=memory_allocation_attributes,
|
|
)
|
|
else: # if event_type == "leave":
|
|
memory_unknown_writer.leave(timestamp, region)
|
|
|
|
# Write Kernel Dispatch Events
|
|
for (tid, agent_id, queue), data in kernel_dispatches.items():
|
|
agent_name, agent_location_group = agents.get(
|
|
agent_id, (f"Unknown Agent {agent_id}", None)
|
|
)
|
|
kernel_location = archive.definitions.location(
|
|
name=f"Thread {tid}, Compute on {agent_name}, Queue {queue}",
|
|
type=LocationType.ACCELERATOR_STREAM,
|
|
group=agent_location_group,
|
|
)
|
|
kernel_writer = otf2.writer.EventWriter(
|
|
archive, kernel_location
|
|
)
|
|
kernel_events = []
|
|
for start, end, name in data:
|
|
region = archive.definitions.region(
|
|
name=name,
|
|
region_role=RegionRole.FUNCTION,
|
|
paradigm=Paradigm.HIP,
|
|
)
|
|
kernel_events.append((start, "enter", region))
|
|
kernel_events.append((end, "leave", region))
|
|
kernel_events.sort(key=lambda x: x[0])
|
|
for timestamp, event_type, region in kernel_events:
|
|
if event_type == "enter":
|
|
kernel_writer.enter(
|
|
timestamp,
|
|
region,
|
|
attributes=kernel_attributes,
|
|
)
|
|
else: # if event_type == "leave":
|
|
kernel_writer.leave(timestamp, region)
|
|
except Exception as e:
|
|
print("Could not query sqlite database:", e)
|
|
else:
|
|
print("No sqlite connection found in importData.")
|
|
|
|
|
|
def execute(input, config=None, **kwargs):
|
|
|
|
config = (
|
|
output_config.output_config(**kwargs)
|
|
if config is None
|
|
else config.update(**kwargs)
|
|
)
|
|
|
|
write_otf2(input, config)
|
|
|
|
|
|
def add_args(parser):
|
|
"""Add otf2 arguments."""
|
|
|
|
# Currently, no otf2 specific args
|
|
|
|
# otf2_options = parser.add_argument_group("OTF2 options")
|
|
|
|
# otf2_options.add_argument(
|
|
# "--kernel-rename",
|
|
# help="Use kernel names from debugging symbols if available",
|
|
# action="store_true",
|
|
# default=False,
|
|
# )
|
|
|
|
def process_args(input, args):
|
|
valid_args = []
|
|
ret = {}
|
|
for itr in valid_args:
|
|
if hasattr(args, itr):
|
|
val = getattr(args, itr)
|
|
if val is not None:
|
|
ret[itr] = val
|
|
return ret
|
|
|
|
return process_args
|
|
|
|
|
|
def main(argv=None):
|
|
import argparse
|
|
from . import time_window
|
|
from . import output_config
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Convert rocPD to OTF2 format", allow_abbrev=False
|
|
)
|
|
|
|
required_params = parser.add_argument_group("Required arguments")
|
|
|
|
required_params.add_argument(
|
|
"-i",
|
|
"--input",
|
|
required=True,
|
|
type=output_config.check_file_exists,
|
|
nargs="+",
|
|
help="Input path and filename to one or more database(s), separated by spaces",
|
|
)
|
|
|
|
process_out_config_args = output_config.add_args(parser)
|
|
process_otf2_args = add_args(parser)
|
|
process_generic_args = output_config.add_generic_args(parser)
|
|
process_time_window_args = time_window.add_args(parser)
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
input = RocpdImportData(
|
|
args.input, automerge_limit=getattr(args, "automerge_limit", None)
|
|
)
|
|
|
|
out_cfg_args = process_out_config_args(input, args)
|
|
generic_out_cfg_args = process_generic_args(input, args)
|
|
otf2_args = process_otf2_args(input, args)
|
|
process_time_window_args(input, args)
|
|
|
|
all_args = {**out_cfg_args, **otf2_args, **generic_out_cfg_args}
|
|
|
|
execute(input, **all_args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|