diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/__main__.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/__main__.py index 92f5eaf4e9..7a932507e8 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/__main__.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/__main__.py @@ -39,8 +39,10 @@ def main(argv=None, config=None): """ import argparse from . import csv + from . import merge from . import otf2 from . import output_config + from . import package from . import pftrace from . import query from . import summary @@ -64,6 +66,34 @@ Example usage: Convert 2 databases, output CSV, OTF2, and perfetto trace formats $ rocpd convert -i db{3,4}.db --output-format csv otf2 pftrace +""" + + merge_examples = """ + +Example usage: + + Merge the three databases and output to a folder called merged3DBs + $ rocpd merge -i db0.db db1.db db2.db -d merged3DBs + + Merge all the databases from the node0 folder and output to the node0_output folder, with filename called largeMerged.db + $ rocpd merge -i node0/*.db -d node0_output -o largeMerged +""" + + package_examples = """ + +Example usage: + + Index the three databases into a metadata file (index.yaml) in the current folder, just reference the databases where they are on the filesystem + $ rocpd package -i node0/db0.db node1/db1.db node2/db2.db + + Package and copy/consolidate all the databases into a my_MPI_run_1.rpdb folder so it can be managed easier + $ rocpd package -i node0/db0.db node1/db1.db node2/db2.db -d my_MPI_run_1 --consolidate --copy + + Package and copy/consolidate all the databases from my_MPI_run_1.rpdb folder append node5/db5.db and make new folder + $ rocpd package -i my_MPI_run_1.rpdb node5/db5.db -d my_MPI_run_1_append_5 --consolidate --copy + + Use my_MPI_run_1.rpdb folder and move/consolidate node7/db7.db and re-use same .rpdb folder + $ rocpd package -i my_MPI_run_1.rpdb node7/db7.db -d my_MPI_run_1 --consolidate """ query_examples = """ @@ -94,6 +124,7 @@ Example usage: $ rocpd summary -i db{0,1}.db --region-categories HIP MARKERS --domain-summary --format html """ + input_help_string = "Input path and filename to one or more database(s). Wildcards accepted, as well as .rpdb folders" # Add the subparsers parser = argparse.ArgumentParser( @@ -109,6 +140,18 @@ Example usage: help="Print the version information and exit", ) + def add_required_args(_parser): + _required_params = _parser.add_argument_group("Required options") + _required_params.add_argument( + "-i", + "--input", + required=True, + type=output_config.check_file_exists, + nargs="+", + help=input_help_string, + ) + return _required_params + subparsers = parser.add_subparsers(dest="command") converter = subparsers.add_parser( "convert", @@ -118,6 +161,22 @@ Example usage: epilog=convert_examples, ) + merger = subparsers.add_parser( + "merge", + description="Generate merged database from rocPD databases", + allow_abbrev=False, + formatter_class=argparse.RawTextHelpFormatter, + epilog=merge_examples, + ) + + packager = subparsers.add_parser( + "package", + description="Package database files into .rpdb output", + allow_abbrev=False, + formatter_class=argparse.RawTextHelpFormatter, + epilog=package_examples, + ) + query_reporter = subparsers.add_parser( "query", description="Generate output on a query", @@ -138,15 +197,7 @@ Example usage: return val.lower().replace("perfetto", "pftrace") # add required options for each subparser - converter_required_params = converter.add_argument_group("Required options") - converter_required_params.add_argument( - "-i", - "--input", - required=True, - type=output_config.check_file_exists, - nargs="+", - help="Input path and filename to one or more database(s)", - ) + converter_required_params = add_required_args(converter) converter_required_params.add_argument( "-f", "--output-format", @@ -158,43 +209,39 @@ Example usage: required=True, ) - query_required_params = query_reporter.add_argument_group("Required options") - query_required_params.add_argument( - "-i", - "--input", - required=True, - type=output_config.check_file_exists, - nargs="+", - help="Input path and filename to one or more database(s)", - ) - - summary_required_params = generate_summary.add_argument_group("Required options") - summary_required_params.add_argument( - "-i", - "--input", - required=True, - type=output_config.check_file_exists, - nargs="+", - help="Input path and filename to one or more database(s)", - ) + add_required_args(merger) + add_required_args(packager) + add_required_args(query_reporter) + add_required_args(generate_summary) # converter: add args from any sub-modules - valid_out_config_args = output_config.add_args(converter) - valid_generic_args = output_config.add_generic_args(converter) - valid_pftrace_args = pftrace.add_args(converter) - valid_csv_args = csv.add_args(converter) - valid_otf2_args = otf2.add_args(converter) - valid_time_window_args = time_window.add_args(converter) + process_converter_args = [] + process_converter_args.append(output_config.add_args(converter)) + process_converter_args.append(output_config.add_generic_args(converter)) + process_converter_args.append(pftrace.add_args(converter)) + process_converter_args.append(csv.add_args(converter)) + process_converter_args.append(otf2.add_args(converter)) + process_converter_args.append(time_window.add_args(converter)) + + # merge: subparser args + process_merger_args = [] + process_merger_args.append(merge.add_args(merger)) + + # package: subparser args + process_packager_args = [] + process_packager_args.append(package.add_args(packager)) # query: subparser args - valid_out_config_args = output_config.add_args(query_reporter) - valid_query_args = query.add_args(query_reporter) - valid_time_window_args = time_window.add_args(query_reporter) + process_query_reporter_args = [] + process_query_reporter_args.append(output_config.add_args(query_reporter)) + process_query_reporter_args.append(query.add_args(query_reporter)) + process_query_reporter_args.append(time_window.add_args(query_reporter)) # summary: subparser args - valid_io_args = summary.add_io_args(generate_summary) - valid_summary_args = summary.add_args(generate_summary) - valid_time_window_args = time_window.add_args(generate_summary) + process_generate_summary_args = [] + process_generate_summary_args.append(output_config.add_args(generate_summary)) + process_generate_summary_args.append(summary.add_args(generate_summary)) + process_generate_summary_args.append(time_window.add_args(generate_summary)) # parse the command line arguments args = parser.parse_args(argv) @@ -213,30 +260,18 @@ Example usage: # if the user requested converter, process the conversion if args.command == "convert": - # process the args - out_cfg_args = output_config.process_args(args, valid_out_config_args) - generic_out_cfg_args = output_config.process_generic_args( - args, valid_generic_args + # construct the rocpd import data object + input = RocpdImportData( + args.input, + automerge_limit=getattr( + args, "automerge_limit", package.IDEAL_NUMBER_OF_DATABASE_FILES + ), ) - pftrace_args = pftrace.process_args(args, valid_pftrace_args) - csv_args = csv.process_args(args, valid_csv_args) - otf2_args = otf2.process_args(args, valid_otf2_args) - window_args = time_window.process_args(args, valid_time_window_args) - # now start processing the data. Import the data and merge the views - importData = RocpdImportData(args.input) + all_args = {} + for pitr in process_converter_args: + all_args.update(pitr(input, args)) - # adjust the time window view of the data - if window_args is not None: - time_window.apply_time_window(importData, **window_args) - - all_args = { - **out_cfg_args, - **generic_out_cfg_args, - **pftrace_args, - **csv_args, - **otf2_args, - } # setup the config args config = ( output_config.output_config(**all_args) @@ -254,42 +289,71 @@ Example usage: for out_format in args.output_format: if out_format in format_handlers: print(f"Converting database(s) to {out_format} format:") - format_handlers[out_format](importData, config) + format_handlers[out_format](input, config) else: print(f"Warning: Unsupported output format '{out_format}'") + # if the user requested merge module, execute the merge + elif args.command == "merge": + # no construction of the import data object + input = None + + # merge subparser args + merge_args = {} + for pitr in process_merger_args: + merge_args.update(pitr(input, args)) + + merge.execute(args.input, **merge_args) + + # if the user requested package module, package up the database + elif args.command == "package": + # construct the rocpd import data object + input = None + + # package subparser args + packager_args = {} + for pitr in process_packager_args: + packager_args.update(pitr(input, args)) + + package.execute(args.input, **packager_args) + # if the user requested query module, execute the query elif args.command == "query": - # query subparser args - query_args = query.process_args(args, valid_query_args) - out_cfg_args = output_config.process_args(args, valid_out_config_args) - window_args = time_window.process_args(args, valid_time_window_args) + # construct the rocpd import data object + input = RocpdImportData( + args.input, + automerge_limit=getattr( + args, "automerge_limit", package.IDEAL_NUMBER_OF_DATABASE_FILES + ), + ) - all_args = {**query_args, **out_cfg_args} + # query subparser args + query_args = {} + for pitr in process_query_reporter_args: + query_args.update(pitr(input, args)) query.execute( - args.input, + input, args, - window_args=window_args, - **all_args, + **query_args, ) # if the user requested a summary, generate the views elif args.command == "summary": + # construct the rocpd import data object + input = RocpdImportData( + args.input, + automerge_limit=getattr( + args, "automerge_limit", package.IDEAL_NUMBER_OF_DATABASE_FILES + ), + ) + # summary subparser args - summary_args = summary.process_args(args, valid_summary_args) - io_args = output_config.process_args(args, valid_io_args) - window_args = time_window.process_args(args, valid_time_window_args) + summary_args = {} + for pitr in process_generate_summary_args: + summary_args.update(pitr(input, args)) - # now start processing the data. Import the data and merge the views - importData = RocpdImportData(args.input) - - # adjust the time window view of the data - if window_args is not None: - time_window.apply_time_window(importData, **window_args) - - all_args = {**summary_args, **io_args} - summary.generate_all_summaries(importData, **all_args) + summary.generate_all_summaries(input, **summary_args) print("Done. Exiting...") diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/csv.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/csv.py index 44a76a5ae4..7979ea04be 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/csv.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/csv.py @@ -28,7 +28,6 @@ import re from .importer import RocpdImportData from .query import export_sqlite_query -from .time_window import apply_time_window from . import output_config from . import libpyrocpd @@ -413,12 +412,10 @@ def write_csv(importData, config): write_scratch_memory_csv(importData, config) -def execute(input, config=None, window_args=None, **kwargs): +def execute(input, config=None, **kwargs): importData = RocpdImportData(input) - apply_time_window(importData, **window_args) - config = ( output_config.output_config(**kwargs) if config is None @@ -431,21 +428,17 @@ def execute(input, config=None, window_args=None, **kwargs): def add_args(parser): """Add csv arguments.""" - return [] + def process_args(input, args): + ret = {} + return ret - -def process_args(args, valid_args): - ret = {} - return ret + return process_args def main(argv=None): import argparse - from .time_window import add_args as add_args_time_window - from .time_window import process_args as process_args_time_window - from .output_config import add_args as add_args_output_config - from .output_config import process_args as process_args_output_config - from .output_config import add_generic_args, process_generic_args + from . import time_window + from . import output_config parser = argparse.ArgumentParser( description="Convert rocPD to CSV files", @@ -464,17 +457,19 @@ def main(argv=None): help="Input path and filename to one or more database(s), separated by spaces", ) - valid_out_config_args = add_args_output_config(parser) - valid_generic_args = add_generic_args(parser) - valid_time_window_args = add_args_time_window(parser) - valid_csv_args = add_args(parser) + process_out_config_args = output_config.add_args(parser) + process_generic_args = output_config.add_generic_args(parser) + process_time_window_args = time_window.add_args(parser) + process_csv_args = add_args(parser) args = parser.parse_args(argv) - out_cfg_args = process_args_output_config(args, valid_out_config_args) - generic_out_cfg_args = process_generic_args(args, valid_generic_args) - window_args = process_args_time_window(args, valid_time_window_args) - csv_args = process_args(args, valid_csv_args) + input = RocpdImportData(args.input) + + out_cfg_args = process_out_config_args(input, args) + generic_out_cfg_args = process_generic_args(input, args) + csv_args = process_csv_args(input, args) + process_time_window_args(input, args) all_args = { **out_cfg_args, @@ -482,7 +477,7 @@ def main(argv=None): **csv_args, } - execute(args.input, window_args=window_args, **all_args) + execute(input, **all_args) if __name__ == "__main__": diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/importer.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/importer.py index 5ad4aef1a6..6e0bf8156f 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/importer.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/importer.py @@ -28,6 +28,7 @@ # import sys +import os import sqlite3 from .schema import RocpdSchema @@ -36,9 +37,33 @@ from . import libpyrocpd __all__ = ["RocpdImportData", "execute_statement"] +def internal_init(_input, _output, skip_auto_merge, automerge_limit): + from . import package + + _input = package.flatten_rocpd_yaml_input_file( + _input, skip_auto_merge=skip_auto_merge, automerge_limit=automerge_limit + ) + assert not os.path.isdir(_output), "Output database name must not be a directory" + assert _check_for_valid_dbs( + _input + ), "RocpdImportData error, invalid SQLite3 database provided" + _connection = libpyrocpd.connect(_output) + _connection.execute("PRAGMA foreign_keys = ON") + _table_info = _create_temp_views(_connection, _input) + _create_meta_views(_connection) + return (_connection, _input, _table_info) + + class RocpdImportData(libpyrocpd.RocpdImportData): - def __init__(self, input): + def __init__( + self, input, skip_auto_merge=False, automerge_limit=None, dbname=":memory:" + ): + from . import package + + if automerge_limit is None: + automerge_limit = package.IDEAL_NUMBER_OF_DATABASE_FILES + if isinstance(input, RocpdImportData): super(RocpdImportData, self).__init__(input) self.table_info = input.table_info @@ -48,15 +73,13 @@ class RocpdImportData(libpyrocpd.RocpdImportData): raise ValueError( "RocpdImportData does not accept existing sqlite3 connections" ) - elif isinstance(input, str): - _connection = libpyrocpd.connect(input) - _filenames = [input] - elif isinstance(input, list) and len(input) > 0 and isinstance(input[0], str): - _connection = libpyrocpd.connect(":memory:") - _filenames = input[:] - _connection.execute("PRAGMA foreign_keys = ON") - self.table_info = _create_temp_views(_connection, input) - _create_meta_views(_connection) + elif isinstance(input, str) or ( + isinstance(input, list) and len(input) > 0 and isinstance(input[0], str) + ): + _connection, _filenames, _table_info = internal_init( + input, dbname, skip_auto_merge, automerge_limit + ) + self.table_info = _table_info else: raise ValueError( f"input is unsupported type. Expected sqlite3.Connection, string, or (non-empty) list of strings. type={type(input).__name__}" @@ -75,6 +98,22 @@ class RocpdImportData(libpyrocpd.RocpdImportData): return self.connection.__exit__(exc_type, exc, tb) +def _is_sqlite_db(file_path): + with open(file_path, "rb") as f: + header = f.read(16) + return header == b"SQLite format 3\x00" + + +def _check_for_valid_dbs(input_files) -> bool: + # check the list of .db files to confirm they are SQLite3 DBs + for file in input_files: + sqlite_db = _is_sqlite_db(file) + if not sqlite_db: + print(f"Error: {file} is not an SQLite3 database. File not supported.") + return False + return True + + def execute_statement(conn, statement, is_script=False): if isinstance(conn, RocpdImportData): _conn = conn.connection diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/merge.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/merge.py new file mode 100644 index 0000000000..929ce05318 --- /dev/null +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/merge.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +############################################################################### +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +############################################################################### + +import argparse +import os +import sqlite3 +import time + +from typing import List, Dict, Iterable, Optional, Callable, Any + + +def merge_sqlite_dbs( + sources: Iterable[str], + dest_path: str, + on_log: Optional[Callable[[str], None]] = None, +) -> None: + """ + Merge multiple SQLite databases into a single destination database. + + Parameters + ---------- + sources : Iterable[str] + Paths to source databases. + dest_path : str + Path to destination database. + on_log : Optional[Callable[[str], None]] + Logger function; defaults to None. Pass `print` to generate logs. + """ + + def log(msg: str) -> None: + if on_log: + on_log(f" {msg}") + + sources = list(sources) + if not sources: + raise ValueError("No source databases provided") + + # Prepare output directory + output_dir = os.path.dirname(os.path.abspath(dest_path)) or os.getcwd() + os.makedirs(output_dir, exist_ok=True) + + # Remove existing file + if os.path.isfile(dest_path): + os.remove(dest_path) + + uuids = [] + views = [] + data_views = [] + schema_versions = [] + + with sqlite3.connect(str(dest_path)) as conn: + conn.execute("PRAGMA journal_mode = WAL;") + conn.execute("PRAGMA synchronous = NORMAL;") + conn.execute("PRAGMA foreign_keys = OFF;") # defer FK checks until end + + # One big atomic transaction + with conn: + # Attach sources one by one + for i, src in enumerate(sources, 1): + alias = f"src{i}" + conn.execute(f"ATTACH DATABASE ? AS {alias}", (src,)) + print(f"Adding {src}") + log(f"Attached {src} AS {alias}") + + # UUIDs and schema version + _uuids = [ + itr[0] + for itr in conn.execute( + f"SELECT value FROM {alias}.rocpd_metadata WHERE tag='uuid'", + ).fetchall() + ] + uuids += [itr for itr in _uuids if itr not in uuids] + + _schema_versions = [ + itr[0] + for itr in conn.execute( + f"SELECT value FROM {alias}.rocpd_metadata WHERE tag='schema_version'", + ).fetchall() + ] + schema_versions += _schema_versions + + # Helper: fetch rows from attached sqlite_master + def fetch_master(_alias: str, kind: str): + cur = conn.execute( + f""" + SELECT name, sql + FROM {_alias}.sqlite_master + WHERE type = ? AND name NOT LIKE 'sqlite_%' + ORDER BY name + """, + (kind,), + ) + return cur.fetchall() + + # Track dest tables to detect collisions quickly + existing_tables = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" + ) + } + + # 1) Create tables + for name, create_sql in fetch_master(alias, "table"): + if name in existing_tables: + raise AssertionError( + f"Table name collision for '{name}' from {alias}; " + "assumption of globally-unique table names violated." + ) + if not create_sql: + continue + log(f"Creating table {name}") + conn.execute(create_sql) + existing_tables.add(name) + + # 2) Copy table data + tbls = [name for name, _ in fetch_master(alias, "table")] + print(f"Tables found: {len(tbls)}") + for name in tbls: + log(f"Inserting rows into {name} from {alias}.{name}") + rows = conn.execute(f'SELECT * FROM {alias}."{name}"').fetchall() + if rows: + col_count = len(rows[0]) + placeholders = ", ".join(["?"] * col_count) + conn.executemany( + f'INSERT INTO "{name}" VALUES ({placeholders})', rows + ) + + # 3) Recreate indexes (make idempotent with IF NOT EXISTS) + def inject_if_not_exists_in_index_sql(sql: str) -> str: + # Naive, but works for standard forms produced by sqlite_master + # Handles UNIQUE and non-UNIQUE: + # "CREATE INDEX name ON ..." or "CREATE UNIQUE INDEX name ON ..." + sql_stripped = sql.strip() + if sql_stripped.upper().startswith("CREATE UNIQUE INDEX"): + return sql_stripped.replace( + "CREATE UNIQUE INDEX", "CREATE UNIQUE INDEX IF NOT EXISTS", 1 + ) + if sql_stripped.upper().startswith("CREATE INDEX"): + return sql_stripped.replace( + "CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1 + ) + return sql + + existing_indexes = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%'" + ) + } + for name, create_sql in fetch_master(alias, "index"): + if not create_sql: + continue + if name in existing_indexes: + log(f"Index {name} exists; skipping or using IF NOT EXISTS") + # Try to create with IF NOT EXISTS to avoid collision + sql2 = inject_if_not_exists_in_index_sql(create_sql) + conn.execute(sql2) + existing_indexes.add(name) + + # 4) Recreate triggers (skip on name conflict) + existing_triggers = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='trigger'" + ) + } + for name, create_sql in fetch_master(alias, "trigger"): + if not create_sql: + continue + if name in existing_triggers: + log(f"Trigger {name} exists; skipping") + continue + log(f"Creating trigger {name}") + conn.execute(create_sql) + existing_triggers.add(name) + + # 5) Recreate views (skip on name conflict) + existing_views = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='view'" + ) + } + for name, create_sql in fetch_master(alias, "view"): + if not create_sql: + continue + if name in existing_views: + log(f"View {name} exists; skipping") + continue + # If the view name does not start with "rocpd_", collect it for later recreation + if not name.startswith("rocpd_") and not any( + name == _name for _name, _ in data_views + ): + data_views.append((name, create_sql)) + existing_views.add(name) + + views += [itr for itr in list(existing_views) if itr.startswith("rocpd_")] + + conn.commit() + conn.execute(f"DETACH DATABASE {alias}") + log(f"Detached {alias}") + + # Check the schema versions. Merge only occurs if all the DBs are the same schema version. + unique_versions = list(set(schema_versions)) + if len(unique_versions) != 1: + raise RuntimeError(f"Multiple schema versions found: {unique_versions}") + + # Re-enable FKs and run a quick FK check + conn.execute("PRAGMA foreign_keys = ON;") + # Optional: enforce integrity + # try: + # conn.execute("PRAGMA quick_check;") + # except sqlite3.DatabaseError as e: + # log(f"SQLite3 quick_check reported an issue: {e}") + + uuids = sorted(list(set(uuids))) # unique set of uuids + views = sorted(list(set(views))) # unique set of views + + # Create UNION views by listing all tables + existing_tables = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" + ) + } + + # Then UNION all the tables starting with the view name + for vitr in views: + matching_tables = [ + titr for titr in existing_tables if titr.startswith(f"{vitr}_") + ] + tables_union = " UNION ALL ".join( + [f"SELECT * FROM {titr}" for titr in matching_tables] + ) + conn.execute(f"CREATE VIEW {vitr} AS {tables_union}") + conn.commit() + + # Now that the rocpd_ views are created, re-create the data-views using all the data + for _, sql_view in data_views: + conn.execute(sql_view) + conn.commit() + + +# +# Command-line interface functions +# +def add_args(parser): + """Add arguments for merger.""" + + io_options = parser.add_argument_group("I/O options") + + io_options.add_argument( + "-o", + "--output-file", + help="Sets the base output file name", + default=os.environ.get("ROCPD_OUTPUT_NAME", "merged"), + type=str, + required=False, + ) + io_options.add_argument( + "-d", + "--output-path", + help="Sets the output path where the output files will be saved (default path: `./rocpd-output-data`)", + default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"), + type=str, + required=False, + ) + + def process_args(input, args): + valid_args = ["output_file", "output_path"] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + return ret + + return process_args + + +def execute(inputs: List[str], **kwargs: Dict[str, Any]) -> str: + + start_time = time.time() + + input_files = inputs + try: + from . import package + + input_files = package.flatten_rocpd_yaml_input_file(inputs, skip_auto_merge=True) + except Exception as e: + print(f"Import error trying to use package, fallback to use inputs: {e}") + + output_path = kwargs.get("output_path") + output_filename = kwargs.get("output_file") + if not output_filename.endswith(".db"): + output_filename += ".db" + output = os.path.join(output_path, output_filename) + + merge_sqlite_dbs(input_files, output) + + elapsed_time = time.time() - start_time + + print(f"Merge completed successfully! Output saved to: {output}") + print(f"Time: {elapsed_time:.2f} sec") + return str(output) + + +def main(argv=None) -> int: + """Main entry point for command line execution.""" + + from . import output_config + + parser = argparse.ArgumentParser( + description="Generate merged database from rocPD databases" + ) + + required_params = parser.add_argument_group("Required options") + + required_params.add_argument( + "-i", + "--input", + required=True, + type=output_config.check_file_exists, + nargs="+", + help="Path to the input ROCpd database files", + ) + + process_args = add_args(parser) + + args = parser.parse_args(argv) + + merge_args = process_args(args) + + execute(args.input, **merge_args) + + +if __name__ == "__main__": + main() diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/otf2.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/otf2.py index 33714e8a0f..d00b45038d 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/otf2.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/otf2.py @@ -24,7 +24,6 @@ ############################################################################### from .importer import RocpdImportData -from .time_window import apply_time_window from . import output_config from . import libpyrocpd @@ -33,12 +32,10 @@ def write_otf2(importData, config): return libpyrocpd.write_otf2(importData, config) -def execute(input, config=None, window_args=None, **kwargs): +def execute(input, config=None, **kwargs): importData = RocpdImportData(input) - apply_time_window(importData, **window_args) - config = ( output_config.output_config(**kwargs) if config is None @@ -62,27 +59,23 @@ def add_args(parser): # default=False, # ) - return [] + def process_args(input, args): + valid_args = [] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + return ret - -def process_args(args, valid_args): - - ret = {} - for itr in valid_args: - if hasattr(args, itr): - val = getattr(args, itr) - if val is not None: - ret[itr] = val - return ret + return process_args def main(argv=None): import argparse - from .time_window import add_args as add_args_time_window - from .time_window import process_args as process_args_time_window - from .output_config import add_args as add_args_output_config - from .output_config import process_args as process_args_output_config - from .output_config import add_generic_args, process_generic_args + from . import time_window + from . import output_config parser = argparse.ArgumentParser( description="Convert rocPD to OTF2 format", allow_abbrev=False @@ -99,21 +92,23 @@ def main(argv=None): help="Input path and filename to one or more database(s), separated by spaces", ) - valid_out_config_args = add_args_output_config(parser) - valid_otf2_args = add_args(parser) - valid_generic_args = add_generic_args(parser) - valid_time_window_args = add_args_time_window(parser) + process_out_config_args = output_config.add_args(parser) + process_otf2_args = add_args(parser) + process_generic_args = output_config.add_generic_args(parser) + process_time_window_args = time_window.add_args(parser) args = parser.parse_args(argv) - out_cfg_args = process_args_output_config(args, valid_out_config_args) - generic_out_cfg_args = process_generic_args(args, valid_generic_args) - window_args = process_args_time_window(args, valid_time_window_args) - otf2_args = process_args(args, valid_otf2_args) + input = RocpdImportData(args.input) + + out_cfg_args = process_out_config_args(input, args) + generic_out_cfg_args = process_generic_args(input, args) + otf2_args = process_otf2_args(input, args) + process_time_window_args(input, args) all_args = {**out_cfg_args, **otf2_args, **generic_out_cfg_args} - execute(args.input, window_args=window_args, **all_args) + execute(input, **all_args) if __name__ == "__main__": diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/output_config.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/output_config.py index d0877384c1..1a7aa466aa 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/output_config.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/output_config.py @@ -36,7 +36,7 @@ except Exception: from . import libpyrocpd -__all__ = ["format_path", "output_config", "add_args", "process_args"] +__all__ = ["format_path", "output_config", "add_args", "add_generic_args"] def _generate_attribute_docs(data): @@ -73,12 +73,9 @@ class output_config(libpyrocpd.output_config): self.update(**kwargs) def update(self, **kwargs): - _strict = kwargs.get("strict", True) - # _verbose = kwargs.get("log-level", "config") + _strict = kwargs.get("strict", False) for key, itr in kwargs.items(): if hasattr(self, key): - # if _verbose in ("info", "trace", "config"): - # print(f" - output_config.{key} = {itr}") if key == "agent_index_value": if itr == "absolute": setattr(self, key, libpyrocpd.agent_indexing.node) @@ -97,10 +94,29 @@ def format_path(path, tag=os.path.basename(sys.executable)): return libpyrocpd.format_path(path, tag) +def sanitize_input_list(input: list): + sanitized_list = [] + for items in input: + if isinstance(items, list): + sanitized_list.extend(sanitize_input_list(items)) + else: + sanitized_list.append(items) + return sanitized_list + + def check_file_exists(filename): - if not os.path.exists(filename): - raise argparse.ArgumentTypeError(f"File '{filename}' does not exist.") - return filename + import glob + + # Check for wildcards passed in + if any(char in filename for char in ["*", "?", "["]): + expanded_files = glob.glob(filename) + if not expanded_files: + raise argparse.ArgumentTypeError(f"File '{filename}' does not exist.") + return sanitize_input_list(expanded_files) + else: + if not os.path.exists(filename): + raise argparse.ArgumentTypeError(f"File '{filename}' does not exist.") + return filename def add_args(parser): @@ -125,6 +141,33 @@ def add_args(parser): required=False, ) + io_options.add_argument( + "--automerge-limit", + help="Change database auto-merge limit (default: 1, auto-merge to 1 DB. max: 8)", + type=int, + required=False, + ) + + def process_args(input, args): + valid_args = ["output_file", "output_path"] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if itr == "output_format": + ret[itr] = val + elif itr == "output_path" and val is not None: + ret[itr] = format_path(val) + elif val is not None: + ret[itr] = val + return ret + + return process_args + + +def add_generic_args(parser): + """Add generic arguments that apply to multiple output formats.""" + kernel_naming_options = parser.add_argument_group("Kernel naming options") kernel_naming_options.add_argument( @@ -134,27 +177,6 @@ def add_args(parser): default=False, ) - return ["output_file", "output_path", "kernel_rename"] - - -def process_args(args, valid_args): - - ret = {} - for itr in valid_args: - if hasattr(args, itr): - val = getattr(args, itr) - if itr == "output_format": - ret[itr] = val - elif itr == "output_path" and val is not None: - ret[itr] = format_path(val) - elif val is not None: - ret[itr] = val - return ret - - -def add_generic_args(parser): - """Add generic arguments that apply to multiple output formats.""" - generic_options = parser.add_argument_group("Generic options") generic_options.add_argument( @@ -167,16 +189,14 @@ def add_generic_args(parser): default="relative", ) - return [ - "agent_index_value", - ] + def process_args(input, args): + valid_args = ["agent_index_value", "kernel_rename"] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + return ret - -def process_generic_args(args, valid_args): - ret = {} - for itr in valid_args: - if hasattr(args, itr): - val = getattr(args, itr) - if val is not None: - ret[itr] = val - return ret + return process_args diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/package.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/package.py new file mode 100644 index 0000000000..aea4a1da24 --- /dev/null +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/package.py @@ -0,0 +1,511 @@ +#!/usr/bin/env python3 +############################################################################### +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +############################################################################### + +import os +import shutil +import datetime +import yaml +import argparse +from . import output_config + +rocpd_package_version = "1.0" +rocpd_metadata_param_version = "rocpd_package_version" + +IDEAL_NUMBER_OF_DATABASE_FILES = 1 +MAX_LIMIT_OF_DATABASE_FILES = 8 + + +def prepare_output_folder(output_path, consolidate) -> str: + """ + Prepares the output folder path with appropriate .rpdb extension. + + When consolidating to current directory, generates a timestamped folder. + Otherwise, ensures the provided path has .rpdb extension. + + Args: + output_path (str): The path to the output folder. + consolidate (bool): Whether to consolidate output files. + + Returns: + str: The output folder path with .rpdb extension. + """ + # Current directory with consolidation - generate timestamped folder + if output_path == os.getcwd(): + if consolidate: + date_str = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + output_path = f"rocpd-{date_str}.rpdb" + else: + # Custom path provided - ensure it has .rpdb extension + if not output_path.endswith(".rpdb"): + output_path = f"{output_path}.rpdb" + return output_path + + +def flatten_rocpd_yaml_input_file(input, **kwargs) -> list: + """ + Processes input files and returns a flattened list of database files. + + Handles multiple input types: + - YAML files containing rocpd metadata + - .rpdb folders with index.yaml + - Direct database files + - Directories containing .db files + - Wildcard patterns + + Optionally merges databases if count exceeds threshold. + + Args: + input (list of str): List of input file paths (YAML, DB, or .rpdb folder). + skip_auto_merge (bool): If True, skip automatic merging of multiple databases. + + Returns: + list: List of database file paths. + """ + import glob + + def parse_yaml_file(yaml_path, base_dir=None): + """ + Parse a rocprofiler-sdk YAML file and extract database file paths. + + Args: + yaml_path (str): Path to the YAML file. + base_dir (str): Base directory for resolving relative paths. + + Returns: + list: Expanded list of database file paths. + """ + with open(yaml_path, "r") as f: + meta = yaml.safe_load(f) + rocpd_meta = meta.get("rocprofiler-sdk", {}).get("rocpd", {}) + + # Check version compatibility + version = rocpd_meta.get(rocpd_metadata_param_version, "0") + if version < rocpd_package_version: + print( + f"Warning: {yaml_path} is using an outdated version of rocpd package ({version})." + ) + + # Determine working directory for relative paths + cwd = ( + base_dir if base_dir is not None else rocpd_meta.get("path", os.getcwd()) + ) + + # Get database file list from YAML + dbs = rocpd_meta.get("files", []) + if isinstance(dbs, str): + dbs = [dbs] + + # Expand each database path (handle wildcards and relative paths) + files = [] + for db in dbs: + db_path = os.path.join(cwd, db) if not os.path.isabs(db) else db + if _contains_wildcard(db_path): + files.extend(glob.glob(db_path)) + else: + files.append(db_path) + + return files + + def _contains_wildcard(path): + """Check if path contains wildcard characters.""" + return "*" in path or "?" in path or "[" in path + + def _process_rpdb_folder(item): + """Process .rpdb folder and extract database files.""" + index_yaml = os.path.join(item, "index.yaml") + if os.path.isfile(index_yaml): + return parse_yaml_file(index_yaml, base_dir=os.path.abspath(item)) + else: + # No index.yaml, search for .db files directly + return glob.glob(os.path.join(item, "*.db")) + + def _process_yaml_file(item): + """Process YAML file and extract database files.""" + base_dir = os.path.dirname(item) + return parse_yaml_file(item, base_dir) + + def _process_directory(item): + """Process directory and find .db files.""" + return glob.glob(os.path.join(item, "*.db")) + + def _process_file_or_pattern(item): + """Process individual file or wildcard pattern.""" + if _contains_wildcard(item): + return glob.glob(item) + else: + return [item] + + # Sanitize and categorize input + sanitized_input = output_config.sanitize_input_list(input) + + # Process each input item based on its type + input_files = [] + for item in sanitized_input: + if item.endswith(".rpdb") and os.path.isdir(item): + input_files.extend(_process_rpdb_folder(item)) + elif item.endswith((".yaml", ".yml")): + input_files.extend(_process_yaml_file(item)) + elif os.path.isdir(item): + input_files.extend(_process_directory(item)) + else: + input_files.extend(_process_file_or_pattern(item)) + + # Validate all files exist + num_dbs = len(input_files) + print(f"Found {num_dbs} database files.") + + for db in input_files: + if not os.path.exists(db): + print(f"Warning: Input database file not found: {db}. Exiting.") + return [] + + # Optionally merge databases if count exceeds threshold + skip_auto_merge = kwargs.get("skip_auto_merge", False) + auto_merge_max_limit = kwargs.get("automerge_limit", IDEAL_NUMBER_OF_DATABASE_FILES) + + # Check if user tried to exceed MAX_LIMIT of DBs. Conservatively set to 8. SQLite limit is 10. + if auto_merge_max_limit > MAX_LIMIT_OF_DATABASE_FILES: + print( + f"SQLite has a database attach limit of 10. Max auto-merge limit of {MAX_LIMIT_OF_DATABASE_FILES} set." + ) + auto_merge_max_limit = MAX_LIMIT_OF_DATABASE_FILES + + if skip_auto_merge: + print("Skip auto merge and packaging.") + return input_files + + if num_dbs > auto_merge_max_limit: + print( + f"More than {auto_merge_max_limit} database files found. " + f"It is recommended to merge and package databases" + ) + merged_files = merge_and_repackage( + input_files, max_limit=auto_merge_max_limit, **kwargs + ) + print(f"Reduced to {len(merged_files)} database files.") + return merged_files + + return input_files + + +def merge_and_repackage( + input_files, max_limit=IDEAL_NUMBER_OF_DATABASE_FILES, **kwargs +) -> list: + """ + Merges and repackages database files into batches to reduce file count. + + If the number of input files is within the limit, returns them unchanged. + Otherwise, merges files into batches and creates a timestamped .rpdb folder + with the merged databases and metadata file. + + Args: + input_files (list of str): List of database file paths to merge. + max_limit (int): Maximum number of output files desired (default: 1). + + Returns: + list: List of merged database file paths. + """ + import uuid + + original_num_dbs = len(input_files) + + # Early return if already within limit + if original_num_dbs <= max_limit: + print( + f"Number of database files ({original_num_dbs}) is within the limit ({max_limit}). " + f"No merging needed." + ) + return input_files + + # Calculate batch size for merging + batch_size = _calculate_batch_size(original_num_dbs, max_limit) + print( + f"Original number of DBs: {original_num_dbs}, " + f"Target number of DBs to merge per batch: {batch_size}" + ) + + # Prepare output folder for merged databases + unique_str = uuid.uuid4() + merged_output_folder = prepare_output_folder(os.getcwd(), consolidate=True) + os.makedirs(merged_output_folder, exist_ok=True) + + # Process databases in batches + merged_files = _process_batches( + input_files, batch_size, merged_output_folder, unique_str, **kwargs + ) + + # Display merged file list + for item in merged_files: + print(f"Reduced file list: {item}") + + # Create metadata file for the merged databases + create_metadata_file(merged_files, output_path=merged_output_folder) + + print( + f"\033[1;34mMerge and repackage completed. " + f"Output files are located in: {merged_output_folder}\033[0m" + ) + + return merged_files + + +def _calculate_batch_size(total_files, max_output_files): + """ + Calculate how many input files should be merged per batch. + + Args: + total_files (int): Total number of input files. + max_output_files (int): Desired maximum number of output files. + + Returns: + int: Number of files to merge per batch. + """ + # Use ceiling division to ensure we don't exceed max_output_files + return (total_files // max_output_files) + (total_files % max_output_files > 0) + + +def _process_batches(input_files, batch_size, output_folder, unique_str, **kwargs): + """ + Process database files in batches, merging or copying as needed. + + Args: + input_files (list): List of input database file paths. + batch_size (int): Number of files per batch. + output_folder (str): Directory to store merged files. + unique_str: Unique identifier for merged filenames. + + Returns: + list: List of merged/copied database file paths. + """ + + merged_files = [] + total_files = len(input_files) + + for batch_index, i in enumerate(range(0, total_files, batch_size)): + batch_files = input_files[i : i + batch_size] + merged_filename = f"merged_db_{batch_index}_{unique_str}.db" + + merged_path = _merge_a_batch( + batch_files, output_folder, merged_filename, **kwargs + ) + merged_files.append(merged_path) + + return merged_files + + +def _merge_a_batch(batch_files, output_folder, output_filename, **kwargs): + """ + Merge multiple files or copy a single file to the output folder. + + Args: + batch_files (list): List of database files in this batch. + output_folder (str): Destination folder. + output_filename (str): Name for the output file. + + Returns: + str: Path to the merged or copied file. + """ + from . import merge + + dest_file = os.path.join(output_folder, output_filename) + + if len(batch_files) > 1: + # Multiple files: merge them + args = {"output_path": output_folder, "output_file": output_filename} + return str(merge.execute(batch_files, **args)) + else: + # Single file: just copy it (optimization) + # Because this is auto-merge, we want to just copy the file, don't just move it for the user. + shutil.copy2(batch_files[0], dest_file) + return str(dest_file) + + +def create_metadata_file(db_files, output_path=".", metadata_filename="index.yaml"): + """ + Creates a metadata file in a custom YAML format for rocprofiler-sdk/rocpd. + + Args: + db_files (list of str): List of absolute or relative paths to SQL database files. + output_path (str): Directory to write the metadata file. + metadata_filename (str): Name of the metadata file to create. + + Returns: + str: Path to the created metadata file. + """ + # Ensure output directory exists + os.makedirs(output_path, exist_ok=True) + + # Compute relative paths + rel_paths = [os.path.relpath(db_file, output_path) for db_file in db_files] + + # Compose the YAML structure + metadata = { + "rocprofiler-sdk": { + "rocpd": { + rocpd_metadata_param_version: rocpd_package_version, + # "source": "rocprofv3", # omitting source, not sure why we need this, and how we determine the source as rocprof-sys, for example. + "path": ".", + "files": ( + rel_paths + if len(rel_paths) > 1 + else (rel_paths[0] if rel_paths else "") + ), + } + } + } + + metadata_path = os.path.join(output_path, metadata_filename) + with open(metadata_path, "w") as f: + yaml.safe_dump(metadata, f, default_flow_style=False) + + return metadata_path + + +def add_args(parser): + """Add arguments for package.""" + + package_options = parser.add_argument_group("Package options") + + package_options.add_argument( + "-c", + "--consolidate", + action="store_true", + help="Consolidate (move) database files into a new folder and generate metadata file pointing to that folder", + ) + + package_options.add_argument( + "--copy", + action="store_true", + help="Copy database files instead of moving them", + ) + + package_options.add_argument( + "-d", + "--output-path", + help="Sets the name of output folder (default : current directory)", + # default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"), + type=str, + required=False, + ) + + def process_args(input, args): + valid_args = [ + "consolidate", + "copy", + "output_path", + ] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + return ret + + return process_args + + +def execute(input_files, **kwargs): + import glob + + output_path_kw = kwargs.get("output_path", os.getcwd()) + consolidate = kwargs.get("consolidate", False) + copy_instead_of_move = kwargs.get("copy", False) + + output_path = prepare_output_folder(output_path_kw, consolidate) + db_files = output_config.sanitize_input_list(input_files) + + # check if a folder is provided, if so, search for *.db + expanded_files = [] + for itr in db_files: + if os.path.isdir(itr): + expanded_files.extend(glob.glob(os.path.join(itr, "*.db"))) + else: + expanded_files.append(itr) + db_files = expanded_files + + if consolidate: + # Create a new folder with current date and time + os.makedirs(output_path, exist_ok=True) + consolidated_files = [] + for db_file in db_files: + dest_file = os.path.join(output_path, os.path.basename(db_file)) + # Only copy if source and destination are not the same file + if os.path.abspath(db_file) != os.path.abspath(dest_file): + if copy_instead_of_move: + shutil.copy2(db_file, dest_file) + else: + shutil.move(db_file, dest_file) + consolidated_files.append(dest_file) + db_files = consolidated_files + + metadata_path = create_metadata_file(db_files, output_path) + + print(f"rocPD package created at: {metadata_path}") + + +def main(argv=None): + """ + Main function to create a metadata file and .rpdb package + + Consolidates to a .rpdb package if --consolidate is specified. + """ + + parser = argparse.ArgumentParser( + description="Package database files into .rpdb output" + ) + + required_params = parser.add_argument_group("Required options") + + required_params.add_argument( + "-i", + "--input", + required=True, + type=output_config.check_file_exists, + nargs="+", + help="Input path and filename to one or more database(s). Wildcards accepted, as well as .rpdb folders", + ) + + process_args = add_args(parser) + + args = parser.parse_args(argv) + + input_files = flatten_rocpd_yaml_input_file( + args.input, skip_auto_merge=True, copy=args.copy + ) + + package_args = process_args(None, args) + + # error check for databases before trying to use the data + if not input_files: + print("Error, no databases found\n") + return + + execute(input_files, **package_args) + + +# This is the entry point for the script. +if __name__ == "__main__": + main() diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/pftrace.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/pftrace.py index a2650062ae..0661784ba9 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/pftrace.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/pftrace.py @@ -24,7 +24,6 @@ ############################################################################### from .importer import RocpdImportData -from .time_window import apply_time_window from . import output_config from . import libpyrocpd @@ -33,12 +32,10 @@ def write_pftrace(importData, config): return libpyrocpd.write_perfetto(importData, config) -def execute(input, config=None, window_args=None, **kwargs): +def execute(input, config=None, **kwargs): importData = RocpdImportData(input) - apply_time_window(importData, **window_args) - config = ( output_config.output_config(**kwargs) if config is None @@ -90,33 +87,30 @@ def add_args(parser): default=False, ) - return [ - "perfetto_backend", - "perfetto_buffer_fill_policy", - "perfetto_buffer_size", - "perfetto_shmem_size_hint", - "group_by_queue", - ] + def process_args(input, args): + valid_args = [ + "perfetto_backend", + "perfetto_buffer_fill_policy", + "perfetto_buffer_size", + "perfetto_shmem_size_hint", + "group_by_queue", + ] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + return ret - -def process_args(args, valid_args): - - ret = {} - for itr in valid_args: - if hasattr(args, itr): - val = getattr(args, itr) - if val is not None: - ret[itr] = val - return ret + return process_args def main(argv=None): import argparse from .time_window import add_args as add_args_time_window - from .time_window import process_args as process_args_time_window from .output_config import add_args as add_args_output_config - from .output_config import process_args as process_args_output_config - from .output_config import add_generic_args, process_generic_args + from .output_config import add_generic_args parser = argparse.ArgumentParser( description="Convert rocPD to Perfetto file", allow_abbrev=False @@ -133,17 +127,18 @@ def main(argv=None): help="Input path and filename to one or more database(s), separated by spaces", ) - valid_out_config_args = add_args_output_config(parser) - valid_pftrace_args = add_args(parser) - valid_generic_args = add_generic_args(parser) - valid_time_window_args = add_args_time_window(parser) + process_out_config_args = add_args_output_config(parser) + process_pftrace_args = add_args(parser) + process_generic_args = add_generic_args(parser) + process_time_window_args = add_args_time_window(parser) args = parser.parse_args(argv) + input = RocpdImportData(args.input) - out_cfg_args = process_args_output_config(args, valid_out_config_args) - pftrace_args = process_args(args, valid_pftrace_args) - generic_out_cfg_args = process_generic_args(args, valid_generic_args) - window_args = process_args_time_window(args, valid_time_window_args) + out_cfg_args = process_out_config_args(input, args) + pftrace_args = process_pftrace_args(input, args) + generic_out_cfg_args = process_generic_args(input, args) + process_time_window_args(input, args) all_args = { **pftrace_args, @@ -152,8 +147,7 @@ def main(argv=None): } execute( - args.input, - window_args=window_args, + input, **all_args, ) diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/query.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/query.py index c4099f2994..b7f945ab83 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/query.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/query.py @@ -32,7 +32,15 @@ from datetime import datetime from . import output_config from . import libpyrocpd from .importer import RocpdImportData -from .time_window import apply_time_window + +__all__ = [ + "export_sqlite_query", + "send_report_email", + "zip_files", + "add_args", + "execute", + "main", +] def export_sqlite_query( @@ -439,34 +447,18 @@ def add_args(parser): "--template-path", help="Path to a Jinja2 HTML template for the dashboard" ) - return [ - "query", - "script", - "email_to", - "email_from", - "email_subject", - "smtp_server", - "smtp_port", - "smtp_user", - "smtp_password", - "inline_preview", - "zip_attachments", - "format", - "template_path", - ] + def process_args(input, args): + ret = {} + return ret + + return process_args -def process_args(args, valid_args): - # do not add any of the arguments to the output config dict - ret = {} - return ret +def execute(input, args, config=None, **kwargs): - -def execute(input, args, config=None, window_args=None, **kwargs): - - importData = RocpdImportData(input) - - apply_time_window(importData, **window_args) + importData = RocpdImportData( + input, automerge_limit=getattr(args, "automerge_limit", None) + ) config = ( output_config.output_config(**kwargs) @@ -524,11 +516,8 @@ def execute(input, args, config=None, window_args=None, **kwargs): def main(argv=None): import argparse - from .time_window import add_args as add_args_time_window - from .time_window import process_args as process_args_time_window - from .output_config import add_args as add_args_output_config - from .output_config import process_args as process_args_output_config - from .output_config import add_generic_args, process_generic_args + from . import time_window + from . import output_config parser = argparse.ArgumentParser( description="Generate report for rocpd query", allow_abbrev=False @@ -545,17 +534,19 @@ def main(argv=None): help="Input path and filename to one or more database(s), separated by spaces", ) - valid_out_config_args = add_args_output_config(parser) - valid_generic_args = add_generic_args(parser) - valid_time_window_args = add_args_time_window(parser) - valid_query_args = add_args(parser) + process_out_config_args = output_config.add_args(parser) + process_generic_args = output_config.add_generic_args(parser) + process_time_window_args = time_window.add_args(parser) + process_query_args = add_args(parser) args = parser.parse_args(argv) - out_cfg_args = process_args_output_config(args, valid_out_config_args) - generic_out_cfg_args = process_generic_args(args, valid_generic_args) - window_args = process_args_time_window(args, valid_time_window_args) - query_args = process_args(args, valid_query_args) + input = RocpdImportData(args.input) + + out_cfg_args = process_out_config_args(input, args) + generic_out_cfg_args = process_generic_args(input, args) + query_args = process_query_args(input, args) + process_time_window_args(input, args) all_args = { **query_args, @@ -564,9 +555,8 @@ def main(argv=None): } execute( - args.input, + input, args, - window_args=window_args, **all_args, ) diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py index 6f4c4dfb9b..7f02419511 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py @@ -30,7 +30,19 @@ import math from typing import Any, List, Tuple from .importer import RocpdImportData, execute_statement from .query import export_sqlite_query -from . import output_config + +__all__ = [ + "generate_all_summaries", + "generate_summary_query", + "generate_domain_query", + "create_domain_query", + "create_summary_queries", + "create_summary_region_queries", + "export_query", + "add_args", + "execute", + "main", +] def check_function_availability(connection, function_name): @@ -471,12 +483,10 @@ def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None: # # Command-line interface functions # +def add_args(parser): + """Add arguments for summary.""" - -def add_io_args(parser): - """Add input/output arguments for summary.""" io_options = parser.add_argument_group("I/O options") - io_options.add_argument( "-f", "--format", @@ -486,28 +496,7 @@ def add_io_args(parser): type=str, required=False, ) - io_options.add_argument( - "-o", - "--output-file", - help="Sets the base output file name", - default=os.environ.get("ROCPD_OUTPUT_NAME", ""), - type=str, - required=False, - ) - io_options.add_argument( - "-d", - "--output-path", - help="Sets the output path where the output files will be saved (default path: `./rocpd-output-data`)", - default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"), - type=str, - required=False, - ) - return ["format", "output_file", "output_path"] - - -def add_args(parser): - """Add arguments for summary.""" summary_options = parser.add_argument_group("Summary options") summary_options.add_argument( "--domain-summary", @@ -528,26 +517,25 @@ def add_args(parser): help="Specify region categories to include in the summary (example: HIP, HSA, RCCL, ROCDECODE, ROCJPEG, MARKER). If not specified, categories will be automatically retrieved from the database.", ) - return ["domain_summary", "summary_by_rank", "region_categories"] + def process_args(input, args): + valid_args = ["format", "domain_summary", "summary_by_rank", "region_categories"] + + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + return ret + + return process_args -def process_args(args, valid_args): +def execute(input, **kwargs: Any) -> RocpdImportData: - ret = {} - for itr in valid_args: - if hasattr(args, itr): - val = getattr(args, itr) - if val is not None: - ret[itr] = val - return ret - - -def execute(input, window_args=None, **kwargs: Any) -> RocpdImportData: - from .time_window import apply_time_window - - importData = RocpdImportData(input) - - apply_time_window(importData, **window_args) + importData = RocpdImportData( + input, automerge_limit=getattr(kwargs, "automerge_limit", None) + ) generate_all_summaries(importData, **kwargs) @@ -556,12 +544,10 @@ def execute(input, window_args=None, **kwargs: Any) -> RocpdImportData: def main(argv=None) -> int: """Main entry point for command line execution.""" - from .time_window import add_args as add_args_time_window - from .time_window import process_args as process_args_time_window + from . import time_window + from . import output_config - parser = argparse.ArgumentParser( - description="Create ROCpd database summary region views" - ) + parser = argparse.ArgumentParser(description="Generate summary views from rocPD data") required_params = parser.add_argument_group("Required options") required_params.add_argument( @@ -573,21 +559,22 @@ def main(argv=None) -> int: help="Input path and filename to one or more database(s), separated by spaces", ) - valid_io_args = add_io_args(parser) - valid_summary_args = add_args(parser) - valid_time_window_args = add_args_time_window(parser) + process_outcfg_args = output_config.add_args(parser) + process_summary_args = add_args(parser) + process_time_window_args = time_window.add_args(parser) args = parser.parse_args(argv) - summary_args = process_args(args, valid_summary_args) - io_args = output_config.process_args(args, valid_io_args) - window_args = process_args_time_window(args, valid_time_window_args) + input = RocpdImportData(args.input) + + summary_args = process_summary_args(input, args) + io_args = process_outcfg_args(input, args) + process_time_window_args(input, args) all_args = {**summary_args, **io_args} execute( args.input, - window_args=window_args, **all_args, ) diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/time_window.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/time_window.py index 79aec8c095..05b2610c7e 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/time_window.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/time_window.py @@ -26,10 +26,12 @@ import argparse import sqlite3 from argparse import ArgumentParser -from typing import Optional, Tuple, Dict, Any, List +from typing import Optional, Tuple, Dict, Any from .importer import RocpdImportData, execute_statement +__all__ = ["apply_time_window", "execute", "add_args", "main"] + def get_marker_timestamp( connection: sqlite3.Connection, marker_name: str, marker_type: str = "start" @@ -265,7 +267,7 @@ def apply_time_window(connection: RocpdImportData, **kwargs: Any) -> None: # # Command-line interface functions # -def add_args(parser: ArgumentParser) -> List[str]: +def add_args(parser: ArgumentParser): """Add time slice arguments to an existing parser.""" tw_options = parser.add_argument_group("Time window options") @@ -307,18 +309,21 @@ def add_args(parser: ArgumentParser) -> List[str]: default=True, ) - return ["start", "end", "inclusive", "start_marker", "end_marker"] + def process_args(input, args): + valid_args = ["start", "end", "inclusive", "start_marker", "end_marker"] + ret = {} + for itr in valid_args: + if hasattr(args, itr): + val = getattr(args, itr) + if val is not None: + ret[itr] = val + if ret and input is not None: + apply_time_window(input, **ret) -def process_args(args, valid_args): + return ret - ret = {} - for itr in valid_args: - if hasattr(args, itr): - val = getattr(args, itr) - if val is not None: - ret[itr] = val - return ret + return process_args def execute(input_rpd: str, **kwargs: Any) -> RocpdImportData: diff --git a/projects/rocprofiler-sdk/source/lib/python/utilities.cmake b/projects/rocprofiler-sdk/source/lib/python/utilities.cmake index 1bc9171a3e..8f51fe971b 100644 --- a/projects/rocprofiler-sdk/source/lib/python/utilities.cmake +++ b/projects/rocprofiler-sdk/source/lib/python/utilities.cmake @@ -170,8 +170,10 @@ function(rocprofiler_rocpd_python_bindings _VERSION) importer.py __init__.py __main__.py + merge.py output_config.py otf2.py + package.py pftrace.py query.py schema.py diff --git a/projects/rocprofiler-sdk/tests/rocprofv3/rocpd/CMakeLists.txt b/projects/rocprofiler-sdk/tests/rocprofv3/rocpd/CMakeLists.txt index b413b42d7e..75b8dd37b4 100644 --- a/projects/rocprofiler-sdk/tests/rocprofv3/rocpd/CMakeLists.txt +++ b/projects/rocprofiler-sdk/tests/rocprofv3/rocpd/CMakeLists.txt @@ -48,8 +48,18 @@ add_test( --runtime-trace --kernel-rename --output-config --pmc SQ_WAVES -- $ 500 2) +# Can remove this test if MULTIPROC is re-enabled and rely on the multiproc databases for +# packaging +add_test( + NAME rocprofv3-test-rocpd-execute2 + COMMAND + $ -d + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data -o out2 --output-format rocpd + --runtime-trace --kernel-rename --pmc SQ_WAVES -- + $ 500 2) + set_tests_properties( - rocprofv3-test-rocpd-execute + rocprofv3-test-rocpd-execute rocprofv3-test-rocpd-execute2 PROPERTIES TIMEOUT 120 LABELS @@ -140,7 +150,7 @@ set_tests_properties( ######################################################################################### # -# perfetto generate +# Perfetto generation # ######################################################################################### @@ -194,7 +204,7 @@ set_tests_properties( ######################################################################################### # -# CSV generate +# CSV generation # ######################################################################################### @@ -224,7 +234,7 @@ set_tests_properties( ######################################################################################### # -# Summary generate +# Summary generation # ######################################################################################### add_test( @@ -311,3 +321,211 @@ set_tests_properties( "${ROCPROFILER_DEFAULT_FAIL_REGEX}" FIXTURES_REQUIRED rocprofv3-test-rocpd-generation) + +######################################################################################### +# +# Package generation +# +######################################################################################### + +add_test( + NAME rocprofv3-test-rocpd-package-generation + COMMAND + ${Python3_EXECUTABLE} -m rocpd package --consolidate -d + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/test_package -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/out*.db) + +set_tests_properties( + rocprofv3-test-rocpd-package-generation + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-execute" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd) + +add_test( + NAME rocprofv3-test-rocpd-package-generation-multiproc + COMMAND + ${Python3_EXECUTABLE} -m rocpd package --consolidate -d + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/test_package_multi -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/out*.db) + +set_tests_properties( + rocprofv3-test-rocpd-package-generation-multiproc + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-execute-multiproc" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + DISABLED + "${MULTIPROC_IS_DISABLED}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd-multiproc) + +######################################################################################### +# +# Query SQL string using package +# +######################################################################################### + +add_test( + NAME rocprofv3-test-rocpd-query-using-package + COMMAND + ${Python3_EXECUTABLE} -m rocpd query -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/test_package.rpdb --query + "SELECT id, type FROM rocpd_info_agent LIMIT 10;") + +set_tests_properties( + rocprofv3-test-rocpd-query-using-package + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-package-generation" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd) + +add_test( + NAME rocprofv3-test-rocpd-query-using-package-multiproc + COMMAND + ${Python3_EXECUTABLE} -m rocpd query -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/test_package_multi.rpdb + --query "SELECT id, type FROM rocpd_info_agent LIMIT 10;") + +set_tests_properties( + rocprofv3-test-rocpd-query-using-package-multiproc + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-package-generation-multiproc" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + DISABLED + "${MULTIPROC_IS_DISABLED}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd-multiproc) + +######################################################################################### +# +# Merge database generation with package +# +######################################################################################### + +add_test( + NAME rocprofv3-test-rocpd-merge-generation-using-package + COMMAND + ${Python3_EXECUTABLE} -m rocpd merge -d + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data -o merged_db -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/test_package.rpdb) + +set_tests_properties( + rocprofv3-test-rocpd-merge-generation-using-package + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-package-generation" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd) + +add_test( + NAME rocprofv3-test-rocpd-merge-generation-using-package-multiproc + COMMAND + ${Python3_EXECUTABLE} -m rocpd merge -d + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc -o merged_db_multi.db -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/test_package_multi.rpdb) + +set_tests_properties( + rocprofv3-test-rocpd-merge-generation-using-package-multiproc + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-package-generation-multiproc" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + DISABLED + "${MULTIPROC_IS_DISABLED}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd-multiproc) + +######################################################################################### +# +# Query SQL string using merged database +# +######################################################################################### + +add_test( + NAME rocprofv3-test-rocpd-query-using-mergedDB + COMMAND + ${Python3_EXECUTABLE} -m rocpd query -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/merged_db.db --query + "SELECT id, type FROM rocpd_info_agent LIMIT 10;") + +set_tests_properties( + rocprofv3-test-rocpd-query-using-mergedDB + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-merge-generation-using-package" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd) + +add_test( + NAME rocprofv3-test-rocpd-query-using-mergedDB-multiproc + COMMAND + ${Python3_EXECUTABLE} -m rocpd query -i + ${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/merged_db_multi.db --query + "SELECT id, type FROM rocpd_info_agent LIMIT 10;") + +set_tests_properties( + rocprofv3-test-rocpd-query-using-mergedDB-multiproc + PROPERTIES TIMEOUT + 120 + LABELS + "integration-tests;rocpd" + ENVIRONMENT + "${rocprofv3-rocpd-env}" + DEPENDS + "rocprofv3-test-rocpd-merge-generation-using-package-multiproc" + FAIL_REGULAR_EXPRESSION + "${ROCPROFILER_DEFAULT_FAIL_REGEX}" + DISABLED + "${MULTIPROC_IS_DISABLED}" + FIXTURES_REQUIRED + rocprofv3-test-rocpd-multiproc)