diff --git a/projects/rocprofiler-compute/CHANGELOG.md b/projects/rocprofiler-compute/CHANGELOG.md index e2903dc9cf..6cfacf28e3 100644 --- a/projects/rocprofiler-compute/CHANGELOG.md +++ b/projects/rocprofiler-compute/CHANGELOG.md @@ -10,6 +10,11 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs. * Added `config_delta/gfx950_diff.yaml` to analysis config yamls to track the revision between a gfx9 architecture against the latest supported architecture gfx950 +* Analysis db features + * Add support for per kernel metrics analysis. + * Add support for dispatch timeline analysis. + * Show duration as median in addition to mean in kernel view. + ### Changed * `-b/--block` accepts block alias(es) (See block aliases using command-line option `--list-blocks `). diff --git a/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_schema.png b/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_schema.png index 7eb4813cf4..274b6f0020 100644 Binary files a/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_schema.png and b/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_schema.png differ diff --git a/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_views.png b/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_views.png index e7302258f9..954ee506dd 100644 Binary files a/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_views.png and b/projects/rocprofiler-compute/docs/data/analyze/analysis_data_dump_views.png differ diff --git a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py index e4b26f7964..46afb28a1d 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py @@ -126,6 +126,7 @@ class db_analysis(OmniAnalyze_Base): profiling_config_extdata=self._profiling_config, ) Database.get_session().add(workload_obj) + for pc_sample in self._pc_sampling_data_per_workload.get( workload_path, pd.DataFrame() ).itertuples(): @@ -142,42 +143,6 @@ class db_analysis(OmniAnalyze_Base): workload=workload_obj, ) ) - for dispatch in self._dispatch_data_per_workload.get( - workload_path, pd.DataFrame() - ).itertuples(): - Database.get_session().add( - orm.Dispatch( - dispatch_id=dispatch.dispatch_id, - kernel_name=dispatch.kernel_name, - gpu_id=dispatch.gpu_id, - duration=dispatch.duration, - workload=workload_obj, - ) - ) - for metric in self._metrics_info_data_per_workload.get( - workload_path, pd.DataFrame() - ).itertuples(): - metric_obj = orm.Metric( - name=metric.name, - metric_id=metric.metric_id, - description=metric.description, - unit=metric.unit, - table_name=metric.table_name, - sub_table_name=metric.sub_table_name, - workload=workload_obj, - ) - Database.get_session().add(metric_obj) - for value in self._values_data_per_workload.get( - workload_path, pd.DataFrame() - ).itertuples(): - if value.metric_id == metric.metric_id: - Database.get_session().add( - orm.Value( - metric=metric_obj, - value_name=value.value_name, - value=value.value, - ) - ) for roofline_data in self._roofline_data_per_workload.get( workload_path, pd.DataFrame() @@ -193,6 +158,63 @@ class db_analysis(OmniAnalyze_Base): ) ) + kernel_objs: dict[str, orm.Kernel] = {} + for dispatch in self._dispatch_data_per_workload.get( + workload_path, pd.DataFrame() + ).itertuples(): + # Add kernel object and map it, if not already added + if dispatch.kernel_name not in kernel_objs: + kernel_objs[dispatch.kernel_name] = orm.Kernel( + kernel_name=dispatch.kernel_name, + workload=workload_obj, + ) + Database.get_session().add(kernel_objs[dispatch.kernel_name]) + + # Add dispatch object and link with kernel object + Database.get_session().add( + orm.Dispatch( + dispatch_id=dispatch.dispatch_id, + gpu_id=dispatch.gpu_id, + start_timestamp=dispatch.start_timestamp, + end_timestamp=dispatch.end_timestamp, + kernel=kernel_objs[dispatch.kernel_name], + ) + ) + + for metric in self._metrics_info_data_per_workload.get( + workload_path, pd.DataFrame() + ).itertuples(): + kernel_names = ( + self._dispatch_data_per_workload[workload_path]["kernel_name"] + .unique() + .tolist() + ) + for kernel_name in kernel_names: + metric_obj = orm.Metric( + name=metric.name, + metric_id=metric.metric_id, + description=metric.description, + unit=metric.unit, + table_name=metric.table_name, + sub_table_name=metric.sub_table_name, + kernel=kernel_objs[kernel_name], + ) + Database.get_session().add(metric_obj) + for value in self._values_data_per_workload.get( + workload_path, pd.DataFrame() + ).itertuples(): + if ( + value.metric_id == metric.metric_id + and value.kernel_name == kernel_name + ): + Database.get_session().add( + orm.Value( + metric=metric_obj, + value_name=value.value_name, + value=value.value, + ) + ) + version = get_version(rocprof_compute_home) Database.get_session().add( orm.Metadata( @@ -406,43 +428,63 @@ class db_analysis(OmniAnalyze_Base): console_warning(f"Failed to evaluate expression for {name}: {value} - {e}") return None + @staticmethod + def per_kernel_calc_expressions( + kernel_name: str, pmc_df: pd.DataFrame, sys_info: dict, value_df: pd.DataFrame + ) -> pd.Series: + console_debug(f"Calculating expressions for kernel: {kernel_name}") + # Calculate PER_XCD variables first + for key, value in BUILD_IN_VARS.items(): + if "PER_XCD" in key: + sys_info[key] = db_analysis.evaluate( + key, value, pmc_df, sys_info, parse=True + ) + # Variable dependent on PER_XCD variables + for key, value in BUILD_IN_VARS.items(): + if "PER_XCD" not in key: + sys_info[key] = db_analysis.evaluate( + key, value, pmc_df, sys_info, parse=True + ) + # Evaluate expressions while printing warnings + return value_df.apply( + lambda row: db_analysis.evaluate( + f"{row['metric_id']} - {row['value_name']}", + row["value"], + pmc_df, + sys_info, + ), + axis=1, + ) + def calc_expressions(self) -> dict[str, pd.DataFrame]: values_data_per_workload = self._values_data_per_workload.copy() for workload_path in self._runs.keys(): - pmc_df = self._pmc_df_per_workload[workload_path].copy() + kernel_names = ( + self._dispatch_data_per_workload[workload_path]["kernel_name"] + .unique() + .tolist() + ) + pmc_df = self._pmc_df_per_workload[workload_path] + value_df = self._values_data_per_workload[workload_path] sys_info = self._runs[workload_path].sys_info.iloc[0].to_dict() for key, value in self._roofline_ceilings_per_workload.get( workload_path, {} ).items(): sys_info[f"{key}_empirical_peak"] = value - # Calculate PER_XCD variables first - for key, value in BUILD_IN_VARS.items(): - if "PER_XCD" in key: - sys_info[key] = db_analysis.evaluate( - key, value, pmc_df, sys_info, parse=True - ) - - # variable dependent on PER_XCD variables - for key, value in BUILD_IN_VARS.items(): - if "PER_XCD" not in key: - sys_info[key] = db_analysis.evaluate( - key, value, pmc_df, sys_info, parse=True - ) - - # Get name and print warning - values_data_per_workload[workload_path]["value"] = values_data_per_workload[ - workload_path - ].apply( - lambda row: db_analysis.evaluate( - f"{row['metric_id']} - {row['value_name']}", - row["value"], - pmc_df, - sys_info, - ), - axis=1, - ) + for kernel_name in kernel_names: + values_data_per_workload[workload_path].loc[ + value_df["kernel_name"] == kernel_name, "value" + ] = db_analysis.per_kernel_calc_expressions( + kernel_name, + # Filter pmc_df for current kernel + pmc_df[pmc_df["Kernel_Name"] == kernel_name], + # Pass a copy to prevent side-effects in multiprocessing + sys_info.copy(), + # Filter value_df for current kernel + value_df.loc[value_df["kernel_name"] == kernel_name], + ) console_debug("Calculated metric values") return values_data_per_workload @@ -493,11 +535,17 @@ class db_analysis(OmniAnalyze_Base): if set(metric_df.columns).intersection({"Metric", "Channel"}) for metric_id, row in metric_df.iterrows() ]) + kernel_names = ( + self._dispatch_data_per_workload[workload_path]["kernel_name"] + .unique() + .tolist() + ) values_df = pd.DataFrame([ { "metric_id": metric_id, "value_name": value_name, "value": row[value_name].strip(), + "kernel_name": kernel_name, } for metric_df_id, metric_df in self._arch_configs[gfx_arch].dfs.items() if metric_df_id @@ -507,6 +555,7 @@ class db_analysis(OmniAnalyze_Base): for value_name in metric_df.drop( columns=non_expression_columns, errors="ignore" ).columns + for kernel_name in kernel_names ]) metrics_info_data_per_workload[workload_path] = metrics_info_df @@ -524,7 +573,8 @@ class db_analysis(OmniAnalyze_Base): "dispatch_id": row.Dispatch_ID, "kernel_name": row.Kernel_Name, "gpu_id": row.GPU_ID, - "duration": row.End_Timestamp - row.Start_Timestamp, + "start_timestamp": row.Start_Timestamp, + "end_timestamp": row.End_Timestamp, } for row in self._pmc_df_per_workload[workload_path].itertuples() ]) diff --git a/projects/rocprofiler-compute/src/utils/analysis_orm.py b/projects/rocprofiler-compute/src/utils/analysis_orm.py index 92e78b6b75..d6d8839db3 100644 --- a/projects/rocprofiler-compute/src/utils/analysis_orm.py +++ b/projects/rocprofiler-compute/src/utils/analysis_orm.py @@ -45,7 +45,7 @@ from sqlalchemy.sql import Select from utils.logger import console_debug, console_error PREFIX = "compute_" -SCHEMA_VERSION = "1.0.0" +SCHEMA_VERSION = "1.1.0" Base = declarative_base() @@ -61,10 +61,8 @@ class Workload(Base): roofline_bench_extdata = Column(JSON) profiling_config_extdata = Column(JSON) - # Workload can have multiple dispatches - dispatches = relationship("Dispatch", back_populates="workload") - # Workload can have multiple metrics - metrics = relationship("Metric", back_populates="workload") + # Workload can have multiple kernels + kernels = relationship("Kernel", back_populates="workload") # Workload can have multiple roofline data points roofline_data_points = relationship("RooflineData", back_populates="workload") # Workload can have multiple pc_sampling values @@ -75,8 +73,8 @@ class Metric(Base): __tablename__ = f"{PREFIX}metric" metric_uuid = Column(Integer, primary_key=True) - workload_id = Column( - Integer, ForeignKey(f"{PREFIX}workload.workload_id"), nullable=False + kernel_uuid = Column( + Integer, ForeignKey(f"{PREFIX}kernel.kernel_uuid"), nullable=False ) name = Column(String) # e.g. Wavefronts Num metric_id = Column(String) # e.g. 4.1.3 @@ -85,8 +83,8 @@ class Metric(Base): sub_table_name = Column(String) # e.g. Wavefront stats unit = Column(String) # e.g. Gbps - # Metric can have one workload - workload = relationship("Workload", back_populates="metrics") + # Metric can have one kernel + kernel = relationship("Kernel", back_populates="metrics") # Metric can have multiple values values = relationship("Value", back_populates="metric") @@ -112,16 +110,33 @@ class Dispatch(Base): __tablename__ = f"{PREFIX}dispatch" dispatch_uuid = Column(Integer, primary_key=True) + kernel_uuid = Column( + Integer, ForeignKey(f"{PREFIX}kernel.kernel_uuid"), nullable=False + ) + dispatch_id = Column(Integer) + gpu_id = Column(Integer) + start_timestamp = Column(Integer) + end_timestamp = Column(Integer) + + # Dispatch can have one kernel + kernel = relationship("Kernel", back_populates="dispatches") + + +class Kernel(Base): + __tablename__ = f"{PREFIX}kernel" + + kernel_uuid = Column(Integer, primary_key=True) workload_id = Column( Integer, ForeignKey(f"{PREFIX}workload.workload_id"), nullable=False ) - dispatch_id = Column(Integer) kernel_name = Column(String) - gpu_id = Column(Integer) - duration = Column(Integer) - # Dispatch can have one workload - workload = relationship("Workload", back_populates="dispatches") + # Kernel can have one workload + workload = relationship("Workload", back_populates="kernels") + # Kernel can have multiple dispatches + dispatches = relationship("Dispatch", back_populates="kernel") + # Kernel can have multiple metrics + metrics = relationship("Metric", back_populates="kernel") class PCsampling(Base): @@ -199,16 +214,59 @@ class Database: def get_views() -> list[TextClause]: + # Calculate median by finding middle value(s) + median_subquery = ( + select( + Kernel.kernel_name, + (Dispatch.end_timestamp - Dispatch.start_timestamp).label("duration"), + func.row_number() + .over( + partition_by=Kernel.kernel_name, + order_by=Dispatch.end_timestamp - Dispatch.start_timestamp, + ) + .label("row_num"), + func.count().over(partition_by=Kernel.kernel_name).label("total_count"), + ) + .select_from(Dispatch) + .join(Kernel, Dispatch.kernel_uuid == Kernel.kernel_uuid) + ) + + median_calc = ( + select( + median_subquery.c.kernel_name, + func.avg(median_subquery.c.duration).label("duration_ns_median"), + ) + .where( + # For odd counts: get the middle row + # For even counts: get the two middle rows and average them + median_subquery.c.row_num.in_([ + func.cast((median_subquery.c.total_count + 1) / 2, Integer), + func.cast((median_subquery.c.total_count + 2) / 2, Integer), + ]) + ) + .group_by(median_subquery.c.kernel_name) + ) + views: dict[str, Select[Any]] = { "kernel_view": select( - Dispatch.kernel_name, + Kernel.kernel_name, func.count(Dispatch.dispatch_id).label("dispatch_count"), - func.sum(Dispatch.duration).label("duration_sum"), - func.avg(Dispatch.duration).label("duration_mean"), - ).group_by(Dispatch.kernel_name), + func.sum(Dispatch.end_timestamp - Dispatch.start_timestamp).label( + "duration_ns_sum" + ), + median_calc.c.duration_ns_median, + func.avg(Dispatch.end_timestamp - Dispatch.start_timestamp).label( + "duration_ns_mean" + ), + ) + .select_from(Dispatch) + .join(Kernel, Dispatch.kernel_uuid == Kernel.kernel_uuid) + .join(median_calc.subquery(), Kernel.kernel_name == median_calc.c.kernel_name) + .group_by(Kernel.kernel_name), "metric_view": select( - Metric.workload_id, - Metric.name, + Workload.name.label("workload_name"), + Kernel.kernel_name, + Metric.name.label("metric_name"), Metric.metric_id, Metric.description, Metric.table_name, @@ -216,7 +274,11 @@ def get_views() -> list[TextClause]: Metric.unit, Value.value_name, Value.value, - ).join(Value, Metric.metric_uuid == Value.metric_uuid), + ) + .select_from(Metric) + .join(Kernel, Metric.kernel_uuid == Kernel.kernel_uuid) + .join(Value, Metric.metric_uuid == Value.metric_uuid) + .join(Workload, Kernel.workload_id == Workload.workload_id), } return [ diff --git a/projects/rocprofiler-compute/src/utils/rocpd_data.py b/projects/rocprofiler-compute/src/utils/rocpd_data.py index eb04921fd4..8eec4a06a3 100644 --- a/projects/rocprofiler-compute/src/utils/rocpd_data.py +++ b/projects/rocprofiler-compute/src/utils/rocpd_data.py @@ -108,15 +108,11 @@ def process_rocpd_csv(df: pd.DataFrame) -> pd.DataFrame: "SGPR": group_df["SGPR"].iloc[0], "Kernel_Name": group_df["Kernel_Name"].iloc[0], "Kernel_ID": group_df["Kernel_ID"].iloc[0], + "Start_Timestamp": group_df["Start_Timestamp"].iloc[0], + "End_Timestamp": group_df["End_Timestamp"].iloc[0], } # Each counter will become its own column row.update(dict(zip(group_df["Counter_Name"], group_df["Counter_Value"]))) - # Replace end timestamp with median of durations of group, - # start timestamp is set to 0 - row["End_Timestamp"] = ( - group_df["End_Timestamp"] - group_df["Start_Timestamp"] - ).median() - row["Start_Timestamp"] = 0.0 data.append(row) df = pd.DataFrame(data) # Rank GPU IDs, map lowest number to 0, next to 1, etc. diff --git a/projects/rocprofiler-compute/tests/test_profile_general.py b/projects/rocprofiler-compute/tests/test_profile_general.py index b5c6bc84c0..1150b16a92 100644 --- a/projects/rocprofiler-compute/tests/test_profile_general.py +++ b/projects/rocprofiler-compute/tests/test_profile_general.py @@ -26,6 +26,7 @@ import inspect import os import re +import sqlite3 import subprocess import sys from pathlib import Path @@ -752,7 +753,42 @@ def test_analyze_rocpd( assert code == 0 assert os.path.isfile(f"{db_name}.db") - # Remove test.db + # Open the sqlite database and assert the schema + # Import Kernel from analysis_orm.py + sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + from utils.analysis_orm import ( + Dispatch, + Kernel, + Metadata, + Metric, + RooflineData, + Value, + Workload, + ) + + table_name_map = { + "compute_workload": Workload, + "compute_metric": Metric, + "compute_roofline_data": RooflineData, + "compute_dispatch": Dispatch, + "compute_kernel": Kernel, + "compute_value": Value, + "compute_metadata": Metadata, + } + + def check_cols(table_name, orm_obj): + conn = sqlite3.connect(f"{db_name}.db") + cursor = conn.cursor() + cursor.execute(f"PRAGMA table_info('{table_name}');") + columns = cursor.fetchall() + column_names = [column[1] for column in columns] + expected_columns = [col.name for col in orm_obj.__table__.columns] + assert column_names == expected_columns + conn.close() + + for table_name, orm_obj in table_name_map.items(): + check_cols(table_name, orm_obj) + os.remove(f"{db_name}.db") test_utils.clean_output_dir(config["cleanup"], workload_dir)