28977999ad
* Switching from RHEL 8 to RHEL 9 * Switching from RHEL 8 to RHEL 9 * Switching from RHEL 8 to RHEL 9 * Fixing ROCPD for older Python Versions * Formatting and checking if SQRT is available or not * Formatting and checking if SQRT is available or not * Formatting and checking if SQRT is available or not * Formatting and checking if SQRT is available or not * Formatting and checking if SQRT is available or not * Update projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update summary.py * Update summary.py * Update projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update summary.py * Update summary.py --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
567 строки
19 KiB
Python
567 строки
19 KiB
Python
#!/usr/bin/env python3
|
|
###############################################################################
|
|
# MIT License
|
|
#
|
|
# Copyright (c) 2025 Advanced Micro Devices, Inc.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import argparse
|
|
import os
|
|
import math
|
|
|
|
from typing import Any, List, Tuple
|
|
from .importer import RocpdImportData, execute_statement
|
|
from .query import export_sqlite_query
|
|
from . import output_config
|
|
|
|
|
|
def check_function_availability(connection, function_name):
|
|
"""
|
|
Checks if a given function exists in the SQLite database.
|
|
|
|
Args:
|
|
connection (sqlite3 db connection): The SQLite database connection handler.
|
|
function_name (str): The name of the function to check.
|
|
|
|
Returns:
|
|
bool: True if the function exists, False otherwise.
|
|
"""
|
|
cursor = connection.cursor()
|
|
|
|
# Query pragma_function_list to check for the function
|
|
cursor.execute(
|
|
"SELECT EXISTS(SELECT 1 FROM pragma_function_list WHERE name=?)", (function_name,)
|
|
)
|
|
result = cursor.fetchone()[0]
|
|
|
|
return bool(result)
|
|
|
|
|
|
def get_temp_view_names(connection: RocpdImportData) -> List[str]:
|
|
"""Return the names of all temporary views in the SQLite connection."""
|
|
return [
|
|
v[0]
|
|
for v in execute_statement(
|
|
connection, "SELECT name FROM sqlite_temp_master WHERE type='view';"
|
|
).fetchall()
|
|
]
|
|
|
|
|
|
def get_temp_view_columns(connection: RocpdImportData, view_name: str) -> List[str]:
|
|
"""Return the column names of a given temporary view."""
|
|
cursor = connection.cursor()
|
|
cursor.execute(f"PRAGMA table_xinfo('{view_name}')")
|
|
return [row[1] for row in cursor.fetchall()]
|
|
|
|
|
|
def make_temp_view_query(view_name, query) -> str:
|
|
return "CREATE TEMPORARY VIEW IF NOT EXISTS `{}` AS {}".format(view_name, query)
|
|
|
|
|
|
def export_view(
|
|
connection: RocpdImportData, view_name, output_format, output_path, filename=""
|
|
) -> None:
|
|
"""Write the contents of a SQL view to an output format."""
|
|
|
|
query = "SELECT * FROM `{}`".format(view_name)
|
|
query_one = "SELECT * FROM `{}` LIMIT 1".format(view_name)
|
|
|
|
# just return if view is empty
|
|
if not connection.execute(query_one).fetchone():
|
|
return
|
|
|
|
# prepare the output filename
|
|
if not filename:
|
|
output_filename = view_name
|
|
else:
|
|
output_filename = f"{filename}_{view_name}"
|
|
|
|
if output_format == "console":
|
|
print(f"\n{view_name.upper()}:")
|
|
|
|
# call query module to export. query will append the extension
|
|
export_path = os.path.join(output_path, output_filename)
|
|
export_sqlite_query(
|
|
connection, query, export_format=output_format, export_path=export_path
|
|
)
|
|
|
|
|
|
def generate_summary_query(
|
|
view_name: str,
|
|
name_column="name",
|
|
by_rank=False,
|
|
) -> Tuple[str, str]:
|
|
"""Generate the SQL statement to create a summary view."""
|
|
|
|
if by_rank:
|
|
view_suffix = "_summary_by_rank"
|
|
group_by_columns = "guid, {name_column}".format(name_column=name_column)
|
|
aggregation_group_by = "T.guid, T.nid, T.{name_column}".format(
|
|
name_column=name_column
|
|
)
|
|
total_duration_group_by = "guid"
|
|
additional_select_columns = "AD.pid AS ProcessID, P.hostname AS Hostname,"
|
|
additional_aggregated_columns = """
|
|
T.guid,
|
|
T.nid,
|
|
T.pid,"""
|
|
join_condition = "T.guid = A.guid AND T.{name_column} = A.name".format(
|
|
name_column=name_column
|
|
)
|
|
total_duration_join = "JOIN total_duration TD ON AD.guid = TD.guid JOIN processes P ON AD.pid = P.pid"
|
|
else:
|
|
view_suffix = "_summary"
|
|
group_by_columns = name_column
|
|
aggregation_group_by = "T.{name_column}".format(name_column=name_column)
|
|
total_duration_group_by = ""
|
|
additional_select_columns = ""
|
|
additional_aggregated_columns = ""
|
|
join_condition = "T.{name_column} = A.name".format(name_column=name_column)
|
|
total_duration_join = "CROSS JOIN total_duration TD"
|
|
|
|
full_view_name = f"{view_name}{view_suffix}"
|
|
|
|
summary_query = f"""
|
|
WITH
|
|
avg_data AS (
|
|
SELECT
|
|
{group_by_columns.replace(name_column, f"{name_column} AS name")},
|
|
AVG(duration) AS avg_duration
|
|
FROM {view_name}
|
|
GROUP BY {group_by_columns}
|
|
),
|
|
aggregated_data AS (
|
|
SELECT{additional_aggregated_columns}
|
|
T.{name_column} as name,
|
|
COUNT(*) AS calls,
|
|
SUM(T.duration) AS total_duration,
|
|
A.avg_duration AS average_duration,
|
|
MIN(T.duration) AS min_duration,
|
|
MAX(T.duration) AS max_duration,
|
|
SQRT(SUM(CAST((T.duration - A.avg_duration) AS REAL) * CAST((T.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1)) AS std_dev_duration
|
|
FROM {view_name} T
|
|
JOIN avg_data A ON {join_condition}
|
|
GROUP BY {aggregation_group_by}
|
|
),
|
|
total_duration AS (
|
|
SELECT
|
|
{f"{total_duration_group_by}," if total_duration_group_by else ""}
|
|
SUM(total_duration) AS grand_total_duration
|
|
FROM
|
|
aggregated_data
|
|
{f"GROUP BY {total_duration_group_by}" if total_duration_group_by else ""}
|
|
)
|
|
SELECT
|
|
{additional_select_columns}
|
|
AD.name AS Name,
|
|
AD.calls AS Calls,
|
|
AD.total_duration AS "DURATION (nsec)",
|
|
AD.average_duration AS "AVERAGE (nsec)",
|
|
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
|
|
AD.min_duration AS "MIN (nsec)",
|
|
AD.max_duration AS "MAX (nsec)",
|
|
AD.std_dev_duration AS "STD_DEV"
|
|
FROM
|
|
aggregated_data AD
|
|
{total_duration_join}
|
|
ORDER BY
|
|
{"AD.pid," if by_rank else ""} AD.total_duration DESC;
|
|
"""
|
|
|
|
return (full_view_name, summary_query)
|
|
|
|
|
|
def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[str, str]:
|
|
"""Generate the SQL statement for domain summary by doing union over all summary views."""
|
|
|
|
if by_rank:
|
|
view_suffix = "_summary_by_rank"
|
|
view_name = "domain_summary_by_rank"
|
|
additional_group_columns = "ProcessID, Hostname,"
|
|
additional_select_columns = "GD.ProcessID, GD.Hostname,"
|
|
total_duration_group_by = "GROUP BY ProcessID"
|
|
join_condition = "JOIN total_duration TD ON GD.ProcessID = TD.ProcessID"
|
|
order_by = "ORDER BY GD.ProcessID"
|
|
else:
|
|
view_suffix = "_summary"
|
|
view_name = "domain_summary"
|
|
additional_group_columns = ""
|
|
additional_select_columns = ""
|
|
total_duration_group_by = ""
|
|
join_condition = "CROSS JOIN total_duration TD"
|
|
order_by = 'ORDER BY GD."DURATION (nsec)" DESC'
|
|
|
|
summary_views = [
|
|
itr for itr in get_temp_view_names(connection) if itr.endswith(view_suffix)
|
|
]
|
|
|
|
if len(summary_views) < 1:
|
|
return view_name
|
|
|
|
union_selects = [
|
|
f" SELECT '{s.replace(view_suffix, '').upper()}' as domain, * FROM {s} "
|
|
for s in summary_views
|
|
]
|
|
|
|
domain_select = f"""
|
|
WITH
|
|
all_domains AS (
|
|
{f" UNION ALL ".join(union_selects)}
|
|
),
|
|
grouped_domains AS (
|
|
SELECT
|
|
domain,
|
|
{additional_group_columns}
|
|
SUM(calls) AS calls,
|
|
SUM("DURATION (nsec)") AS "DURATION (nsec)",
|
|
SUM("AVERAGE (nsec)") AS "AVERAGE (nsec)",
|
|
MIN("MIN (nsec)") AS "MIN (nsec)",
|
|
MAX("MAX (nsec)") AS "MAX (nsec)",
|
|
SUM("STD_DEV") AS "STD_DEV"
|
|
FROM all_domains
|
|
GROUP BY domain{", ProcessID" if by_rank else ""}
|
|
),
|
|
total_duration AS (
|
|
SELECT
|
|
{additional_group_columns}
|
|
SUM("DURATION (nsec)") AS grand_total_duration
|
|
FROM grouped_domains
|
|
{total_duration_group_by}
|
|
)
|
|
SELECT
|
|
{additional_select_columns}
|
|
GD.domain AS Name,
|
|
GD.calls AS Calls,
|
|
GD."DURATION (nsec)",
|
|
GD."AVERAGE (nsec)",
|
|
(CAST(GD."DURATION (nsec)" AS REAL) / TD.grand_total_duration) * 100 AS "PERCENT (INC)",
|
|
GD."MIN (nsec)",
|
|
GD."MAX (nsec)",
|
|
GD."STD_DEV"
|
|
FROM
|
|
grouped_domains GD
|
|
{join_condition}
|
|
{order_by};
|
|
"""
|
|
|
|
return (view_name, domain_select)
|
|
|
|
|
|
def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
|
|
"""Create summary views for eligible temporary views in the database."""
|
|
|
|
NAME_COLUMN_MAP = {
|
|
"memory_allocations": "type",
|
|
"scratch_memory": "operation",
|
|
}
|
|
|
|
avoid_view_pattern = ("rocpd", "region", "counter", "pmc")
|
|
required_columns = {"duration"}
|
|
|
|
views = get_temp_view_names(connection)
|
|
|
|
for view_name in views:
|
|
if any(pattern in view_name for pattern in avoid_view_pattern):
|
|
continue
|
|
|
|
columns = get_temp_view_columns(connection, view_name)
|
|
if not required_columns.issubset(columns):
|
|
continue
|
|
|
|
# Create regular summary view
|
|
summary_view_name, summary_query = generate_summary_query(
|
|
view_name, name_column=NAME_COLUMN_MAP.get(view_name, "name")
|
|
)
|
|
connection.execute(make_temp_view_query(summary_view_name, summary_query))
|
|
|
|
# Create per-rank summary
|
|
if by_rank:
|
|
per_rank_view_name, summary_by_rank_query = generate_summary_query(
|
|
view_name,
|
|
name_column=NAME_COLUMN_MAP.get(view_name, "name"),
|
|
by_rank=True,
|
|
)
|
|
connection.execute(
|
|
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
|
|
)
|
|
|
|
|
|
def create_summary_region_views(
|
|
connection: RocpdImportData, by_rank=False, region_categories=None
|
|
) -> None:
|
|
"""Create summary and region views"""
|
|
|
|
query = "SELECT DISTINCT(category) FROM regions_and_samples;"
|
|
categories = execute_statement(connection, query).fetchall()
|
|
|
|
if region_categories is None:
|
|
# Automatically retrieve region categories from the database
|
|
region_categories = set([cat[0].split("_")[0] for cat in categories])
|
|
|
|
category_map = {
|
|
cat.lower(): [c[0] for c in categories if c[0].startswith(cat + "_")]
|
|
for cat in region_categories
|
|
if "MARKER" not in cat.upper()
|
|
}
|
|
|
|
for k, v in category_map.items():
|
|
if len(v) > 0:
|
|
conditions = [f"category LIKE '{c}'" for c in v]
|
|
temp_region_view = f"""
|
|
CREATE TEMPORARY VIEW IF NOT EXISTS `{k}` AS
|
|
SELECT *
|
|
FROM regions_and_samples
|
|
WHERE {" OR ".join(conditions)};
|
|
"""
|
|
|
|
connection.execute(temp_region_view)
|
|
|
|
# Create regular summary view
|
|
summary_view_name, summary_query = generate_summary_query(k)
|
|
connection.execute(make_temp_view_query(summary_view_name, summary_query))
|
|
|
|
# Create per-rank summary view
|
|
if by_rank:
|
|
per_rank_view_name, summary_by_rank_query = generate_summary_query(
|
|
k, by_rank=True
|
|
)
|
|
connection.execute(
|
|
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
|
|
)
|
|
|
|
# Markers
|
|
if "MARKER" not in region_categories:
|
|
return
|
|
|
|
view_name = "markers"
|
|
markers_create = f"""
|
|
CREATE TEMPORARY VIEW IF NOT EXISTS `{view_name}` AS
|
|
SELECT JSON_EXTRACT(extdata, '$.message') AS marker_name, *
|
|
FROM regions_and_samples
|
|
WHERE category LIKE 'MARKER_%'
|
|
"""
|
|
connection.execute(markers_create)
|
|
|
|
# Create regular summary view
|
|
summary_view_name, summary_query = generate_summary_query(
|
|
view_name, name_column="marker_name"
|
|
)
|
|
connection.execute(make_temp_view_query(summary_view_name, summary_query))
|
|
|
|
# Create per-rank summary view
|
|
if by_rank:
|
|
per_rank_view_name, summary_by_rank_query = generate_summary_query(
|
|
view_name, name_column="marker_name", by_rank=True
|
|
)
|
|
connection.execute(
|
|
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
|
|
)
|
|
|
|
|
|
def create_domain_view(connection: RocpdImportData, by_rank=False) -> str:
|
|
"""Create a domain summary view by aggregating all summary views."""
|
|
|
|
view_name, domain_query = generate_domain_query(connection, by_rank=by_rank)
|
|
|
|
# Create the domain summary view
|
|
connection.execute(make_temp_view_query(view_name, domain_query))
|
|
|
|
return view_name
|
|
|
|
|
|
def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
|
|
"""Generate all summary views and write them to CSV files."""
|
|
|
|
domain_summary = kwargs.get("domain_summary", False)
|
|
by_rank = kwargs.get("summary_by_rank", False)
|
|
filename = kwargs.get("output_file", "")
|
|
output_path = kwargs.get("output_path", "./rocpd-output-data")
|
|
region_categories = kwargs.get("region_categories", None)
|
|
output_format = kwargs.get("format", "console")
|
|
|
|
if not check_function_availability(connection, "sqrt"):
|
|
connection.create_function(
|
|
"sqrt",
|
|
1,
|
|
lambda x: (
|
|
math.sqrt(x)
|
|
if x is not None and isinstance(x, (int, float)) and x >= 0
|
|
else None
|
|
),
|
|
)
|
|
|
|
# create the temporary summary views
|
|
create_summary_views(connection, by_rank)
|
|
create_summary_region_views(connection, by_rank, region_categories=region_categories)
|
|
|
|
if domain_summary:
|
|
create_domain_view(connection)
|
|
# Create domain summary per rank only if both domain_summary and summary_by_rank are enabled
|
|
if by_rank:
|
|
create_domain_view(connection, by_rank=True)
|
|
|
|
# Write regular summary views
|
|
print("\nSummary files:")
|
|
summary_views = [
|
|
itr for itr in get_temp_view_names(connection) if itr.endswith("_summary")
|
|
]
|
|
for v in summary_views:
|
|
export_view(connection, v, output_format, output_path, filename)
|
|
|
|
# Write per-rank summary views if flag is set
|
|
if by_rank:
|
|
print("\nSummary files by rank:")
|
|
summary_by_rank_views = [
|
|
itr
|
|
for itr in get_temp_view_names(connection)
|
|
if itr.endswith("_summary_by_rank")
|
|
]
|
|
for v in summary_by_rank_views:
|
|
export_view(connection, v, output_format, output_path, filename)
|
|
|
|
|
|
#
|
|
# Command-line interface functions
|
|
#
|
|
|
|
|
|
def add_io_args(parser):
|
|
"""Add input/output arguments for summary."""
|
|
io_options = parser.add_argument_group("I/O options")
|
|
|
|
io_options.add_argument(
|
|
"-f",
|
|
"--format",
|
|
help="Sets the format the summaries are output to (default: console)",
|
|
choices=("console", "csv", "html", "json", "md", "pdf"),
|
|
default="console",
|
|
type=str,
|
|
required=False,
|
|
)
|
|
io_options.add_argument(
|
|
"-o",
|
|
"--output-file",
|
|
help="Sets the base output file name",
|
|
default=os.environ.get("ROCPD_OUTPUT_NAME", ""),
|
|
type=str,
|
|
required=False,
|
|
)
|
|
io_options.add_argument(
|
|
"-d",
|
|
"--output-path",
|
|
help="Sets the output path where the output files will be saved (default path: `./rocpd-output-data`)",
|
|
default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"),
|
|
type=str,
|
|
required=False,
|
|
)
|
|
|
|
return ["format", "output_file", "output_path"]
|
|
|
|
|
|
def add_args(parser):
|
|
"""Add arguments for summary."""
|
|
summary_options = parser.add_argument_group("Summary options")
|
|
summary_options.add_argument(
|
|
"--domain-summary",
|
|
action="store_true",
|
|
default=False,
|
|
help="Generate domain summary view",
|
|
)
|
|
summary_options.add_argument(
|
|
"--summary-by-rank",
|
|
action="store_true",
|
|
default=False,
|
|
help="Generate summary views by-rank (or Process ID)",
|
|
)
|
|
summary_options.add_argument(
|
|
"--region-categories",
|
|
nargs="+",
|
|
default=None,
|
|
help="Specify region categories to include in the summary (example: HIP, HSA, RCCL, ROCDECODE, ROCJPEG, MARKER). If not specified, categories will be automatically retrieved from the database.",
|
|
)
|
|
|
|
return ["domain_summary", "summary_by_rank", "region_categories"]
|
|
|
|
|
|
def process_args(args, valid_args):
|
|
|
|
ret = {}
|
|
for itr in valid_args:
|
|
if hasattr(args, itr):
|
|
val = getattr(args, itr)
|
|
if val is not None:
|
|
ret[itr] = val
|
|
return ret
|
|
|
|
|
|
def execute(input, window_args=None, **kwargs: Any) -> RocpdImportData:
|
|
from .time_window import apply_time_window
|
|
|
|
importData = RocpdImportData(input)
|
|
|
|
apply_time_window(importData, **window_args)
|
|
|
|
generate_all_summaries(importData, **kwargs)
|
|
|
|
return importData
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
"""Main entry point for command line execution."""
|
|
from .time_window import add_args as add_args_time_window
|
|
from .time_window import process_args as process_args_time_window
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Create ROCpd database summary region views"
|
|
)
|
|
required_params = parser.add_argument_group("Required options")
|
|
|
|
required_params.add_argument(
|
|
"-i",
|
|
"--input",
|
|
required=True,
|
|
type=output_config.check_file_exists,
|
|
nargs="+",
|
|
help="Input path and filename to one or more database(s), separated by spaces",
|
|
)
|
|
|
|
valid_io_args = add_io_args(parser)
|
|
valid_summary_args = add_args(parser)
|
|
valid_time_window_args = add_args_time_window(parser)
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
summary_args = process_args(args, valid_summary_args)
|
|
io_args = output_config.process_args(args, valid_io_args)
|
|
window_args = process_args_time_window(args, valid_time_window_args)
|
|
|
|
all_args = {**summary_args, **io_args}
|
|
|
|
execute(
|
|
args.input,
|
|
window_args=window_args,
|
|
**all_args,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|