[rocpd] Adding summary module to generate summaries from rocpd database + query submodule + rocpd command-line tools (#488)

* adding summary.py to generate tmp <category_region>_summary views

* migrating CSV summary to SDK method of writing CSVs

  - Add domain_view to summary.py
  - omit the C++ code of writing CSV because it gets revered later anyway

* Add summary subparser and write_sql_view_to_csv function

* adding all <>_summary views generation to summary.py

* add summary_per_rank feature

* add --summary-per-rank

* reconstruct generate_summary_view and create_domain_view

-introduce by_rank

* remove sqr and variance in summary views

* use RocpdImportData instead of connection

* two fixes on summary.py

--modify the generate_summary_view function to return a tuple with view name and sql code

add if_not_exits parameter to generete_summary_view

* Refactor summary.py to allow output path and filename args, and apply time_window
- clean up summary table column headers
- only generate by-rank views if that param is specified

* Add ProcessID to Hostname output and csv, so users can identify the system in the by-rank summaries

* Summary.py, just add hostname to by-rank summaries, instead of creating mapping table

* Summary - migrate csv writer to pandas, for more future flexibility

* Adding a few simple tests for summary.py

* Linting fixes

* add region_categories to summary options

  -  Automatically retrieve region categories from the database if argument is None

* add backticks for view_names

* fix tests after rebase

* Made code review changes
- fixed whitespace in CMakelists.txt
- adding query.py module & subparser in __main__.py
- refactor summary function to return query
- used query.py to output csv
- used query.py to also output summary to console
- provided new command line options to select summary output to csv or console

* Made fix to jinja template in query.py, as suggested by copilot

* Consolidated output calls to query in export_view function based on feedback
- refactored: helpers, query functions, create view functions
- extended formats to include what query supports (md, html, pdf, json)
- added json format to query, and changed orient=records
- adding jinja2 and reportlab to requirements.txt

* Add version_info for rocpd and roctx

* Add rocpd commandline tool

* Add executable permissions to source/bin/rocpd.py

* Removed rocpd2query, and cleaned up --help examples

---------

Co-authored-by: acanadas <acanadas@amd.com>
Co-authored-by: Jin Tao <jintao12@amd.com>
Co-authored-by: a-canadasruiz <Araceli.CanadasRuiz@amd.com>
Co-authored-by: Jonathan R. Madsen <Jonathan.Madsen@amd.com>

[ROCm/rocprofiler-sdk commit: 3954cedd25]
This commit is contained in:
Hui, Young
2025-07-24 17:12:06 -04:00
committed by GitHub
parent 35d491159f
commit 68ae6cf65f
13 changed files with 1480 additions and 277 deletions
@@ -5,6 +5,7 @@ cmake>=3.21.0
cmake-format
dataclasses
flake8
jinja2
numpy
otf2
pandas
@@ -12,3 +13,4 @@ perfetto
pycobertura
pytest
pyyaml
reportlab
@@ -25,3 +25,39 @@ install(
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ
WORLD_EXECUTE
COMPONENT tools)
# for each entry here there must be a ROCPD_BIN_<entry> list
set(ROCPD_EXECUTABLES "all" "csv" "otf2" "pftrace" "summary")
# format is list: [<exe-name>, <python-module>, <extra-args-to-python-module>]
set(ROCPD_BIN_all "rocpd" "rocpd" "")
set(ROCPD_BIN_csv "rocpd2csv" "rocpd.csv" "")
set(ROCPD_BIN_otf2 "rocpd2otf2" "rocpd.otf2" "")
set(ROCPD_BIN_pftrace "rocpd2pftrace" "rocpd.pftrace" "")
set(ROCPD_BIN_summary "rocpd2summary" "rocpd.summary" "")
foreach(_EXE IN LISTS ROCPD_EXECUTABLES)
list(GET ROCPD_BIN_${_EXE} 0 ROCPD_EXE_NAME)
list(GET ROCPD_BIN_${_EXE} 1 ROCPD_EXE_MODULE)
list(GET ROCPD_BIN_${_EXE} 2 ROCPD_EXE_MODULE_ARGS)
if(NOT ROCPD_EXE_NAME
OR NOT ROCPD_EXE_MODULE
OR (NOT ROCPD_EXE_MODULE_ARGS AND NOT ROCPD_EXE_MODULE_ARGS STREQUAL ""))
message(
FATAL_ERROR "ROCPD_BIN_${_EXE} not properly defined: ${ROCPD_BIN_${_EXE}}")
endif()
set(ROCPD_EXE_OUTPUT_FILE
${PROJECT_BINARY_DIR}/${CMAKE_INSTALL_BINDIR}/${ROCPD_EXE_NAME})
# Adding main rocpd
configure_file(rocpd.py ${ROCPD_EXE_OUTPUT_FILE} @ONLY)
install(
FILES ${ROCPD_EXE_OUTPUT_FILE}
DESTINATION ${CMAKE_INSTALL_BINDIR}
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE
WORLD_READ WORLD_EXECUTE
COMPONENT rocpd)
endforeach()
+74
View File
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2024-2025 Advanced Micro Devices, Inc. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import sys
"""
Simple Python executable script for invoking `python3 -m @ROCPD_EXE_MODULE@`
"""
def main(argv=sys.argv[1:], environ=dict(os.environ)):
"""
Executes {sys.executable} -m @ROCPD_EXE_MODULE@ @ROCPD_EXE_MODULE_ARGS@
"""
ROCPD_SUPPORTED_PYTHON_VERSIONS = [
".".join(itr.split(".")[:2]) for itr in "@ROCPROFILER_PYTHON_VERSIONS@".split(";")
]
ROCPD_MODULE_ARGS = [f"{itr}" for itr in "@ROCPD_EXE_MODULE_ARGS@".split(" ") if itr]
this_dir = os.path.dirname(os.path.realpath(__file__))
this_python_ver = f"{sys.version_info.major}.{sys.version_info.minor}"
if this_python_ver not in ROCPD_SUPPORTED_PYTHON_VERSIONS:
raise ImportError(
"@ROCPD_EXE_NAME@ not supported for Python version {} (sys.executable='{}').\n@ROCPD_EXE_NAME@ supported python versions: {}".format(
this_python_ver,
sys.executable,
", ".join(ROCPD_SUPPORTED_PYTHON_VERSIONS),
)
)
module_path = os.path.join(
this_dir,
"..",
"@CMAKE_INSTALL_LIBDIR@",
f"python{this_python_ver}",
"site-packages",
)
python_path = [module_path] + os.environ.get("PYTHONPATH", "").split(":")
# update PYTHONPATH environment variable
environ["PYTHONPATH"] = ":".join(python_path)
args = [f"{sys.executable}", "-m", "@ROCPD_EXE_MODULE@"] + ROCPD_MODULE_ARGS + argv
# does not return
os.execvpe(args[0], args, env=environ)
if __name__ == "__main__":
main()
@@ -46,8 +46,24 @@ __all__ = [
"write_csv",
"write_otf2",
"RocpdImportData",
"version_info",
]
version_info = {
"version": "@PROJECT_VERSION@",
"major": int("@PROJECT_VERSION_MAJOR@"),
"minor": int("@PROJECT_VERSION_MINOR@"),
"patch": int("@PROJECT_VERSION_PATCH@"),
"git_revision": "@ROCPROFILER_SDK_GIT_REVISION@",
"library_arch": "@CMAKE_LIBRARY_ARCHITECTURE@",
"system_name": "@CMAKE_SYSTEM_NAME@",
"system_processor": "@CMAKE_SYSTEM_PROCESSOR@",
"system_version": "@CMAKE_SYSTEM_VERSION@",
"compiler_id": "@CMAKE_CXX_COMPILER_ID@",
"compiler_version": "@CMAKE_CXX_COMPILER_VERSION@",
"rocm_version": "@rocm_version_FULL_VERSION@",
}
def format_path(path, tag=os.path.basename(sys.executable)):
return libpyrocpd.format_path(path, tag)
@@ -38,11 +38,14 @@ def main(argv=None, config=None):
"""
import argparse
from . import time_window
from . import output_config
from . import pftrace
from . import csv
from . import otf2
from . import output_config
from . import pftrace
from . import query
from . import summary
from . import time_window
from . import version_info
from .importer import RocpdImportData
convert_examples = """
@@ -50,25 +53,62 @@ def main(argv=None, config=None):
Example usage:
Convert 1 database, output perfetto trace
$ python3 -m rocpd convert -i db1.db --output-format pftrace
$ rocpd convert -i db1.db --output-format pftrace
Convert 2 databases, output perfetto trace to path and filename, reduce time window to omit the first 30%
$ python3 -m rocpd convert -i db1.db db2.db --output-format pftrace -d "./output/" -o "twoFileTraces" --start 30% --end 100%
$ rocpd convert -i db1.db db2.db --output-format pftrace -d "./output/" -o "twoFileTraces" --start 30% --end 100%
Convert 6 databases, output CSV and perfetto trace formats
$ python3 -m rocpd convert -i db{0..5}.db --output-format csv pftrace -d "~/output_folder/" -o "sixFileTraces"
$ rocpd convert -i db{0..5}.db --output-format csv pftrace -d "~/output_folder/" -o "sixFileTraces"
Convert 2 databases, output CSV, OTF2, and perfetto trace formats
$ python3 -m rocpd convert -i db{3,4}.db --output-format csv otf2 pftrace
$ rocpd convert -i db{3,4}.db --output-format csv otf2 pftrace
"""
query_examples = """
Example usage:
Query the first 5 rows of the 'rocpd_info_agents' view and output to console
$ rocpd query -i db0.db --query "SELECT * FROM rocpd_info_agents LIMIT 5"
Combine 4 databases and query the first 10 rows of the 'top_kernels' view and output to CSV file
$ rocpd query -i db{0..3}.db --query "SELECT * FROM top_kernels LIMIT 10" --format csv
"""
summary_examples = """
Example usage:
Output all summaries to console and include domain summary for 1 database
$ rocpd summary -i db1.db --domain-summary
Aggregate 3 databases and output all summary files and include summary by rank/process ID, to csv file output
$ rocpd summary -i db{1..3}.db --summary-by-rank --format csv
Output all summaries to console and exlude all regions to save processing time
$ rocpd summary -i db0.db --region-categories NONE
Aggregate 2 databases and output all summary files to HTML, only include HIP and MARKER regions, include domain summary
$ rocpd summary -i db{0,1}.db --region-categories HIP MARKERS --domain-summary --format html
"""
# Add the subparsers
parser = argparse.ArgumentParser(
prog="rocpd",
description="Aggregate and/or analyze ROCm Profiling Data (rocpd)",
allow_abbrev=False,
)
parser.add_argument(
"-v",
"--version",
action="store_true",
help="Print the version information and exit",
)
subparsers = parser.add_subparsers(dest="command")
converter = subparsers.add_parser(
"convert",
@@ -78,20 +118,36 @@ Example usage:
epilog=convert_examples,
)
query_reporter = subparsers.add_parser(
"query",
description="Generate output on a query",
allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
epilog=query_examples,
)
generate_summary = subparsers.add_parser(
"summary",
description="Generate summary views from rocPD data",
allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
epilog=summary_examples,
)
def get_output_type(val):
return val.lower().replace("perfetto", "pftrace")
required_params = converter.add_argument_group("Required arguments")
required_params.add_argument(
# add required options for each subparser
converter_required_params = converter.add_argument_group("Required options")
converter_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s), separated by spaces",
help="Input path and filename to one or more database(s)",
)
required_params.add_argument(
converter_required_params.add_argument(
"-f",
"--output-format",
help="For adding output format (supported formats: csv, pftrace, otf2)",
@@ -102,7 +158,27 @@ Example usage:
required=True,
)
# add args from any sub-modules
query_required_params = query_reporter.add_argument_group("Required options")
query_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s)",
)
summary_required_params = generate_summary.add_argument_group("Required options")
summary_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s)",
)
# converter: add args from any sub-modules
valid_out_config_args = output_config.add_args(converter)
valid_generic_args = output_config.add_generic_args(converter)
valid_pftrace_args = pftrace.add_args(converter)
@@ -110,51 +186,110 @@ Example usage:
valid_otf2_args = otf2.add_args(converter)
valid_time_window_args = time_window.add_args(converter)
# query: subparser args
valid_out_config_args = output_config.add_args(query_reporter)
valid_query_args = query.add_args(query_reporter)
valid_time_window_args = time_window.add_args(query_reporter)
# summary: subparser args
valid_io_args = summary.add_io_args(generate_summary)
valid_summary_args = summary.add_args(generate_summary)
valid_time_window_args = time_window.add_args(generate_summary)
# parse the command line arguments
args = parser.parse_args(argv)
# process the args
out_cfg_args = output_config.process_args(args, valid_out_config_args)
generic_out_cfg_args = output_config.process_generic_args(args, valid_generic_args)
pftrace_args = pftrace.process_args(args, valid_pftrace_args)
csv_args = csv.process_args(args, valid_csv_args)
otf2_args = otf2.process_args(args, valid_otf2_args)
window_args = time_window.process_args(args, valid_time_window_args)
if args.version:
for key, itr in version_info.items():
if key in ["major", "minor", "patch"]:
continue
print(f" {key:>16}: {itr}")
return 0
# now start processing the data. Import the data and merge the views
importData = RocpdImportData(args.input)
# error check the command line arguments, if no subparser command is given, print the help message
if args.command is None:
parser.print_help()
return
# adjust the time window view of the data
if window_args is not None:
time_window.apply_time_window(importData, **window_args)
# if the user requested converter, process the conversion
if args.command == "convert":
# process the args
out_cfg_args = output_config.process_args(args, valid_out_config_args)
generic_out_cfg_args = output_config.process_generic_args(
args, valid_generic_args
)
pftrace_args = pftrace.process_args(args, valid_pftrace_args)
csv_args = csv.process_args(args, valid_csv_args)
otf2_args = otf2.process_args(args, valid_otf2_args)
window_args = time_window.process_args(args, valid_time_window_args)
all_args = {
**out_cfg_args,
**generic_out_cfg_args,
**pftrace_args,
**csv_args,
**otf2_args,
}
# setup the config args
config = (
output_config.output_config(**all_args)
if config is None
else config.update(**all_args)
)
# now start processing the data. Import the data and merge the views
importData = RocpdImportData(args.input)
# process each requested output format
format_handlers = {
"pftrace": pftrace.write_pftrace,
"csv": csv.write_csv,
"otf2": otf2.write_otf2,
}
# adjust the time window view of the data
if window_args is not None:
time_window.apply_time_window(importData, **window_args)
for out_format in args.output_format:
if out_format in format_handlers:
print(f"Converting database(s) to {out_format} format:")
format_handlers[out_format](importData, config)
else:
print(f"Warning: Unsupported output format '{out_format}'")
all_args = {
**out_cfg_args,
**generic_out_cfg_args,
**pftrace_args,
**csv_args,
**otf2_args,
}
# setup the config args
config = (
output_config.output_config(**all_args)
if config is None
else config.update(**all_args)
)
# process each requested output format
format_handlers = {
"pftrace": pftrace.write_pftrace,
"csv": csv.write_csv,
"otf2": otf2.write_otf2,
}
for out_format in args.output_format:
if out_format in format_handlers:
print(f"Converting database(s) to {out_format} format:")
format_handlers[out_format](importData, config)
else:
print(f"Warning: Unsupported output format '{out_format}'")
# if the user requested query module, execute the query
elif args.command == "query":
# query subparser args
query_args = query.process_args(args, valid_query_args)
out_cfg_args = output_config.process_args(args, valid_out_config_args)
window_args = time_window.process_args(args, valid_time_window_args)
all_args = {**query_args, **out_cfg_args}
query.execute(
args.input,
args,
window_args=window_args,
**all_args,
)
# if the user requested a summary, generate the views
elif args.command == "summary":
# summary subparser args
summary_args = summary.process_args(args, valid_summary_args)
io_args = output_config.process_args(args, valid_io_args)
window_args = time_window.process_args(args, valid_time_window_args)
# now start processing the data. Import the data and merge the views
importData = RocpdImportData(args.input)
# adjust the time window view of the data
if window_args is not None:
time_window.apply_time_window(importData, **window_args)
all_args = {**summary_args, **io_args}
summary.generate_all_summaries(importData, **all_args)
print("Done. Exiting...")
@@ -0,0 +1,560 @@
#!/usr/bin/env python3
###############################################################################
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
###############################################################################
import os
import sys
from typing import Union, Tuple, List, Optional
from datetime import datetime
from . import output_config
from . import libpyrocpd
from .importer import RocpdImportData
from .time_window import apply_time_window
def export_sqlite_query(
conn: RocpdImportData,
query: str,
params: Union[Tuple, List] = (),
export_format: Optional[str] = None,
export_path: Optional[str] = None,
dashboard_template_path: Optional[str] = None,
) -> Optional[str]:
"""
Execute a SQLite query and print it to console.
Then, if export_format is specified, write the results to a file.
Returns the path to the exported file (or None if nothing was exported).
Supported export_format values (case-insensitive):
- "csv"
- "html"
- "md" (markdown)
- "pdf"
- "dashboard" (templated HTML dashboard)
- "clipboard"
If export_format == "dashboard", you may optionally pass a
dashboard_template_path (a Jinja2 template file). If omitted,
a built-in default template is used.
"""
try:
import pandas as pd
conn = conn.connection if isinstance(conn, RocpdImportData) else conn
# 1) Run the query via pandas
df = pd.read_sql_query(query, conn, params=params)
if df.empty:
sys.stderr.write(f"No results found for query: {query}\n")
sys.stderr.flush()
return None
if export_format == "console" or export_format is None:
# 2) Print to console
print(df.to_string(index=False))
return None
elif export_format == "clipboard":
df.to_clipboard(excel=False)
return None
export_format = export_format.lower()
ext = export_format
export_path = export_path or f"query_output.{ext}"
if not export_path.endswith(f".{ext}"):
export_path = f"{export_path}.{ext}"
export_path = os.path.abspath(libpyrocpd.format_path(export_path, "rocpd"))
os.makedirs(os.path.dirname(export_path), exist_ok=True)
def write_export(content):
with open(export_path, "w") as ofs:
ofs.write(f"{content}\n")
ofs.flush()
# 3) Export based on format
if export_format == "csv":
df.to_csv(export_path, index=False)
elif export_format == "html":
write_export(df.to_html(index=False))
elif export_format == "md":
# pandas 1.0+ has to_markdown
try:
write_export(df.to_markdown(index=False))
except AttributeError:
# fallback: manually write markdown table
_df_to_markdown_fallback(df, export_path)
elif export_format == "pdf":
_export_df_to_pdf(df, export_path)
elif export_format == "dashboard":
_export_dashboard(
df, export_path=export_path, template_path=dashboard_template_path
)
elif export_format == "json":
df.to_json(export_path, index=False, indent=2, orient="records")
else:
print(f"Unsupported export format: {export_format}")
return None
print(f"Exported to: {export_path}\n")
return export_path
except Exception as e:
print(f"Error: {e}")
return None
def _df_to_markdown_fallback(df, path: str):
"""
Simple fallback if pandas.DataFrame.to_markdown(...) is unavailable.
"""
headers = list(df.columns)
with open(path, "w", encoding="utf-8") as f:
# Header row
f.write("| " + " | ".join(headers) + " |\n")
# Separator
f.write("|" + "|".join("---" for _ in headers) + "|\n")
# Data rows
for row in df.itertuples(index=False):
line = "| " + " | ".join(str(v) for v in row) + " |\n"
f.write(line)
def _export_df_to_pdf(df, path: str):
"""
Render a DataFrame into a monospaced text table inside a PDF.
"""
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.units import inch
c = canvas.Canvas(path, pagesize=letter)
width, height = letter
x = 0.5 * inch
y = height - 1 * inch
row_height = 14
c.setFont("Courier", 9)
headers = list(df.columns)
header_line = " | ".join(headers)
c.drawString(x, y, header_line)
y -= row_height
c.drawString(x, y, "-" * len(header_line))
y -= row_height
for _, row in df.iterrows():
row_line = " | ".join(str(v) for v in row)
# Clip at ~160 characters so it doesnt overflow the page width
c.drawString(x, y, row_line[:160])
y -= row_height
if y < 1 * inch:
c.showPage()
c.setFont("Courier", 9)
y = height - 1 * inch
c.save()
def _export_dashboard(df, export_path: str, template_path: Optional[str] = None):
"""
Generate a templated HTML “dashboard” from df. If template_path is None,
use a built-in template. Otherwise, load the Jinja2 template from that path.
"""
from jinja2 import Environment, FileSystemLoader, select_autoescape
# 1) Prepare Jinja2 environment
if template_path:
# User provided a .html (Jinja2) file
env = Environment(
loader=FileSystemLoader(os.path.dirname(template_path)),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template(os.path.basename(template_path))
else:
# Built-in default template
builtin_html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Dashboard Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
h1 { color: #333; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #aaa; padding: 8px; text-align: left; }
th { background-color: #f0f0f0; }
tr:nth-child(even) { background-color: #fafafa; }
</style>
</head>
<body>
<h1>{{ title }}</h1>
<p><em>Generated on {{ timestamp }}</em></p>
<div>
{{ table_html | safe }}
</div>
</body>
</html>
"""
env = Environment(autoescape=select_autoescape(["html", "xml"]))
template = env.from_string(builtin_html)
# 2) Render template with context
context = {
"title": "SQLite Query Dashboard",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"table_html": df.to_html(index=False, classes="dashboard-table"),
}
rendered = template.render(**context)
# 3) Write to export_path
with open(export_path, "w", encoding="utf-8") as f:
f.write(rendered)
def zip_files(file_paths: List[str], zip_path: str) -> str:
"""
Zip up one or more files into zip_path. Overwrites existing zip if present.
Returns the path to the created zip.
"""
import zipfile
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for fp in file_paths:
if os.path.isfile(fp):
zf.write(fp, arcname=os.path.basename(fp))
else:
raise FileNotFoundError(f"Cannot find file to zip: {fp}")
print(f"Created ZIP archive: {zip_path}")
return zip_path
def send_report_email(
file_paths: List[str],
to: Union[str, List[str]],
sender: str,
subject: str = "rocpd query Report",
inline_preview: bool = False,
smtp_server: str = "localhost",
smtp_port: int = 25,
smtp_user: Optional[str] = None,
smtp_password: Optional[str] = None,
zip_attachments: bool = False,
) -> None:
"""
Send an email with one or more attachments, optionally zipped,
and optionally with an inline preview (if the primary attachment is HTML).
Args:
file_paths: List of file paths to attach (each must exist).
to: Recipient email address, or list of addresses.
sender: Sender email address.
subject: Subject line.
inline_preview: If True, and one of the attachments is HTML, use that
HTML as the email body (and still attach files).
smtp_server / smtp_port / smtp_user / smtp_password: SMTP credentials.
zip_attachments: If True, bundle all file_paths into a single ZIP named
"<timestamp>_attachments.zip" and attach that ZIP only.
"""
import smtplib
from email.message import EmailMessage
# 1) Validate that files exist
for fp in file_paths:
if not os.path.isfile(fp):
raise FileNotFoundError(f"Attachment not found: {fp}")
# 2) If zip_attachments is True, zip everything into one archive
actual_attachments: List[str]
if zip_attachments:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_path = f"attachments_{timestamp}.zip"
zip_files(file_paths, zip_path)
actual_attachments = [zip_path]
else:
actual_attachments = file_paths.copy()
# 3) Build the EmailMessage
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = ", ".join(to) if isinstance(to, list) else to
# 4) If inline_preview is True, look for the first HTML attachment,
# read its content, and set as an HTML alternative in the email body.
if inline_preview:
html_body_found = False
for fp in actual_attachments:
if fp.lower().endswith(".html"):
with open(fp, "r", encoding="utf-8") as f:
html_content = f.read()
msg.set_content(
"This email contains an inline HTML preview. If your mail client "
"doesnt display HTML, see the attachment."
)
msg.add_alternative(html_content, subtype="html")
html_body_found = True
break
if not html_body_found:
# No HTML attachment found; create a simple text body
msg.set_content("Please see attached report file(s).")
else:
# No inline preview desired; use a simple text body
msg.set_content("Please see attached report file(s).")
# 5) Attach each file (or the single ZIP)
for fp in actual_attachments:
with open(fp, "rb") as f:
data = f.read()
ctype = "application"
subtype = "octet-stream"
filename = os.path.basename(fp)
msg.add_attachment(data, maintype=ctype, subtype=subtype, filename=filename)
# 6) Connect to SMTP and send
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.ehlo()
if smtp_user and smtp_password:
server.starttls()
server.login(smtp_user, smtp_password)
server.send_message(msg)
print(f"Email sent to {msg['To']} with subject '{subject}'")
def add_args(parser):
"""Add query arguments"""
query_options = parser.add_argument_group("Query Options")
# Common arguments
query_options.add_argument(
"--query", required=True, help="SQL SELECT query to execute (enclose in quotes)."
)
query_options.add_argument(
"--script",
required=False,
type=str,
help="Input SQL script which should be read before query (e.g. defines views)",
)
query_options.add_argument(
"--format",
help="Export format",
choices=("console", "csv", "html", "json", "md", "pdf", "dashboard", "clipboard"),
type=str.lower,
)
email_options = parser.add_argument_group("Query Email Options")
# Email options (optional)
email_options.add_argument(
"--email-to", help="Recipient email address (or comma-separated list)."
)
email_options.add_argument(
"--email-from", help="Sender email address (required if --email-to is used)."
)
email_options.add_argument(
"--email-subject",
default="SQLite Query Report",
help="Subject line for the email (default: %(default)s).",
)
email_options.add_argument(
"--smtp-server",
default="localhost",
help="SMTP server hostname (default: %(default)s).",
)
email_options.add_argument(
"--smtp-port",
type=int,
default=25,
help="SMTP server port (default: %(default)d).",
)
email_options.add_argument("--smtp-user", help="SMTP login username (if required).")
email_options.add_argument(
"--smtp-password", help="SMTP login password (if required)."
)
email_options.add_argument(
"--zip-attachments",
action="store_true",
help="Zip all attachments into a single .zip file before sending.",
)
email_options.add_argument(
"--inline-preview",
action="store_true",
help="Embed HTML report as inline body if an HTML attachment is present.",
)
dashboard_options = parser.add_argument_group("Query Dashboard Options")
dashboard_options.add_argument(
"--template-path", help="Path to a Jinja2 HTML template for the dashboard"
)
return [
"query",
"script",
"email_to",
"email_from",
"email_subject",
"smtp_server",
"smtp_port",
"smtp_user",
"smtp_password",
"inline_preview",
"zip_attachments",
"format",
"template_path",
]
def process_args(args, valid_args):
# do not add any of the arguments to the output config dict
ret = {}
return ret
def execute(input, args, config=None, window_args=None, **kwargs):
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
config = (
output_config.output_config(**kwargs)
if config is None
else config.update(**kwargs)
)
if args.script:
# read script and execute statements
with open(args.script, "r") as ifs:
for itr in ifs.read().split(";"):
importData.execute(f"{itr}")
# Prepare parameters for export
query = args.query
db = importData
export_format = args.format
export_path = os.path.join(config.output_path, config.output_file)
# Dashboard-only extra
dashboard_template = kwargs.get("template_path", None)
# 1) Run and export
exported_file = export_sqlite_query(
db,
query=query,
params=(),
export_format=export_format,
export_path=export_path,
dashboard_template_path=dashboard_template,
)
# 2) If --email-to was provided and we have a file, send it
if args.email_to:
if not args.email_from:
raise ValueError("--email-from is required when --email-to is used.")
if not exported_file:
print("No file was exported; skipping email.")
return
recipients = [addr.strip() for addr in args.email_to.split(",")]
send_report_email(
file_paths=[exported_file],
to=recipients,
sender=args.email_from,
subject=args.email_subject,
inline_preview=args.inline_preview,
smtp_server=args.smtp_server,
smtp_port=args.smtp_port,
smtp_user=args.smtp_user,
smtp_password=args.smtp_password,
zip_attachments=args.zip_attachments,
)
def main(argv=None):
import argparse
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
from .output_config import add_args as add_args_output_config
from .output_config import process_args as process_args_output_config
from .output_config import add_generic_args, process_generic_args
parser = argparse.ArgumentParser(
description="Generate report for rocpd query", allow_abbrev=False
)
required_params = parser.add_argument_group("Required options")
required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_out_config_args = add_args_output_config(parser)
valid_generic_args = add_generic_args(parser)
valid_time_window_args = add_args_time_window(parser)
valid_query_args = add_args(parser)
args = parser.parse_args(argv)
out_cfg_args = process_args_output_config(args, valid_out_config_args)
generic_out_cfg_args = process_generic_args(args, valid_generic_args)
window_args = process_args_time_window(args, valid_time_window_args)
query_args = process_args(args, valid_query_args)
all_args = {
**query_args,
**out_cfg_args,
**generic_out_cfg_args,
}
execute(
args.input,
args,
window_args=window_args,
**all_args,
)
if __name__ == "__main__":
main()
@@ -0,0 +1,532 @@
#!/usr/bin/env python3
###############################################################################
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
###############################################################################
import argparse
import os
from typing import Any, List, Tuple
from .importer import RocpdImportData, execute_statement
from .query import export_sqlite_query
from . import output_config
def get_temp_view_names(connection: RocpdImportData) -> List[str]:
"""Return the names of all temporary views in the SQLite connection."""
return [
v[0]
for v in execute_statement(
connection, "SELECT name FROM sqlite_temp_master WHERE type='view';"
).fetchall()
]
def get_temp_view_columns(connection: RocpdImportData, view_name: str) -> List[str]:
"""Return the column names of a given temporary view."""
cursor = connection.cursor()
cursor.execute(f"PRAGMA table_xinfo('{view_name}')")
return [row[1] for row in cursor.fetchall()]
def make_temp_view_query(view_name, query) -> str:
return "CREATE TEMPORARY VIEW IF NOT EXISTS `{}` AS {}".format(view_name, query)
def export_view(
connection: RocpdImportData, view_name, output_format, output_path, filename=""
) -> None:
"""Write the contents of a SQL view to an output format."""
query = "SELECT * FROM `{}`".format(view_name)
query_one = "SELECT * FROM `{}` LIMIT 1".format(view_name)
# just return if view is empty
if not connection.execute(query_one).fetchone():
return
# prepare the output filename
if not filename:
output_filename = view_name
else:
output_filename = f"{filename}_{view_name}"
if output_format == "console":
print(f"\n{view_name.upper()}:")
# call query module to export. query will append the extension
export_path = os.path.join(output_path, output_filename)
export_sqlite_query(
connection, query, export_format=output_format, export_path=export_path
)
def generate_summary_query(
view_name: str,
name_column="name",
by_rank=False,
) -> Tuple[str, str]:
"""Generate the SQL statement to create a summary view."""
if by_rank:
view_suffix = "_summary_by_rank"
group_by_columns = "guid, {name_column}".format(name_column=name_column)
aggregation_group_by = "T.guid, T.nid, T.{name_column}".format(
name_column=name_column
)
total_duration_group_by = "guid"
additional_select_columns = "AD.pid AS ProcessID, P.hostname AS Hostname,"
additional_aggregated_columns = """
T.guid,
T.nid,
T.pid,"""
join_condition = "T.guid = A.guid AND T.{name_column} = A.name".format(
name_column=name_column
)
total_duration_join = "JOIN total_duration TD ON AD.guid = TD.guid JOIN processes P ON AD.pid = P.pid"
else:
view_suffix = "_summary"
group_by_columns = name_column
aggregation_group_by = "T.{name_column}".format(name_column=name_column)
total_duration_group_by = ""
additional_select_columns = ""
additional_aggregated_columns = ""
join_condition = "T.{name_column} = A.name".format(name_column=name_column)
total_duration_join = "CROSS JOIN total_duration TD"
full_view_name = f"{view_name}{view_suffix}"
summary_query = f"""
WITH
avg_data AS (
SELECT
{group_by_columns.replace(name_column, f"{name_column} AS name")},
AVG(duration) AS avg_duration
FROM {view_name}
GROUP BY {group_by_columns}
),
aggregated_data AS (
SELECT{additional_aggregated_columns}
T.{name_column} as name,
COUNT(*) AS calls,
SUM(T.duration) AS total_duration,
A.avg_duration AS average_duration,
MIN(T.duration) AS min_duration,
MAX(T.duration) AS max_duration,
SQRT(SUM(CAST((T.duration - A.avg_duration) AS REAL) * CAST((T.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1)) AS std_dev_duration
FROM {view_name} T
JOIN avg_data A ON {join_condition}
GROUP BY {aggregation_group_by}
),
total_duration AS (
SELECT
{f"{total_duration_group_by}," if total_duration_group_by else ""}
SUM(total_duration) AS grand_total_duration
FROM
aggregated_data
{f"GROUP BY {total_duration_group_by}" if total_duration_group_by else ""}
)
SELECT
{additional_select_columns}
AD.name AS Name,
AD.calls AS Calls,
AD.total_duration AS "DURATION (nsec)",
AD.average_duration AS "AVERAGE (nsec)",
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
AD.min_duration AS "MIN (nsec)",
AD.max_duration AS "MAX (nsec)",
AD.std_dev_duration AS "STD_DEV"
FROM
aggregated_data AD
{total_duration_join}
ORDER BY
{"AD.pid," if by_rank else ""} AD.total_duration DESC;
"""
return (full_view_name, summary_query)
def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[str, str]:
"""Generate the SQL statement for domain summary by doing union over all summary views."""
if by_rank:
view_suffix = "_summary_by_rank"
view_name = "domain_summary_by_rank"
additional_group_columns = "ProcessID, Hostname,"
additional_select_columns = "GD.ProcessID, GD.Hostname,"
total_duration_group_by = "GROUP BY ProcessID"
join_condition = "JOIN total_duration TD ON GD.ProcessID = TD.ProcessID"
order_by = "ORDER BY GD.ProcessID"
else:
view_suffix = "_summary"
view_name = "domain_summary"
additional_group_columns = ""
additional_select_columns = ""
total_duration_group_by = ""
join_condition = "CROSS JOIN total_duration TD"
order_by = 'ORDER BY GD."DURATION (nsec)" DESC'
summary_views = [
itr for itr in get_temp_view_names(connection) if itr.endswith(view_suffix)
]
if len(summary_views) < 1:
return view_name
union_selects = [
f" SELECT '{s.replace(view_suffix, '').upper()}' as domain, * FROM {s} "
for s in summary_views
]
domain_select = f"""
WITH
all_domains AS (
{f" UNION ALL ".join(union_selects)}
),
grouped_domains AS (
SELECT
domain,
{additional_group_columns}
SUM(calls) AS calls,
SUM("DURATION (nsec)") AS "DURATION (nsec)",
SUM("AVERAGE (nsec)") AS "AVERAGE (nsec)",
MIN("MIN (nsec)") AS "MIN (nsec)",
MAX("MAX (nsec)") AS "MAX (nsec)",
SUM("STD_DEV") AS "STD_DEV"
FROM all_domains
GROUP BY domain{", ProcessID" if by_rank else ""}
),
total_duration AS (
SELECT
{additional_group_columns}
SUM("DURATION (nsec)") AS grand_total_duration
FROM grouped_domains
{total_duration_group_by}
)
SELECT
{additional_select_columns}
GD.domain AS Name,
GD.calls AS Calls,
GD."DURATION (nsec)",
GD."AVERAGE (nsec)",
(CAST(GD."DURATION (nsec)" AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
GD."MIN (nsec)",
GD."MAX (nsec)",
GD."STD_DEV"
FROM
grouped_domains GD
{join_condition}
{order_by};
"""
return (view_name, domain_select)
def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
"""Create summary views for eligible temporary views in the database."""
NAME_COLUMN_MAP = {
"memory_allocations": "type",
"scratch_memory": "operation",
}
avoid_view_pattern = ("rocpd", "region", "counter", "pmc")
required_columns = {"duration"}
views = get_temp_view_names(connection)
for view_name in views:
if any(pattern in view_name for pattern in avoid_view_pattern):
continue
columns = get_temp_view_columns(connection, view_name)
if not required_columns.issubset(columns):
continue
# Create regular summary view
summary_view_name, summary_query = generate_summary_query(
view_name, name_column=NAME_COLUMN_MAP.get(view_name, "name")
)
connection.execute(make_temp_view_query(summary_view_name, summary_query))
# Create per-rank summary
if by_rank:
per_rank_view_name, summary_by_rank_query = generate_summary_query(
view_name,
name_column=NAME_COLUMN_MAP.get(view_name, "name"),
by_rank=True,
)
connection.execute(
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
)
def create_summary_region_views(
connection: RocpdImportData, by_rank=False, region_categories=None
) -> None:
"""Create summary and region views"""
query = "SELECT DISTINCT(category) FROM regions_and_samples;"
categories = execute_statement(connection, query).fetchall()
if region_categories is None:
# Automatically retrieve region categories from the database
region_categories = set([cat[0].split("_")[0] for cat in categories])
category_map = {
cat.lower(): [c[0] for c in categories if c[0].startswith(cat + "_")]
for cat in region_categories
if "MARKER" not in cat.upper()
}
for k, v in category_map.items():
if len(v) > 0:
conditions = [f"category LIKE '{c}'" for c in v]
temp_region_view = f"""
CREATE TEMPORARY VIEW IF NOT EXISTS `{k}` AS
SELECT *
FROM regions_and_samples
WHERE {" OR ".join(conditions)};
"""
connection.execute(temp_region_view)
# Create regular summary view
summary_view_name, summary_query = generate_summary_query(k)
connection.execute(make_temp_view_query(summary_view_name, summary_query))
# Create per-rank summary view
if by_rank:
per_rank_view_name, summary_by_rank_query = generate_summary_query(
k, by_rank=True
)
connection.execute(
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
)
# Markers
if "MARKER" not in region_categories:
return
view_name = "markers"
markers_create = f"""
CREATE TEMPORARY VIEW IF NOT EXISTS `{view_name}` AS
SELECT JSON_EXTRACT(extdata, '$.message') AS marker_name, *
FROM regions_and_samples
WHERE category LIKE 'MARKER_%'
"""
connection.execute(markers_create)
# Create regular summary view
summary_view_name, summary_query = generate_summary_query(
view_name, name_column="marker_name"
)
connection.execute(make_temp_view_query(summary_view_name, summary_query))
# Create per-rank summary view
if by_rank:
per_rank_view_name, summary_by_rank_query = generate_summary_query(
view_name, name_column="marker_name", by_rank=True
)
connection.execute(
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
)
def create_domain_view(connection: RocpdImportData, by_rank=False) -> str:
"""Create a domain summary view by aggregating all summary views."""
view_name, domain_query = generate_domain_query(connection, by_rank=by_rank)
# Create the domain summary view
connection.execute(make_temp_view_query(view_name, domain_query))
return view_name
def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
"""Generate all summary views and write them to CSV files."""
domain_summary = kwargs.get("domain_summary", False)
by_rank = kwargs.get("summary_by_rank", False)
filename = kwargs.get("output_file", "")
output_path = kwargs.get("output_path", "./rocpd-output-data")
region_categories = kwargs.get("region_categories", None)
output_format = kwargs.get("format", "console")
# create the temporary summary views
create_summary_views(connection, by_rank)
create_summary_region_views(connection, by_rank, region_categories=region_categories)
if domain_summary:
create_domain_view(connection)
# Create domain summary per rank only if both domain_summary and summary_by_rank are enabled
if by_rank:
create_domain_view(connection, by_rank=True)
# Write regular summary views
print("\nSummary files:")
summary_views = [
itr for itr in get_temp_view_names(connection) if itr.endswith("_summary")
]
for v in summary_views:
export_view(connection, v, output_format, output_path, filename)
# Write per-rank summary views if flag is set
if by_rank:
print("\nSummary files by rank:")
summary_by_rank_views = [
itr
for itr in get_temp_view_names(connection)
if itr.endswith("_summary_by_rank")
]
for v in summary_by_rank_views:
export_view(connection, v, output_format, output_path, filename)
#
# Command-line interface functions
#
def add_io_args(parser):
"""Add input/output arguments for summary."""
io_options = parser.add_argument_group("I/O options")
io_options.add_argument(
"-f",
"--format",
help="Sets the format the summaries are output to (default: console)",
choices=("console", "csv", "html", "json", "md", "pdf"),
default="console",
type=str,
required=False,
)
io_options.add_argument(
"-o",
"--output-file",
help="Sets the base output file name",
default=os.environ.get("ROCPD_OUTPUT_NAME", ""),
type=str,
required=False,
)
io_options.add_argument(
"-d",
"--output-path",
help="Sets the output path where the output files will be saved (default path: `./rocpd-output-data`)",
default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"),
type=str,
required=False,
)
return ["format", "output_file", "output_path"]
def add_args(parser):
"""Add arguments for summary."""
summary_options = parser.add_argument_group("Summary options")
summary_options.add_argument(
"--domain-summary",
action="store_true",
default=False,
help="Generate domain summary view",
)
summary_options.add_argument(
"--summary-by-rank",
action="store_true",
default=False,
help="Generate summary views by-rank (or Process ID)",
)
summary_options.add_argument(
"--region-categories",
nargs="+",
default=None,
help="Specify region categories to include in the summary (example: HIP, HSA, RCCL, ROCDECODE, ROCJPEG, MARKER). If not specified, categories will be automatically retrieved from the database.",
)
return ["domain_summary", "summary_by_rank", "region_categories"]
def process_args(args, valid_args):
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
def execute(input, window_args=None, **kwargs: Any) -> RocpdImportData:
from .time_window import apply_time_window
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
generate_all_summaries(importData, **kwargs)
return importData
def main(argv=None) -> int:
"""Main entry point for command line execution."""
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
parser = argparse.ArgumentParser(
description="Create ROCpd database summary region views"
)
required_params = parser.add_argument_group("Required options")
required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_io_args = add_io_args(parser)
valid_summary_args = add_args(parser)
valid_time_window_args = add_args_time_window(parser)
args = parser.parse_args(argv)
summary_args = process_args(args, valid_summary_args)
io_args = output_config.process_args(args, valid_io_args)
window_args = process_args_time_window(args, valid_time_window_args)
all_args = {**summary_args, **io_args}
execute(
args.input,
window_args=window_args,
**all_args,
)
if __name__ == "__main__":
main()
@@ -38,8 +38,24 @@ __all__ = [
"nameOsThread",
"nameHipDevice",
"context_decorators",
"version_info",
]
version_info = {
"version": "@PROJECT_VERSION@",
"major": int("@PROJECT_VERSION_MAJOR@"),
"minor": int("@PROJECT_VERSION_MINOR@"),
"patch": int("@PROJECT_VERSION_PATCH@"),
"git_revision": "@ROCPROFILER_SDK_GIT_REVISION@",
"library_arch": "@CMAKE_LIBRARY_ARCHITECTURE@",
"system_name": "@CMAKE_SYSTEM_NAME@",
"system_processor": "@CMAKE_SYSTEM_PROCESSOR@",
"system_version": "@CMAKE_SYSTEM_VERSION@",
"compiler_id": "@CMAKE_CXX_COMPILER_ID@",
"compiler_version": "@CMAKE_CXX_COMPILER_VERSION@",
"rocm_version": "@rocm_version_FULL_VERSION@",
}
def mark(msg):
return libpyroctx.roctxMark(msg) if msg is not None else None
@@ -97,7 +97,7 @@ function(rocprofiler_roctx_python_bindings _VERSION)
foreach(_SOURCE ${roctx_PYTHON_SOURCES})
configure_file(${CMAKE_CURRENT_LIST_DIR}/${_SOURCE}
${roctx_PYTHON_OUTPUT_DIRECTORY}/${_SOURCE} COPYONLY)
${roctx_PYTHON_OUTPUT_DIRECTORY}/${_SOURCE} @ONLY)
install(
FILES ${roctx_PYTHON_OUTPUT_DIRECTORY}/${_SOURCE}
DESTINATION ${roctx_PYTHON_INSTALL_DIRECTORY}
@@ -154,12 +154,14 @@ function(rocprofiler_rocpd_python_bindings _VERSION)
output_config.py
otf2.py
pftrace.py
query.py
schema.py
summary.py
time_window.py)
foreach(_SOURCE ${rocpd_PYTHON_SOURCES})
configure_file(${CMAKE_CURRENT_LIST_DIR}/${_SOURCE}
${rocpd_PYTHON_OUTPUT_DIRECTORY}/${_SOURCE} COPYONLY)
${rocpd_PYTHON_OUTPUT_DIRECTORY}/${_SOURCE} @ONLY)
install(
FILES ${rocpd_PYTHON_OUTPUT_DIRECTORY}/${_SOURCE}
DESTINATION ${rocpd_PYTHON_INSTALL_DIRECTORY}
@@ -596,6 +596,7 @@ SELECT
JSON_EXTRACT(M.extdata, '$.flags') AS alloc_flags,
M.start,
M.end,
(M.end - M.start) AS duration,
M.size,
M.address,
E.correlation_id,
@@ -151,226 +151,3 @@ GROUP BY
name
ORDER BY
total_duration DESC;
-- Kernel summary by name
CREATE VIEW
`kernel_summary` AS
WITH
avg_data AS (
SELECT
name,
AVG(duration) AS avg_duration
FROM
`kernels`
GROUP BY
name
),
aggregated_data AS (
SELECT
K.name,
COUNT(*) AS calls,
SUM(K.duration) AS total_duration,
SUM(CAST(K.duration AS REAL) * CAST(K.duration AS REAL)) AS sqr_duration,
A.avg_duration AS average_duration,
MIN(K.duration) AS min_duration,
MAX(K.duration) AS max_duration,
SUM(CAST((K.duration - A.avg_duration) AS REAL) * CAST((K.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1) AS variance_duration,
SQRT(
SUM(CAST((K.duration - A.avg_duration) AS REAL) * CAST((K.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1)
) AS std_dev_duration
FROM
`kernels` K
JOIN avg_data A ON K.name = A.name
GROUP BY
K.name
),
total_duration AS (
SELECT
SUM(total_duration) AS grand_total_duration
FROM
aggregated_data
)
SELECT
AD.name AS name,
AD.calls,
AD.total_duration AS "DURATION (nsec)",
AD.sqr_duration AS "SQR (nsec)",
AD.average_duration AS "AVERAGE (nsec)",
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
AD.min_duration AS "MIN (nsec)",
AD.max_duration AS "MAX (nsec)",
AD.variance_duration AS "VARIANCE",
AD.std_dev_duration AS "STD_DEV"
FROM
aggregated_data AD
CROSS JOIN total_duration TD;
--
-- Kernel summary by region name
CREATE VIEW
`kernel_summary_region` AS
WITH
avg_data AS (
SELECT
region,
AVG(duration) AS avg_duration
FROM
`kernels`
GROUP BY
region
),
aggregated_data AS (
SELECT
K.region AS name,
COUNT(*) AS calls,
SUM(K.duration) AS total_duration,
SUM(CAST(K.duration AS REAL) * CAST(K.duration AS REAL)) AS sqr_duration,
A.avg_duration AS average_duration,
MIN(K.duration) AS min_duration,
MAX(K.duration) AS max_duration,
SUM(CAST((K.duration - A.avg_duration) AS REAL) * CAST((K.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1) AS variance_duration,
SQRT(
SUM(CAST((K.duration - A.avg_duration) AS REAL) * CAST((K.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1)
) AS std_dev_duration
FROM
`kernels` K
JOIN avg_data A ON K.region = A.region
GROUP BY
K.region
),
total_duration AS (
SELECT
SUM(total_duration) AS grand_total_duration
FROM
aggregated_data
)
SELECT
AD.name AS name,
AD.calls,
AD.total_duration AS "DURATION (nsec)",
AD.sqr_duration AS "SQR (nsec)",
AD.average_duration AS "AVERAGE (nsec)",
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
AD.min_duration AS "MIN (nsec)",
AD.max_duration AS "MAX (nsec)",
AD.variance_duration AS "VARIANCE",
AD.std_dev_duration AS "STD_DEV"
FROM
aggregated_data AD
CROSS JOIN total_duration TD;
--
-- Memory copy summary
CREATE VIEW
`memory_copy_summary` AS
WITH
avg_data AS (
SELECT
name,
AVG(duration) AS avg_duration
FROM
`memory_copies`
GROUP BY
name
),
aggregated_data AS (
SELECT
MC.name,
COUNT(*) AS calls,
SUM(MC.duration) AS total_duration,
SUM(CAST(MC.duration AS REAL) * CAST(MC.duration AS REAL)) AS sqr_duration,
A.avg_duration AS average_duration,
MIN(MC.duration) AS min_duration,
MAX(MC.duration) AS max_duration,
SUM(
CAST((MC.duration - A.avg_duration) AS REAL) * CAST((MC.duration - A.avg_duration) AS REAL)
) / (COUNT(*) - 1) AS variance_duration,
SQRT(
SUM(
CAST((MC.duration - A.avg_duration) AS REAL) * CAST((MC.duration - A.avg_duration) AS REAL)
) / (COUNT(*) - 1)
) AS std_dev_duration
FROM
`memory_copies` MC
JOIN avg_data A ON MC.name = A.name
GROUP BY
MC.name
),
total_duration AS (
SELECT
SUM(total_duration) AS grand_total_duration
FROM
aggregated_data
)
SELECT
AD.name AS name,
AD.calls,
AD.total_duration AS "DURATION (nsec)",
AD.sqr_duration AS "SQR (nsec)",
AD.average_duration AS "AVERAGE (nsec)",
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
AD.min_duration AS "MIN (nsec)",
AD.max_duration AS "MAX (nsec)",
AD.variance_duration AS "VARIANCE",
AD.std_dev_duration AS "STD_DEV"
FROM
aggregated_data AD
CROSS JOIN total_duration TD;
--
-- Memory allocation summary
CREATE VIEW
`memory_allocation_summary` AS
WITH
avg_data AS (
SELECT
type AS name,
AVG(duration) AS avg_duration
FROM
`memory_allocations`
GROUP BY
type
),
aggregated_data AS (
SELECT
MA.type AS name,
COUNT(*) AS calls,
SUM(MA.duration) AS total_duration,
SUM(CAST(MA.duration AS REAL) * CAST(MA.duration AS REAL)) AS sqr_duration,
A.avg_duration AS average_duration,
MIN(MA.duration) AS min_duration,
MAX(MA.duration) AS max_duration,
SUM(
CAST((MA.duration - A.avg_duration) AS REAL) * CAST((MA.duration - A.avg_duration) AS REAL)
) / (COUNT(*) - 1) AS variance_duration,
SQRT(
SUM(
CAST((MA.duration - A.avg_duration) AS REAL) * CAST((MA.duration - A.avg_duration) AS REAL)
) / (COUNT(*) - 1)
) AS std_dev_duration
FROM
`memory_allocations` MA
JOIN avg_data A ON MA.type = A.name
GROUP BY
MA.type
),
total_duration AS (
SELECT
SUM(total_duration) AS grand_total_duration
FROM
aggregated_data
)
SELECT
'MEMORY_ALLOCATION_' || AD.name AS name,
AD.calls,
AD.total_duration AS "DURATION (nsec)",
AD.sqr_duration AS "SQR (nsec)",
AD.average_duration AS "AVERAGE (nsec)",
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
AD.min_duration AS "MIN (nsec)",
AD.max_duration AS "MAX (nsec)",
AD.variance_duration AS "VARIANCE",
AD.std_dev_duration AS "STD_DEV"
FROM
aggregated_data AD
CROSS JOIN total_duration TD;
@@ -22,7 +22,7 @@ set(rocpd-env
#
#########################################################################################
foreach(_SUBPARSER "" "-convert")
foreach(_SUBPARSER "" "-convert" "-query" "-summary")
string(REPLACE "-" "" _CMD "${_SUBPARSER}")
add_test(NAME rocpd${_SUBPARSER}-help COMMAND ${Python3_EXECUTABLE} -m rocpd ${_CMD}
--help)
@@ -34,7 +34,7 @@ foreach(_SUBPARSER "" "-convert")
"${ROCPROFILER_DEFAULT_FAIL_REGEX}")
endforeach()
foreach(_MODULE "csv" "pftrace" "otf2")
foreach(_MODULE "csv" "pftrace" "otf2" "query" "summary")
add_test(NAME rocpd-module-${_MODULE}-help COMMAND ${Python3_EXECUTABLE} -m
rocpd.${_MODULE} --help)
@@ -222,6 +222,58 @@ set_tests_properties(
FIXTURES_REQUIRED
rocprofv3-test-rocpd)
#########################################################################################
#
# Summary generate
#
#########################################################################################
add_test(
NAME rocprofv3-test-rocpd-summary-generation
COMMAND
${Python3_EXECUTABLE} -m rocpd summary --domain-summary --summary-by-rank -f csv
-d ${CMAKE_CURRENT_BINARY_DIR}/rocpd-output-data/summary -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/out_results.db)
set_tests_properties(
rocprofv3-test-rocpd-summary-generation
PROPERTIES TIMEOUT
45
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-execute"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_SETUP
rocprofv3-test-rocpd-generation
FIXTURES_REQUIRED
rocprofv3-test-rocpd)
add_test(
NAME rocprofv3-test-rocpd-summary-generation-multiproc
COMMAND
${Python3_EXECUTABLE} -m rocpd summary --domain-summary --summary-by-rank -f csv
-d ${CMAKE_CURRENT_BINARY_DIR}/rocpd-output-test/summary -o out_mp -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/out_mp_0_results.db
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/out_mp_1_results.db)
set_tests_properties(
rocprofv3-test-rocpd-summary-generation-multiproc
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
DISABLED
"${MULTIPROC_IS_DISABLED}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd-multiproc)
#########################################################################################
#
# Validation