Introduce rocprof-compute TUI (Text User Interface) (#682)

* rocprof-compute TUI (Text User Interface) - providing users interactive analyze experience with visuals.

* Analyze results with tables, charts, plots.

* Add menu bar, terminal, directory dialog. Improve logging and ui.

* Add display config file to manipulate result categorization.

* Add support for recently opened dirs.

* Update licensing and version.

[ROCm/rocprofiler-compute commit: ca0cdaf948]
Этот коммит содержится в:
xuchen-amd
2025-06-04 17:06:08 -04:00
коммит произвёл GitHub
родитель dd2d9cddf0
Коммит 0db02c5fd6
31 изменённых файлов: 2949 добавлений и 23 удалений
+4
Просмотреть файл
@@ -6,6 +6,10 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs.
### Added
* Add rocprof-compute Text User Interface (TUI) support for analyze mode
* A command line based user interface to support interactive single-run analysis
* launch with `--tui` option in analyze mode. i.e., `rocprof-compute analyze --tui`
* Add support to be able to acquire from rocprofv3 every single channle on each XCD of TCC counters
* Add Docker files to package the application and dependencies into a single portable and executable standalone binary file
+11 -7
Просмотреть файл
@@ -22,19 +22,23 @@ SOFTWARE.
This application uses the following dependencies and their usage is governed by their respective licenses
Python 3 standard library: PSFL
Nuitka specific runtime code: Apache 2.0 license
astunparse python library: PSFL
colorlover python library: MIT
dash python library: MIT
dash-bootstrap-components python library: MIT
dash-svg python library: MIT
kaleido python library: MIT
matplotlib python library: PSFL
Nuitka specific runtime code: Apache 2.0 license
numpy python library: BSD
pandas python library: BSD
plotext python library: MIT
plotille python library: MIT
pymongo python library: Apache 2.0 license
pyyaml python library: MIT
tabulate python library: MIT
tqdm python library: MIT
dash-svg python library: MIT
dash-bootstrap-components python library: MIT
kaleido python library: MIT
setuptools python library: MIT
plotille python library: MIT
tabulate python library: MIT
textual python library: MIT
textual_plotext python library: MIT
textual-fspicker python library: MIT
tqdm python library: MIT
+10 -6
Просмотреть файл
@@ -1,15 +1,19 @@
astunparse==1.6.2
colorlover
dash-bootstrap-components
dash-svg
dash>=3.0.0
kaleido==0.2.1
matplotlib
numpy>=1.17.5
pandas>=1.4.3
plotext
plotille
pymongo
pyyaml
tabulate
tqdm
dash-svg
dash-bootstrap-components
kaleido==0.2.1
setuptools
plotille
tabulate
textual
textual_plotext
textual-fspicker
tqdm
+5 -1
Просмотреть файл
@@ -600,7 +600,11 @@ Examples:
const=8050,
help="\t\tActivate a GUI to interate with rocprofiler-compute metrics.\n\t\tOptionally, specify port to launch application (DEFAULT: 8050)",
)
analyze_group.add_argument(
"--tui",
action="store_true",
help="\t\tActivate a Textual User Interface (TUI) to interact with rocprofiler-compute metrics.",
)
analyze_group.add_argument(
"-R",
"--roofline-data-type",
+2
Просмотреть файл
@@ -185,6 +185,8 @@ class OmniAnalyze_Base:
@demarcate
def sanitize(self):
"""Perform sanitization of inputs"""
if self.__args.tui:
return
if not self.__args.path:
console_error("The following arguments are required: -p/--path")
# verify not accessing parent directories
+7 -1
Просмотреть файл
@@ -31,7 +31,6 @@ import sys
import time
from pathlib import Path
import pandas as pd
import yaml
import config
@@ -145,6 +144,8 @@ class RocProfCompute:
def detect_analyze(self):
if self.__args.gui:
self.__analyze_mode = "web_ui"
elif self.__args.tui:
self.__analyze_mode = "tui"
else:
self.__analyze_mode = "cli"
return
@@ -392,6 +393,11 @@ class RocProfCompute:
from rocprof_compute_analyze.analysis_webui import webui_analysis
analyzer = webui_analysis(self.__args, self.__supported_archs)
elif self.__analyze_mode == "tui":
from rocprof_compute_tui.tui_app import run_tui
run_tui(self.__args, self.__supported_archs)
return
else:
console_error("Unsupported analysis mode -> %s" % self.__analyze_mode)
@@ -313,3 +313,4 @@ Panel Config:
comparable: false # for now
cli_style: mem_chart
tui_style: mem_chart
+9 -8
Просмотреть файл
@@ -37,6 +37,7 @@ Panel Config:
tips:
comparable: false # for now
cli_style: simple_bar
tui_style: simple_bar
- metric_table:
id: 1202
@@ -55,42 +56,42 @@ Panel Config:
max: MAX((SQ_INSTS_LDS / $denom))
unit: (Instr + $normUnit)
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS LOAD:
avg: None
min: None
max: None
unit: (instr + $normUnit)
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS STORE:
avg: None
min: None
max: None
unit: (instr + $normUnit)
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS ATOMIC:
avg: None
min: None
max: None
unit: (instr + $normUnit)
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS LOAD Bandwidth:
avg: None
min: None
max: None
units: Gbps
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS STORE Bandwidth:
avg: None
min: None
max: None
units: Gbps
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS ATOMIC Bandwidth:
avg: None
min: None
@@ -158,14 +159,14 @@ Panel Config:
max: MAX((SQ_LDS_MEM_VIOLATIONS / $denom))
unit: (Accesses + $normUnit)
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS Command FIFO Full Rate:
avg: None
min: None
max: None
unit: (Cycles + $normUnit)
tips:
# TODO: Fix baseline comparision logic to handle non existent metrics, then
# TODO: Fix baseline comparision logic to handle non existent metrics, then
LDS Data FIFO Full Rate:
avg: None
min: None
@@ -34,6 +34,7 @@ Panel Config:
tips:
comparable: false # for now
cli_style: simple_bar
tui_style: simple_bar
- metric_table:
id: 1302
@@ -34,6 +34,7 @@ Panel Config:
tips:
comparable: false # for now
cli_style: simple_bar
tui_style: simple_bar
- metric_table:
id: 1402
@@ -41,6 +41,7 @@ Panel Config:
tips:
comparable: false # for now
cli_style: simple_bar
tui_style: simple_bar
- metric_table:
id: 1602
@@ -96,6 +97,7 @@ Panel Config:
if (TCP_GATE_EN1_sum != 0) else None)
tips:
cli_style: simple_box
tui_style: simple_box
- metric_table:
id: 1603
@@ -119,6 +119,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_box
tui_style: simple_box
- metric_table:
id: 1803
@@ -132,6 +133,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_box
tui_style: simple_box
- metric_table:
id: 1804
@@ -149,6 +151,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_multiple_bar
tui_style: simple_multiple_bar
- metric_table:
id: 1805
@@ -166,6 +169,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_multiple_bar
tui_style: simple_multiple_bar
# - metric_table:
# id: 1806
@@ -204,6 +208,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_box
tui_style: simple_box
- metric_table:
id: 1807
@@ -219,6 +224,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_box
tui_style: simple_box
- metric_table:
id: 1808
@@ -233,6 +239,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_box
tui_style: simple_box
- metric_table:
id: 1809
@@ -250,6 +257,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_multiple_bar
tui_style: simple_multiple_bar
- metric_table:
id: 1810
@@ -269,6 +277,7 @@ Panel Config:
placeholder_range:
"::_1": $total_l2_chan
cli_style: simple_multiple_bar
tui_style: simple_multiple_bar
# - metric_table:
# id: 1811
@@ -296,3 +305,4 @@ Panel Config:
"::_1": $total_l2_chan
# tips: Number of 128-byte read requests sent to EA
cli_style: simple_box
tui_style: simple_box
+140
Просмотреть файл
@@ -0,0 +1,140 @@
##############################################################################bl
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
##############################################################################el
import copy
import sys
from pathlib import Path
from rocprof_compute_analyze.analysis_base import OmniAnalyze_Base
from rocprof_compute_tui.utils.tui_utils import process_panels_to_dataframes
from utils import file_io, parser, schema
from utils.kernel_name_shortener import kernel_name_shortener
from utils.logger import console_error, demarcate
class tui_analysis(OmniAnalyze_Base):
def __init__(self, args, supported_archs, path):
super().__init__(args, supported_archs)
self.path = str(path)
# -----------------------
# Required child methods
# -----------------------
@demarcate
def pre_processing(self):
"""Perform any pre-processing steps prior to analysis."""
# Read profiling config
self._profiling_config = file_io.load_profiling_config(self.path)
# initalize runs
self._runs = self.initalize_runs()
if self.get_args().random_port:
console_error("--gui flag is required to enable --random-port")
# create 'mega dataframe'
self._runs[self.path].raw_pmc = file_io.create_df_pmc(
self.path,
self.get_args().nodes,
self.get_args().spatial_multiplexing,
self.get_args().kernel_verbose,
self.get_args().verbose,
)
if self.get_args().spatial_multiplexing:
self._runs[self.path].raw_pmc = self.spatial_multiplex_merge_counters(
self._runs[self.path].raw_pmc
)
file_io.create_df_kernel_top_stats(
df_in=self._runs[self.path].raw_pmc,
raw_data_dir=self.path,
filter_gpu_ids=self._runs[self.path].filter_gpu_ids,
filter_dispatch_ids=self._runs[self.path].filter_dispatch_ids,
filter_nodes=self._runs[self.path].filter_nodes,
time_unit=self.get_args().time_unit,
max_stat_num=self.get_args().max_stat_num,
kernel_verbose=self.get_args().kernel_verbose,
)
# demangle and overwrite original 'Kernel_Name'
kernel_name_shortener(
self._runs[self.path].raw_pmc, self.get_args().kernel_verbose
)
# create the loaded table
parser.load_table_data(
workload=self._runs[self.path],
dir=self.path,
is_gui=False,
debug=self.get_args().debug,
verbose=self.get_args().verbose,
)
def initalize_runs(self, normalization_filter=None):
# load required configs
sysinfo_path = Path(self.path)
sys_info = file_io.load_sys_info(sysinfo_path.joinpath("sysinfo.csv"))
arch = sys_info.iloc[0]["gpu_arch"]
args = self.get_args()
self.generate_configs(
arch,
args.config_dir,
args.list_stats,
args.filter_metrics,
sys_info.iloc[0],
)
self.load_options(normalization_filter)
w = schema.Workload()
# FIXME:
# For regular single node case, load sysinfo.csv directly
# For multi-node, either the default "all", or specified some,
# pick up the one in the 1st sub_dir. We could fix it properly later.
sysinfo_path = Path(self.path)
w.sys_info = file_io.load_sys_info(sysinfo_path.joinpath("sysinfo.csv"))
arch = w.sys_info.iloc[0]["gpu_arch"]
mspec = self.get_socs()[arch]._mspec
if args.specs_correction:
w.sys_info = parser.correct_sys_info(mspec, args.specs_correction)
w.avail_ips = w.sys_info["ip_blocks"].item().split("|")
w.dfs = copy.deepcopy(self._arch_configs[arch].dfs)
w.dfs_type = self._arch_configs[arch].dfs_type
self._runs[self.path] = w
return self._runs
@demarcate
def run_analysis(self):
"""Run TUI analysis."""
super().run_analysis()
results = process_panels_to_dataframes(
self.get_args(),
self._runs,
self._arch_configs[self._runs[self.path].sys_info.iloc[0]["gpu_arch"]],
self._profiling_config,
)
return results
+315
Просмотреть файл
@@ -0,0 +1,315 @@
/*
* Performance Analysis TUI Stylesheet
* ----------------------------------
*/
/* Base app styling */
Screen {
background: $surface;
color: $text;
}
/* Main layout containers */
#main-container {
layout: grid;
grid-size: 1 2;
/* 1 column, 2 rows */
grid-columns: 1fr;
grid-rows: auto 1fr;
}
#center-container {
layout: grid;
grid-size: 2 1;
/* 2 column, 1 rows */
grid-columns: 8fr 1fr;
}
#activity-container {
layout: grid;
grid-size: 1 2;
/* 1 column, 2 rows */
grid-rows: 4fr 1fr;
}
/* Panel styling */
#left-panel,
#right-panel,
#center-panel,
#bottom-panel {
border: solid $primary;
background: $surface-darken-1;
}
/* Collapsible sections */
Collapsible {
width: 100%;
}
Collapsible>.collapsible--title {
background: $surface-darken-2;
text-style: bold;
}
Collapsible>.collapsible--content {
background: $surface;
}
/* Summary section */
.summary-section {
height: auto;
margin: 0;
}
.header-cell {
text-style: bold;
color: $accent;
padding: 0 1;
height: 1;
margin: 0;
}
.data-cell {
color: $text-muted;
padding: 0 1;
height: 1;
margin: 0;
}
.row {
width: 1fr;
height: 1;
margin: 0;
}
/* DataTables */
DataTable {
height: auto;
}
DataTable>.datatable--header {
background: $surface-darken-2;
text-style: bold;
}
DataTable>.datatable--row {
background: $surface;
}
DataTable>.datatable--row-odd {
background: $surface-darken-1;
}
/* TabbedContent styling */
#bottom-panel {
min-height: 8;
}
/* Visualizations */
.roofline-plot {
padding: 1;
width: auto;
height: auto;
background: $surface;
color: $text;
}
.mem-chart {
border: solid $accent;
padding: 0;
width: auto;
height: auto;
overflow: auto auto;
background: $surface;
color: $text;
}
/* Debug view styling */
.debug-view {
border: solid $error;
padding: 1;
width: 100%;
}
/* Status classes */
.error {
color: $error;
text-style: bold;
}
.warning {
color: $warning;
}
.success {
color: $success;
}
/* Splitter */
#splitter {
height: 1;
background: $primary-background-lighten-1;
}
/* Placeholder text */
.placeholder {
color: $text-muted;
text-align: center;
padding: 2;
}
/* Section headers */
.section-header {
background: $primary-background;
color: $text;
text-style: bold;
border-bottom: solid $primary;
}
.section {
border: round $accent 40%;
border-title-color: $text-accent 50%;
border-title-align: right;
&:focus-within {
border: round $accent 100%;
border-title-color: $foreground;
border-title-style: b;
}
}
RocprofTUIApp {
& TabbedContent:focus-within Tabs {
&:focus {
& .-active {
text-style: $block-cursor-text-style;
color: $block-cursor-foreground;
background: $block-cursor-background;
}
}
&:blur Tab:enabled {
&.-active {
background: $panel;
}
}
}
}
Button {
background: $surface;
color: $text;
min-width: 10;
height: 1;
margin: 0;
padding: 0 1;
border: none;
width: auto;
}
Button:hover {
background: $primary-background-lighten-1;
color: $text-accent;
}
Button.selected {
background: $primary-background;
color: $text-accent;
text-style: bold;
}
MenuBar {
width: 100%;
height: auto;
background: $surface;
grid-rows: 1;
}
#menu-buttons {
width: 100%;
height: auto;
background: $surface;
}
MenuButton {
background: $surface;
color: $text;
min-width: 10;
height: 1;
border: none;
width: auto;
}
MenuButton:hover {
background: $primary-background-lighten-1;
color: $text-accent;
}
MenuButton.selected {
background: $primary-background;
color: $text-accent;
}
#dropdown-container {
width: 100%;
height: auto;
}
DropdownMenu {
background: $accent;
width: auto;
min-width: 20;
padding-top: 1;
height: auto;
}
#file-dropdown {
margin-left: 0;
}
.menu-item {
background: $accent;
}
.menu-item:hover {
background: $primary-background-lighten-1;
color: $text-accent;
}
.hidden {
display: none;
}
Terimnal {
layout: vertical;
width: 100%;
height: 100%;
min-height: 10;
background: $surface;
}
#term-output-scroll {
width: 100%;
height: 2fr;
min-height: 10;
max-height: 75%;
background: $surface;
overflow-y: scroll;
}
#term-output {
width: 100%;
background: $surface;
color: $text;
}
#terminal-input {
padding: 0;
&:focus-within {
border: round $accent 100%;
border-title-color: $foreground;
border-title-style: b;
}
}
+15
Просмотреть файл
@@ -0,0 +1,15 @@
"""
Configuration Module
-------------------
Central configuration for the application.
"""
# Application settings
APP_TITLE = "ROCm Compute Profiler TUI"
VERSION = "3.1.0"
# Widget configurations
DEFAULT_COLLAPSIBLE_STATE = True # True = collapsed by default
# File paths
DEFAULT_START_PATH = None # None uses cwd
+156
Просмотреть файл
@@ -0,0 +1,156 @@
"""
ROCm Compute Profiler TUI - Main Application with Analysis Methods
----------------------------------------------------------------
"""
import importlib
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from textual import on, work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.widgets import Button, Footer, Header
from textual_fspicker import SelectDirectory
from rocprof_compute_tui.config import APP_TITLE, VERSION
from rocprof_compute_tui.views.main_view import MainView
from rocprof_compute_tui.widgets.menu_bar.menu_bar import DropdownMenu
from utils.specs import MachineSpecs, generate_machine_specs
class RocprofTUIApp(App):
"""Main application for the performance analysis tool."""
TITLE = f"{APP_TITLE} v{VERSION}"
SUB_TITLE = "Workload Analysis Tool"
CSS_PATH = "assets/style.css"
BINDINGS = [
Binding(key="q", action="quit", description="Quit"),
Binding(key="r", action="refresh", description="Refresh"),
Binding(key="a", action="analyze", description="Analyze"),
]
def __init__(
self, args: Optional[Any] = None, supported_archs: Optional[Dict] = None
) -> None:
"""
Initialize the application.
"""
super().__init__()
self.main_view = MainView()
self.recent_file = Path.home() / ".textual_browser_recent.json"
self.recent_dirs: List[str] = []
self.current_path = ""
self.load_recent_directories()
# Initialize analysis-related attributes
self.args = args
self.supported_archs = supported_archs or {}
self.soc: Dict = {}
self.mspec: Optional[MachineSpecs] = None
def compose(self) -> ComposeResult:
"""Compose the application layout."""
yield Header()
yield self.main_view
yield Footer()
def action_refresh(self) -> None:
"""Refresh the view."""
try:
self.main_view.refresh_view()
except Exception as e:
self.notify(f"Refresh failed: {str(e)}", severity="error")
def load_soc_specs(self, sysinfo: dict = None) -> None:
"""
Load OmniSoC instance for analysis.
"""
self.mspec = generate_machine_specs(self.args, sysinfo)
if self.args and self.args.specs:
print(self.mspec)
return
arch = self.mspec.gpu_arch
# Dynamically import and instantiate the SoC class
soc_module = importlib.import_module("rocprof_compute_soc.soc_" + arch)
soc_class = getattr(soc_module, arch + "_soc")
self.soc[arch] = soc_class(self.args, self.mspec)
def get_soc(self) -> Dict:
"""Get the SoC dictionary."""
return self.soc
def get_mspec(self) -> Optional[MachineSpecs]:
"""Get the machine specifications."""
return self.mspec
def load_recent_directories(self) -> None:
"""Load recent directories from file."""
try:
if self.recent_file.exists():
with open(self.recent_file, "r") as f:
self.recent_dirs = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
self.recent_dirs = []
def save_recent_directories(self) -> None:
"""Save recent directories to file."""
try:
with open(self.recent_file, "w") as f:
json.dump(self.recent_dirs, f, indent=2)
except Exception as e:
self.notify(f"Failed to save recent directories: {e}", severity="error")
def add_to_recent(self, directory: str) -> None:
"""Add directory to recent list (FIFO, max 5 items)."""
directory = os.path.abspath(directory)
# Remove if already exists
if directory in self.recent_dirs:
self.recent_dirs.remove(directory)
# Add to front
# TODO: should we check to if workload dir can be successfully loaded?
self.recent_dirs.insert(0, directory)
# Keep only last 5
self.recent_dirs = self.recent_dirs[:5]
# Save to file
self.save_recent_directories()
def on_recent_selected(self, selected_dir: str) -> None:
if selected_dir:
self.main_view.selected_path = selected_dir
self.main_view.run_analysis()
@on(Button.Pressed, "#menu-open-workload")
@work
async def pick_a_directory(self) -> None:
if opened := await self.push_screen_wait(SelectDirectory()):
self.add_to_recent(str(opened))
self.main_view.selected_path = opened
dropdown = self.query_one(f"#file-dropdown", DropdownMenu)
dropdown.add_class("hidden")
self.main_view.run_analysis()
def run_tui(args: Optional[Any] = None, supported_archs: Optional[list] = None) -> None:
"""
Run the TUI application.
"""
try:
app = RocprofTUIApp(args, supported_archs)
app.run()
except KeyboardInterrupt:
pass
except Exception as e:
raise RuntimeError(f"Failed to run TUI application: {str(e)}") from e
+49
Просмотреть файл
@@ -0,0 +1,49 @@
sections:
- title: "📊 Summaries"
collapsed: true
class: "summary-section"
subsections:
- title: "Top Kernels"
data_path: ["0. Top Stats", "0.1 Top Kernels"]
collapsed: true
header_label: "Top Kernels by Duration (ns):"
header_class: "section-header"
- title: "Dispatch List"
data_path: ["0. Top Stats", "0.2 Dispatch List"]
collapsed: true
- title: "System Info"
data_path: ["1. System Info", "1.1"]
collapsed: true
- title: "⚡ High Level Analysis"
collapsed: true
class: "sysinfo-section"
subsections:
- title: "System Speed-of-Light"
data_path: ["2. System Speed-of-Light", "2.1 Speed-of-Light"]
collapsed: true
- title: "Roofline"
collapsed: true
tui_style: "roofline"
widget_id: "roofline-plot"
- title: "Memory Chart"
data_path: ["3. Memory Chart", "3.1 Memory Chart"]
collapsed: true
tui_style: "mem_chart"
- title: "🔍 Detailed Block Analysis"
collapsed: true
class: "kernels-section"
dynamic_sections: true
skip_sections:
- "0. Top Stats"
- "1. System Info"
- "2. System Speed-of-Light"
- "3. Memory Chart"
- title: "🚧 Source Level Analysis/PC Sampling"
collapsed: true
class: "source-section"
under_construction: true
construction_label: "🚧 Under Construction"
construction_class: "section-header"
+587
Просмотреть файл
@@ -0,0 +1,587 @@
import copy
import logging
import os
import re
import sys
from collections import defaultdict
from datetime import datetime
from enum import Enum
from pathlib import Path
import pandas as pd
HIDDEN_SECTIONS = [1900, 2000]
HIDDEN_COLUMNS = ["Tips", "coll_level"]
supported_field = [
"Value",
"Minimum",
"Maximum",
"Average",
"Median",
"Min",
"Max",
"Avg",
"Pct of Peak",
"Peak",
"Count",
"Mean",
"Pct",
"Std Dev",
"Q1",
"Q3",
"Expression",
# Special keywords for L2 channel
"Channel",
"L2 Cache Hit Rate",
"Requests",
"L2 Read",
"L2 Write",
"L2 Atomic",
"L2-Fabric Requests",
"L2-Fabric Read",
"L2-Fabric Write and Atomic",
"L2-Fabric Atomic",
"L2 Read Req",
"L2 Write Req",
"L2 Atomic Req",
"L2-Fabric Read Req",
"L2-Fabric Write and Atomic Req",
"L2-Fabric Atomic Req",
"L2-Fabric Read Latency",
"L2-Fabric Write Latency",
"L2-Fabric Atomic Latency",
"L2-Fabric Read Stall (PCIe)",
"L2-Fabric Read Stall (Infinity Fabric™)",
"L2-Fabric Read Stall (HBM)",
"L2-Fabric Write Stall (PCIe)",
"L2-Fabric Write Stall (Infinity Fabric™)",
"L2-Fabric Write Stall (HBM)",
"L2-Fabric Write Starve",
]
class LogLevel(str, Enum):
"""Log levels for consistent logging."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
SUCCESS = "success" # Maintained for UI compatibility
class Logger:
"""Centralized logging handler for the application."""
def __init__(self, output_area=None):
"""
Initialize the logger.
"""
self.output_area = output_area
self._setup_logger()
def _setup_logger(self):
"""
Setup the Python logger with proper formatting.
"""
self.logger = logging.getLogger("app")
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def set_output_area(self, output_area):
"""
Set or update the output area for displaying logs.
"""
self.output_area = output_area
def log(self, message, level=LogLevel.INFO, update_ui=True):
"""
Log a message with the specified level.
"""
level_map = {
LogLevel.INFO: logging.INFO,
LogLevel.SUCCESS: logging.INFO,
LogLevel.WARNING: logging.WARNING,
LogLevel.ERROR: logging.ERROR,
}
self.logger.log(level_map[level], message)
timestamp = datetime.now().strftime("%H:%M:%S")
if update_ui and self.output_area:
if level == LogLevel.ERROR:
formatted_msg = f"[{timestamp}] [ERROR] {message}"
elif level == LogLevel.WARNING:
formatted_msg = f"[{timestamp}] [WARNING] {message}"
elif level == LogLevel.SUCCESS:
formatted_msg = f"[{timestamp}] [SUCCESS] {message}"
else:
formatted_msg = f"[{timestamp}] [INFO] {message}"
if hasattr(self.output_area, "text"):
current_text = self.output_area.text
self.output_area.text = (
f"{current_text}\n{formatted_msg}" if current_text else formatted_msg
)
# HACK: moving curson to end of outpu (Is there a better way to achieve this?)
self.output_area.cursor_location = (999999, 0)
def info(self, message, update_ui=True):
self.log(message, LogLevel.INFO, update_ui)
def success(self, message, update_ui=True):
self.log(message, LogLevel.SUCCESS, update_ui)
def warning(self, message, update_ui=True):
self.log(message, LogLevel.WARNING, update_ui)
def error(self, message, update_ui=True):
self.log(message, LogLevel.ERROR, update_ui)
def split_table_line(line):
"""
Splits a table row line into a list of cell strings (trimmed). For example:
│ │ Kernel_Name │ Count │ ...
"""
cells = line.split("")
if cells and cells[0] == "":
cells = cells[1:]
if cells and cells[-1] == "":
cells = cells[:-1]
return [cell.strip() for cell in cells]
def parse_ascii_table(table_lines):
"""
Given a list of lines belonging to one ASCII table (including border rows),
return a tuple (header, data_rows) where header is a list of column names and
data_rows is a list of rows (each a list of cell strings).
Skips border/separator lines and also checks for continuation
rows (which have an empty first cell). Continuation rows get merged into the previous row.
"""
header = None
data_rows = []
for line in table_lines:
if re.match(r"^[╒╞╘├└─]+", line):
continue
if "" not in line:
continue
cells = split_table_line(line)
if header is None:
header = cells
continue
if cells and cells[0] == "":
if data_rows: # There should be at least one row already.
for i, cell in enumerate(cells):
if cell:
data_rows[-1][i] += " " + cell
else:
continue
else:
data_rows.append(cells)
return header, data_rows
def parse_file(filename):
"""
Returns nested structure:
{
"0. Top Stats": {
"0.1 Top Kernels": {header: [...], data: [...]},
"0.2 Dispatch List": {header: [...], data: [...]}
},
"1. System Info": {
"1.1 System Information": {header: [...], data: [...]}
},
...
}
"""
with open(filename, "r", encoding="utf-8") as f:
lines = f.readlines()
sections = {}
current_section = None
current_subsection = None
table_lines = []
in_table = False
for line in lines:
line = line.rstrip("\n")
# Skip separator lines
if line.startswith(
"--------------------------------------------------------------------------------"
):
continue
# Check for section header (e.g., "0. Top Stats")
section_match = re.match(r"^\s*(\d+\. .+)$", line)
if section_match:
current_section = section_match.group(1).strip()
sections[current_section] = {}
continue
# Check for subsection header (e.g., "0.1 Top Kernels")
# FIXME: 1. System Info is an exception, no subsection
subsection_match = re.match(r"^\s*(\d+\.\d+ .+)$", line)
if subsection_match:
current_subsection = subsection_match.group(1).strip()
if current_section is None:
current_section = "Uncategorized"
sections[current_section] = {}
continue
# Table parsing logic
if line.startswith(""):
in_table = True
table_lines = [line]
continue
if in_table:
table_lines.append(line)
if line.startswith(""):
if current_section and current_subsection:
header, data = parse_ascii_table(table_lines)
sections[current_section][current_subsection] = {
"header": header,
"data": data,
}
in_table = False
table_lines = []
return sections
def get_table_dfs():
filename = str(Path(os.getcwd()).joinpath("analyze_output.csv"))
sections_info = parse_file(filename)
# Convert to DataFrames while maintaining nested structure
section_dfs = {}
for section_name, subsections in sections_info.items():
section_dfs[section_name] = {}
for subsection_name, table_data in subsections.items():
if table_data and table_data["data"]:
try:
df = pd.DataFrame(table_data["data"], columns=table_data["header"])
section_dfs[section_name][subsection_name] = df
except Exception as e:
print(f"Error creating DataFrame for {subsection_name}: {e}")
continue
return section_dfs
def process_panels_to_dataframes(args, runs, archConfigs, profiling_config):
"""
Process panel data into pandas DataFrames.
Returns a nested dictionary structure with DataFrames and tui_style information.
Returns:
Dict[str, Dict[str, Dict[str, Any]]]: Nested structure {
"section_name": {
"subsection_name": {
"df": DataFrame,
"tui_style": dict or None
}
}
}
"""
comparable_columns = build_comparable_columns(args.time_unit)
filter_panel_ids = [
convert_metric_id_to_panel_idx(section)
for section in [
name
for name, type in profiling_config.get("filter_blocks", {}).items()
if type == "metric_id"
]
]
# Initialize the result structure
result_structure = defaultdict(dict)
for panel_id, panel in archConfigs.panel_configs.items():
# Skip panels that don't support baseline comparison
if panel_id in HIDDEN_SECTIONS:
continue
# Get section name (e.g., "0. Top Stats")
section_name = f"{panel_id // 100}. {panel['title']}"
for data_source in panel["data source"]:
for type, table_config in data_source.items():
# Check for filtering conditions
if (
not args.filter_metrics
and filter_panel_ids
and table_config["id"] not in filter_panel_ids
and panel_id not in filter_panel_ids
and panel_id > 100
):
table_id_str = (
str(table_config["id"] // 100)
+ "."
+ str(table_config["id"] % 100)
)
continue
# Process the data
base_run, base_data = next(iter(runs.items()))
base_df = base_data.dfs[table_config["id"]]
df = pd.DataFrame(index=base_df.index)
# Process columns
for header in list(base_df.keys()):
if should_process_column(header, args, type):
if header in HIDDEN_COLUMNS:
pass
elif header not in comparable_columns:
df = process_non_comparable_column(
df, header, base_df, type, table_config, runs
)
else:
df = process_comparable_column(
df,
header,
base_df,
table_config,
runs,
base_run,
type,
args,
HIDDEN_COLUMNS,
)
if not df.empty:
# Check for empty columns
is_empty_columns_exist = check_empty_columns(df)
if not is_empty_columns_exist:
# Get subsection name
table_id_str = (
str(table_config["id"] // 100)
+ "."
+ str(table_config["id"] % 100)
)
subsection_name = table_id_str
if "title" in table_config and table_config["title"]:
subsection_name += " " + table_config["title"]
# Handle special cases for top stats
if type == "raw_csv_table" and (
table_config["source"] == "pmc_kernel_top.csv"
or table_config["source"] == "pmc_dispatch_info.csv"
):
df = df.head(args.max_stat_num)
# Check for transpose requirement
transpose = (
type != "raw_csv_table"
and "columnwise" in table_config
and table_config.get("columnwise") == True
)
if transpose:
df = df.T
# Store the DataFrame with tui_style as separate keys
result_structure[section_name][subsection_name] = {
"df": df,
"tui_style": None,
}
# Set tui_style if available
if type == "metric_table" and "tui_style" in table_config:
result_structure[section_name][subsection_name][
"tui_style"
] = table_config["tui_style"]
# Save to CSV if requested
if args.df_file_dir:
save_dataframe_to_csv(df, table_id_str, table_config, args)
return dict(result_structure)
def should_process_column(header, args, type):
"""Check if a column should be processed based on arguments."""
return (
(not args.cols)
or (
args.cols and header in args.cols
) # Assuming args.cols is now a list of column names
or (type == "raw_csv_table")
)
def process_non_comparable_column(df, header, base_df, type, table_config, runs):
"""Process columns that are not comparable across runs."""
if (
type == "raw_csv_table"
and (
table_config["source"] == "pmc_kernel_top.csv"
or table_config["source"] == "pmc_dispatch_info.csv"
)
and header == "Kernel_Name"
):
# Adjust kernel name width based on source
if table_config["source"] == "pmc_kernel_top.csv":
adjusted_name = base_df["Kernel_Name"].apply(
lambda x: string_multiple_lines(x, 40, 3)
)
else:
adjusted_name = base_df["Kernel_Name"].apply(
lambda x: string_multiple_lines(x, 80, 4)
)
df = pd.concat([df, adjusted_name], axis=1)
elif type == "raw_csv_table" and header == "Info":
for run, data in runs.items():
cur_df = data.dfs[table_config["id"]]
df = pd.concat([df, cur_df[header]], axis=1)
else:
df = pd.concat([df, base_df[header]], axis=1)
return df
def process_comparable_column(
df, header, base_df, table_config, runs, base_run, type, args, hidden_columns
):
"""Process columns that can be compared across runs."""
for run, data in runs.items():
cur_df = data.dfs[table_config["id"]]
if (type == "raw_csv_table") or (
type == "metric_table" and (header not in hidden_columns)
):
if run != base_run:
# Calculate percentage over the baseline
base_values = [float(x) if x != "" else float(0) for x in base_df[header]]
cur_values = [float(x) if x != "" else float(0) for x in cur_df[header]]
base_df[header] = base_values
cur_df[header] = cur_values
t_df = pd.concat(
[base_df[header], cur_df[header]],
axis=1,
)
absolute_diff = (t_df.iloc[:, 1] - t_df.iloc[:, 0]).round(args.decimal)
t_df = absolute_diff / t_df.iloc[:, 0].replace(0, 1)
t_df_pretty = t_df.astype(float).mul(100).round(args.decimal)
# Show value + percentage
t_df = (
cur_df[header].astype(float).round(args.decimal).map(str).astype(str)
+ " ("
+ t_df_pretty.map(str)
+ "%)"
)
df = pd.concat([df, t_df], axis=1)
# Check for threshold violations
if (
header in ["Value", "Count", "Avg"]
and t_df_pretty.abs().gt(args.report_diff).any()
):
df["Abs Diff"] = absolute_diff
if args.report_diff:
violation_idx = t_df_pretty.index[
t_df_pretty.abs() > args.report_diff
]
else:
cur_df_copy = copy.deepcopy(cur_df)
cur_df_copy[header] = [
(round(float(x), args.decimal) if x != "" else x)
for x in base_df[header]
]
df = pd.concat([df, cur_df_copy[header]], axis=1)
return df
def check_empty_columns(df):
"""Check if any column in the DataFrame is empty."""
return any(
[
df.columns[col_idx]
for col_idx in range(len(df.columns))
if df.replace("", None).iloc[:, col_idx].isnull().all()
]
)
def save_dataframe_to_csv(df, table_id_str, table_config, args):
"""Save DataFrame to CSV file if directory is specified."""
p = Path(args.df_file_dir)
if not p.exists():
p.mkdir()
if p.is_dir():
filename = table_id_str
if "title" in table_config and table_config["title"]:
filename += "_" + table_config["title"]
df.to_csv(
p.joinpath(filename.replace(" ", "_") + ".csv"),
index=False,
)
def string_multiple_lines(source, width, max_rows):
"""
Adjust string with multiple lines by inserting '\n'
"""
idx = 0
lines = []
while idx < len(source) and len(lines) < max_rows:
lines.append(source[idx : idx + width])
idx += width
if idx < len(source):
last = lines[-1]
lines[-1] = last[0:-3] + "..."
return "\n".join(lines)
def convert_metric_id_to_panel_idx(metric_id):
# "4.02" -> 402
# "4.23" -> 423
# "4" -> 400
tokens = metric_id.split(".")
if len(tokens) == 1:
return int(tokens[0]) * 100
elif len(tokens) == 2:
return int(tokens[0]) * 100 + int(tokens[1])
else:
raise Exception(f"Invalid metric id: {metric_id}")
def build_comparable_columns(time_unit):
"""
Build comparable columns/headers for display
"""
comparable_columns = supported_field
top_stat_base = ["Count", "Sum", "Mean", "Median", "Standard Deviation"]
for h in top_stat_base:
comparable_columns.append(h + "(" + time_unit + ")")
return comparable_columns
+268
Просмотреть файл
@@ -0,0 +1,268 @@
"""
Main View Module
---------------
Contains the main view layout and organization for the application.
"""
from pathlib import Path
from textual import on, work
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.reactive import reactive
from textual.widgets import DataTable
from rocprof_compute_tui.analysis_tui import tui_analysis
from rocprof_compute_tui.config import DEFAULT_START_PATH
from rocprof_compute_tui.utils.tui_utils import Logger, LogLevel
from rocprof_compute_tui.widgets.center_panel.center_area import CenterPanel
from rocprof_compute_tui.widgets.menu_bar.menu_bar import MenuBar
from rocprof_compute_tui.widgets.right_panel.right import RightPanel
from rocprof_compute_tui.widgets.tabs.tabs_area import TabsArea
from utils import file_io
class MainView(Horizontal):
"""Main view layout for the application."""
selected_path = reactive(None)
dfs = reactive({})
def __init__(self):
"""Initialize the main view."""
super().__init__(id="main-container")
self.start_path = (
# NOTE: is cwd the best choice?
Path.cwd()
if DEFAULT_START_PATH is None
else Path(DEFAULT_START_PATH)
)
self.logger = Logger()
self.logger.info("MainView initialized", update_ui=False)
def flush(self):
"""Required for stdout compatibility."""
pass
def compose(self) -> ComposeResult:
"""Compose the main view layout."""
self.logger.info("Composing main view layout", update_ui=False)
yield MenuBar()
# Center Container - Holds both analysis results and output tabs
with Horizontal(id="center-container"):
with Vertical(id="activity-container"):
# Center Panel - Analysis results display
center_panel = CenterPanel()
yield center_panel
self.center = center_panel
# Bottom Panel - Output, terminal, and metric description
tabs = TabsArea()
yield tabs
# Store references to text areas
self.metric_description = tabs.description_area
self.output = tabs.output_area
# Now set the output area for the logger
self.logger.set_output_area(self.output)
self.logger.info("Main view layout composed")
# Right Panel - Additional tools/features
yield RightPanel()
@on(DataTable.CellSelected)
def on_data_table_cell_selected(self, event: DataTable.CellSelected) -> None:
table = event.data_table
row_idx = event.coordinate.row
self.logger.info(f"Cell selected at row {row_idx}")
try:
row_data = table.get_row_at(row_idx)
content = f"Selected Row {row_idx}:\n"
content += "\n".join(f"{val}" for val in row_data)
self.metric_description.text = content
self.logger.info(f"Row {row_idx} data displayed in metric_description")
except Exception as e:
error_msg = f"Error displaying row {row_idx}: {str(e)}"
table.add_column("Error")
table.add_row(str(e))
self.metric_description.text = error_msg
self.logger.error(error_msg)
@work(thread=True)
def run_analysis(self) -> None:
if not self.selected_path:
error_msg = "No directory selected for analysis"
self._update_view(error_msg, LogLevel.ERROR)
self.logger.error(error_msg)
return
try:
self.logger.info(f"Starting analysis on: {self.selected_path}")
self._update_view(
f"Running analysis on: {self.selected_path}", LogLevel.SUCCESS
)
# Step 1: Create analyzer
try:
self.logger.info("Step 1: Creating analyzer")
self.logger.info(f"Step 1: args {self.app.args}")
self.logger.info(f"Step 1: arch {self.app.supported_archs}")
self.logger.info("Step 1: Creating analyzer")
analyzer = tui_analysis(
self.app.args, self.app.supported_archs, self.selected_path
)
self.logger.info("Step 1: Analyzer created successfully")
except Exception as e:
self.logger.error(f"Step 1 failed - Error creating analyzer: {str(e)}")
raise
# Step 2: Sanitize analyzer
try:
self.logger.info("Step 2: Sanitizing analyzer")
analyzer.sanitize()
self.logger.info("Step 2: Analyzer sanitized successfully")
except Exception as e:
self.logger.error(f"Step 2 failed - Error sanitizing analyzer: {str(e)}")
raise
# Step 3: Load sys_info
try:
self.logger.info("Step 3: Loading sys_info")
sysinfo_path = Path(self.selected_path).joinpath("sysinfo.csv")
self.logger.info(f"Step 3: sysinfo_path = {sysinfo_path}")
if not sysinfo_path.exists():
raise FileNotFoundError(f"sysinfo.csv not found at {sysinfo_path}")
sys_info_df = file_io.load_sys_info(sysinfo_path)
self.logger.info(f"Step 3: sys_info_df type = {type(sys_info_df)}")
self.logger.info(
f"Step 3: sys_info_df shape = {sys_info_df.shape if hasattr(sys_info_df, 'shape') else 'No shape attribute'}"
)
self.logger.info(f"Step 3: sys_info_df = {sys_info_df}")
except Exception as e:
self.logger.error(f"Step 3 failed - Error loading sys_info: {str(e)}")
raise
# Step 4: Convert sys_info to dict
try:
self.logger.info("Step 4: Converting sys_info to dict")
# Check if it's actually a DataFrame
if hasattr(sys_info_df, "iloc"):
sys_info = sys_info_df.iloc[0].to_dict()
elif hasattr(sys_info_df, "to_dict"):
# If it's already a Series
sys_info = sys_info_df.to_dict()
elif isinstance(sys_info_df, dict):
# If it's already a dict
sys_info = sys_info_df
else:
raise TypeError(f"Unexpected type for sys_info: {type(sys_info_df)}")
self.logger.info(f"Step 4: sys_info converted = {sys_info}")
self.logger.info(f"Step 4: sys_info type = {type(sys_info)}")
except Exception as e:
self.logger.error(f"Step 4 failed - Error converting sys_info: {str(e)}")
raise
# Step 5: Load SoC specs
try:
self.logger.info("Step 5: Loading SoC specs")
self.app.load_soc_specs(sys_info)
self.logger.info(f"Step 5: SoC loaded = {self.app.soc}")
except Exception as e:
self.logger.error(f"Step 5 failed - Error loading SoC specs: {str(e)}")
raise
# Step 6: Set SoC in analyzer
try:
self.logger.info("Step 6: Setting SoC in analyzer")
analyzer.set_soc(self.app.soc)
self.logger.info("Step 6: SoC set successfully")
except Exception as e:
self.logger.error(f"Step 6 failed - Error setting SoC: {str(e)}")
raise
# Step 7: Pre-processing
try:
self.logger.info("Step 7: Running pre-processing")
analyzer.pre_processing()
self.logger.info("Step 7: Pre-processing completed")
except Exception as e:
self.logger.error(f"Step 7 failed - Error in pre-processing: {str(e)}")
raise
# Step 8: Run analysis
try:
self.logger.info("Step 8: Running analysis")
self.dfs = analyzer.run_analysis()
if not self.dfs:
warning_msg = "Step 8: Analysis completed but no data was returned"
self._update_view(warning_msg, LogLevel.WARNING)
self.logger.warning(warning_msg)
else:
self.app.call_from_thread(self.refresh_results)
self.logger.info("Step 8: Analysis completed successfully")
except Exception as e:
self.logger.error(f"Step 8 failed - Error running analysis: {str(e)}")
raise
except Exception as e:
import traceback
error_msg = f"Unexpected error during analysis: {str(e)}"
self.logger.error(error_msg)
self.logger.error(f"Full traceback:\n{traceback.format_exc()}")
self._update_view(error_msg, LogLevel.ERROR)
def _update_view(self, message: str, log_level: LogLevel) -> None:
try:
# Use call_from_thread to safely update UI from background thread
self.app.call_from_thread(self._safe_update_view, message, log_level)
except Exception as e:
# Capture errors that might occur when scheduling the UI update
self.logger.error(f"View update scheduling error: {str(e)}")
def _safe_update_view(self, message: str, log_level: LogLevel) -> None:
try:
analyze_view = self.query_one("#analyze-view")
if analyze_view:
analyze_view.update_view(message, log_level)
else:
self.logger.warning("Analysis view not found when updating log")
except Exception as e:
self.logger.error(f"Log update error: {str(e)}")
def refresh_results(self) -> None:
try:
self.logger.info("Refreshing analysis results")
analyze_view = self.query_one("#analyze-view")
if not analyze_view:
self.logger.error("Analysis view not found")
return
if not hasattr(self, "dfs") or self.dfs is None:
self.logger.error("No analysis data available to display")
return
analyze_view.update_results(self.dfs)
self.logger.success(f"Results displayed successfully.")
except Exception as e:
self.logger.error(f"Error refreshing results: {str(e)}")
def refresh_view(self) -> None:
self.logger.info("Refreshing view...")
if self.dfs:
self.refresh_results()
else:
self.logger.warning("No data available for refresh")
+69
Просмотреть файл
@@ -0,0 +1,69 @@
"""
Panel Widget Modules
-------------------
Contains the panel widgets used in the main layout.
"""
from typing import Any, Dict
from textual.containers import ScrollableContainer
from textual.widgets import Label
from rocprof_compute_tui.widgets.collapsibles import build_all_sections
class AnalyzeView(ScrollableContainer):
"""Center panel with analysis results."""
def __init__(
self, config_path: str = "src/rocprof_compute_tui/utils/analyze_config.yaml"
):
super().__init__(id="analyze-view")
self.dfs = {}
self.config_path = config_path
def compose(self):
"""
Compose the initial center panel state.
"""
yield Label(
"Open a workload directory to run analysis and view results",
classes="placeholder",
)
def update_results(self, dfs: Dict[str, Any]) -> None:
"""
Update the center panel with analysis results.
"""
self.dfs = dfs
self.remove_children()
try:
sections = build_all_sections(self.dfs, self.config_path)
# Mount all sections
for section in sections:
self.mount(section)
except Exception as e:
self.mount(Label(f"Error displaying results: {str(e)}", classes="error"))
def update_view(self, message: str, log_level: str) -> None:
"""
Update the view with a status message.
"""
self.remove_children()
try:
self.mount(Label(f"{message}", classes=log_level))
except Exception as e:
self.mount(Label(f"Error displaying results: {str(e)}", classes="error"))
def reload_config(self, config_path: str = None) -> None:
"""
Reload the configuration and update the view.
"""
if config_path:
self.config_path = config_path
if self.dfs:
self.update_results(self.dfs)
+39
Просмотреть файл
@@ -0,0 +1,39 @@
"""
Panel Widget Modules
-------------------
Contains the panel widgets used in the main layout.
"""
from textual.containers import Vertical
from textual.widgets import Label, TabPane
from rocprof_compute_tui.widgets.center_panel.analyze_view import AnalyzeView
from rocprof_compute_tui.widgets.tabbed_content import TabsTabbedContent
class CenterPanel(Vertical):
"""
The response area.
"""
COMPONENT_CLASSES = {
"border-title-status",
}
def __init__(self):
super().__init__()
self.default_tab = "center-analyze"
self.analyze_view = AnalyzeView()
def compose(self):
with TabsTabbedContent(initial="tab-analyze"):
with TabPane("Analyze Results", id="tab-analyze"):
yield self.analyze_view
# TODO:
# with TabPane("placeholder (🚧)", id="tab-1"):
# yield Label("🚧 Under Construction")
def on_mount(self) -> None:
self.border_title = "CENTER TABS"
self.add_class("section")
+523
Просмотреть файл
@@ -0,0 +1,523 @@
##############################################################################bl
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
##############################################################################el
from __future__ import annotations
import math
import sys
import traceback
from io import StringIO
import pandas as pd
import plotext as plt
import plotly.express as px
from textual.widgets import Static
from textual_plotext import PlotextPlot
from utils.mem_chart import plot_mem_chart
def simple_bar(df, title=None):
"""
Plot data with simple bar chart
"""
# TODO: handle None properly
if "Metric" in df.columns and "Avg" in df.columns:
metric_dict = (
pd.DataFrame([df["Metric"], df["Avg"]])
.replace("", 0)
.replace(float("inf"), -1) # It should not happen
.replace(float("-inf"), -1)
.transpose()
.set_index("Metric")
.to_dict()["Avg"]
)
else:
raise NameError(f"simple_bar: No Metric or Avg in df columns: {str(df.columns)}")
plt.clear_figure()
# adjust plot size along x axis based on the max value
w = max(list(metric_dict.values())) - 40
if w < 20 and w > 1:
w *= 3
elif w < 1:
w *= 100
plt.simple_bar(list(metric_dict.keys()), list(metric_dict.values()), width=w)
# plt.show()
return "\n" + plt.build() + "\n"
def simple_multiple_bar(df, title=None):
"""
Plot data with simple multiple bar chart
"""
# TODO: handle Nan and None properly
plt.clear_figure()
t_df = (
df.fillna(0).replace("", 0).replace(float("inf"), -1).replace(float("-inf"), -1)
)
sub_labels = t_df.transpose().to_dict("split")["index"]
sub_labels.pop(0)
data = t_df.transpose().to_dict("split")["data"]
labels = data.pop(0)
# plt.simple_multiple_bar(labels, data, labels = sub_labels) #, width=w)
# print(data)
plt.theme("pro")
# adjust plot size along y axis based on the max value
h = max(max(y) for y in data)
# print(h)
if h < 20 and h > 0.5:
h *= 10
elif h < 0.5 or math.isclose(h, 0.5):
h *= 300
plt.plot_size(height=h)
plt.multiple_bar(labels, data, color=["blue", "blue+", 68, 63])
# plt.show()
return "\n" + plt.build() + "\n"
def simple_box(df, orientation="v", title=None):
"""
Plot data with simple box/whisker chart.
Accept pre-calculated data only for now.
"""
# Example:
# labels = ["apple", "bee", "cat", "dog"]
# datas = [
# # max, q3, q2, q1, min
# [10, 7, 5, 3, 1.5],
# [19, 12.3, 9, 7, 4],
# [15, 14, 11, 9, 8],
# [13, 12, 11, 10, 6]]
# plt.box(labels, datas, width=0.1, hint='hint')
# plt.theme("pro")
# plt.title("Most Favored Pizzas in the World")
# plt.show()
plt.clear_figure()
labels = []
data = []
# TODO:
# handle Nan and None properly
# error checking for labels
# show unit if provided
labels_length = 0
t_df = (
df.fillna(0).replace("", 0).replace(float("inf"), -1).replace(float("-inf"), -1)
)
for index, row in t_df.iterrows():
column_name = row.get("Metric") or row.get("Channel")
if column_name is None:
raise KeyError("Neither 'Metric' nor 'Channel' column found")
labels.append(column_name)
# TODO: need better fix for horizontal overflow
labels_length += len(str(column_name)) + 8
data.append([row["Max"], row["Q3"], row["Median"], row["Q1"], row["Min"]])
# TODO: need better fix for horizontal overflow
# labels_length *= 0.80
# print("~~~~~~~~~~~~~~~~~~~~")
# print(labels)
# print(labels_length)
# print(data)
# print("~~~~~~~~~~~~~~~~~~~~")
# print(plt.bar.__doc__)
if orientation == "v":
# adjust plot size along x axis based on total labels length
plt.plot_size(labels_length, 30)
plt.box(
labels,
data,
width=0.1,
colors=["blue+", "orange+"],
orientation=orientation,
)
plt.theme("pro")
# plt.show()
return "\n" + plt.build() + "\n"
def px_simple_bar(df, title: str = None, id=None, style: dict = None, orientation="h"):
"""
Plot data with simple bar chart
"""
# TODO: handle None properly
if "Metric" in df.columns and ("Count" in df.columns or "Value" in df.columns):
detected_label = "Count" if "Count" in df.columns else "Value"
df[detected_label] = [
x.astype(int) if x != "" else int(0) for x in df[detected_label]
]
else:
raise NameError("simple_bar: No Metric or Count in df columns!")
# Assign figure characteristics
range_color = style.get("range_color", None)
label_txt = style.get("label_txt", None)
xrange = style.get("xrange", None)
if label_txt is not None:
label_txt = label_txt.strip("()")
try:
label_txt = label_txt.replace("+ $normUnit", df["Unit"][0])
except KeyError:
print("No units found in df. Auto labeling.")
# Overrides for figure chatacteristics
if id == 1701.1:
label_txt = "%"
range_color = [0, 100]
xrange = [0, 110]
if id == 1701.2:
label_txt = "Gb/s"
range_color = [0, 1638]
xrange = [0, 1638]
fig = px.bar(
df,
title=title,
x=detected_label,
y="Metric",
color=detected_label,
range_color=range_color,
labels={detected_label: label_txt},
orientation=orientation,
).update_xaxes(range=xrange)
return fig
def px_simple_multi_bar(df, title=None, id=None):
"""
Plot data with simple multiple bar chart
"""
# TODO: handle Nan and None properly
if "Metric" in df.columns and "Avg" in df.columns:
df["Avg"] = [x.astype(int) if x != "" else int(0) for x in df["Avg"]]
else:
raise NameError("simple_multi_bar: No Metric or Count in df columns!")
dfigs = []
nested_bar = {}
df_unit = df["Unit"][0]
if id == 1604:
nested_bar = {"NC": {}, "UC": {}, "RW": {}, "CC": {}}
for index, row in df.iterrows():
nested_bar[row["Coherency"]][row["Xfer"]] = row["Avg"]
if id == 1704:
nested_bar = {"Read": {}, "Write": {}}
for index, row in df.iterrows():
nested_bar[row["Transaction"]][row["Type"]] = row["Avg"]
for group, metric in nested_bar.items():
dfigs.append(
px.bar(
title=group,
x=metric.values(),
y=metric.keys(),
labels={"x": df_unit, "y": ""},
text=metric.values(),
)
.update_xaxes(showgrid=False, rangemode="nonnegative")
.update_yaxes(showgrid=False)
)
return dfigs
class MemoryChart(Static):
"""Memory chart visualization widget."""
DEFAULT_CSS = """
MemoryChart {
border: solid $accent;
padding: 0;
width: auto;
height: auto;
overflow-y: auto;
overflow-x: auto;
background: $surface;
color: $text;
}
"""
def __init__(self, df: pd.DataFrame, **kwargs):
"""Initialize the memory chart."""
super().__init__("", classes="mem-chart", **kwargs)
self.df = df
# Generate the chart content on initialization
try:
# Prepare data
metric_dict = (
self.df[["Metric", "Value"]].set_index("Metric").to_dict()["Value"]
)
# Capture stdout
original_stdout = sys.stdout
string_buffer = StringIO()
sys.stdout = string_buffer
try:
# Generate the chart
result = plot_mem_chart("", "per_kernel", metric_dict)
stdout_output = string_buffer.getvalue()
if stdout_output:
plot_str = stdout_output
elif result:
plot_str = str(result)
else:
plot_str = "No chart data generated"
finally:
sys.stdout = original_stdout
self.update(plot_str)
except Exception as e:
error_message = f"Memory chart error: {str(e)}\n{traceback.format_exc()}"
self.update(f"Error: {str(error_message)}")
class RooflinePlot(PlotextPlot):
"""
HACK: will be replaced with real roof line plot
Roofline plot visualization widget.
"""
DEFAULT_CSS = """
RooflinePlot {
padding: 1;
width: auto;
height: auto;
background: $surface;
color: $text;
border: solid $accent;
}
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.styles.height = "75%"
self.styles.width = "75%"
self.plot_initialized = False
self.plt.theme("pro")
def on_mount(self):
self.refresh_plot()
def refresh_plot(self):
# Get the current size of this widget
plot_width, plot_height = self.size
# Configure the plot size
self.plt.plot_size(plot_width, plot_height)
self.create_roofline()
self.plot_initialized = True
def on_resize(self):
if self.plot_initialized:
self.refresh_plot()
def create_roofline(self):
"""Generate the roofline plot data and visualization."""
# Roofline model parameters
peak_performance = 1000.0 # GFLOPS
memory_bandwidth = 100.0 # GB/s
# For memory-bound region (diagonal line)
x_mem = [0.1, 0.5, 1.0, 5.0, 10.0]
y_mem = [x * memory_bandwidth for x in x_mem]
# For compute-bound region (horizontal line)
x_comp = [10.0, 20.0, 50.0, 100.0]
y_comp = [peak_performance] * len(x_comp)
# Example workloads with safe values
workloads = [
(0.5, 45.0, "Workload A"), # Memory bound
(2.0, 180.0, "Workload B"), # Memory bound
(15.0, 950.0, "Workload C"), # Compute bound
(30.0, 980.0, "Workload D"), # Compute bound
]
# Clear the plot and set properties
self.plt.clear_figure()
self.plt.title("Roofline Model (🚧 Under Construction)")
self.plt.xlabel("Arithmetic Intensity (FLOPs/Byte)")
self.plt.ylabel("Performance (GFLOP/sec)")
# Plot memory-bound and compute-bound lines
self.plt.plot(x_mem, y_mem, label="Memory Bound")
self.plt.plot(x_comp, y_comp, label="Compute Bound")
# Add workload points
workload_x = [w[0] for w in workloads]
workload_y = [w[1] for w in workloads]
workload_names = [w[2] for w in workloads]
# Plot workload points one by one to avoid errors
for i in range(len(workload_x)):
self.plt.scatter([workload_x[i]], [workload_y[i]], label=workload_names[i])
# Set a reasonable view range
self.plt.xlim(0, 100)
self.plt.ylim(0, 1100)
# Draw the plot
self.refresh()
class SimpleBar(Static):
"""Simple Bar visualization widget."""
DEFAULT_CSS = """
SimpleBar {
padding: 0;
width: auto;
height: auto;
overflow-y: auto;
overflow-x: auto;
background: $surface;
color: $text;
}
"""
def __init__(self, df: pd.DataFrame, **kwargs):
"""Initialize the simple bar."""
super().__init__("", classes="simple-bar", **kwargs)
self.df = df
try:
result = simple_bar(self.df)
if result:
plot_str = str(result)
# Escape markup characters
escaped_content = plot_str.replace("[", r"\[").replace("]", r"\]")
self.update(escaped_content)
# Alternative - wrap in [pre] tags for preformatted text
# self.update(f"[pre]{plot_str}[/pre]")
else:
self.update("No simple bar data generated")
except Exception as e:
error_message = f"Simple Bar error: {str(e)}\n{traceback.format_exc()}"
escaped_error = error_message.replace("[", r"\[").replace("]", r"\]")
self.update(f"Error: {escaped_error}")
class SimpleBox(Static):
"""Simple Box visualization widget."""
DEFAULT_CSS = """
SimpleBox {
padding: 0;
width: auto;
height: auto;
overflow-y: auto;
overflow-x: auto;
background: $surface;
color: $text;
}
"""
def __init__(self, df: pd.DataFrame, **kwargs):
"""Initialize the simple box."""
super().__init__("", classes="simple-box", **kwargs)
self.df = df
try:
result = simple_box(self.df)
if result:
plot_str = str(result)
# Escape markup characters
escaped_content = plot_str.replace("[", r"\[").replace("]", r"\]")
self.update(escaped_content)
else:
self.update("No simple box data generated")
except Exception as e:
error_message = f"Simple Box error: {str(e)}\n{traceback.format_exc()}"
escaped_error = error_message.replace("[", r"\[").replace("]", r"\]")
self.update(f"Error: {escaped_error}")
class SimpleMultiBar(Static):
"""Simple Multiple Bar visualization widget."""
DEFAULT_CSS = """
SimpleMultiBar {
padding: 0;
width: auto;
height: auto;
overflow-y: auto;
overflow-x: auto;
background: $surface;
color: $text;
}
"""
def __init__(self, df: pd.DataFrame, **kwargs):
"""Initialize the simple multiple bar."""
super().__init__("", classes="simple-multi-bar", **kwargs)
self.df = df
try:
result = simple_multiple_bar(self.df)
if result:
plot_str = str(result)
# Escape markup characters
escaped_content = plot_str.replace("[", r"\[").replace("]", r"\]")
self.update(escaped_content)
else:
self.update("No simple multi bar data generated")
except Exception as e:
error_message = (
f"Simple Multiple Box error: {str(e)}\n{traceback.format_exc()}"
)
escaped_error = error_message.replace("[", r"\[").replace("]", r"\]")
self.update(f"Error: {escaped_error}")
+243
Просмотреть файл
@@ -0,0 +1,243 @@
from typing import Any, Dict, List, Optional
import pandas as pd
import yaml
from textual.containers import VerticalScroll
from textual.widgets import Collapsible, DataTable, Label
from rocprof_compute_tui.widgets.charts import (
MemoryChart,
RooflinePlot,
SimpleBar,
SimpleBox,
SimpleMultiBar,
)
def create_table(df: pd.DataFrame) -> DataTable:
table = DataTable(zebra_stripes=True)
# Clean the DataFrame - remove NaN and empty cells
df = df.reset_index()
df = df.dropna(how="any")
df = df[~df.apply(lambda row: row.astype(str).str.strip().eq("").any(), axis=1)]
# Add columns and rows
str_columns = [str(col) for col in df.columns]
table.add_columns(*str_columns)
table.add_rows([tuple(str(x) for x in row) for row in df.itertuples(index=False)])
return table
def load_config(config_path) -> Dict[str, Any]:
try:
with open(config_path, "r") as file:
return yaml.safe_load(file)
except FileNotFoundError:
raise FileNotFoundError(f"Configuration file {config_path} not found")
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML configuration: {e}")
def get_data_from_path(dfs: Dict[str, Any], path: List[str]) -> Optional[pd.DataFrame]:
try:
current = dfs
for key in path:
current = current[key]
return current["df"]
except (KeyError, TypeError):
return None
def get_tui_style_from_path(dfs: Dict[str, Any], path: List[str]) -> Optional[str]:
try:
current = dfs
for key in path:
current = current[key]
return current.get("tui_style")
except (KeyError, TypeError):
return None
def create_widget_from_data(df: pd.DataFrame, tui_style: Optional[str] = None) -> Any:
if df is not None and not df.empty:
match tui_style:
case None:
return create_table(df)
case "mem_chart":
return MemoryChart(df)
case "roofline":
# TODO: implement real roofline plot
pass
case "simple_bar":
return SimpleBar(df)
case "simple_box":
return SimpleBox(df)
case "simple_multiple_bar":
return SimpleMultiBar(df)
case _:
return Label(f"Unknown display type: {tui_style}")
else:
return Label(f"Data not available for display in {tui_style}.")
def build_subsection(
subsection_config: Dict[str, Any], dfs: Dict[str, Any]
) -> Collapsible:
title = subsection_config["title"]
collapsed = subsection_config.get("collapsed", True)
tui_style = subsection_config.get("tui_style")
# Handle data-driven widgets
if "data_path" in subsection_config:
data_path = subsection_config["data_path"]
if tui_style is None:
tui_style = (
get_tui_style_from_path(dfs, data_path) if dfs is not None else None
)
df = get_data_from_path(dfs, data_path)
if df is None and tui_style is None:
error_msg = (
f"{title} data not available: Path {' -> '.join(data_path)} not found"
)
return Collapsible(
Label(error_msg, classes="warning"), title=title, collapsed=collapsed
)
# Create main widget
widget = create_widget_from_data(df, tui_style)
# Add header label if specified
widgets = []
if "header_label" in subsection_config:
header_class = subsection_config.get("header_class", "")
widgets.append(Label(subsection_config["header_label"], classes=header_class))
widgets.append(widget)
collapsible = Collapsible(*widgets, title=title, collapsed=collapsed)
# HACK: only because no real roofline data right now
elif tui_style == "roofline":
widget = VerticalScroll(RooflinePlot())
collapsible = Collapsible(widget, title=title, collapsed=collapsed)
# Fallback for subsections without data or style
else:
collapsible = Collapsible(
Label(f"No data or style configuration for {title}"),
title=title,
collapsed=collapsed,
)
# Add ID if specified
if "widget_id" in subsection_config:
collapsible.id = subsection_config["widget_id"]
return collapsible
def build_dynamic_kernel_sections(
dfs: Dict[str, Any], skip_sections: List[str]
) -> List[Collapsible]:
children = []
try:
for section_name, subsections in dfs.items():
if section_name in skip_sections:
continue
kernel_children = []
for subsection_name, data in subsections.items():
if isinstance(data, dict) and "df" in data:
df = data["df"]
tui_style = data.get("tui_style")
widget = create_widget_from_data(df, tui_style)
kernel_children.append(
Collapsible(widget, title=subsection_name, collapsed=True)
)
if kernel_children:
children.append(
Collapsible(*kernel_children, title=section_name, collapsed=True)
)
except Exception as e:
children.append(Label(f"Error in Kernel Section: {str(e)}", classes="error"))
return children
def build_section_from_config(
section_config: Dict[str, Any], dfs: Dict[str, Any]
) -> Collapsible:
title = section_config["title"]
collapsed = section_config.get("collapsed", True)
css_class = section_config.get("class")
# Handle under construction sections
if section_config.get("under_construction", False):
construction_label = section_config.get(
"construction_label", "Under Construction"
)
construction_class = section_config.get("construction_class", "")
children = [Label(construction_label, classes=construction_class)]
# Handle dynamic sections (like kernel sections)
elif section_config.get("dynamic_sections", False):
skip_sections = section_config.get("skip_sections", [])
children = build_dynamic_kernel_sections(dfs, skip_sections)
# Handle regular sections with subsections
elif "subsections" in section_config:
children = []
for subsection_config in section_config["subsections"]:
try:
subsection = build_subsection(subsection_config, dfs)
children.append(subsection)
except Exception as e:
error_msg = f"{subsection_config.get('title', 'Unknown')} error: {str(e)}"
children.append(Label(error_msg, classes="warning"))
else:
children = [Label("No configuration provided for this section")]
# Create the main collapsible
collapsible = Collapsible(*children, title=title, collapsed=collapsed)
# Add CSS class if specified
if css_class:
collapsible.add_class(css_class)
return collapsible
def build_all_sections(dfs: Dict[str, Any], config_path) -> List[Collapsible]:
config = load_config(config_path)
sections = []
for section_config in config["sections"]:
try:
section = build_section_from_config(section_config, dfs)
sections.append(section)
except Exception as e:
# Create error section if something goes wrong
error_title = section_config.get("title", "Unknown Section")
error_section = Collapsible(
Label(f"Error building section: {str(e)}", classes="error"),
title=f"{error_title}",
collapsed=True,
)
sections.append(error_section)
return sections
+15
Просмотреть файл
@@ -0,0 +1,15 @@
"""
Specialized Widget Modules
-------------------------
Contains custom widget implementations for the application.
"""
from textual.widgets import DirectoryTree
class FolderOnlyDirectory(DirectoryTree):
"""Directory tree that only shows folders."""
def filter_paths(self, paths):
"""Filter to only show directories."""
return [path for path in paths if path.is_dir()]
+87
Просмотреть файл
@@ -0,0 +1,87 @@
from textual import on, work
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.reactive import reactive
from textual.widgets import Button
from rocprof_compute_tui.widgets.recent_directories import RecentDirectoriesScreen
class DropdownMenu(Container):
"""A dropdown menu that appears when a menu button is clicked."""
def compose(self) -> ComposeResult:
"""Compose the dropdown menu with menu items."""
yield Button("Open Workload", id="menu-open-workload", classes="menu-item")
yield Button("Open Recent", id="menu-open-recent", classes="menu-item")
# TODO:
# yield Button("Attach", id="menu-attach", classes="menu-item")
yield Button("Exit", id="menu-exit", classes="menu-item")
def on_mount(self) -> None:
"""Hide the dropdown menu when it's first mounted."""
self.add_class("hidden")
class MenuButton(Button):
"""A button that toggles a dropdown menu when clicked."""
is_open = reactive(False)
def __init__(self, label: str, menu_id: str, *args, **kwargs):
super().__init__(label, *args, **kwargs)
self.menu_id = menu_id
def on_click(self) -> None:
"""Toggle the dropdown menu when clicked."""
self.is_open = not self.is_open
dropdown = self.app.query_one(f"#{self.menu_id}", DropdownMenu)
if self.is_open:
dropdown.remove_class("hidden")
else:
dropdown.add_class("hidden")
class MenuBar(Container):
"""A menu bar that spans the width of the app."""
def compose(self) -> ComposeResult:
"""Compose the menu bar with menu buttons and dropdown menus."""
yield Horizontal(
MenuButton("File", "file-dropdown", id="menu-file"),
# TODO:
# Button("Help (🚧)", id="menu-placeholder"),
id="menu-buttons",
)
# Create a container for the dropdown menus
with Container(id="dropdown-container"):
yield DropdownMenu(id="file-dropdown")
yield DropdownMenu(id="placeholder-dropdown")
def on_mount(self) -> None:
self.border_title = "MENU BAR"
self.add_class("section")
self.parent_main_view = self.screen.query_one("#main-container", Horizontal)
@on(Button.Pressed, "#menu-open-recent")
def show_recent(self) -> None:
if not self.app.recent_dirs:
self.notify("No recent directories found", severity="warning")
return
def on_recent_selected(selected_dir: str) -> None:
if selected_dir:
self.parent_main_view.selected_path = selected_dir
dropdown = self.query_one(f"#file-dropdown", DropdownMenu)
dropdown.add_class("hidden")
self.parent_main_view.run_analysis()
self.app.push_screen(
RecentDirectoriesScreen(self.app.recent_dirs), on_recent_selected
)
@on(Button.Pressed, "#menu-exit")
def exit_app(self) -> None:
self.app.exit()
+40
Просмотреть файл
@@ -0,0 +1,40 @@
from typing import List
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.screen import ModalScreen
from textual.widgets import Button, Label, ListItem, ListView
class RecentDirectoriesScreen(ModalScreen):
"""Modal screen to display recent directories."""
def __init__(self, recent_dirs: List[str]) -> None:
super().__init__()
self.recent_dirs = recent_dirs
def compose(self) -> ComposeResult:
with Container(id="recent-modal"):
yield Label("Recent Directories", id="recent-title")
if self.recent_dirs:
with ListView(id="recent-list"):
for directory in self.recent_dirs:
yield ListItem(Label(directory))
else:
yield Label("No recent directories found", id="no-recent")
with Horizontal(id="recent-buttons"):
yield Button("Select", variant="primary", id="select-recent")
yield Button("Close", variant="default", id="close-recent")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "close-recent":
self.dismiss()
elif event.button.id == "select-recent":
list_view = self.query_one("#recent-list", ListView)
if list_view.highlighted_child:
selected_dir = self.recent_dirs[list_view.index or 0]
self.dismiss(selected_dir)
def on_list_view_selected(self, event: ListView.Selected) -> None:
selected_dir = self.recent_dirs[event.list_view.index or 0]
self.dismiss(selected_dir)
+24
Просмотреть файл
@@ -0,0 +1,24 @@
"""
Panel Widget Modules
-------------------
Contains the panel widgets used in the main layout.
"""
from textual.containers import Vertical
from textual.widgets import Label
class RightPanel(Vertical):
"""Right panel for additional tools."""
def __init__(self):
"""Initialize the right panel."""
super().__init__()
def compose(self):
"""Compose the right panel."""
yield Label("🚧 Under Construction")
def _on_mount(self):
self.border_title = "🚧 UNDER CONSTRUCTION"
self.add_class("section")
+48
Просмотреть файл
@@ -0,0 +1,48 @@
"""
Specialized Widget Modules
-------------------------
Contains custom widget implementations for the application.
"""
import pandas as pd
from textual.events import MouseDown, MouseMove, MouseUp
from textual.widgets import Static
class HorizontalSplitter(Static):
"""A draggable horizontal splitter between panels."""
def __init__(self, **kwargs):
"""Initialize the horizontal splitter."""
super().__init__("", id="splitter", **kwargs)
self.dragging = False
def on_mouse_down(self, event: MouseDown) -> None:
"""Handle mouse down events."""
self.dragging = True
self.capture_mouse()
def on_mouse_up(self, event: MouseUp) -> None:
"""Handle mouse up events."""
self.dragging = False
self.release_mouse()
def on_mouse_move(self, event: MouseMove) -> None:
"""Handle mouse move events for dragging."""
if not self.dragging:
return
app = self.app
top = app.query_one("#center-panel")
bottom = app.query_one("#bottom-panel")
new_top_height = event.screen_y
total_height = app.size.height
new_bottom_height = total_height - new_top_height - 1
# Ensure minimum height
if new_top_height < 3 or new_bottom_height < 3:
return
top.styles.height = new_top_height
bottom.styles.height = new_bottom_height
+21
Просмотреть файл
@@ -0,0 +1,21 @@
from textual.binding import Binding
from textual.widgets import TabbedContent, Tabs
class TabsTabbedContent(TabbedContent):
BINDINGS = [
Binding("l", "next_tab", "Next tab", show=False),
Binding("h", "previous_tab", "Previous tab", show=False),
Binding("down,j", "app.focus_next", "Focus next", show=False),
Binding("up,k", "app.focus_previous", "Focus previous", show=False),
]
def action_next_tab(self) -> None:
tabs = self.query_one(Tabs)
if tabs.has_focus:
tabs.action_next_tab()
def action_previous_tab(self) -> None:
tabs = self.query_one(Tabs)
if tabs.has_focus:
tabs.action_previous_tab()
+48
Просмотреть файл
@@ -0,0 +1,48 @@
"""
Panel Widget Modules
-------------------
Contains the panel widgets used in the main layout.
"""
from textual.containers import Vertical
from textual.widgets import TabPane, TextArea
from rocprof_compute_tui.widgets.tabbed_content import TabsTabbedContent
from rocprof_compute_tui.widgets.tabs.tabs_terminal import Terimnal
class TabsArea(Vertical):
"""
The response area.
"""
COMPONENT_CLASSES = {
"border-title-status",
}
def __init__(self):
"""Initialize the bottom panel."""
super().__init__()
# Create text areas as instance attributes
self.description_area = TextArea(id="description-text", read_only=True)
self.output_area = TextArea(id="output-text", read_only=True)
# Set initial tab
self.default_tab = "tab-output"
def compose(self):
with TabsTabbedContent(initial="tab-output"):
with TabPane("METRIC DESCRIPTION", id="tab-description"):
yield (self.description_area)
with TabPane("OUTPUT", id="tab-output"):
yield (self.output_area)
with TabPane("TERMINAL", id="tab-terminal"):
yield Terimnal()
def on_mount(self) -> None:
self.border_title = "TABS"
self.add_class("section")
+199
Просмотреть файл
@@ -0,0 +1,199 @@
import os
import platform
import subprocess
from typing import Optional
from rich.text import Text
from textual import events
from textual.app import ComposeResult
from textual.containers import Container, Vertical, VerticalScroll
from textual.widgets import Input, Static
class Terimnal(Container):
def __init__(
self,
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
):
super().__init__(name=name, id=id, classes=classes)
self.current_directory = os.getcwd()
self.output_text = ""
self.input_text = ""
self.input_prompt = ""
self.has_focus = True
# Command history
self.command_history = []
self.history_index = -1
self.current_command = ""
# Tab completion
self.tab_completions = []
self.tab_index = -1
self.tab_prefix = ""
def compose(self) -> ComposeResult:
# Output area with scroll wrapper
with Vertical():
with VerticalScroll(id="term-output-scroll"):
yield Static("", id="terminal-output")
yield Input(id="terminal-input", placeholder="")
def on_mount(self) -> None:
"""Initialize the terminal."""
# Update status
self.add_output(
f"Support quick/simple terminal commands.\ncwd: {self.current_directory}\n"
)
# Update the prompt
self.update_prompt()
# self.query_one("#terminal-input").focus()
def update_prompt(self) -> None:
"""Update the command prompt in the input field."""
input_widget = self.query_one("#terminal-input")
current_path = os.path.basename(self.current_directory) or self.current_directory
if platform.system() != "Windows":
prompt = f"{current_path} $ "
else:
prompt = f"{current_path}> "
input_widget.placeholder = prompt
def add_output(self, text: str) -> None:
"""Add text to the terminal output."""
self.output_text += text
output = self.query_one("#terminal-output")
output.update(Text.from_ansi(self.output_text))
# Ensure scroll to bottom
scroll = self.query_one("#term-output-scroll")
scroll.scroll_end(animate=False)
def action_clear(self) -> None:
"""Clear the terminal output."""
self.output_text = ""
output = self.query_one("#terminal-output")
output.update(Text.from_ansi(""))
def action_interrupt(self) -> None:
"""Interrupt the current process if any."""
if self.current_process is not None:
try:
if platform.system() == "Windows":
import signal
self.current_process.send_signal(signal.CTRL_C_EVENT)
else:
self.current_process.terminate()
self.add_output("\n^C\n")
except Exception as e:
self.add_output(f"\nFailed to interrupt process: {str(e)}\n")
finally:
self.current_process = None
else:
# If no process is running, just show ^C and clear the input
self.add_output("\n^C\n")
self.query_one("#terminal-input").value = ""
def run_command(self, command: str) -> None:
"""Run a system command and display its output."""
# Add command to history
if command.strip() and (
not self.command_history or command != self.command_history[-1]
):
self.command_history.append(command)
self.history_index = len(self.command_history)
# Show the command in the output
prompt = self.query_one("#terminal-input").placeholder
self.add_output(f"{prompt}{command}\n")
if not command.strip():
return
# Handle built-in commands
if command == "clear" or command == "cls":
self.action_clear()
return
elif command == "exit" or command == "quit":
if hasattr(self.app, "exit"):
self.app.exit()
return
elif command.startswith("cd "):
try:
path = command[3:].strip()
if not path:
# Just "cd" usually goes to home directory
path = os.path.expanduser("~")
# Handle relative paths
if not os.path.isabs(path):
path = os.path.join(self.current_directory, path)
# Change to the new directory
os.chdir(path)
self.current_directory = os.getcwd()
self.add_output(f"Changed directory to {self.current_directory}\n")
self.update_prompt()
except Exception as e:
self.add_output(f"Error: {str(e)}\n")
return
# Execute the command with shell=True to support pipes and redirections
try:
self.current_process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
cwd=self.current_directory,
text=True,
)
stdout, stderr = self.current_process.communicate()
if stdout:
self.add_output(stdout)
if stderr:
self.add_output(f"{stderr}")
self.current_process = None
except Exception as e:
self.add_output(f"Error executing command: {str(e)}\n")
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle input submission."""
if event.input.id == "terminal-input":
command = event.input.value
event.input.value = ""
self.run_command(command)
event.input.focus()
def on_key(self, event: events.Key) -> None:
"""Handle key events for history navigation."""
# Handle arrow keys for command history
if event.key == "up" and self.command_history:
input_widget = self.query_one("#terminal-input")
if self.history_index > 0:
self.history_index -= 1
input_widget.value = self.command_history[self.history_index]
input_widget.cursor_position = len(input_widget.value)
event.prevent_default()
elif event.key == "down" and self.command_history:
input_widget = self.query_one("#terminal-input")
if self.history_index < len(self.command_history) - 1:
self.history_index += 1
input_widget.value = self.command_history[self.history_index]
else:
self.history_index = len(self.command_history)
input_widget.value = ""
input_widget.cursor_position = len(input_widget.value)
event.prevent_default()