2025-07-24 17:12:06 -04:00
#!/usr/bin/env python3
###############################################################################
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
###############################################################################
import argparse
import os
2025-08-13 08:53:38 -05:00
import math
2025-07-24 17:12:06 -04:00
from typing import Any , List , Tuple
from . importer import RocpdImportData , execute_statement
from . query import export_sqlite_query
from . import output_config
2025-08-13 08:53:38 -05:00
def check_function_availability ( connection , function_name ) :
"""
Checks if a given function exists in the SQLite database.
Args:
connection (sqlite3 db connection): The SQLite database connection handler.
function_name (str): The name of the function to check.
Returns:
bool: True if the function exists, False otherwise.
"""
cursor = connection . cursor ( )
2025-09-09 14:25:07 -05:00
try :
# Try the modern approach first (SQLite 3.30.0+)
cursor . execute (
" SELECT EXISTS(SELECT 1 FROM pragma_function_list WHERE name=?) " ,
( function_name , ) ,
)
result = cursor . fetchone ( ) [ 0 ]
return bool ( result )
except Exception :
# Fallback for older SQLite versions (Workaround for RHEL 8)
# Try to execute a simple query using the function to see if it exists
try :
cursor . execute ( f " SELECT { function_name } (1) " )
return True
except Exception :
return False
2025-08-13 08:53:38 -05:00
2025-07-24 17:12:06 -04:00
def get_temp_view_names ( connection : RocpdImportData ) - > List [ str ] :
""" Return the names of all temporary views in the SQLite connection. """
return [
v [ 0 ]
for v in execute_statement (
2025-09-15 23:13:06 +02:00
connection , " SELECT name FROM sqlite_temp_master WHERE type= ' view ' "
2025-07-24 17:12:06 -04:00
) . fetchall ( )
]
def get_temp_view_columns ( connection : RocpdImportData , view_name : str ) - > List [ str ] :
""" Return the column names of a given temporary view. """
cursor = connection . cursor ( )
cursor . execute ( f " PRAGMA table_xinfo( ' { view_name } ' ) " )
return [ row [ 1 ] for row in cursor . fetchall ( ) ]
2025-09-15 23:13:06 +02:00
def export_query (
connection : RocpdImportData ,
output_path ,
output_file ,
output_format ,
query_name ,
query ,
2025-07-24 17:12:06 -04:00
) - > None :
2025-09-15 23:13:06 +02:00
""" Write the contents of a SQL query to an output format. """
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
query_not_empty = f """
SELECT EXISTS (
{ query }
)
"""
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
# just return if the result is empty
if not connection . execute ( query_not_empty ) . fetchone ( ) [ 0 ] :
2025-07-24 17:12:06 -04:00
return
# prepare the output filename
2025-09-15 23:13:06 +02:00
if not output_file :
output_filename = query_name
2025-07-24 17:12:06 -04:00
else :
2025-09-15 23:13:06 +02:00
output_filename = f " { output_file } _ { query_name } "
2025-07-24 17:12:06 -04:00
if output_format == " console " :
2025-09-15 23:13:06 +02:00
print ( f " \n { query_name . upper ( ) } : " )
2025-07-24 17:12:06 -04:00
# call query module to export. query will append the extension
export_path = os . path . join ( output_path , output_filename )
export_sqlite_query (
connection , query , export_format = output_format , export_path = export_path
)
def generate_summary_query (
view_name : str ,
2025-09-15 23:13:06 +02:00
view_query = " " ,
2025-07-24 17:12:06 -04:00
name_column = " name " ,
by_rank = False ,
) - > Tuple [ str , str ] :
2025-09-15 23:13:06 +02:00
""" Generate the SQL statement to create a summary query. """
2025-07-24 17:12:06 -04:00
if by_rank :
view_suffix = " _summary_by_rank "
group_by_columns = " guid, {name_column} " . format ( name_column = name_column )
aggregation_group_by = " T.guid, T.nid, T. {name_column} " . format (
name_column = name_column
)
total_duration_group_by = " guid "
additional_select_columns = " AD.pid AS ProcessID, P.hostname AS Hostname, "
additional_aggregated_columns = """
T.guid,
T.nid,
T.pid, """
join_condition = " T.guid = A.guid AND T. {name_column} = A.name " . format (
name_column = name_column
)
total_duration_join = " JOIN total_duration TD ON AD.guid = TD.guid JOIN processes P ON AD.pid = P.pid "
else :
view_suffix = " _summary "
group_by_columns = name_column
aggregation_group_by = " T. {name_column} " . format ( name_column = name_column )
total_duration_group_by = " "
additional_select_columns = " "
additional_aggregated_columns = " "
join_condition = " T. {name_column} = A.name " . format ( name_column = name_column )
total_duration_join = " CROSS JOIN total_duration TD "
full_view_name = f " { view_name } { view_suffix } "
2025-09-15 23:13:06 +02:00
view_select = (
f """
{ view_name } AS (
{ view_query }
),
"""
if view_query
else " "
)
2025-07-24 17:12:06 -04:00
summary_query = f """
WITH
2025-09-15 23:13:06 +02:00
{ view_select }
2025-07-24 17:12:06 -04:00
avg_data AS (
SELECT
{ group_by_columns . replace ( name_column , f " { name_column } AS name " ) } ,
AVG(duration) AS avg_duration
FROM { view_name }
GROUP BY { group_by_columns }
),
aggregated_data AS (
SELECT { additional_aggregated_columns }
T. { name_column } as name,
COUNT(*) AS calls,
SUM(T.duration) AS total_duration,
A.avg_duration AS average_duration,
MIN(T.duration) AS min_duration,
MAX(T.duration) AS max_duration,
SQRT(SUM(CAST((T.duration - A.avg_duration) AS REAL) * CAST((T.duration - A.avg_duration) AS REAL)) / (COUNT(*) - 1)) AS std_dev_duration
FROM { view_name } T
JOIN avg_data A ON { join_condition }
GROUP BY { aggregation_group_by }
),
total_duration AS (
SELECT
{ f " { total_duration_group_by } , " if total_duration_group_by else " " }
SUM(total_duration) AS grand_total_duration
FROM
aggregated_data
{ f " GROUP BY { total_duration_group_by } " if total_duration_group_by else " " }
)
SELECT
{ additional_select_columns }
AD.name AS Name,
AD.calls AS Calls,
AD.total_duration AS " DURATION (nsec) " ,
AD.average_duration AS " AVERAGE (nsec) " ,
(CAST(AD.total_duration AS REAL) / TD.grand_total_duration) * 100 AS " PERCENT (INC) " ,
AD.min_duration AS " MIN (nsec) " ,
AD.max_duration AS " MAX (nsec) " ,
AD.std_dev_duration AS " STD_DEV "
FROM
aggregated_data AD
{ total_duration_join }
ORDER BY
2025-09-15 23:13:06 +02:00
{ " AD.pid, " if by_rank else " " } AD.total_duration DESC
2025-07-24 17:12:06 -04:00
"""
return ( full_view_name , summary_query )
2025-09-15 23:13:06 +02:00
def generate_domain_query (
connection : RocpdImportData , summary_queries , by_rank = False
) - > Tuple [ str , str ] :
""" Generate the SQL statement for domain summary by doing union over all summary queries. """
2025-07-24 17:12:06 -04:00
if by_rank :
view_suffix = " _summary_by_rank "
view_name = " domain_summary_by_rank "
additional_group_columns = " ProcessID, Hostname, "
additional_select_columns = " GD.ProcessID, GD.Hostname, "
total_duration_group_by = " GROUP BY ProcessID "
join_condition = " JOIN total_duration TD ON GD.ProcessID = TD.ProcessID "
order_by = " ORDER BY GD.ProcessID "
else :
view_suffix = " _summary "
view_name = " domain_summary "
additional_group_columns = " "
additional_select_columns = " "
total_duration_group_by = " "
join_condition = " CROSS JOIN total_duration TD "
order_by = ' ORDER BY GD. " DURATION (nsec) " DESC '
2025-09-15 23:13:06 +02:00
summary_dictionary = {
query_name : query
for query_name , query in summary_queries . items ( )
if query_name . endswith ( view_suffix )
}
if len ( summary_dictionary ) < 1 :
return ( )
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
summary_selects = [
f " { query_name } AS ( { query } ) , " for query_name , query in summary_dictionary . items ( )
]
2025-07-24 17:12:06 -04:00
union_selects = [
2025-09-15 23:13:06 +02:00
f " SELECT ' { query_name . replace ( view_suffix , ' ' ) . upper ( ) } ' as domain, * FROM { query_name } "
for query_name , query in summary_dictionary . items ( )
2025-07-24 17:12:06 -04:00
]
domain_select = f """
WITH
2025-09-15 23:13:06 +02:00
{ f " " . join ( summary_selects ) }
2025-07-24 17:12:06 -04:00
all_domains AS (
{ f " UNION ALL " . join ( union_selects ) }
),
grouped_domains AS (
SELECT
domain,
{ additional_group_columns }
SUM(calls) AS calls,
SUM( " DURATION (nsec) " ) AS " DURATION (nsec) " ,
SUM( " AVERAGE (nsec) " ) AS " AVERAGE (nsec) " ,
MIN( " MIN (nsec) " ) AS " MIN (nsec) " ,
MAX( " MAX (nsec) " ) AS " MAX (nsec) " ,
SUM( " STD_DEV " ) AS " STD_DEV "
FROM all_domains
GROUP BY domain { " , ProcessID " if by_rank else " " }
),
total_duration AS (
SELECT
{ additional_group_columns }
SUM( " DURATION (nsec) " ) AS grand_total_duration
FROM grouped_domains
{ total_duration_group_by }
)
SELECT
{ additional_select_columns }
GD.domain AS Name,
GD.calls AS Calls,
GD. " DURATION (nsec) " ,
GD. " AVERAGE (nsec) " ,
(CAST(GD. " DURATION (nsec) " AS REAL) / TD.grand_total_duration) * 100 AS " PERCENT (INC) " ,
GD. " MIN (nsec) " ,
GD. " MAX (nsec) " ,
GD. " STD_DEV "
FROM
grouped_domains GD
{ join_condition }
2025-09-15 23:13:06 +02:00
{ order_by }
2025-07-24 17:12:06 -04:00
"""
return ( view_name , domain_select )
2025-09-15 23:13:06 +02:00
def create_summary_queries ( connection : RocpdImportData , by_rank = False ) :
""" Create summary queries for eligible temporary views in the database. """
2025-07-24 17:12:06 -04:00
NAME_COLUMN_MAP = {
" memory_allocations " : " type " ,
" scratch_memory " : " operation " ,
}
avoid_view_pattern = ( " rocpd " , " region " , " counter " , " pmc " )
required_columns = { " duration " }
views = get_temp_view_names ( connection )
2025-09-15 23:13:06 +02:00
queries = { }
2025-07-24 17:12:06 -04:00
for view_name in views :
if any ( pattern in view_name for pattern in avoid_view_pattern ) :
continue
columns = get_temp_view_columns ( connection , view_name )
if not required_columns . issubset ( columns ) :
continue
2025-09-15 23:13:06 +02:00
# Create regular summary query
summary_query_name , summary_query = generate_summary_query (
view_name , " " , name_column = NAME_COLUMN_MAP . get ( view_name , " name " )
2025-07-24 17:12:06 -04:00
)
2025-09-15 23:13:06 +02:00
queries [ summary_query_name ] = summary_query
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
# Create per-rank summary query
2025-07-24 17:12:06 -04:00
if by_rank :
2025-09-15 23:13:06 +02:00
per_rank_query_name , summary_by_rank_query = generate_summary_query (
2025-07-24 17:12:06 -04:00
view_name ,
2025-09-15 23:13:06 +02:00
" " ,
2025-07-24 17:12:06 -04:00
name_column = NAME_COLUMN_MAP . get ( view_name , " name " ) ,
by_rank = True ,
)
2025-09-15 23:13:06 +02:00
queries [ per_rank_query_name ] = summary_by_rank_query
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
return queries
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
def create_summary_region_queries (
connection : RocpdImportData ,
by_rank = False ,
region_categories = None ,
) :
""" Create summary and region queries """
query = " SELECT DISTINCT(category) FROM regions_and_samples "
2025-07-24 17:12:06 -04:00
categories = execute_statement ( connection , query ) . fetchall ( )
if region_categories is None :
# Automatically retrieve region categories from the database
region_categories = set ( [ cat [ 0 ] . split ( " _ " ) [ 0 ] for cat in categories ] )
category_map = {
cat . lower ( ) : [ c [ 0 ] for c in categories if c [ 0 ] . startswith ( cat + " _ " ) ]
for cat in region_categories
if " MARKER " not in cat . upper ( )
}
2025-09-15 23:13:06 +02:00
queries = { }
2025-07-24 17:12:06 -04:00
for k , v in category_map . items ( ) :
if len ( v ) > 0 :
conditions = [ f " category LIKE ' { c } ' " for c in v ]
2025-09-15 23:13:06 +02:00
region_query = f """
2025-07-24 17:12:06 -04:00
SELECT *
FROM regions_and_samples
2025-09-15 23:13:06 +02:00
WHERE { " OR " . join ( conditions ) }
2025-07-24 17:12:06 -04:00
"""
2025-09-15 23:13:06 +02:00
# Create regular summary query
summary_query_name , summary_query = generate_summary_query ( k , region_query )
queries [ summary_query_name ] = summary_query
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
# Create per-rank summary query
2025-07-24 17:12:06 -04:00
if by_rank :
2025-09-15 23:13:06 +02:00
per_rank_query_name , summary_by_rank_query = generate_summary_query (
k , region_query , by_rank = True
2025-07-24 17:12:06 -04:00
)
2025-09-15 23:13:06 +02:00
queries [ per_rank_query_name ] = summary_by_rank_query
2025-07-24 17:12:06 -04:00
# Markers
if " MARKER " not in region_categories :
2025-09-15 23:13:06 +02:00
return queries
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
markers_query_name = " markers "
markers_query = """
2025-07-24 17:12:06 -04:00
SELECT JSON_EXTRACT(extdata, ' $.message ' ) AS marker_name, *
FROM regions_and_samples
WHERE category LIKE ' MARKER_ % '
"""
2025-09-15 23:13:06 +02:00
# Create regular summary query
summary_query_name , summary_query = generate_summary_query (
markers_query_name , markers_query , name_column = " marker_name "
2025-07-24 17:12:06 -04:00
)
2025-09-15 23:13:06 +02:00
queries [ summary_query_name ] = summary_query
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
# Create per-rank summary query
2025-07-24 17:12:06 -04:00
if by_rank :
2025-09-15 23:13:06 +02:00
per_rank_query_name , summary_by_rank_query = generate_summary_query (
markers_query_name , markers_query , name_column = " marker_name " , by_rank = True
2025-07-24 17:12:06 -04:00
)
2025-09-15 23:13:06 +02:00
queries [ per_rank_query_name ] = summary_by_rank_query
return queries
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
def create_domain_query ( connection : RocpdImportData , summary_queries , by_rank = False ) :
""" Create a domain summary query by aggregating all summary queries. """
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
result = generate_domain_query ( connection , summary_queries , by_rank = by_rank )
if not result :
return { }
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
query_name , query = result
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
return { query_name : query }
2025-07-24 17:12:06 -04:00
def generate_all_summaries ( connection : RocpdImportData , * * kwargs : Any ) - > None :
2025-09-15 23:13:06 +02:00
""" Generate all summaries and export them to selected format. """
2025-07-24 17:12:06 -04:00
domain_summary = kwargs . get ( " domain_summary " , False )
by_rank = kwargs . get ( " summary_by_rank " , False )
2025-09-15 23:13:06 +02:00
output_file = kwargs . get ( " output_file " , " " )
2025-07-24 17:12:06 -04:00
output_path = kwargs . get ( " output_path " , " ./rocpd-output-data " )
region_categories = kwargs . get ( " region_categories " , None )
output_format = kwargs . get ( " format " , " console " )
2025-08-13 08:53:38 -05:00
if not check_function_availability ( connection , " sqrt " ) :
connection . create_function (
" sqrt " ,
1 ,
lambda x : (
math . sqrt ( x )
if x is not None and isinstance ( x , ( int , float ) ) and x > = 0
else None
) ,
)
2025-09-15 23:13:06 +02:00
summary_queries = { }
# Create the summary queries
summary_queries . update ( create_summary_queries ( connection , by_rank ) )
summary_queries . update (
create_summary_region_queries (
connection , by_rank , region_categories = region_categories
)
)
2025-07-24 17:12:06 -04:00
if domain_summary :
2025-09-15 23:13:06 +02:00
summary_queries . update ( create_domain_query ( connection , summary_queries ) )
2025-07-24 17:12:06 -04:00
# Create domain summary per rank only if both domain_summary and summary_by_rank are enabled
if by_rank :
2025-09-15 23:13:06 +02:00
summary_queries . update (
create_domain_query ( connection , summary_queries , by_rank = True )
)
2025-07-24 17:12:06 -04:00
2025-09-15 23:13:06 +02:00
# Export all summary queries
for query_name , query in summary_queries . items ( ) :
export_query (
connection , output_path , output_file , output_format , query_name , query
)
2025-07-24 17:12:06 -04:00
#
# Command-line interface functions
#
def add_io_args ( parser ) :
""" Add input/output arguments for summary. """
io_options = parser . add_argument_group ( " I/O options " )
io_options . add_argument (
" -f " ,
" --format " ,
help = " Sets the format the summaries are output to (default: console) " ,
choices = ( " console " , " csv " , " html " , " json " , " md " , " pdf " ) ,
default = " console " ,
type = str ,
required = False ,
)
io_options . add_argument (
" -o " ,
" --output-file " ,
help = " Sets the base output file name " ,
default = os . environ . get ( " ROCPD_OUTPUT_NAME " , " " ) ,
type = str ,
required = False ,
)
io_options . add_argument (
" -d " ,
" --output-path " ,
help = " Sets the output path where the output files will be saved (default path: `./rocpd-output-data`) " ,
default = os . environ . get ( " ROCPD_OUTPUT_PATH " , " ./rocpd-output-data " ) ,
type = str ,
required = False ,
)
return [ " format " , " output_file " , " output_path " ]
def add_args ( parser ) :
""" Add arguments for summary. """
summary_options = parser . add_argument_group ( " Summary options " )
summary_options . add_argument (
" --domain-summary " ,
action = " store_true " ,
default = False ,
help = " Generate domain summary view " ,
)
summary_options . add_argument (
" --summary-by-rank " ,
action = " store_true " ,
default = False ,
help = " Generate summary views by-rank (or Process ID) " ,
)
summary_options . add_argument (
" --region-categories " ,
nargs = " + " ,
default = None ,
help = " Specify region categories to include in the summary (example: HIP, HSA, RCCL, ROCDECODE, ROCJPEG, MARKER). If not specified, categories will be automatically retrieved from the database. " ,
)
return [ " domain_summary " , " summary_by_rank " , " region_categories " ]
def process_args ( args , valid_args ) :
ret = { }
for itr in valid_args :
if hasattr ( args , itr ) :
val = getattr ( args , itr )
if val is not None :
ret [ itr ] = val
return ret
def execute ( input , window_args = None , * * kwargs : Any ) - > RocpdImportData :
from . time_window import apply_time_window
importData = RocpdImportData ( input )
apply_time_window ( importData , * * window_args )
generate_all_summaries ( importData , * * kwargs )
return importData
def main ( argv = None ) - > int :
""" Main entry point for command line execution. """
from . time_window import add_args as add_args_time_window
from . time_window import process_args as process_args_time_window
parser = argparse . ArgumentParser (
description = " Create ROCpd database summary region views "
)
required_params = parser . add_argument_group ( " Required options " )
required_params . add_argument (
" -i " ,
" --input " ,
required = True ,
type = output_config . check_file_exists ,
nargs = " + " ,
help = " Input path and filename to one or more database(s), separated by spaces " ,
)
valid_io_args = add_io_args ( parser )
valid_summary_args = add_args ( parser )
valid_time_window_args = add_args_time_window ( parser )
args = parser . parse_args ( argv )
summary_args = process_args ( args , valid_summary_args )
io_args = output_config . process_args ( args , valid_io_args )
window_args = process_args_time_window ( args , valid_time_window_args )
all_args = { * * summary_args , * * io_args }
execute (
args . input ,
window_args = window_args ,
* * all_args ,
)
if __name__ == " __main__ " :
main ( )