[rocprofiler-compute][TUI] Add interactive metric description (#718)

Этот коммит содержится в:
xuchen-amd
2025-08-25 15:53:55 -04:00
коммит произвёл GitHub
родитель 9a02dae75f
Коммит 5c8b34ddf5
8 изменённых файлов: 177 добавлений и 242 удалений
+3
Просмотреть файл
@@ -18,6 +18,9 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs.
* Enable SQC_DCACHE_INFLIGHT_LEVEL counter and associated metrics
* Enable TCP_TCP_LATENCY counter and associated counter for all GPUs except MI300
* Added interactive metric descriptions in TUI analyze mode
* users can now left click on any metric cell to view detailed descriptions in the dedicated `METRIC DESCRIPTION` tab
### Changed
* Add notice for change in default output format to `rocpd` in a future release
+1
Просмотреть файл
@@ -42,6 +42,7 @@ workload from ``rocprof-compute profile`` generated output directories.
and graphs visualizing the analysis data.
4. After the analysis results are loaded, you can start interactive analysis with detailed metrics.
You can left click on any metric cell to view detailed descriptions in the dedicated `METRIC DESCRIPTION` tab.
The TUI supports basic keyboard shortcuts, including quit application commands for easy navigation.
TUI analysis structure
+1 -1
Просмотреть файл
@@ -32,7 +32,7 @@ PROJECT_NAME = "rocprofiler-compute"
HIDDEN_COLUMNS = ["coll_level"]
HIDDEN_COLUMNS_CLI = ["Description", "coll_level"]
HIDDEN_COLUMNS_TUI = ["Description", "coll_level"]
HIDDEN_COLUMNS_TUI = ["coll_level"]
HIDDEN_SECTIONS = [1900, 2000]
TIME_UNITS = {"s": 10**9, "ms": 10**6, "us": 10**3, "ns": 1}
+48 -72
Просмотреть файл
@@ -1,5 +1,4 @@
import logging
from collections import defaultdict
from datetime import datetime
from enum import Enum
@@ -35,50 +34,36 @@ class Logger:
def set_output_area(self, output_area):
self.output_area = output_area
def log(self, message, level=LogLevel.INFO, update_ui=True):
def log(self, message, level="INFO", update_ui=True):
level_map = {
LogLevel.INFO: logging.INFO,
LogLevel.SUCCESS: logging.INFO,
LogLevel.WARNING: logging.WARNING,
LogLevel.ERROR: logging.ERROR,
"INFO": logging.INFO,
"SUCCESS": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
}
self.logger.log(level_map[level], message)
timestamp = datetime.now().strftime("%H:%M:%S")
if update_ui and self.output_area:
if level == LogLevel.ERROR:
formatted_msg = f"[{timestamp}] [ERROR] {message}"
elif level == LogLevel.WARNING:
formatted_msg = f"[{timestamp}] [WARNING] {message}"
elif level == LogLevel.SUCCESS:
formatted_msg = f"[{timestamp}] [SUCCESS] {message}"
else:
formatted_msg = f"[{timestamp}] [INFO] {message}"
if hasattr(self.output_area, "text"):
current_text = self.output_area.text
self.output_area.text = (
f"{current_text}\n{formatted_msg}"
if current_text
else formatted_msg
)
# HACK: moving curson to end of output
# (Is there a better way to achieve this?)
self.output_area.cursor_location = (999999, 0)
if update_ui and self.output_area and hasattr(self.output_area, "text"):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_msg = f"[{timestamp}] [{level}] {message}"
self.output_area.text = (
f"{self.output_area.text}\n{formatted_msg}"
if self.output_area.text
else formatted_msg
)
self.output_area.cursor_location = (999999, 0)
def info(self, message, update_ui=True):
self.log(message, LogLevel.INFO, update_ui)
self.log(message, "INFO", update_ui)
def success(self, message, update_ui=True):
self.log(message, LogLevel.SUCCESS, update_ui)
self.log(message, "SUCCESS", update_ui)
def warning(self, message, update_ui=True):
self.log(message, LogLevel.WARNING, update_ui)
self.log(message, "WARNING", update_ui)
def error(self, message, update_ui=True):
self.log(message, LogLevel.ERROR, update_ui)
self.log(message, "ERROR", update_ui)
def get_top_kernels_and_dispatch_ids(runs):
@@ -99,7 +84,6 @@ def get_top_kernels_and_dispatch_ids(runs):
top_kernel_df, dispatch_id_df, on="Kernel_Name", how="outer"
).sort_values("Pct", ascending=False)
# Remove unwanted columns
merged_df = merged_df.drop(columns=["Count", "GPU_ID"])
return merged_df.to_dict("records")
@@ -127,8 +111,7 @@ def process_panels_to_dataframes(args, kernel_df, archConfigs, roof_plot=None):
# args.max_stat_num
# args.df_file_dir
result_structure = defaultdict(dict)
result_structure = {}
decimal_precision = getattr(args, "decimal", 2) if args else 2
for panel_id, panel in archConfigs.panel_configs.items():
@@ -136,19 +119,21 @@ def process_panels_to_dataframes(args, kernel_df, archConfigs, roof_plot=None):
continue
section_name = f"{panel_id // 100}. {panel['title']}"
section_data = {}
for data_source in panel["data source"]:
for type, table_config in data_source.items():
table_id = table_config["id"]
if table_id not in kernel_df:
if (
table_id not in kernel_df
or kernel_df[table_id] is None
or kernel_df[table_id].empty
):
continue
base_df = kernel_df[table_id]
if base_df is None or base_df.empty:
continue
df = pd.DataFrame(index=base_df.index)
for header in list(base_df.columns):
@@ -160,49 +145,40 @@ def process_panels_to_dataframes(args, kernel_df, archConfigs, roof_plot=None):
df = apply_rounding_logic(df, decimal_precision)
subsection_name = (
str(table_config["id"] // 100) + "." + str(table_config["id"] % 100)
f"{table_config['id'] // 100}.{table_config['id'] % 100}"
)
if "title" in table_config and table_config["title"]:
if table_config.get("title"):
subsection_name += " " + table_config["title"]
result_structure[section_name][subsection_name] = {
section_data[subsection_name] = {
"df": df,
"tui_style": None,
"tui_style": (
table_config.get("tui_style")
if type == "metric_table"
else None
),
}
if type == "metric_table" and "tui_style" in table_config:
result_structure[section_name][subsection_name]["tui_style"] = (
table_config["tui_style"]
)
if section_data:
result_structure[section_name] = section_data
return dict(result_structure)
return result_structure
def apply_rounding_logic(df, decimal_precision):
df_copy = df.copy()
if df.empty:
return df
for column in df_copy.columns:
if column in ["Metric", "Tips", "coll_level", "Unit", "Kernel_Name", "Info"]:
continue
df_rounded = df.copy()
if df_copy[column].dtype in ["float64", "float32", "int64", "int32"]:
df_copy[column] = df_copy[column].round(decimal_precision)
else:
try:
numeric_series = pd.to_numeric(df_copy[column], errors="coerce")
if not numeric_series.isna().all():
rounded_series = numeric_series.round(decimal_precision)
float_cols = df_rounded.select_dtypes(include=["float"]).columns
if len(float_cols) > 0:
df_rounded[float_cols] = df_rounded[float_cols].round(decimal_precision)
if df_copy[column].dtype == "object":
df_copy[column] = df_copy[column].combine(
rounded_series,
lambda orig, rounded: (
rounded if pd.notna(rounded) else orig
),
)
else:
df_copy[column] = rounded_series
except (ValueError, TypeError):
continue
other_cols = df_rounded.select_dtypes(exclude=["float"]).columns
for col in other_cols:
numeric_series = pd.to_numeric(df_rounded[col], errors="coerce")
if numeric_series.notna().any():
df_rounded[col] = numeric_series.round(decimal_precision)
return df_copy
return df_rounded
+1 -18
Просмотреть файл
@@ -133,26 +133,9 @@ class KernelView(Container):
df_path = self.kernel_to_df_dict[kernel["Kernel_Name"]]["7. Wavefront"][
"7.1 Wavefront Launch Stats"
]["df"]
metric_avg = (
df_path[df_path["Metric"] == new_metric]["Avg"].iloc[0].item()
)
metric_avg = df_path[df_path["Metric"] == new_metric]["Avg"].iloc[0]
self.top_kernel_to_df_list[i][new_metric] = metric_avg
"""
header_order = [
"Dispatch_ID",
"Kernel_Name",
"Mean(ns)",
"Median(ns)",
"Sum(ns)",
"Compute Throughput",
"Memory Throughput",
"VGPRs",
"Grid Size",
"Workgroup Size",
]
"""
@on(RadioSet.Changed)
def on_radio_changed(self, event: RadioSet.Changed) -> None:
if not event.pressed:
+36 -40
Просмотреть файл
@@ -31,7 +31,6 @@ Contains the main view layout and organization for the application.
from pathlib import Path
from textual import on, work
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.reactive import reactive
from textual.widgets import DataTable
@@ -55,10 +54,7 @@ class MainView(Horizontal):
def __init__(self):
super().__init__(id="main-container")
self.start_path = (
# NOTE: is cwd the best choice?
Path.cwd() if DEFAULT_START_PATH is None else Path(DEFAULT_START_PATH)
)
self.start_path = Path(DEFAULT_START_PATH) if DEFAULT_START_PATH else Path.cwd()
self.logger = Logger()
self.logger.info("MainView initialized", update_ui=False)
@@ -66,7 +62,7 @@ class MainView(Horizontal):
"""Required for stdout compatibility."""
pass
def compose(self) -> ComposeResult:
def compose(self):
self.logger.info("Composing main view layout", update_ui=False)
yield MenuBar()
@@ -91,38 +87,40 @@ class MainView(Horizontal):
yield RightPanel()
@on(DataTable.CellSelected)
def on_data_table_cell_selected(self, event: DataTable.CellSelected) -> None:
try:
row_data = event.data_table.get_row_at(event.coordinate.row)
self.metric_description.text = (
f"Selected Metric ID: {row_data[0]}\nSelected Metric: {row_data[1]}\n"
)
self.logger.info(f"Row {event.coordinate.row} data displayed")
except Exception as e:
error_msg = f"Error displaying row {event.coordinate.row}: {str(e)}"
self.metric_description.text = error_msg
self.logger.error(error_msg)
def on_data_table_cell_selected(self, event):
table = event.data_table
row_idx = event.coordinate.row
visible_data = table.get_row_at(row_idx)
description = (
table._df.iloc[row_idx].get("Description", "No description")
if hasattr(table, "_df")
else "N/A"
)
self.metric_description.text = (
f"Selected Metric ID: {visible_data[0]}\n"
f"Selected Metric: {visible_data[1]}\n"
f"Description: {description}"
)
@work(thread=True)
def run_analysis(self) -> None:
def run_analysis(self):
self.kernel_to_df_dict = {}
self.top_kernel_to_df_list = []
if not self.selected_path:
self.app.call_from_thread(
lambda: self.query_one("#kernel-view").update_view(
"No directory selected for analysis", LogLevel.ERROR
)
self._update_kernel_view(
"No directory selected for analysis", LogLevel.ERROR
)
return
try:
self.logger.info(f"Starting analysis on: {self.selected_path}")
self.logger.info("Loading...")
self.app.call_from_thread(
lambda: self.query_one("#kernel-view").update_view(
f"Running analysis on: {self.selected_path}", LogLevel.SUCCESS
)
self._update_kernel_view(
f"Running analysis on: {self.selected_path}", LogLevel.SUCCESS
)
# 1. Create and TUI analyzer
@@ -135,39 +133,37 @@ class MainView(Horizontal):
sysinfo_path = Path(self.selected_path) / "sysinfo.csv"
if not sysinfo_path.exists():
raise FileNotFoundError(f"sysinfo.csv not found at {sysinfo_path}")
sys_info = file_io.load_sys_info(sysinfo_path).iloc[0].to_dict()
self.app.load_soc_specs(sys_info)
analyzer.set_soc(self.app.soc)
# 3. run analysis
analyzer.set_soc(self.app.soc)
analyzer.pre_processing()
self.kernel_to_df_dict = analyzer.run_kernel_analysis()
self.top_kernel_to_df_list = analyzer.run_top_kernel()
if not self.kernel_to_df_dict or not self.top_kernel_to_df_list:
self.app.call_from_thread(
lambda: self.query_one("#kernel-view").update_view(
"Analysis completed but not all data was returned",
LogLevel.WARNING,
)
self._update_kernel_view(
"Analysis completed but not all data was returned", LogLevel.WARNING
)
else:
self.app.call_from_thread(self.refresh_results)
self.logger.info("Kernel Analysis completed successfully")
# self.logger.info(f"{self.kernel_to_df_dict}")
except Exception as e:
import traceback
error_msg = f"Analysis failed: {str(e)}"
self.logger.error(f"{error_msg}\n{traceback.format_exc()}")
self.app.call_from_thread(
lambda: self.query_one("#kernel-view").update_view(
error_msg, LogLevel.ERROR
)
)
self._update_kernel_view(error_msg, LogLevel.ERROR)
def refresh_results(self) -> None:
def _update_kernel_view(self, message, log_level):
self.app.call_from_thread(
lambda: self.query_one("#kernel-view").update_view(message, log_level)
)
def refresh_results(self):
kernel_view = self.query_one("#kernel-view")
if kernel_view:
kernel_view.update_results(
@@ -177,7 +173,7 @@ class MainView(Horizontal):
else:
self.logger.error("Kernel view not found or no data available")
def refresh_view(self) -> None:
def refresh_view(self):
if self.kernel_to_df_dict and self.top_kernel_to_df_list:
self.refresh_results()
else:
+77 -87
Просмотреть файл
@@ -24,9 +24,6 @@
##############################################################################
from typing import Any, Dict, List
import pandas as pd
import yaml
from textual.widgets import Collapsible, DataTable, Label
@@ -38,120 +35,113 @@ from rocprof_compute_tui.widgets.charts import (
)
def create_table(df: pd.DataFrame) -> DataTable:
def create_table(df, hidden_columns=[]):
table = DataTable(zebra_stripes=True)
df = df.reset_index()
df = df[~df.apply(lambda row: row.astype(str).str.strip().eq("").any(), axis=1)]
str_columns = [str(col) for col in df.columns]
table.add_columns(*str_columns)
table.add_rows([tuple(str(x) for x in row) for row in df.itertuples(index=False)])
df = df.reset_index().dropna()
df = df[~df.astype(str).apply(lambda row: row.str.strip().eq("").any(), axis=1)]
if df.empty:
return Label("No table data generated")
table._df = df
table._visible_cols = [col for col in df.columns if col not in hidden_columns]
table.add_columns(*table._visible_cols)
for _, row in df.iterrows():
table.add_row(*[str(row[col]) for col in table._visible_cols])
return table
def create_widget_from_data(df: pd.DataFrame, tui_style: str = None, context: str = ""):
def create_widget_from_data(df, tui_style=None, context=""):
if df is None or df.empty:
return Label(
f"Data not available{f' for {context}' if context else ''}",
classes="warning",
)
match tui_style:
# TODO: implement tui_style == "roofline"
# case "roofline":
# return Roofline(df)
case None:
return create_table(df)
case "mem_chart":
return MemoryChart(df)
case "simple_bar":
return SimpleBar(df)
case "simple_box":
return SimpleBox(df)
case "simple_multiple_bar":
return SimpleMultiBar(df)
case _:
return Label(f"Unknown display type: {tui_style}")
if tui_style is None:
return create_table(df, hidden_columns=["Description"])
elif tui_style == "mem_chart":
return MemoryChart(df)
elif tui_style == "simple_bar":
return SimpleBar(df)
elif tui_style == "simple_box":
return SimpleBox(df)
elif tui_style == "simple_multiple_bar":
return SimpleMultiBar(df)
else:
return Label(f"Unknown display type: {tui_style}")
def load_config(config_path) -> Dict[str, Any]:
try:
with open(config_path, "r") as file:
return yaml.safe_load(file)
except FileNotFoundError:
raise FileNotFoundError(
(
f"Configuration file {config_path} not found, \n"
"please populate the analysis_config.yaml file."
)
)
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML configuration: {e}")
def load_config(config_path):
with open(config_path, "r") as file:
return yaml.safe_load(file)
def build_section_from_config(
dfs: Dict[str, Any], section_config: Dict[str, Any]
) -> Collapsible:
def build_section_from_config(dfs, section_config):
title = section_config["title"]
collapsed = section_config.get("collapsed", True)
children = []
for subsection_config in section_config["subsections"]:
# Handle arch_config_data
if subsection_config.get("arch_config_data", False):
if isinstance(dfs, dict):
exclude_keys = subsection_config.get("exclude_keys", [])
for section_name, subsections in dfs.items():
if section_name not in exclude_keys and isinstance(
subsections, dict
):
kernel_children = []
for subsection_name, data in subsections.items():
if isinstance(data, dict) and "df" in data:
widget = create_widget_from_data(
data["df"], data.get("tui_style"), subsection_name
)
kernel_children.append(
Collapsible(
widget, title=subsection_name, collapsed=True
)
)
if kernel_children:
children.append(
for subsection_config in section_config["subsections"]:
subsection_title = subsection_config.get("title", "Untitled")
subsection_collapsed = subsection_config.get("collapsed", True)
# Handle arch_config_data (dynamic sections from dfs)
if subsection_config.get("arch_config_data", False):
exclude_keys = subsection_config.get("exclude_keys", [])
for section_name, subsections in dfs.items():
if section_name not in exclude_keys and isinstance(subsections, dict):
kernel_children = []
for subsection_name, data in subsections.items():
if isinstance(data, dict) and "df" in data:
widget = create_widget_from_data(
data["df"], data.get("tui_style"), subsection_name
)
kernel_children.append(
Collapsible(
*kernel_children, title=section_name, collapsed=True
widget, title=subsection_name, collapsed=True
)
)
else:
# Handle data_path
tui_style = subsection_config.get("tui_style")
if kernel_children:
children.append(
Collapsible(
*kernel_children, title=section_name, collapsed=True
)
)
# Handle data_path (specific data sections)
elif "data_path" in subsection_config:
data_path = subsection_config["data_path"]
tui_style = subsection_config.get("tui_style")
df = dfs.get(data_path[0], {}).get(data_path[1], {})
df = df.get("df") if isinstance(df, dict) else None
if df is not None and isinstance(df, dict) and tui_style is None:
tui_style = df.get("tui_style")
# Navigate data path
current = dfs
for key in data_path:
current = current.get(key, {}) if isinstance(current, dict) else {}
widgets = [
create_widget_from_data(df, tui_style, f"path {' -> '.join(data_path)}")
]
df = current.get("df") if isinstance(current, dict) else None
if df is not None and tui_style is None:
tui_style = current.get("tui_style")
widget = create_widget_from_data(
df, tui_style, f"path {' -> '.join(data_path)}"
)
children.append(
Collapsible(
*widgets,
title=subsection_config.get("title", "Untitled"),
collapsed=subsection_config.get("collapsed", True),
widget, title=subsection_title, collapsed=subsection_collapsed
)
)
return Collapsible(*children, title=title, collapsed=collapsed)
def build_all_sections(dfs: Dict[str, Any], config_path) -> List[Collapsible]:
def build_all_sections(dfs, config_path):
config = load_config(config_path)
sections = []
for section_config in config["sections"]:
section = build_section_from_config(dfs, section_config)
sections.append(section)
return sections
return [
build_section_from_config(dfs, section_config)
for section_config in config["sections"]
]
+10 -24
Просмотреть файл
@@ -1,5 +1,4 @@
from textual import on
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.reactive import reactive
from textual.widgets import Button
@@ -8,9 +7,7 @@ from rocprof_compute_tui.widgets.recent_directories import RecentDirectoriesScre
class DropdownMenu(Container):
"""A dropdown menu that appears when a menu button is clicked."""
def compose(self) -> ComposeResult:
def compose(self):
"""Compose the dropdown menu with menu items."""
yield Button("Open Workload", id="menu-open-workload", classes="menu-item")
yield Button("Open Recent", id="menu-open-recent", classes="menu-item")
@@ -19,21 +16,17 @@ class DropdownMenu(Container):
yield Button("Exit", id="menu-exit", classes="menu-item")
def on_mount(self) -> None:
"""Hide the dropdown menu when it's first mounted."""
self.add_class("hidden")
class MenuButton(Button):
"""A button that toggles a dropdown menu when clicked."""
is_open = reactive(False)
def __init__(self, label: str, menu_id: str, *args, **kwargs):
def __init__(self, label, menu_id, *args, **kwargs):
super().__init__(label, *args, **kwargs)
self.menu_id = menu_id
def on_click(self) -> None:
"""Toggle the dropdown menu when clicked."""
def on_click(self):
self.is_open = not self.is_open
dropdown = self.app.query_one(f"#{self.menu_id}", DropdownMenu)
@@ -46,36 +39,29 @@ class MenuButton(Button):
class MenuBar(Container):
"""A menu bar that spans the width of the app."""
def compose(self) -> ComposeResult:
"""Compose the menu bar with menu buttons and dropdown menus."""
def compose(self):
yield Horizontal(
MenuButton("File", "file-dropdown", id="menu-file"),
# TODO:
# Button("Help (🚧)", id="menu-placeholder"),
id="menu-buttons",
MenuButton("File", "file-dropdown", id="menu-file"), id="menu-buttons"
)
# Create a container for the dropdown menus
with Container(id="dropdown-container"):
yield DropdownMenu(id="file-dropdown")
yield DropdownMenu(id="placeholder-dropdown")
def on_mount(self) -> None:
def on_mount(self):
self.border_title = "MENU BAR"
self.add_class("section")
self.parent_main_view = self.screen.query_one("#main-container", Horizontal)
@on(Button.Pressed, "#menu-open-recent")
def show_recent(self) -> None:
def show_recent(self):
if not self.app.recent_dirs:
self.notify("No recent directories found", severity="warning")
return
def on_recent_selected(selected_dir: str) -> None:
def on_recent_selected(selected_dir):
if selected_dir:
self.parent_main_view.selected_path = selected_dir
dropdown = self.query_one("#file-dropdown", DropdownMenu)
dropdown.add_class("hidden")
self.query_one("#file-dropdown", DropdownMenu).add_class("hidden")
self.parent_main_view.run_analysis()
self.app.push_screen(
@@ -83,5 +69,5 @@ class MenuBar(Container):
)
@on(Button.Pressed, "#menu-exit")
def exit_app(self) -> None:
def exit_app(self):
self.app.exit()