[rocprofiler-compute] Update analysis db for visualizer integration (#1548)

* Analysis db changes for visualizer

* Add support for per kernel analysis metrics

* Add support for dispatch timeline visualiztion

* Show median instead of mean of dispatch duration in kernel view

* Add test case to validate analysis db schema

* Analysis db schema updte
    * Add Kernel table and make Metric and Dispatch table its children
    * Kernel table is a child of Workload table
    * Update metric_view to show kernel_name column
    * Add disptach timestamps to Dispatch table for dispatch timeline
      visualization
    * Update kernel_view to show duration_ns_median instead of mean
      duration

* Add mean duation in kernel view

* update changelog

---------

Co-authored-by: Fei Zheng <44449748+feizheng10@users.noreply.github.com>
This commit is contained in:
vedithal-amd
2025-11-03 09:25:12 -05:00
committed by GitHub
parent dbb361c606
commit bb5fd1d4ae
7 changed files with 241 additions and 92 deletions
@@ -10,6 +10,11 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs.
* Added `config_delta/gfx950_diff.yaml` to analysis config yamls to track the revision between a gfx9 architecture against the latest supported architecture gfx950
* Analysis db features
* Add support for per kernel metrics analysis.
* Add support for dispatch timeline analysis.
* Show duration as median in addition to mean in kernel view.
### Changed
* `-b/--block` accepts block alias(es) (See block aliases using command-line option `--list-blocks <arch>`).
Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 185 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 34 KiB

@@ -126,6 +126,7 @@ class db_analysis(OmniAnalyze_Base):
profiling_config_extdata=self._profiling_config,
)
Database.get_session().add(workload_obj)
for pc_sample in self._pc_sampling_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
@@ -142,42 +143,6 @@ class db_analysis(OmniAnalyze_Base):
workload=workload_obj,
)
)
for dispatch in self._dispatch_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
Database.get_session().add(
orm.Dispatch(
dispatch_id=dispatch.dispatch_id,
kernel_name=dispatch.kernel_name,
gpu_id=dispatch.gpu_id,
duration=dispatch.duration,
workload=workload_obj,
)
)
for metric in self._metrics_info_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
metric_obj = orm.Metric(
name=metric.name,
metric_id=metric.metric_id,
description=metric.description,
unit=metric.unit,
table_name=metric.table_name,
sub_table_name=metric.sub_table_name,
workload=workload_obj,
)
Database.get_session().add(metric_obj)
for value in self._values_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
if value.metric_id == metric.metric_id:
Database.get_session().add(
orm.Value(
metric=metric_obj,
value_name=value.value_name,
value=value.value,
)
)
for roofline_data in self._roofline_data_per_workload.get(
workload_path, pd.DataFrame()
@@ -193,6 +158,63 @@ class db_analysis(OmniAnalyze_Base):
)
)
kernel_objs: dict[str, orm.Kernel] = {}
for dispatch in self._dispatch_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
# Add kernel object and map it, if not already added
if dispatch.kernel_name not in kernel_objs:
kernel_objs[dispatch.kernel_name] = orm.Kernel(
kernel_name=dispatch.kernel_name,
workload=workload_obj,
)
Database.get_session().add(kernel_objs[dispatch.kernel_name])
# Add dispatch object and link with kernel object
Database.get_session().add(
orm.Dispatch(
dispatch_id=dispatch.dispatch_id,
gpu_id=dispatch.gpu_id,
start_timestamp=dispatch.start_timestamp,
end_timestamp=dispatch.end_timestamp,
kernel=kernel_objs[dispatch.kernel_name],
)
)
for metric in self._metrics_info_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
kernel_names = (
self._dispatch_data_per_workload[workload_path]["kernel_name"]
.unique()
.tolist()
)
for kernel_name in kernel_names:
metric_obj = orm.Metric(
name=metric.name,
metric_id=metric.metric_id,
description=metric.description,
unit=metric.unit,
table_name=metric.table_name,
sub_table_name=metric.sub_table_name,
kernel=kernel_objs[kernel_name],
)
Database.get_session().add(metric_obj)
for value in self._values_data_per_workload.get(
workload_path, pd.DataFrame()
).itertuples():
if (
value.metric_id == metric.metric_id
and value.kernel_name == kernel_name
):
Database.get_session().add(
orm.Value(
metric=metric_obj,
value_name=value.value_name,
value=value.value,
)
)
version = get_version(rocprof_compute_home)
Database.get_session().add(
orm.Metadata(
@@ -406,43 +428,63 @@ class db_analysis(OmniAnalyze_Base):
console_warning(f"Failed to evaluate expression for {name}: {value} - {e}")
return None
@staticmethod
def per_kernel_calc_expressions(
kernel_name: str, pmc_df: pd.DataFrame, sys_info: dict, value_df: pd.DataFrame
) -> pd.Series:
console_debug(f"Calculating expressions for kernel: {kernel_name}")
# Calculate PER_XCD variables first
for key, value in BUILD_IN_VARS.items():
if "PER_XCD" in key:
sys_info[key] = db_analysis.evaluate(
key, value, pmc_df, sys_info, parse=True
)
# Variable dependent on PER_XCD variables
for key, value in BUILD_IN_VARS.items():
if "PER_XCD" not in key:
sys_info[key] = db_analysis.evaluate(
key, value, pmc_df, sys_info, parse=True
)
# Evaluate expressions while printing warnings
return value_df.apply(
lambda row: db_analysis.evaluate(
f"{row['metric_id']} - {row['value_name']}",
row["value"],
pmc_df,
sys_info,
),
axis=1,
)
def calc_expressions(self) -> dict[str, pd.DataFrame]:
values_data_per_workload = self._values_data_per_workload.copy()
for workload_path in self._runs.keys():
pmc_df = self._pmc_df_per_workload[workload_path].copy()
kernel_names = (
self._dispatch_data_per_workload[workload_path]["kernel_name"]
.unique()
.tolist()
)
pmc_df = self._pmc_df_per_workload[workload_path]
value_df = self._values_data_per_workload[workload_path]
sys_info = self._runs[workload_path].sys_info.iloc[0].to_dict()
for key, value in self._roofline_ceilings_per_workload.get(
workload_path, {}
).items():
sys_info[f"{key}_empirical_peak"] = value
# Calculate PER_XCD variables first
for key, value in BUILD_IN_VARS.items():
if "PER_XCD" in key:
sys_info[key] = db_analysis.evaluate(
key, value, pmc_df, sys_info, parse=True
)
# variable dependent on PER_XCD variables
for key, value in BUILD_IN_VARS.items():
if "PER_XCD" not in key:
sys_info[key] = db_analysis.evaluate(
key, value, pmc_df, sys_info, parse=True
)
# Get name and print warning
values_data_per_workload[workload_path]["value"] = values_data_per_workload[
workload_path
].apply(
lambda row: db_analysis.evaluate(
f"{row['metric_id']} - {row['value_name']}",
row["value"],
pmc_df,
sys_info,
),
axis=1,
)
for kernel_name in kernel_names:
values_data_per_workload[workload_path].loc[
value_df["kernel_name"] == kernel_name, "value"
] = db_analysis.per_kernel_calc_expressions(
kernel_name,
# Filter pmc_df for current kernel
pmc_df[pmc_df["Kernel_Name"] == kernel_name],
# Pass a copy to prevent side-effects in multiprocessing
sys_info.copy(),
# Filter value_df for current kernel
value_df.loc[value_df["kernel_name"] == kernel_name],
)
console_debug("Calculated metric values")
return values_data_per_workload
@@ -493,11 +535,17 @@ class db_analysis(OmniAnalyze_Base):
if set(metric_df.columns).intersection({"Metric", "Channel"})
for metric_id, row in metric_df.iterrows()
])
kernel_names = (
self._dispatch_data_per_workload[workload_path]["kernel_name"]
.unique()
.tolist()
)
values_df = pd.DataFrame([
{
"metric_id": metric_id,
"value_name": value_name,
"value": row[value_name].strip(),
"kernel_name": kernel_name,
}
for metric_df_id, metric_df in self._arch_configs[gfx_arch].dfs.items()
if metric_df_id
@@ -507,6 +555,7 @@ class db_analysis(OmniAnalyze_Base):
for value_name in metric_df.drop(
columns=non_expression_columns, errors="ignore"
).columns
for kernel_name in kernel_names
])
metrics_info_data_per_workload[workload_path] = metrics_info_df
@@ -524,7 +573,8 @@ class db_analysis(OmniAnalyze_Base):
"dispatch_id": row.Dispatch_ID,
"kernel_name": row.Kernel_Name,
"gpu_id": row.GPU_ID,
"duration": row.End_Timestamp - row.Start_Timestamp,
"start_timestamp": row.Start_Timestamp,
"end_timestamp": row.End_Timestamp,
}
for row in self._pmc_df_per_workload[workload_path].itertuples()
])
@@ -45,7 +45,7 @@ from sqlalchemy.sql import Select
from utils.logger import console_debug, console_error
PREFIX = "compute_"
SCHEMA_VERSION = "1.0.0"
SCHEMA_VERSION = "1.1.0"
Base = declarative_base()
@@ -61,10 +61,8 @@ class Workload(Base):
roofline_bench_extdata = Column(JSON)
profiling_config_extdata = Column(JSON)
# Workload can have multiple dispatches
dispatches = relationship("Dispatch", back_populates="workload")
# Workload can have multiple metrics
metrics = relationship("Metric", back_populates="workload")
# Workload can have multiple kernels
kernels = relationship("Kernel", back_populates="workload")
# Workload can have multiple roofline data points
roofline_data_points = relationship("RooflineData", back_populates="workload")
# Workload can have multiple pc_sampling values
@@ -75,8 +73,8 @@ class Metric(Base):
__tablename__ = f"{PREFIX}metric"
metric_uuid = Column(Integer, primary_key=True)
workload_id = Column(
Integer, ForeignKey(f"{PREFIX}workload.workload_id"), nullable=False
kernel_uuid = Column(
Integer, ForeignKey(f"{PREFIX}kernel.kernel_uuid"), nullable=False
)
name = Column(String) # e.g. Wavefronts Num
metric_id = Column(String) # e.g. 4.1.3
@@ -85,8 +83,8 @@ class Metric(Base):
sub_table_name = Column(String) # e.g. Wavefront stats
unit = Column(String) # e.g. Gbps
# Metric can have one workload
workload = relationship("Workload", back_populates="metrics")
# Metric can have one kernel
kernel = relationship("Kernel", back_populates="metrics")
# Metric can have multiple values
values = relationship("Value", back_populates="metric")
@@ -112,16 +110,33 @@ class Dispatch(Base):
__tablename__ = f"{PREFIX}dispatch"
dispatch_uuid = Column(Integer, primary_key=True)
kernel_uuid = Column(
Integer, ForeignKey(f"{PREFIX}kernel.kernel_uuid"), nullable=False
)
dispatch_id = Column(Integer)
gpu_id = Column(Integer)
start_timestamp = Column(Integer)
end_timestamp = Column(Integer)
# Dispatch can have one kernel
kernel = relationship("Kernel", back_populates="dispatches")
class Kernel(Base):
__tablename__ = f"{PREFIX}kernel"
kernel_uuid = Column(Integer, primary_key=True)
workload_id = Column(
Integer, ForeignKey(f"{PREFIX}workload.workload_id"), nullable=False
)
dispatch_id = Column(Integer)
kernel_name = Column(String)
gpu_id = Column(Integer)
duration = Column(Integer)
# Dispatch can have one workload
workload = relationship("Workload", back_populates="dispatches")
# Kernel can have one workload
workload = relationship("Workload", back_populates="kernels")
# Kernel can have multiple dispatches
dispatches = relationship("Dispatch", back_populates="kernel")
# Kernel can have multiple metrics
metrics = relationship("Metric", back_populates="kernel")
class PCsampling(Base):
@@ -199,16 +214,59 @@ class Database:
def get_views() -> list[TextClause]:
# Calculate median by finding middle value(s)
median_subquery = (
select(
Kernel.kernel_name,
(Dispatch.end_timestamp - Dispatch.start_timestamp).label("duration"),
func.row_number()
.over(
partition_by=Kernel.kernel_name,
order_by=Dispatch.end_timestamp - Dispatch.start_timestamp,
)
.label("row_num"),
func.count().over(partition_by=Kernel.kernel_name).label("total_count"),
)
.select_from(Dispatch)
.join(Kernel, Dispatch.kernel_uuid == Kernel.kernel_uuid)
)
median_calc = (
select(
median_subquery.c.kernel_name,
func.avg(median_subquery.c.duration).label("duration_ns_median"),
)
.where(
# For odd counts: get the middle row
# For even counts: get the two middle rows and average them
median_subquery.c.row_num.in_([
func.cast((median_subquery.c.total_count + 1) / 2, Integer),
func.cast((median_subquery.c.total_count + 2) / 2, Integer),
])
)
.group_by(median_subquery.c.kernel_name)
)
views: dict[str, Select[Any]] = {
"kernel_view": select(
Dispatch.kernel_name,
Kernel.kernel_name,
func.count(Dispatch.dispatch_id).label("dispatch_count"),
func.sum(Dispatch.duration).label("duration_sum"),
func.avg(Dispatch.duration).label("duration_mean"),
).group_by(Dispatch.kernel_name),
func.sum(Dispatch.end_timestamp - Dispatch.start_timestamp).label(
"duration_ns_sum"
),
median_calc.c.duration_ns_median,
func.avg(Dispatch.end_timestamp - Dispatch.start_timestamp).label(
"duration_ns_mean"
),
)
.select_from(Dispatch)
.join(Kernel, Dispatch.kernel_uuid == Kernel.kernel_uuid)
.join(median_calc.subquery(), Kernel.kernel_name == median_calc.c.kernel_name)
.group_by(Kernel.kernel_name),
"metric_view": select(
Metric.workload_id,
Metric.name,
Workload.name.label("workload_name"),
Kernel.kernel_name,
Metric.name.label("metric_name"),
Metric.metric_id,
Metric.description,
Metric.table_name,
@@ -216,7 +274,11 @@ def get_views() -> list[TextClause]:
Metric.unit,
Value.value_name,
Value.value,
).join(Value, Metric.metric_uuid == Value.metric_uuid),
)
.select_from(Metric)
.join(Kernel, Metric.kernel_uuid == Kernel.kernel_uuid)
.join(Value, Metric.metric_uuid == Value.metric_uuid)
.join(Workload, Kernel.workload_id == Workload.workload_id),
}
return [
@@ -108,15 +108,11 @@ def process_rocpd_csv(df: pd.DataFrame) -> pd.DataFrame:
"SGPR": group_df["SGPR"].iloc[0],
"Kernel_Name": group_df["Kernel_Name"].iloc[0],
"Kernel_ID": group_df["Kernel_ID"].iloc[0],
"Start_Timestamp": group_df["Start_Timestamp"].iloc[0],
"End_Timestamp": group_df["End_Timestamp"].iloc[0],
}
# Each counter will become its own column
row.update(dict(zip(group_df["Counter_Name"], group_df["Counter_Value"])))
# Replace end timestamp with median of durations of group,
# start timestamp is set to 0
row["End_Timestamp"] = (
group_df["End_Timestamp"] - group_df["Start_Timestamp"]
).median()
row["Start_Timestamp"] = 0.0
data.append(row)
df = pd.DataFrame(data)
# Rank GPU IDs, map lowest number to 0, next to 1, etc.
@@ -26,6 +26,7 @@
import inspect
import os
import re
import sqlite3
import subprocess
import sys
from pathlib import Path
@@ -752,7 +753,42 @@ def test_analyze_rocpd(
assert code == 0
assert os.path.isfile(f"{db_name}.db")
# Remove test.db
# Open the sqlite database and assert the schema
# Import Kernel from analysis_orm.py
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from utils.analysis_orm import (
Dispatch,
Kernel,
Metadata,
Metric,
RooflineData,
Value,
Workload,
)
table_name_map = {
"compute_workload": Workload,
"compute_metric": Metric,
"compute_roofline_data": RooflineData,
"compute_dispatch": Dispatch,
"compute_kernel": Kernel,
"compute_value": Value,
"compute_metadata": Metadata,
}
def check_cols(table_name, orm_obj):
conn = sqlite3.connect(f"{db_name}.db")
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info('{table_name}');")
columns = cursor.fetchall()
column_names = [column[1] for column in columns]
expected_columns = [col.name for col in orm_obj.__table__.columns]
assert column_names == expected_columns
conn.close()
for table_name, orm_obj in table_name_map.items():
check_cols(table_name, orm_obj)
os.remove(f"{db_name}.db")
test_utils.clean_output_dir(config["cleanup"], workload_dir)