Support host-trap PC Sampling on CLI (beta version)

[ROCm/rocprofiler-compute commit: 9bacad0876]
Этот коммит содержится в:
Fei Zheng
2025-03-28 16:51:49 -06:00
коммит произвёл GitHub
родитель b0844b42bb
Коммит ee5df82698
15 изменённых файлов: 391 добавлений и 19 удалений
+2
Просмотреть файл
@@ -20,6 +20,8 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs.
* Additional datatypes for roofline profiling
* Now supports FP8, FP16, BF16, FP32, FP64, I8, I32, I64 (dependent on gpu architecture)
* Support host-trap PC Sampling on CLI (beta version)
### Changed
* Change normal_unit default to per_kernel
+9
Просмотреть файл
@@ -324,6 +324,15 @@ Examples:
help="\t\t\tSet the format of output file of rocprof.",
)
profile_group.add_argument(
"--pc-sampling-interval",
required=False,
metavar="",
dest="pc_sampling_interval",
default=1,
help="\t\t\tSet the interval of pc sampling in microsecond (DEFAULT: 1).",
)
## Roofline Command Line Options
roofline_group.add_argument(
"--roof-only",
+18
Просмотреть файл
@@ -45,6 +45,7 @@ from utils.logger import (
from utils.utils import (
capture_subprocess_output,
gen_sysinfo,
pc_sampling_prof,
print_status,
run_prof,
run_rocscope,
@@ -63,6 +64,8 @@ class RocProfCompute_Base:
self.__filter_metric_ids = [
name for name, type in args.filter_blocks.items() if type == "metric_id"
]
# Fixme: remove the hack code "21" after we could enable pc sampling as default
self.__pc_sampling = True if "21" in self.__filter_metric_ids else False
def get_args(self):
return self.__args
@@ -420,6 +423,21 @@ class RocProfCompute_Base:
# TODO: Finish logic
console_error("Profiler not supported")
if self.__pc_sampling == True and self.__profiler == "rocprofv3":
start_run_prof = time.time()
pc_sampling_prof(
interval=self.get_args().pc_sampling_interval,
workload_dir=self.get_args().path,
appcmd=self.get_args().remaining,
)
end_run_prof = time.time()
console_debug(
"The time of pc sampling profiling is {} m {} sec".format(
int((end_run_prof - start_run_prof) / 60),
str((end_run_prof - start_run_prof) % 60),
)
)
@abstractmethod
def post_processing(self):
"""Perform any post-processing steps prior to profiling."""
@@ -0,0 +1,9 @@
---
Panel Config:
id: 2100
title: PC Sampling
data source:
- pc_sampling_table:
id: 2101
source: None # Not support
comparable: false # enable it later
@@ -0,0 +1,9 @@
---
Panel Config:
id: 2100
title: PC Sampling
data source:
- pc_sampling_table:
id: 2101
source: None # not support
comparable: false # enable it later
@@ -0,0 +1,9 @@
---
Panel Config:
id: 2100
title: PC Sampling
data source:
- pc_sampling_table:
id: 2101
source: ps_file
comparable: false # enable it later
@@ -0,0 +1,9 @@
---
Panel Config:
id: 2100
title: PC Sampling
data source:
- pc_sampling_table:
id: 2101
source: ps_file
comparable: false # enable it later
@@ -0,0 +1,9 @@
---
Panel Config:
id: 2100
title: PC Sampling
data source:
- pc_sampling_table:
id: 2101
source: ps_file
comparable: false # enable it later
@@ -0,0 +1,9 @@
---
Panel Config:
id: 2100
title: PC Sampling
data source:
- pc_sampling_table:
id: 2101
source: ps_file
comparable: false # enable it later
+1 -6
Просмотреть файл
@@ -33,12 +33,7 @@ import pandas as pd
import plotly.graph_objects as go
from dash import dcc, html
from utils.logger import (
console_debug,
console_error,
console_log,
demarcate,
)
from utils.logger import console_debug, console_error, console_log, demarcate
from utils.roofline_calc import (
MFMA_DATATYPES,
PEAK_OPS_DATATYPES,
+2
Просмотреть файл
@@ -116,7 +116,9 @@ def create_df_kernel_top_stats(
Create top stats info by grouping kernels with user's filters.
"""
# NB: think about df = pd.DataFrame(df_in["pmc_perf"].copy())
df = df_in["pmc_perf"]
# Demangle original KernelNames
kernel_name_shortener(df, kernel_verbose)
+264 -3
Просмотреть файл
@@ -23,8 +23,11 @@
##############################################################################el
import ast
import json
import re
import sys
import warnings
from collections import defaultdict
from pathlib import Path
import astunparse
@@ -32,7 +35,7 @@ import numpy as np
import pandas as pd
from utils import schema
from utils.logger import console_error, console_warning, demarcate
from utils.logger import console_debug, console_error, console_warning, demarcate
# ------------------------------------------------------------------------------
# Internal global definitions
@@ -630,6 +633,14 @@ def build_dfs(archConfigs, filter_metrics, sys_info):
metric_list[data_source_idx] = panel["title"]
else:
df = pd.DataFrame()
elif type == "pc_sampling_table":
data_source_idx = str(data_config["id"] // 100)
# NB: enable pc sampling only when users specify, not enable as default
if filter_metrics and (data_source_idx in filter_metrics):
df = pd.DataFrame(
[data_config["source"]], columns=["from_pc_sampling"]
)
metric_list[data_source_idx] = panel["title"]
else:
df = pd.DataFrame()
@@ -930,11 +941,256 @@ def apply_filters(workload, dir, is_gui, debug):
return ret_df
def find_key_recursively(data, search_key):
"""
Recursively search for the search_key in the given data (which can be a dict or list).
If the key is found, returns the value as a DataFrame.
"""
if isinstance(data, dict):
for key, value in data.items():
if key == search_key:
# Convert JSON value to DataFrame
# return pd.read_json(StringIO(json.dumps(value)))
return value
elif isinstance(value, (dict, list)):
result = find_key_recursively(value, search_key)
if result is not None:
return result # Return the DataFrame if found
elif isinstance(data, list):
for item in data:
result = find_key_recursively(item, search_key)
if result is not None:
return result # Return the DataFrame if found
return None # Return None if the key was not found
def search_key_in_json(file_path, search_key):
# FIXME:
# Load the entire JSON into memory.
# Should not use for large file.
with open(file_path, "r") as file:
data = json.load(file)
found = find_key_recursively(data, search_key)
if found == None:
console_error(f"Key '{search_key}' not found in the JSON file.")
return found
def search_pc_sampling_record(records):
"""
Search PC sampling records, and group and sort them
"""
grouped_data = defaultdict(
lambda: defaultdict(lambda: {"count": 0, "inst_index": None})
)
# Populate grouped_data
for item in records:
pc_info = item["record"].get("pc", {})
code_object_id = pc_info.get("code_object_id")
code_object_offset = pc_info.get("code_object_offset")
inst_index = item.get("inst_index")
if (
code_object_id is not None
and code_object_offset is not None
and inst_index is not None
):
grouped_data[code_object_id][code_object_offset]["count"] += 1
grouped_data[code_object_id][code_object_offset]["inst_index"] = inst_index
if len(grouped_data) == 0:
console_warning("PC sampling: no pc sampling record found!")
return None
# Convert to sorted list of tuples (code_object_id, inst_index, code_object_offset, count)
sorted_counts = sorted(
[
(code_object_id, info["inst_index"], offset, info["count"])
for code_object_id, offsets in grouped_data.items()
for offset, info in offsets.items()
],
key=lambda x: (
x[0],
x[2],
), # Sort by code_object_id, then by code_object_offset
)
return sorted_counts
@demarcate
def load_pc_sampling_data_per_kernel(file_name, kernel_name):
"""
Load PC sampling raw data from json file with given kernel name,
then return df.
"""
kernel_info_list = search_key_in_json(file_name, "kernel_symbols")
kernel_info = {}
if kernel_info_list:
for item in kernel_info_list:
if (
item["formatted_kernel_name"] == kernel_name
or item["demangled_kernel_name"] == kernel_name
or item["truncated_kernel_name"] == kernel_name
):
# kernel_info["kernel_id"] = item["kernel_id"]
kernel_info["code_object_id"] = item["code_object_id"]
kernel_info["entry_byte_offset"] = item["kernel_code_entry_byte_offset"]
break
if not kernel_info:
console_warning("PC sampling: can not find the kernel %s " % kernel_name)
return pd.DataFrame()
else:
console_debug("PC sampling: kernel %s " % kernel_info)
filtered_sorted_list = sorted(
[
item
for item in kernel_info_list
if item["code_object_id"] == kernel_info["code_object_id"]
],
key=lambda x: x["kernel_code_entry_byte_offset"],
)
for i, item in enumerate(filtered_sorted_list):
if item["kernel_code_entry_byte_offset"] == kernel_info["entry_byte_offset"]:
next_index = i + 1
if next_index < len(filtered_sorted_list): # Ensure the next item exists
next_item = filtered_sorted_list[next_index]
kernel_info["potential_end_offset"] = item[
"kernel_code_entry_byte_offset"
]
else:
kernel_info["potential_end_offset"] = sys.maxsize
break
# print("kernel_info", kernel_info)
pc_sample_host_trap = search_key_in_json(file_name, "pc_sample_host_trap")
# print(type(pc_sample_host_trap), len(pc_sample_host_trap))
# print(pc_sample_host_trap[0]["record"].get("pc", {}).get("code_object_offset"))
# print(search_pc_sampling_record(pc_sample_host_trap))
df = pd.DataFrame(
search_pc_sampling_record(pc_sample_host_trap),
columns=["code_object_id", "inst_index", "offset", "count"],
)
df = df[
(df["code_object_id"] == kernel_info["code_object_id"])
& (df["offset"] > kernel_info["entry_byte_offset"])
& (df["offset"] < kernel_info["potential_end_offset"])
][["inst_index", "offset", "count"]]
df["offset"] = df["offset"].apply(lambda x: hex(x))
pc_sample_instructions = search_key_in_json(file_name, "pc_sample_instructions")
# print(pc_sample_instructions)
df["instruction"] = df["inst_index"].apply(
lambda x: pc_sample_instructions[x] if x < len(pc_sample_instructions) else None
)
pc_sample_comments = search_key_in_json(file_name, "pc_sample_comments")
df["source_line"] = df["inst_index"].apply(
lambda x: (
".../" + Path(pc_sample_comments[x]).name
if x < len(pc_sample_instructions)
else None
)
)
return df[["source_line", "instruction", "offset", "count"]]
@demarcate
def load_pc_sampling_data(workload, dir, file_prefix):
"""
Load PC sampling raw data, filter and sort it by specified conditions,
then return df.
"""
if file_prefix.lower() == "none":
return pd.DataFrame()
# No kernel filter, return grouped and sorted csv directly
if not workload.filter_kernel_ids:
# NB: the default file name is subject to changes from rocprofv3
csv_file_path = Path.joinpath(
Path(dir), file_prefix + "_pc_sampling_host_trap.csv"
)
if not csv_file_path.exists():
console_error("PC sampling: can not read %s " % csv_file_path)
return pd.DataFrame()
else:
df = pd.read_csv(csv_file_path)
# Group by 'Instruction_Comment' and count occurrences
grouped_counts = (
df.groupby("Instruction_Comment")
.agg(
count=("Instruction_Comment", "count"),
instruction=("Instruction", "first"),
)
.reset_index()
.rename(columns={"Instruction_Comment": "source_line"})
)
grouped_counts = grouped_counts[["source_line", "instruction", "count"]]
grouped_counts["source_line"] = grouped_counts["source_line"].apply(
lambda x: (".../" + Path(x).name)
)
# Sort by the count of occurrences
sorted_counts = grouped_counts.sort_values(by="count", ascending=False)
# print(sorted_counts.info)
return sorted_counts
elif len(workload.filter_kernel_ids) > 1:
console_error(
"PC sampling supports single kernel only! Please specify -k with single kernel."
)
return pd.DataFrame()
elif len(workload.filter_kernel_ids) == 1:
# print("kernel id", workload.filter_kernel_ids[0])
# NB: the default file name is subject to changes from rocprofv3
json_file_path = Path.joinpath(Path(dir), file_prefix + "_results.json")
if not json_file_path.exists():
console_error("PC sampling: can not read %s " % json_file_path)
return pd.DataFrame()
else:
# NB:
# We should find better way to remove the dependency on kernel_top_table
kernel_top_df = workload.dfs[pmc_kernel_top_table_id]
file = Path.joinpath(Path(dir), kernel_top_df.loc[0, "from_csv"])
kernel_name = pd.read_csv(file).loc[
workload.filter_kernel_ids[0], "Kernel_Name"
]
return load_pc_sampling_data_per_kernel(json_file_path, kernel_name)
else:
console_warning("PC sampling: No data")
return pd.DataFrame()
@demarcate
def load_kernel_top(workload, dir):
# NB:
# - Do pmc_kernel_top.csv loading before eval_metric because we need the kernel names.
# - There might be a better way/timing to load raw_csv_table.
# FIXME:
# the func name load_kernel_top needs to be changed to load_non_mertrics_table
# NB:
# "from_csv", "from_csv_columnwise", and "from_pc_sampling"
# are 3 internal symbols converted in build_dfs() for non-metrics table.
# There might be better way to store these info without the orginal entry.
tmp = {}
for id, df in workload.dfs.items():
if "from_csv" in df.columns:
@@ -965,14 +1221,19 @@ def load_kernel_top(workload, dir):
console_warning(
f"Couldn't load {file.name}. This may result in missing analysis data."
)
elif "from_pc_sampling" in df.columns:
tmp[id] = load_pc_sampling_data(workload, dir, df.loc[0, "from_pc_sampling"])
# print("table id", id, "filter_kernel_ids", workload.filter_kernel_ids)
workload.dfs.update(tmp)
@demarcate
def load_table_data(workload, dir, is_gui, debug, verbose, skipKernelTop=False):
"""
Load data for all "raw_csv_table".
Calculate mertric value for all "metric_table".
- Load data for all "raw_csv_table"
- Load dat for "pc_sampling_table"
- Calculate mertric value for all "metric_table"
"""
if not skipKernelTop:
load_kernel_top(workload, dir)
+1 -6
Просмотреть файл
@@ -38,12 +38,7 @@ from pathlib import Path as path
import pandas as pd
import config
from utils.logger import (
console_debug,
console_error,
console_log,
console_warning,
)
from utils.logger import console_debug, console_error, console_log, console_warning
from utils.mi_gpu_spec import get_gpu_series_dict, get_mi300_chip_id_dict
from utils.tty import get_table_string
from utils.utils import get_version, total_xcds
+8 -4
Просмотреть файл
@@ -248,10 +248,14 @@ def show_all(args, runs, archConfigs, output, profiling_config):
)
# Do not print the table if any column is empty
if is_empty_columns_exist:
console_log(
f"Not showing table with empty column(s): {table_id_str} {table_config['title']}"
)
if "title" in table_config:
console_log(
f"Not showing table with empty column(s): {table_id_str} {table_config['title']}"
)
else:
console_log(
f"Not showing table with empty column(s): {table_id_str}"
)
if (
"title" in table_config
and table_config["title"]
+32
Просмотреть файл
@@ -699,6 +699,38 @@ def run_prof(
df.to_csv(workload_dir + "/" + fbase + ".csv", index=False)
def pc_sampling_prof(interval, workload_dir, appcmd):
"""
Run rocprof with pc sampling. Current support v3 only.
"""
# Todo:
# - precheck with rocprofv3 –-list-avail
options = [
"--pc-sampling-beta-enable",
"--pc-sampling-method",
"host_trap",
"--pc-sampling-unit",
"time",
"--output-format",
"csv",
"json",
"--pc-sampling-interval",
str(interval),
"-d",
workload_dir,
"-o",
"ps_file", # todo: sync up with the name from source in 2100_.yaml
"--",
appcmd,
]
success, output = capture_subprocess_output(
[rocprof_cmd] + options, new_env=os.environ.copy(), profileMode=True
)
if not success:
console_error("PC sampling failed.")
def process_rocprofv3_output(rocprof_output, workload_dir, is_timestamps):
"""
rocprofv3 specific output processing.