pc sampling multi kernel (#1382)

* initial commit

* add csv support extraction for non kernel selection mode

* add --kernel-trace for rocprofiler-sdk mode

* make non kernel selective mode runnable

* make kernel selection work with -k

* remove upper case of arg hint

* update documentation

* display same kernel name at only one place and merge instruction id with same obj id as well as offset

* remove kernel name's display for single kernel selection

* change log added

---------

Co-authored-by: Fei Zheng <44449748+feizheng10@users.noreply.github.com>
このコミットが含まれているのは:
ywang103-amd
2025-10-23 01:26:08 -04:00
committed by GitHub
コミット 9b562c0e58
4個のファイルの変更263行の追加178行の削除
+4
ファイルの表示
@@ -5,6 +5,10 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs.
## Unreleased
### Added
* Add support for multi-kernel applications' pc sampling.
* PC Sampling's outputs' instructions are displayed with the name of the kernel that individual instruction belongs to.
* Single kernel selection is supported so that the pc samples of selected kernel can be displayed.
* Add `--list-blocks <arch>` option to general options to list available IP blocks on specified arch (similar to `--list-metrics`), cannot be used with `--block`.
* Added `config_delta/gfx950_diff.yaml` to analysis config yamls to track the revision between a gfx9 architecture against the latest supported architecture gfx950
+1
ファイルの表示
@@ -71,4 +71,5 @@ Selecting single kernel sorting by PC count:
.. note::
* PC sampling feature is currently in BETA version. To enable PC sampling, you have to explicitly enable it with block index 21.
* PC sampling now only shows assembly instructions collected in our record of pc samples and not all instructions of compiled code are represented.
* To associate PC sampling info back to HIP source code, you need to build the profiling target app with ``-g`` to keep the symbols. Otherwise, PC sampling info will be only associated with assembly lines.
+256 -178
ファイルの表示
@@ -27,7 +27,6 @@ import argparse
import ast
import json
import re
import sys
import warnings
from pathlib import Path
from typing import Any, Optional, Union
@@ -1233,20 +1232,35 @@ def search_pc_sampling_record(
records: Union[list[dict], dict],
) -> Optional[list[tuple]]:
"""
Search PC sampling records, and group and sort them
"""
Search PC sampling records.
# NB:
# The field stall_reason is vailid only for HW stochastic pc sampling.
# TODO: might save wavefront count for HW stochastic pc sampling?
Group by (code_object_id, code_object_offset, inst_index), and aggregate
counts, stall reasons, and dispatch IDs.
Returns:
A sorted list of tuples:
(
code_object_id,
code_object_offset,
inst_index,
total_count,
count_issued,
count_stalled,
sorted_stall_reasons,
sorted_dispatch_ids,
)
"""
if not records:
console_warning("PC sampling: no pc sampling record found!")
return None
# records should always be a list of dict
if isinstance(records, dict):
records = [records]
rocp_inst_not_issued_prefix_len = len(PC_SAMPLING_NOT_ISSUE_PREFIX)
grouped_data = {}
stall_reason_keys = {
"NONE": 0,
# No instruction available in the instruction cache.
@@ -1265,82 +1279,74 @@ def search_pc_sampling_record(
"LAST": 0,
}
# Populate grouped_data
grouped_data: dict[tuple, list] = {}
for item in records:
record = item["record"]
record = item.get("record", {})
pc_info = record.get("pc", {})
code_object_id = pc_info.get("code_object_id")
code_object_offset = pc_info.get("code_object_offset")
inst_index = item.get("inst_index")
dispatch_id = record.get("dispatch_id")
if None in (code_object_id, code_object_offset, inst_index):
continue
# Create composite key
key = (code_object_id, code_object_offset)
key = (code_object_id, code_object_offset, inst_index)
snapshot = record.get("snapshot", {})
issued = record.get("wave_issued")
issued = record.get("wave_issued", False)
if key not in grouped_data:
grouped_data[key] = [0, 0, 0, inst_index, {}]
grouped_data[key] = [0, 0, 0, {}, set()]
entry = grouped_data[key]
# Update counts
entry = grouped_data[key]
entry[0] += 1 # count
entry[3] = inst_index # inst_index
entry[0] += 1 # total_count
if issued:
entry[1] += 1 # count_issued
else:
entry[2] += 1 # count_stalled
stall_reason = snapshot.get("stall_reason")
if stall_reason and len(stall_reason) > rocp_inst_not_issued_prefix_len:
reason_key = stall_reason[rocp_inst_not_issued_prefix_len:]
if reason_key in stall_reason_keys:
entry[3][reason_key] = entry[3].get(reason_key, 0) + 1
# Process snapshot data
if snapshot:
if issued:
entry[1] += 1 # count_issued
else:
entry[2] += 1 # count_stalled
# Process stall reason only when stalled
stall_reason = snapshot.get("stall_reason")
if stall_reason:
# Extract reason key with bounds checking
if len(stall_reason) > rocp_inst_not_issued_prefix_len:
reason_key = stall_reason[rocp_inst_not_issued_prefix_len:]
# Only track known stall reasons
if reason_key in stall_reason_keys:
stall_reasons = entry[4]
stall_reasons[reason_key] = (
stall_reasons.get(reason_key, 0) + 1
)
# Add dispatch_id if valid
if dispatch_id is not None:
entry[4].add(dispatch_id)
if not grouped_data:
console_warning("PC sampling: no pc sampling record found!")
return None
# Convert to sorted list of tuples:
# (code_object_id, inst_index, code_object_offset, count)
# Prepare sorted output list
sorted_counts = sorted(
[
(
code_object_id,
info["inst_index"],
offset,
info["count"],
info["count_issued"],
info["count_stalled"],
# For info["stall_reason"], remove the zero entries,
# sorting the remaining items by their values in descending order
code_object_offset,
inst_index,
info[0], # total_count
info[1], # count_issued
info[2], # count_stalled
sorted(
((k, v) for k, v in info["stall_reason"].items() if v > 0),
((k, v) for k, v in info[3].items() if v > 0),
key=lambda item: item[1],
reverse=True,
),
), # sorted stall reasons
sorted(info[4]), # sorted dispatch_ids list
)
for code_object_id, offsets in grouped_data.items()
for offset, info in offsets.items()
for (
code_object_id,
code_object_offset,
inst_index,
), info in grouped_data.items()
],
key=lambda x: (
x[0],
x[2],
), # Sort by code_object_id, then by code_object_offset
key=lambda x: (x[0], x[1], x[2]),
)
return sorted_counts
@@ -1348,7 +1354,11 @@ def search_pc_sampling_record(
@demarcate
def load_pc_sampling_data_per_kernel(
method: str, file_name: Path, kernel_name: str, sorting_type: str
method: str,
file_name: Path,
csv_file_name: Path,
kernel_name: str,
sorting_type: str,
) -> pd.DataFrame:
"""
Load PC sampling raw data from json file with given method and kernel name,
@@ -1367,48 +1377,21 @@ def load_pc_sampling_data_per_kernel(
:return: The counted and reordering pc sampling info.
:rtype: pd.DataFrame:
"""
kernel_info_list = search_key_in_json(file_name, "kernel_symbols")
kernel_info = {}
if kernel_info_list:
for item in kernel_info_list:
if (
item["formatted_kernel_name"] == kernel_name
or item["demangled_kernel_name"] == kernel_name
or item["truncated_kernel_name"] == kernel_name
):
# kernel_info["kernel_id"] = item["kernel_id"]
kernel_info["code_object_id"] = item["code_object_id"]
kernel_info["entry_byte_offset"] = item["kernel_code_entry_byte_offset"]
break
if not kernel_info:
console_warning(f"PC sampling: can not find the kernel {kernel_name}")
return pd.DataFrame()
else:
console_debug(f"PC sampling: kernel {kernel_info}")
filtered_sorted_list = sorted(
[
item
for item in kernel_info_list
if item["code_object_id"] == kernel_info["code_object_id"]
],
key=lambda x: x["kernel_code_entry_byte_offset"],
# Load kernel trace CSV with kernel info
kernel_trace_df = pd.read_csv(
csv_file_name, usecols=["Dispatch_Id", "Kernel_Id", "Kernel_Name"]
)
console_debug(
f"PC sampling: loaded kernel trace with {len(kernel_trace_df)} entries"
)
for i, item in enumerate(filtered_sorted_list):
if item["kernel_code_entry_byte_offset"] == kernel_info["entry_byte_offset"]:
next_index = i + 1
if next_index < len(filtered_sorted_list): # Ensure the next item exists
next_item = filtered_sorted_list[next_index]
kernel_info["potential_end_offset"] = next_item[
"kernel_code_entry_byte_offset"
]
else:
kernel_info["potential_end_offset"] = sys.maxsize
break
# Filter kernels matching requested kernel_name
matching_kernels = kernel_trace_df[kernel_trace_df["Kernel_Name"] == kernel_name]
if matching_kernels.empty:
console_warning(f"PC sampling: cannot find kernel '{kernel_name}' in CSV")
return pd.DataFrame()
# Extract raw PC sampling records from JSON
pc_sample_key_loc = (
search_key_in_json(file_name, "pc_sample_host_trap")
if method == "host_trap"
@@ -1419,90 +1402,145 @@ def load_pc_sampling_data_per_kernel(
console_warning("PC sampling: can not find pc sample.")
return pd.DataFrame()
df = pd.DataFrame(
search_pc_sampling_record(pc_sample_key_loc),
columns=[
"code_object_id",
"inst_index",
"offset",
"count",
"count_issued",
"count_stalled",
"stall_reason",
],
# Get processed sampling data grouped by (code_object_id, offset, inst_index)
records = search_pc_sampling_record(pc_sample_key_loc)
if not records:
console_warning("PC sampling: no records found in PC sampling data.")
return pd.DataFrame()
# Flatten records by dispatch_id to create one row per dispatch ID
rows = []
for (
code_object_id,
offset,
inst_index,
count,
count_issued,
count_stalled,
stall_reasons,
dispatch_ids,
) in records:
for dispatch_id in dispatch_ids:
rows.append({
"dispatch_id": dispatch_id,
"code_object_id": code_object_id,
"offset": offset,
"inst_index": inst_index,
"count": count,
"count_issued": count_issued,
"count_stalled": count_stalled,
"stall_reason": stall_reasons,
})
df = pd.DataFrame(rows)
if df.empty:
console_warning("PC sampling: no records found after flattening dispatch IDs.")
return df
# Map dispatch_id to kernel info (Kernel_Id and Kernel_Name)
dispatch_to_kernel = kernel_trace_df.set_index("Dispatch_Id")[
["Kernel_Id", "Kernel_Name"]
]
# Map dispatch_id to kernel info (Kernel_Id and Kernel_Name)
df["kernel_id"] = df["dispatch_id"].map(dispatch_to_kernel["Kernel_Id"])
df["kernel_name"] = df["dispatch_id"].map(dispatch_to_kernel["Kernel_Name"])
# Drop dispatch_id
df.drop(columns=["dispatch_id"], inplace=True)
def merge_stall_reasons(
stall_reason_series: list[Optional[list[tuple[str, int]]]],
) -> list[tuple[str, int]]:
"""
Function to merge stall_reason lists (list of dicts -> merged & sorted dict)
"""
merged_counts = {}
for entry in stall_reason_series:
if not entry:
continue
# Each entry is a list of (key, count) tuples
for k, v in entry:
if v > 0:
merged_counts[k] = merged_counts.get(k, 0) + v
# Return sorted list of tuples by descending count
return sorted(merged_counts.items(), key=lambda item: item[1], reverse=True)
# Group and aggregate
df = df.groupby(["code_object_id", "offset", "kernel_id"], as_index=False).agg({
"inst_index": "first",
"count": "sum",
"count_issued": "sum",
"count_stalled": "sum",
"stall_reason": merge_stall_reasons,
"kernel_name": "first",
})
# Filter DataFrame to only include rows matching the requested kernel_name
df = df[df["kernel_name"] == kernel_name]
# Convert offset column to hex string for display, keep original numeric for sorting
df["offset"] = df["offset"].apply(lambda x: hex(x))
# Load PC sampling instructions from JSON (if available)
pc_sample_instructions = search_key_in_json(file_name, "pc_sample_instructions")
df["instruction"] = (
df["inst_index"].apply(
lambda x: pc_sample_instructions[x]
if x < len(pc_sample_instructions)
else None
)
if pc_sample_instructions
else None
)
df = df[
(df["code_object_id"] == kernel_info["code_object_id"])
& (df["offset"] > kernel_info["entry_byte_offset"])
& (df["offset"] < kernel_info["potential_end_offset"])
][
# Load source code comments (if available)
pc_sample_comments = search_key_in_json(file_name, "pc_sample_comments")
df["source_line"] = (
df["inst_index"].apply(
lambda x: f".../{Path(pc_sample_comments[x]).name}"
if x < len(pc_sample_comments)
else None
)
if pc_sample_comments
else None
)
# Sorting and returning relevant columns depending on method and sorting_type
if sorting_type == "offset":
df_sorted = df.sort_values(by=["code_object_id", "offset"])
elif sorting_type == "count":
df_sorted = df.sort_values(by=["count"], ascending=False)
else:
console_error(
'Error: pc sampling sorting_type must be either "offset" or "count".'
)
return pd.DataFrame()
columns_to_return = (
[
"inst_index",
"source_line",
"instruction",
"code_object_id",
"offset",
"count",
]
if method == "host_trap"
else [
"source_line",
"instruction",
"code_object_id",
"offset",
"count",
"count_issued",
"count_stalled",
"stall_reason",
]
]
)
df["offset"] = df["offset"].apply(lambda x: hex(x))
# Add instruction and source line information
pc_sample_instructions = search_key_in_json(file_name, "pc_sample_instructions")
if pc_sample_instructions:
df["instruction"] = df["inst_index"].apply(
lambda x: (
pc_sample_instructions[x] if x < len(pc_sample_instructions) else None
)
)
pc_sample_comments = search_key_in_json(file_name, "pc_sample_comments")
if pc_sample_comments:
df["source_line"] = df["inst_index"].apply(
lambda x: (
f".../{Path(pc_sample_comments[x]).name}"
if x < len(pc_sample_comments)
else None
)
)
# Return sorted data based on sorting type
if sorting_type == "offset":
return (
df[["source_line", "instruction", "offset", "count"]]
if method == "host_trap"
else df[
[
"source_line",
"instruction",
"offset",
"count",
"count_issued",
"count_stalled",
"stall_reason",
]
]
)
else: # sort by "count"
return (
df[["source_line", "instruction", "offset", "count"]].sort_values(
by="count", ascending=False
)
if method == "host_trap"
else df[
[
"source_line",
"instruction",
"offset",
"count",
"count_issued",
"count_stalled",
"stall_reason",
]
].sort_values(by="count", ascending=False)
)
return df_sorted[columns_to_return]
# might support sort by stall reason in the future
@@ -1526,6 +1564,14 @@ def load_pc_sampling_data(
# - Alternatively, we could check pc_sampling_method in json
stochastic_path = Path(dir_path) / f"{file_prefix}_pc_sampling_stochastic.csv"
host_trap_path = Path(dir_path) / f"{file_prefix}_pc_sampling_host_trap.csv"
json_file_path = Path(dir_path) / f"{file_prefix}_results.json"
csv_kernel_trace_file_path = Path(dir_path) / f"{file_prefix}_kernel_trace.csv"
if not csv_kernel_trace_file_path.exists():
console_error(
f"PC sampling: can not read {csv_kernel_trace_file_path}", exit=False
)
return pd.DataFrame()
if stochastic_path.exists():
pc_sampling_method = "stochastic"
@@ -1541,24 +1587,44 @@ def load_pc_sampling_data(
# No kernel filter, return grouped and sorted csv dir_pathectly
if not workload.filter_kernel_ids:
# Load instruction CSV
df = pd.read_csv(csv_file_path)
# Group by 'Instruction_Comment' and count occurrences
# Load kernel trace CSV
kernel_trace_df = pd.read_csv(csv_kernel_trace_file_path)
# Merge on Correlation_Id (instruction CSV) and Dispatch_Id (kernel trace CSV)
merged_df = df.merge(
kernel_trace_df[["Dispatch_Id", "Kernel_Name", "Kernel_Id"]],
how="left",
left_on="Correlation_Id",
right_on="Dispatch_Id",
)
# Group by Instruction_Comment and aggregate
grouped_counts = (
df.groupby("Instruction_Comment")
merged_df.groupby("Instruction_Comment")
.agg(
count=("Instruction_Comment", "count"),
instruction=("Instruction", "first"),
Kernel_Id=("Kernel_Id", "first"),
Kernel_Name=("Kernel_Name", "first"),
)
.reset_index()
.rename(columns={"Instruction_Comment": "source_line"})
)
grouped_counts = grouped_counts[["source_line", "instruction", "count"]]
grouped_counts = grouped_counts[
[
"source_line",
"Kernel_Name",
"instruction",
"count",
]
]
grouped_counts["source_line"] = grouped_counts["source_line"].apply(
lambda x: f".../{Path(x).name}"
lambda x: f".../{Path(x).name}" if isinstance(x, str) and x else x
)
# Sort by the count of occurrences
return grouped_counts.sort_values(by="count", ascending=False)
elif len(workload.filter_kernel_ids) > 1:
@@ -1570,8 +1636,6 @@ def load_pc_sampling_data(
return pd.DataFrame()
elif len(workload.filter_kernel_ids) == 1:
# NB: the default file name is subject to changes from rocprofv3/rocprofiler_sdk
json_file_path = Path(dir_path) / f"{file_prefix}_results.json"
if not json_file_path.exists():
console_error(f"PC sampling: can not read {json_file_path}", exit=False)
return pd.DataFrame()
@@ -1580,11 +1644,25 @@ def load_pc_sampling_data(
# We should find better way to remove the dependency on kernel_top_table
kernel_top_df = workload.dfs[PMC_KERNEL_TOP_TABLE_ID]
file = Path(dir_path) / str(kernel_top_df.loc[0, "from_csv"])
kernel_name = pd.read_csv(file).loc[
workload.filter_kernel_ids[0], "Kernel_Name"
]
kernel_index = workload.filter_kernel_ids[0]
kernel_df = pd.read_csv(file)
if kernel_index >= len(kernel_df):
console_warning(
f"Kernel index {kernel_index} is out of bounds. "
f"kernel_top CSV has only {len(kernel_df)} rows."
)
return pd.DataFrame()
kernel_name = kernel_df.iloc[kernel_index]["Kernel_Name"]
return load_pc_sampling_data_per_kernel(
pc_sampling_method, json_file_path, kernel_name, sorting_type
pc_sampling_method,
json_file_path,
csv_kernel_trace_file_path,
kernel_name,
sorting_type,
)
else:
console_warning("PC sampling: No data")
+2
ファイルの表示
@@ -977,6 +977,7 @@ def pc_sampling_prof(
"ROCPROF_PC_SAMPLING_UNIT": unit,
"ROCPROF_PC_SAMPLING_INTERVAL": str(interval),
"ROCPROF_PC_SAMPLING_METHOD": method,
"ROCPROF_KERNEL_TRACE": "1",
}
new_env = os.environ.copy()
for key, value in options.items():
@@ -988,6 +989,7 @@ def pc_sampling_prof(
)
else:
options = [
"--kernel-trace",
"--pc-sampling-beta-enabled",
"--pc-sampling-method",
method,