[rocpd] Adding merge and package submodules for rocpd (#164)

* adding ROCpd database merge

* adding ROCpd database merge concatenating all tables

* update merge script

  - copy all tables from files

* fix merge format

* Add package submodule, initial POC.  Need to refine

* Minor fixes and clean up duplicated code in package.py

* Revamp metadata layout, add wildcard and .rpdb parsing

* Add auto merge & package when > 5 DBs, add examples, don't use auto_merge when using sub-commands merge & package

* - Extend package/yaml inputs to all rocpd modules
- Improve handling more corner cases for bad input files when parsing input parameters (bad yaml files, bad .rpdb folder, folders as input)
- Changed to use UUID in merged filename instead of the time, in auto-merge algorithm

* Minor text fixes for consistancy between modules

* Add more wildcard support and add package, merge tests

* Make changes based on review suggestions

* Move parsing packages into importer.py, simplified adding required params to a function

* fix package test by flattening input list before processing

* Integrate merge.py changes from Jonathan to add name-collision checks, recreating indexes, foreign key check (disabled for now, due to processing time)

* Rework rocpd.<submodule>.{add_args,process_args}

- add_args function returns a functor which accepts input and args
- time_window functor returned from add_args automatically applies time windowing of input

* change merge&package limit to 1, merge should create data views

* Move files by default instead of making copies

- copying can be enabled by passing "copy=True" or --copy cmdline argument

* refactor package to make the logic cleaner, set merge limit back to 5

* Allow automerge-limit param to override limit, change default back to 1.  Tests updated to use query, much quicker

* Update --help instructions for package

---------

Co-authored-by: acanadas <acanadas@amd.com>
Co-authored-by: a-canadasruiz <Araceli.CanadasRuiz@amd.com>
Co-authored-by: Young Hui <young.hui@amd.com>
Co-authored-by: Jonathan R. Madsen <jonathanrmadsen@gmail.com>
Цей коміт міститься в:
systems-assistant[bot]
2025-11-12 17:07:12 -05:00
зафіксовано GitHub
джерело f58393108f
коміт 061948a5ec
13 змінених файлів з 1512 додано та 330 видалено
+146 -82
Переглянути файл
@@ -39,8 +39,10 @@ def main(argv=None, config=None):
"""
import argparse
from . import csv
from . import merge
from . import otf2
from . import output_config
from . import package
from . import pftrace
from . import query
from . import summary
@@ -64,6 +66,34 @@ Example usage:
Convert 2 databases, output CSV, OTF2, and perfetto trace formats
$ rocpd convert -i db{3,4}.db --output-format csv otf2 pftrace
"""
merge_examples = """
Example usage:
Merge the three databases and output to a folder called merged3DBs
$ rocpd merge -i db0.db db1.db db2.db -d merged3DBs
Merge all the databases from the node0 folder and output to the node0_output folder, with filename called largeMerged.db
$ rocpd merge -i node0/*.db -d node0_output -o largeMerged
"""
package_examples = """
Example usage:
Index the three databases into a metadata file (index.yaml) in the current folder, just reference the databases where they are on the filesystem
$ rocpd package -i node0/db0.db node1/db1.db node2/db2.db
Package and copy/consolidate all the databases into a my_MPI_run_1.rpdb folder so it can be managed easier
$ rocpd package -i node0/db0.db node1/db1.db node2/db2.db -d my_MPI_run_1 --consolidate --copy
Package and copy/consolidate all the databases from my_MPI_run_1.rpdb folder append node5/db5.db and make new folder
$ rocpd package -i my_MPI_run_1.rpdb node5/db5.db -d my_MPI_run_1_append_5 --consolidate --copy
Use my_MPI_run_1.rpdb folder and move/consolidate node7/db7.db and re-use same .rpdb folder
$ rocpd package -i my_MPI_run_1.rpdb node7/db7.db -d my_MPI_run_1 --consolidate
"""
query_examples = """
@@ -94,6 +124,7 @@ Example usage:
$ rocpd summary -i db{0,1}.db --region-categories HIP MARKERS --domain-summary --format html
"""
input_help_string = "Input path and filename to one or more database(s). Wildcards accepted, as well as .rpdb folders"
# Add the subparsers
parser = argparse.ArgumentParser(
@@ -109,6 +140,18 @@ Example usage:
help="Print the version information and exit",
)
def add_required_args(_parser):
_required_params = _parser.add_argument_group("Required options")
_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help=input_help_string,
)
return _required_params
subparsers = parser.add_subparsers(dest="command")
converter = subparsers.add_parser(
"convert",
@@ -118,6 +161,22 @@ Example usage:
epilog=convert_examples,
)
merger = subparsers.add_parser(
"merge",
description="Generate merged database from rocPD databases",
allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
epilog=merge_examples,
)
packager = subparsers.add_parser(
"package",
description="Package database files into .rpdb output",
allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
epilog=package_examples,
)
query_reporter = subparsers.add_parser(
"query",
description="Generate output on a query",
@@ -138,15 +197,7 @@ Example usage:
return val.lower().replace("perfetto", "pftrace")
# add required options for each subparser
converter_required_params = converter.add_argument_group("Required options")
converter_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s)",
)
converter_required_params = add_required_args(converter)
converter_required_params.add_argument(
"-f",
"--output-format",
@@ -158,43 +209,39 @@ Example usage:
required=True,
)
query_required_params = query_reporter.add_argument_group("Required options")
query_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s)",
)
summary_required_params = generate_summary.add_argument_group("Required options")
summary_required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s)",
)
add_required_args(merger)
add_required_args(packager)
add_required_args(query_reporter)
add_required_args(generate_summary)
# converter: add args from any sub-modules
valid_out_config_args = output_config.add_args(converter)
valid_generic_args = output_config.add_generic_args(converter)
valid_pftrace_args = pftrace.add_args(converter)
valid_csv_args = csv.add_args(converter)
valid_otf2_args = otf2.add_args(converter)
valid_time_window_args = time_window.add_args(converter)
process_converter_args = []
process_converter_args.append(output_config.add_args(converter))
process_converter_args.append(output_config.add_generic_args(converter))
process_converter_args.append(pftrace.add_args(converter))
process_converter_args.append(csv.add_args(converter))
process_converter_args.append(otf2.add_args(converter))
process_converter_args.append(time_window.add_args(converter))
# merge: subparser args
process_merger_args = []
process_merger_args.append(merge.add_args(merger))
# package: subparser args
process_packager_args = []
process_packager_args.append(package.add_args(packager))
# query: subparser args
valid_out_config_args = output_config.add_args(query_reporter)
valid_query_args = query.add_args(query_reporter)
valid_time_window_args = time_window.add_args(query_reporter)
process_query_reporter_args = []
process_query_reporter_args.append(output_config.add_args(query_reporter))
process_query_reporter_args.append(query.add_args(query_reporter))
process_query_reporter_args.append(time_window.add_args(query_reporter))
# summary: subparser args
valid_io_args = summary.add_io_args(generate_summary)
valid_summary_args = summary.add_args(generate_summary)
valid_time_window_args = time_window.add_args(generate_summary)
process_generate_summary_args = []
process_generate_summary_args.append(output_config.add_args(generate_summary))
process_generate_summary_args.append(summary.add_args(generate_summary))
process_generate_summary_args.append(time_window.add_args(generate_summary))
# parse the command line arguments
args = parser.parse_args(argv)
@@ -213,30 +260,18 @@ Example usage:
# if the user requested converter, process the conversion
if args.command == "convert":
# process the args
out_cfg_args = output_config.process_args(args, valid_out_config_args)
generic_out_cfg_args = output_config.process_generic_args(
args, valid_generic_args
# construct the rocpd import data object
input = RocpdImportData(
args.input,
automerge_limit=getattr(
args, "automerge_limit", package.IDEAL_NUMBER_OF_DATABASE_FILES
),
)
pftrace_args = pftrace.process_args(args, valid_pftrace_args)
csv_args = csv.process_args(args, valid_csv_args)
otf2_args = otf2.process_args(args, valid_otf2_args)
window_args = time_window.process_args(args, valid_time_window_args)
# now start processing the data. Import the data and merge the views
importData = RocpdImportData(args.input)
all_args = {}
for pitr in process_converter_args:
all_args.update(pitr(input, args))
# adjust the time window view of the data
if window_args is not None:
time_window.apply_time_window(importData, **window_args)
all_args = {
**out_cfg_args,
**generic_out_cfg_args,
**pftrace_args,
**csv_args,
**otf2_args,
}
# setup the config args
config = (
output_config.output_config(**all_args)
@@ -254,42 +289,71 @@ Example usage:
for out_format in args.output_format:
if out_format in format_handlers:
print(f"Converting database(s) to {out_format} format:")
format_handlers[out_format](importData, config)
format_handlers[out_format](input, config)
else:
print(f"Warning: Unsupported output format '{out_format}'")
# if the user requested merge module, execute the merge
elif args.command == "merge":
# no construction of the import data object
input = None
# merge subparser args
merge_args = {}
for pitr in process_merger_args:
merge_args.update(pitr(input, args))
merge.execute(args.input, **merge_args)
# if the user requested package module, package up the database
elif args.command == "package":
# construct the rocpd import data object
input = None
# package subparser args
packager_args = {}
for pitr in process_packager_args:
packager_args.update(pitr(input, args))
package.execute(args.input, **packager_args)
# if the user requested query module, execute the query
elif args.command == "query":
# query subparser args
query_args = query.process_args(args, valid_query_args)
out_cfg_args = output_config.process_args(args, valid_out_config_args)
window_args = time_window.process_args(args, valid_time_window_args)
# construct the rocpd import data object
input = RocpdImportData(
args.input,
automerge_limit=getattr(
args, "automerge_limit", package.IDEAL_NUMBER_OF_DATABASE_FILES
),
)
all_args = {**query_args, **out_cfg_args}
# query subparser args
query_args = {}
for pitr in process_query_reporter_args:
query_args.update(pitr(input, args))
query.execute(
args.input,
input,
args,
window_args=window_args,
**all_args,
**query_args,
)
# if the user requested a summary, generate the views
elif args.command == "summary":
# construct the rocpd import data object
input = RocpdImportData(
args.input,
automerge_limit=getattr(
args, "automerge_limit", package.IDEAL_NUMBER_OF_DATABASE_FILES
),
)
# summary subparser args
summary_args = summary.process_args(args, valid_summary_args)
io_args = output_config.process_args(args, valid_io_args)
window_args = time_window.process_args(args, valid_time_window_args)
summary_args = {}
for pitr in process_generate_summary_args:
summary_args.update(pitr(input, args))
# now start processing the data. Import the data and merge the views
importData = RocpdImportData(args.input)
# adjust the time window view of the data
if window_args is not None:
time_window.apply_time_window(importData, **window_args)
all_args = {**summary_args, **io_args}
summary.generate_all_summaries(importData, **all_args)
summary.generate_all_summaries(input, **summary_args)
print("Done. Exiting...")
+18 -23
Переглянути файл
@@ -28,7 +28,6 @@ import re
from .importer import RocpdImportData
from .query import export_sqlite_query
from .time_window import apply_time_window
from . import output_config
from . import libpyrocpd
@@ -413,12 +412,10 @@ def write_csv(importData, config):
write_scratch_memory_csv(importData, config)
def execute(input, config=None, window_args=None, **kwargs):
def execute(input, config=None, **kwargs):
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
config = (
output_config.output_config(**kwargs)
if config is None
@@ -431,21 +428,17 @@ def execute(input, config=None, window_args=None, **kwargs):
def add_args(parser):
"""Add csv arguments."""
return []
def process_args(input, args):
ret = {}
return ret
def process_args(args, valid_args):
ret = {}
return ret
return process_args
def main(argv=None):
import argparse
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
from .output_config import add_args as add_args_output_config
from .output_config import process_args as process_args_output_config
from .output_config import add_generic_args, process_generic_args
from . import time_window
from . import output_config
parser = argparse.ArgumentParser(
description="Convert rocPD to CSV files",
@@ -464,17 +457,19 @@ def main(argv=None):
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_out_config_args = add_args_output_config(parser)
valid_generic_args = add_generic_args(parser)
valid_time_window_args = add_args_time_window(parser)
valid_csv_args = add_args(parser)
process_out_config_args = output_config.add_args(parser)
process_generic_args = output_config.add_generic_args(parser)
process_time_window_args = time_window.add_args(parser)
process_csv_args = add_args(parser)
args = parser.parse_args(argv)
out_cfg_args = process_args_output_config(args, valid_out_config_args)
generic_out_cfg_args = process_generic_args(args, valid_generic_args)
window_args = process_args_time_window(args, valid_time_window_args)
csv_args = process_args(args, valid_csv_args)
input = RocpdImportData(args.input)
out_cfg_args = process_out_config_args(input, args)
generic_out_cfg_args = process_generic_args(input, args)
csv_args = process_csv_args(input, args)
process_time_window_args(input, args)
all_args = {
**out_cfg_args,
@@ -482,7 +477,7 @@ def main(argv=None):
**csv_args,
}
execute(args.input, window_args=window_args, **all_args)
execute(input, **all_args)
if __name__ == "__main__":
+49 -10
Переглянути файл
@@ -28,6 +28,7 @@
#
import sys
import os
import sqlite3
from .schema import RocpdSchema
@@ -36,9 +37,33 @@ from . import libpyrocpd
__all__ = ["RocpdImportData", "execute_statement"]
def internal_init(_input, _output, skip_auto_merge, automerge_limit):
from . import package
_input = package.flatten_rocpd_yaml_input_file(
_input, skip_auto_merge=skip_auto_merge, automerge_limit=automerge_limit
)
assert not os.path.isdir(_output), "Output database name must not be a directory"
assert _check_for_valid_dbs(
_input
), "RocpdImportData error, invalid SQLite3 database provided"
_connection = libpyrocpd.connect(_output)
_connection.execute("PRAGMA foreign_keys = ON")
_table_info = _create_temp_views(_connection, _input)
_create_meta_views(_connection)
return (_connection, _input, _table_info)
class RocpdImportData(libpyrocpd.RocpdImportData):
def __init__(self, input):
def __init__(
self, input, skip_auto_merge=False, automerge_limit=None, dbname=":memory:"
):
from . import package
if automerge_limit is None:
automerge_limit = package.IDEAL_NUMBER_OF_DATABASE_FILES
if isinstance(input, RocpdImportData):
super(RocpdImportData, self).__init__(input)
self.table_info = input.table_info
@@ -48,15 +73,13 @@ class RocpdImportData(libpyrocpd.RocpdImportData):
raise ValueError(
"RocpdImportData does not accept existing sqlite3 connections"
)
elif isinstance(input, str):
_connection = libpyrocpd.connect(input)
_filenames = [input]
elif isinstance(input, list) and len(input) > 0 and isinstance(input[0], str):
_connection = libpyrocpd.connect(":memory:")
_filenames = input[:]
_connection.execute("PRAGMA foreign_keys = ON")
self.table_info = _create_temp_views(_connection, input)
_create_meta_views(_connection)
elif isinstance(input, str) or (
isinstance(input, list) and len(input) > 0 and isinstance(input[0], str)
):
_connection, _filenames, _table_info = internal_init(
input, dbname, skip_auto_merge, automerge_limit
)
self.table_info = _table_info
else:
raise ValueError(
f"input is unsupported type. Expected sqlite3.Connection, string, or (non-empty) list of strings. type={type(input).__name__}"
@@ -75,6 +98,22 @@ class RocpdImportData(libpyrocpd.RocpdImportData):
return self.connection.__exit__(exc_type, exc, tb)
def _is_sqlite_db(file_path):
with open(file_path, "rb") as f:
header = f.read(16)
return header == b"SQLite format 3\x00"
def _check_for_valid_dbs(input_files) -> bool:
# check the list of .db files to confirm they are SQLite3 DBs
for file in input_files:
sqlite_db = _is_sqlite_db(file)
if not sqlite_db:
print(f"Error: {file} is not an SQLite3 database. File not supported.")
return False
return True
def execute_statement(conn, statement, is_script=False):
if isinstance(conn, RocpdImportData):
_conn = conn.connection
+362
Переглянути файл
@@ -0,0 +1,362 @@
#!/usr/bin/env python3
###############################################################################
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
###############################################################################
import argparse
import os
import sqlite3
import time
from typing import List, Dict, Iterable, Optional, Callable, Any
def merge_sqlite_dbs(
sources: Iterable[str],
dest_path: str,
on_log: Optional[Callable[[str], None]] = None,
) -> None:
"""
Merge multiple SQLite databases into a single destination database.
Parameters
----------
sources : Iterable[str]
Paths to source databases.
dest_path : str
Path to destination database.
on_log : Optional[Callable[[str], None]]
Logger function; defaults to None. Pass `print` to generate logs.
"""
def log(msg: str) -> None:
if on_log:
on_log(f" {msg}")
sources = list(sources)
if not sources:
raise ValueError("No source databases provided")
# Prepare output directory
output_dir = os.path.dirname(os.path.abspath(dest_path)) or os.getcwd()
os.makedirs(output_dir, exist_ok=True)
# Remove existing file
if os.path.isfile(dest_path):
os.remove(dest_path)
uuids = []
views = []
data_views = []
schema_versions = []
with sqlite3.connect(str(dest_path)) as conn:
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA synchronous = NORMAL;")
conn.execute("PRAGMA foreign_keys = OFF;") # defer FK checks until end
# One big atomic transaction
with conn:
# Attach sources one by one
for i, src in enumerate(sources, 1):
alias = f"src{i}"
conn.execute(f"ATTACH DATABASE ? AS {alias}", (src,))
print(f"Adding {src}")
log(f"Attached {src} AS {alias}")
# UUIDs and schema version
_uuids = [
itr[0]
for itr in conn.execute(
f"SELECT value FROM {alias}.rocpd_metadata WHERE tag='uuid'",
).fetchall()
]
uuids += [itr for itr in _uuids if itr not in uuids]
_schema_versions = [
itr[0]
for itr in conn.execute(
f"SELECT value FROM {alias}.rocpd_metadata WHERE tag='schema_version'",
).fetchall()
]
schema_versions += _schema_versions
# Helper: fetch rows from attached sqlite_master
def fetch_master(_alias: str, kind: str):
cur = conn.execute(
f"""
SELECT name, sql
FROM {_alias}.sqlite_master
WHERE type = ? AND name NOT LIKE 'sqlite_%'
ORDER BY name
""",
(kind,),
)
return cur.fetchall()
# Track dest tables to detect collisions quickly
existing_tables = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
}
# 1) Create tables
for name, create_sql in fetch_master(alias, "table"):
if name in existing_tables:
raise AssertionError(
f"Table name collision for '{name}' from {alias}; "
"assumption of globally-unique table names violated."
)
if not create_sql:
continue
log(f"Creating table {name}")
conn.execute(create_sql)
existing_tables.add(name)
# 2) Copy table data
tbls = [name for name, _ in fetch_master(alias, "table")]
print(f"Tables found: {len(tbls)}")
for name in tbls:
log(f"Inserting rows into {name} from {alias}.{name}")
rows = conn.execute(f'SELECT * FROM {alias}."{name}"').fetchall()
if rows:
col_count = len(rows[0])
placeholders = ", ".join(["?"] * col_count)
conn.executemany(
f'INSERT INTO "{name}" VALUES ({placeholders})', rows
)
# 3) Recreate indexes (make idempotent with IF NOT EXISTS)
def inject_if_not_exists_in_index_sql(sql: str) -> str:
# Naive, but works for standard forms produced by sqlite_master
# Handles UNIQUE and non-UNIQUE:
# "CREATE INDEX name ON ..." or "CREATE UNIQUE INDEX name ON ..."
sql_stripped = sql.strip()
if sql_stripped.upper().startswith("CREATE UNIQUE INDEX"):
return sql_stripped.replace(
"CREATE UNIQUE INDEX", "CREATE UNIQUE INDEX IF NOT EXISTS", 1
)
if sql_stripped.upper().startswith("CREATE INDEX"):
return sql_stripped.replace(
"CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1
)
return sql
existing_indexes = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%'"
)
}
for name, create_sql in fetch_master(alias, "index"):
if not create_sql:
continue
if name in existing_indexes:
log(f"Index {name} exists; skipping or using IF NOT EXISTS")
# Try to create with IF NOT EXISTS to avoid collision
sql2 = inject_if_not_exists_in_index_sql(create_sql)
conn.execute(sql2)
existing_indexes.add(name)
# 4) Recreate triggers (skip on name conflict)
existing_triggers = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='trigger'"
)
}
for name, create_sql in fetch_master(alias, "trigger"):
if not create_sql:
continue
if name in existing_triggers:
log(f"Trigger {name} exists; skipping")
continue
log(f"Creating trigger {name}")
conn.execute(create_sql)
existing_triggers.add(name)
# 5) Recreate views (skip on name conflict)
existing_views = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='view'"
)
}
for name, create_sql in fetch_master(alias, "view"):
if not create_sql:
continue
if name in existing_views:
log(f"View {name} exists; skipping")
continue
# If the view name does not start with "rocpd_", collect it for later recreation
if not name.startswith("rocpd_") and not any(
name == _name for _name, _ in data_views
):
data_views.append((name, create_sql))
existing_views.add(name)
views += [itr for itr in list(existing_views) if itr.startswith("rocpd_")]
conn.commit()
conn.execute(f"DETACH DATABASE {alias}")
log(f"Detached {alias}")
# Check the schema versions. Merge only occurs if all the DBs are the same schema version.
unique_versions = list(set(schema_versions))
if len(unique_versions) != 1:
raise RuntimeError(f"Multiple schema versions found: {unique_versions}")
# Re-enable FKs and run a quick FK check
conn.execute("PRAGMA foreign_keys = ON;")
# Optional: enforce integrity
# try:
# conn.execute("PRAGMA quick_check;")
# except sqlite3.DatabaseError as e:
# log(f"SQLite3 quick_check reported an issue: {e}")
uuids = sorted(list(set(uuids))) # unique set of uuids
views = sorted(list(set(views))) # unique set of views
# Create UNION views by listing all tables
existing_tables = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
}
# Then UNION all the tables starting with the view name
for vitr in views:
matching_tables = [
titr for titr in existing_tables if titr.startswith(f"{vitr}_")
]
tables_union = " UNION ALL ".join(
[f"SELECT * FROM {titr}" for titr in matching_tables]
)
conn.execute(f"CREATE VIEW {vitr} AS {tables_union}")
conn.commit()
# Now that the rocpd_ views are created, re-create the data-views using all the data
for _, sql_view in data_views:
conn.execute(sql_view)
conn.commit()
#
# Command-line interface functions
#
def add_args(parser):
"""Add arguments for merger."""
io_options = parser.add_argument_group("I/O options")
io_options.add_argument(
"-o",
"--output-file",
help="Sets the base output file name",
default=os.environ.get("ROCPD_OUTPUT_NAME", "merged"),
type=str,
required=False,
)
io_options.add_argument(
"-d",
"--output-path",
help="Sets the output path where the output files will be saved (default path: `./rocpd-output-data`)",
default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"),
type=str,
required=False,
)
def process_args(input, args):
valid_args = ["output_file", "output_path"]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def execute(inputs: List[str], **kwargs: Dict[str, Any]) -> str:
start_time = time.time()
input_files = inputs
try:
from . import package
input_files = package.flatten_rocpd_yaml_input_file(inputs, skip_auto_merge=True)
except Exception as e:
print(f"Import error trying to use package, fallback to use inputs: {e}")
output_path = kwargs.get("output_path")
output_filename = kwargs.get("output_file")
if not output_filename.endswith(".db"):
output_filename += ".db"
output = os.path.join(output_path, output_filename)
merge_sqlite_dbs(input_files, output)
elapsed_time = time.time() - start_time
print(f"Merge completed successfully! Output saved to: {output}")
print(f"Time: {elapsed_time:.2f} sec")
return str(output)
def main(argv=None) -> int:
"""Main entry point for command line execution."""
from . import output_config
parser = argparse.ArgumentParser(
description="Generate merged database from rocPD databases"
)
required_params = parser.add_argument_group("Required options")
required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Path to the input ROCpd database files",
)
process_args = add_args(parser)
args = parser.parse_args(argv)
merge_args = process_args(args)
execute(args.input, **merge_args)
if __name__ == "__main__":
main()
+24 -29
Переглянути файл
@@ -24,7 +24,6 @@
###############################################################################
from .importer import RocpdImportData
from .time_window import apply_time_window
from . import output_config
from . import libpyrocpd
@@ -33,12 +32,10 @@ def write_otf2(importData, config):
return libpyrocpd.write_otf2(importData, config)
def execute(input, config=None, window_args=None, **kwargs):
def execute(input, config=None, **kwargs):
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
config = (
output_config.output_config(**kwargs)
if config is None
@@ -62,27 +59,23 @@ def add_args(parser):
# default=False,
# )
return []
def process_args(input, args):
valid_args = []
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
def process_args(args, valid_args):
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def main(argv=None):
import argparse
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
from .output_config import add_args as add_args_output_config
from .output_config import process_args as process_args_output_config
from .output_config import add_generic_args, process_generic_args
from . import time_window
from . import output_config
parser = argparse.ArgumentParser(
description="Convert rocPD to OTF2 format", allow_abbrev=False
@@ -99,21 +92,23 @@ def main(argv=None):
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_out_config_args = add_args_output_config(parser)
valid_otf2_args = add_args(parser)
valid_generic_args = add_generic_args(parser)
valid_time_window_args = add_args_time_window(parser)
process_out_config_args = output_config.add_args(parser)
process_otf2_args = add_args(parser)
process_generic_args = output_config.add_generic_args(parser)
process_time_window_args = time_window.add_args(parser)
args = parser.parse_args(argv)
out_cfg_args = process_args_output_config(args, valid_out_config_args)
generic_out_cfg_args = process_generic_args(args, valid_generic_args)
window_args = process_args_time_window(args, valid_time_window_args)
otf2_args = process_args(args, valid_otf2_args)
input = RocpdImportData(args.input)
out_cfg_args = process_out_config_args(input, args)
generic_out_cfg_args = process_generic_args(input, args)
otf2_args = process_otf2_args(input, args)
process_time_window_args(input, args)
all_args = {**out_cfg_args, **otf2_args, **generic_out_cfg_args}
execute(args.input, window_args=window_args, **all_args)
execute(input, **all_args)
if __name__ == "__main__":
+61 -41
Переглянути файл
@@ -36,7 +36,7 @@ except Exception:
from . import libpyrocpd
__all__ = ["format_path", "output_config", "add_args", "process_args"]
__all__ = ["format_path", "output_config", "add_args", "add_generic_args"]
def _generate_attribute_docs(data):
@@ -73,12 +73,9 @@ class output_config(libpyrocpd.output_config):
self.update(**kwargs)
def update(self, **kwargs):
_strict = kwargs.get("strict", True)
# _verbose = kwargs.get("log-level", "config")
_strict = kwargs.get("strict", False)
for key, itr in kwargs.items():
if hasattr(self, key):
# if _verbose in ("info", "trace", "config"):
# print(f" - output_config.{key} = {itr}")
if key == "agent_index_value":
if itr == "absolute":
setattr(self, key, libpyrocpd.agent_indexing.node)
@@ -97,10 +94,29 @@ def format_path(path, tag=os.path.basename(sys.executable)):
return libpyrocpd.format_path(path, tag)
def sanitize_input_list(input: list):
sanitized_list = []
for items in input:
if isinstance(items, list):
sanitized_list.extend(sanitize_input_list(items))
else:
sanitized_list.append(items)
return sanitized_list
def check_file_exists(filename):
if not os.path.exists(filename):
raise argparse.ArgumentTypeError(f"File '{filename}' does not exist.")
return filename
import glob
# Check for wildcards passed in
if any(char in filename for char in ["*", "?", "["]):
expanded_files = glob.glob(filename)
if not expanded_files:
raise argparse.ArgumentTypeError(f"File '{filename}' does not exist.")
return sanitize_input_list(expanded_files)
else:
if not os.path.exists(filename):
raise argparse.ArgumentTypeError(f"File '{filename}' does not exist.")
return filename
def add_args(parser):
@@ -125,6 +141,33 @@ def add_args(parser):
required=False,
)
io_options.add_argument(
"--automerge-limit",
help="Change database auto-merge limit (default: 1, auto-merge to 1 DB. max: 8)",
type=int,
required=False,
)
def process_args(input, args):
valid_args = ["output_file", "output_path"]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if itr == "output_format":
ret[itr] = val
elif itr == "output_path" and val is not None:
ret[itr] = format_path(val)
elif val is not None:
ret[itr] = val
return ret
return process_args
def add_generic_args(parser):
"""Add generic arguments that apply to multiple output formats."""
kernel_naming_options = parser.add_argument_group("Kernel naming options")
kernel_naming_options.add_argument(
@@ -134,27 +177,6 @@ def add_args(parser):
default=False,
)
return ["output_file", "output_path", "kernel_rename"]
def process_args(args, valid_args):
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if itr == "output_format":
ret[itr] = val
elif itr == "output_path" and val is not None:
ret[itr] = format_path(val)
elif val is not None:
ret[itr] = val
return ret
def add_generic_args(parser):
"""Add generic arguments that apply to multiple output formats."""
generic_options = parser.add_argument_group("Generic options")
generic_options.add_argument(
@@ -167,16 +189,14 @@ def add_generic_args(parser):
default="relative",
)
return [
"agent_index_value",
]
def process_args(input, args):
valid_args = ["agent_index_value", "kernel_rename"]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
def process_generic_args(args, valid_args):
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
+511
Переглянути файл
@@ -0,0 +1,511 @@
#!/usr/bin/env python3
###############################################################################
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
###############################################################################
import os
import shutil
import datetime
import yaml
import argparse
from . import output_config
rocpd_package_version = "1.0"
rocpd_metadata_param_version = "rocpd_package_version"
IDEAL_NUMBER_OF_DATABASE_FILES = 1
MAX_LIMIT_OF_DATABASE_FILES = 8
def prepare_output_folder(output_path, consolidate) -> str:
"""
Prepares the output folder path with appropriate .rpdb extension.
When consolidating to current directory, generates a timestamped folder.
Otherwise, ensures the provided path has .rpdb extension.
Args:
output_path (str): The path to the output folder.
consolidate (bool): Whether to consolidate output files.
Returns:
str: The output folder path with .rpdb extension.
"""
# Current directory with consolidation - generate timestamped folder
if output_path == os.getcwd():
if consolidate:
date_str = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
output_path = f"rocpd-{date_str}.rpdb"
else:
# Custom path provided - ensure it has .rpdb extension
if not output_path.endswith(".rpdb"):
output_path = f"{output_path}.rpdb"
return output_path
def flatten_rocpd_yaml_input_file(input, **kwargs) -> list:
"""
Processes input files and returns a flattened list of database files.
Handles multiple input types:
- YAML files containing rocpd metadata
- .rpdb folders with index.yaml
- Direct database files
- Directories containing .db files
- Wildcard patterns
Optionally merges databases if count exceeds threshold.
Args:
input (list of str): List of input file paths (YAML, DB, or .rpdb folder).
skip_auto_merge (bool): If True, skip automatic merging of multiple databases.
Returns:
list: List of database file paths.
"""
import glob
def parse_yaml_file(yaml_path, base_dir=None):
"""
Parse a rocprofiler-sdk YAML file and extract database file paths.
Args:
yaml_path (str): Path to the YAML file.
base_dir (str): Base directory for resolving relative paths.
Returns:
list: Expanded list of database file paths.
"""
with open(yaml_path, "r") as f:
meta = yaml.safe_load(f)
rocpd_meta = meta.get("rocprofiler-sdk", {}).get("rocpd", {})
# Check version compatibility
version = rocpd_meta.get(rocpd_metadata_param_version, "0")
if version < rocpd_package_version:
print(
f"Warning: {yaml_path} is using an outdated version of rocpd package ({version})."
)
# Determine working directory for relative paths
cwd = (
base_dir if base_dir is not None else rocpd_meta.get("path", os.getcwd())
)
# Get database file list from YAML
dbs = rocpd_meta.get("files", [])
if isinstance(dbs, str):
dbs = [dbs]
# Expand each database path (handle wildcards and relative paths)
files = []
for db in dbs:
db_path = os.path.join(cwd, db) if not os.path.isabs(db) else db
if _contains_wildcard(db_path):
files.extend(glob.glob(db_path))
else:
files.append(db_path)
return files
def _contains_wildcard(path):
"""Check if path contains wildcard characters."""
return "*" in path or "?" in path or "[" in path
def _process_rpdb_folder(item):
"""Process .rpdb folder and extract database files."""
index_yaml = os.path.join(item, "index.yaml")
if os.path.isfile(index_yaml):
return parse_yaml_file(index_yaml, base_dir=os.path.abspath(item))
else:
# No index.yaml, search for .db files directly
return glob.glob(os.path.join(item, "*.db"))
def _process_yaml_file(item):
"""Process YAML file and extract database files."""
base_dir = os.path.dirname(item)
return parse_yaml_file(item, base_dir)
def _process_directory(item):
"""Process directory and find .db files."""
return glob.glob(os.path.join(item, "*.db"))
def _process_file_or_pattern(item):
"""Process individual file or wildcard pattern."""
if _contains_wildcard(item):
return glob.glob(item)
else:
return [item]
# Sanitize and categorize input
sanitized_input = output_config.sanitize_input_list(input)
# Process each input item based on its type
input_files = []
for item in sanitized_input:
if item.endswith(".rpdb") and os.path.isdir(item):
input_files.extend(_process_rpdb_folder(item))
elif item.endswith((".yaml", ".yml")):
input_files.extend(_process_yaml_file(item))
elif os.path.isdir(item):
input_files.extend(_process_directory(item))
else:
input_files.extend(_process_file_or_pattern(item))
# Validate all files exist
num_dbs = len(input_files)
print(f"Found {num_dbs} database files.")
for db in input_files:
if not os.path.exists(db):
print(f"Warning: Input database file not found: {db}. Exiting.")
return []
# Optionally merge databases if count exceeds threshold
skip_auto_merge = kwargs.get("skip_auto_merge", False)
auto_merge_max_limit = kwargs.get("automerge_limit", IDEAL_NUMBER_OF_DATABASE_FILES)
# Check if user tried to exceed MAX_LIMIT of DBs. Conservatively set to 8. SQLite limit is 10.
if auto_merge_max_limit > MAX_LIMIT_OF_DATABASE_FILES:
print(
f"SQLite has a database attach limit of 10. Max auto-merge limit of {MAX_LIMIT_OF_DATABASE_FILES} set."
)
auto_merge_max_limit = MAX_LIMIT_OF_DATABASE_FILES
if skip_auto_merge:
print("Skip auto merge and packaging.")
return input_files
if num_dbs > auto_merge_max_limit:
print(
f"More than {auto_merge_max_limit} database files found. "
f"It is recommended to merge and package databases"
)
merged_files = merge_and_repackage(
input_files, max_limit=auto_merge_max_limit, **kwargs
)
print(f"Reduced to {len(merged_files)} database files.")
return merged_files
return input_files
def merge_and_repackage(
input_files, max_limit=IDEAL_NUMBER_OF_DATABASE_FILES, **kwargs
) -> list:
"""
Merges and repackages database files into batches to reduce file count.
If the number of input files is within the limit, returns them unchanged.
Otherwise, merges files into batches and creates a timestamped .rpdb folder
with the merged databases and metadata file.
Args:
input_files (list of str): List of database file paths to merge.
max_limit (int): Maximum number of output files desired (default: 1).
Returns:
list: List of merged database file paths.
"""
import uuid
original_num_dbs = len(input_files)
# Early return if already within limit
if original_num_dbs <= max_limit:
print(
f"Number of database files ({original_num_dbs}) is within the limit ({max_limit}). "
f"No merging needed."
)
return input_files
# Calculate batch size for merging
batch_size = _calculate_batch_size(original_num_dbs, max_limit)
print(
f"Original number of DBs: {original_num_dbs}, "
f"Target number of DBs to merge per batch: {batch_size}"
)
# Prepare output folder for merged databases
unique_str = uuid.uuid4()
merged_output_folder = prepare_output_folder(os.getcwd(), consolidate=True)
os.makedirs(merged_output_folder, exist_ok=True)
# Process databases in batches
merged_files = _process_batches(
input_files, batch_size, merged_output_folder, unique_str, **kwargs
)
# Display merged file list
for item in merged_files:
print(f"Reduced file list: {item}")
# Create metadata file for the merged databases
create_metadata_file(merged_files, output_path=merged_output_folder)
print(
f"\033[1;34mMerge and repackage completed. "
f"Output files are located in: {merged_output_folder}\033[0m"
)
return merged_files
def _calculate_batch_size(total_files, max_output_files):
"""
Calculate how many input files should be merged per batch.
Args:
total_files (int): Total number of input files.
max_output_files (int): Desired maximum number of output files.
Returns:
int: Number of files to merge per batch.
"""
# Use ceiling division to ensure we don't exceed max_output_files
return (total_files // max_output_files) + (total_files % max_output_files > 0)
def _process_batches(input_files, batch_size, output_folder, unique_str, **kwargs):
"""
Process database files in batches, merging or copying as needed.
Args:
input_files (list): List of input database file paths.
batch_size (int): Number of files per batch.
output_folder (str): Directory to store merged files.
unique_str: Unique identifier for merged filenames.
Returns:
list: List of merged/copied database file paths.
"""
merged_files = []
total_files = len(input_files)
for batch_index, i in enumerate(range(0, total_files, batch_size)):
batch_files = input_files[i : i + batch_size]
merged_filename = f"merged_db_{batch_index}_{unique_str}.db"
merged_path = _merge_a_batch(
batch_files, output_folder, merged_filename, **kwargs
)
merged_files.append(merged_path)
return merged_files
def _merge_a_batch(batch_files, output_folder, output_filename, **kwargs):
"""
Merge multiple files or copy a single file to the output folder.
Args:
batch_files (list): List of database files in this batch.
output_folder (str): Destination folder.
output_filename (str): Name for the output file.
Returns:
str: Path to the merged or copied file.
"""
from . import merge
dest_file = os.path.join(output_folder, output_filename)
if len(batch_files) > 1:
# Multiple files: merge them
args = {"output_path": output_folder, "output_file": output_filename}
return str(merge.execute(batch_files, **args))
else:
# Single file: just copy it (optimization)
# Because this is auto-merge, we want to just copy the file, don't just move it for the user.
shutil.copy2(batch_files[0], dest_file)
return str(dest_file)
def create_metadata_file(db_files, output_path=".", metadata_filename="index.yaml"):
"""
Creates a metadata file in a custom YAML format for rocprofiler-sdk/rocpd.
Args:
db_files (list of str): List of absolute or relative paths to SQL database files.
output_path (str): Directory to write the metadata file.
metadata_filename (str): Name of the metadata file to create.
Returns:
str: Path to the created metadata file.
"""
# Ensure output directory exists
os.makedirs(output_path, exist_ok=True)
# Compute relative paths
rel_paths = [os.path.relpath(db_file, output_path) for db_file in db_files]
# Compose the YAML structure
metadata = {
"rocprofiler-sdk": {
"rocpd": {
rocpd_metadata_param_version: rocpd_package_version,
# "source": "rocprofv3", # omitting source, not sure why we need this, and how we determine the source as rocprof-sys, for example.
"path": ".",
"files": (
rel_paths
if len(rel_paths) > 1
else (rel_paths[0] if rel_paths else "")
),
}
}
}
metadata_path = os.path.join(output_path, metadata_filename)
with open(metadata_path, "w") as f:
yaml.safe_dump(metadata, f, default_flow_style=False)
return metadata_path
def add_args(parser):
"""Add arguments for package."""
package_options = parser.add_argument_group("Package options")
package_options.add_argument(
"-c",
"--consolidate",
action="store_true",
help="Consolidate (move) database files into a new folder and generate metadata file pointing to that folder",
)
package_options.add_argument(
"--copy",
action="store_true",
help="Copy database files instead of moving them",
)
package_options.add_argument(
"-d",
"--output-path",
help="Sets the name of output folder (default : current directory)",
# default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"),
type=str,
required=False,
)
def process_args(input, args):
valid_args = [
"consolidate",
"copy",
"output_path",
]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def execute(input_files, **kwargs):
import glob
output_path_kw = kwargs.get("output_path", os.getcwd())
consolidate = kwargs.get("consolidate", False)
copy_instead_of_move = kwargs.get("copy", False)
output_path = prepare_output_folder(output_path_kw, consolidate)
db_files = output_config.sanitize_input_list(input_files)
# check if a folder is provided, if so, search for *.db
expanded_files = []
for itr in db_files:
if os.path.isdir(itr):
expanded_files.extend(glob.glob(os.path.join(itr, "*.db")))
else:
expanded_files.append(itr)
db_files = expanded_files
if consolidate:
# Create a new folder with current date and time
os.makedirs(output_path, exist_ok=True)
consolidated_files = []
for db_file in db_files:
dest_file = os.path.join(output_path, os.path.basename(db_file))
# Only copy if source and destination are not the same file
if os.path.abspath(db_file) != os.path.abspath(dest_file):
if copy_instead_of_move:
shutil.copy2(db_file, dest_file)
else:
shutil.move(db_file, dest_file)
consolidated_files.append(dest_file)
db_files = consolidated_files
metadata_path = create_metadata_file(db_files, output_path)
print(f"rocPD package created at: {metadata_path}")
def main(argv=None):
"""
Main function to create a metadata file and .rpdb package
Consolidates to a .rpdb package if --consolidate is specified.
"""
parser = argparse.ArgumentParser(
description="Package database files into .rpdb output"
)
required_params = parser.add_argument_group("Required options")
required_params.add_argument(
"-i",
"--input",
required=True,
type=output_config.check_file_exists,
nargs="+",
help="Input path and filename to one or more database(s). Wildcards accepted, as well as .rpdb folders",
)
process_args = add_args(parser)
args = parser.parse_args(argv)
input_files = flatten_rocpd_yaml_input_file(
args.input, skip_auto_merge=True, copy=args.copy
)
package_args = process_args(None, args)
# error check for databases before trying to use the data
if not input_files:
print("Error, no databases found\n")
return
execute(input_files, **package_args)
# This is the entry point for the script.
if __name__ == "__main__":
main()
+28 -34
Переглянути файл
@@ -24,7 +24,6 @@
###############################################################################
from .importer import RocpdImportData
from .time_window import apply_time_window
from . import output_config
from . import libpyrocpd
@@ -33,12 +32,10 @@ def write_pftrace(importData, config):
return libpyrocpd.write_perfetto(importData, config)
def execute(input, config=None, window_args=None, **kwargs):
def execute(input, config=None, **kwargs):
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
config = (
output_config.output_config(**kwargs)
if config is None
@@ -90,33 +87,30 @@ def add_args(parser):
default=False,
)
return [
"perfetto_backend",
"perfetto_buffer_fill_policy",
"perfetto_buffer_size",
"perfetto_shmem_size_hint",
"group_by_queue",
]
def process_args(input, args):
valid_args = [
"perfetto_backend",
"perfetto_buffer_fill_policy",
"perfetto_buffer_size",
"perfetto_shmem_size_hint",
"group_by_queue",
]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
def process_args(args, valid_args):
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def main(argv=None):
import argparse
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
from .output_config import add_args as add_args_output_config
from .output_config import process_args as process_args_output_config
from .output_config import add_generic_args, process_generic_args
from .output_config import add_generic_args
parser = argparse.ArgumentParser(
description="Convert rocPD to Perfetto file", allow_abbrev=False
@@ -133,17 +127,18 @@ def main(argv=None):
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_out_config_args = add_args_output_config(parser)
valid_pftrace_args = add_args(parser)
valid_generic_args = add_generic_args(parser)
valid_time_window_args = add_args_time_window(parser)
process_out_config_args = add_args_output_config(parser)
process_pftrace_args = add_args(parser)
process_generic_args = add_generic_args(parser)
process_time_window_args = add_args_time_window(parser)
args = parser.parse_args(argv)
input = RocpdImportData(args.input)
out_cfg_args = process_args_output_config(args, valid_out_config_args)
pftrace_args = process_args(args, valid_pftrace_args)
generic_out_cfg_args = process_generic_args(args, valid_generic_args)
window_args = process_args_time_window(args, valid_time_window_args)
out_cfg_args = process_out_config_args(input, args)
pftrace_args = process_pftrace_args(input, args)
generic_out_cfg_args = process_generic_args(input, args)
process_time_window_args(input, args)
all_args = {
**pftrace_args,
@@ -152,8 +147,7 @@ def main(argv=None):
}
execute(
args.input,
window_args=window_args,
input,
**all_args,
)
+31 -41
Переглянути файл
@@ -32,7 +32,15 @@ from datetime import datetime
from . import output_config
from . import libpyrocpd
from .importer import RocpdImportData
from .time_window import apply_time_window
__all__ = [
"export_sqlite_query",
"send_report_email",
"zip_files",
"add_args",
"execute",
"main",
]
def export_sqlite_query(
@@ -439,34 +447,18 @@ def add_args(parser):
"--template-path", help="Path to a Jinja2 HTML template for the dashboard"
)
return [
"query",
"script",
"email_to",
"email_from",
"email_subject",
"smtp_server",
"smtp_port",
"smtp_user",
"smtp_password",
"inline_preview",
"zip_attachments",
"format",
"template_path",
]
def process_args(input, args):
ret = {}
return ret
return process_args
def process_args(args, valid_args):
# do not add any of the arguments to the output config dict
ret = {}
return ret
def execute(input, args, config=None, **kwargs):
def execute(input, args, config=None, window_args=None, **kwargs):
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
importData = RocpdImportData(
input, automerge_limit=getattr(args, "automerge_limit", None)
)
config = (
output_config.output_config(**kwargs)
@@ -524,11 +516,8 @@ def execute(input, args, config=None, window_args=None, **kwargs):
def main(argv=None):
import argparse
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
from .output_config import add_args as add_args_output_config
from .output_config import process_args as process_args_output_config
from .output_config import add_generic_args, process_generic_args
from . import time_window
from . import output_config
parser = argparse.ArgumentParser(
description="Generate report for rocpd query", allow_abbrev=False
@@ -545,17 +534,19 @@ def main(argv=None):
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_out_config_args = add_args_output_config(parser)
valid_generic_args = add_generic_args(parser)
valid_time_window_args = add_args_time_window(parser)
valid_query_args = add_args(parser)
process_out_config_args = output_config.add_args(parser)
process_generic_args = output_config.add_generic_args(parser)
process_time_window_args = time_window.add_args(parser)
process_query_args = add_args(parser)
args = parser.parse_args(argv)
out_cfg_args = process_args_output_config(args, valid_out_config_args)
generic_out_cfg_args = process_generic_args(args, valid_generic_args)
window_args = process_args_time_window(args, valid_time_window_args)
query_args = process_args(args, valid_query_args)
input = RocpdImportData(args.input)
out_cfg_args = process_out_config_args(input, args)
generic_out_cfg_args = process_generic_args(input, args)
query_args = process_query_args(input, args)
process_time_window_args(input, args)
all_args = {
**query_args,
@@ -564,9 +555,8 @@ def main(argv=None):
}
execute(
args.input,
input,
args,
window_args=window_args,
**all_args,
)
+42 -55
Переглянути файл
@@ -30,7 +30,19 @@ import math
from typing import Any, List, Tuple
from .importer import RocpdImportData, execute_statement
from .query import export_sqlite_query
from . import output_config
__all__ = [
"generate_all_summaries",
"generate_summary_query",
"generate_domain_query",
"create_domain_query",
"create_summary_queries",
"create_summary_region_queries",
"export_query",
"add_args",
"execute",
"main",
]
def check_function_availability(connection, function_name):
@@ -471,12 +483,10 @@ def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
#
# Command-line interface functions
#
def add_args(parser):
"""Add arguments for summary."""
def add_io_args(parser):
"""Add input/output arguments for summary."""
io_options = parser.add_argument_group("I/O options")
io_options.add_argument(
"-f",
"--format",
@@ -486,28 +496,7 @@ def add_io_args(parser):
type=str,
required=False,
)
io_options.add_argument(
"-o",
"--output-file",
help="Sets the base output file name",
default=os.environ.get("ROCPD_OUTPUT_NAME", ""),
type=str,
required=False,
)
io_options.add_argument(
"-d",
"--output-path",
help="Sets the output path where the output files will be saved (default path: `./rocpd-output-data`)",
default=os.environ.get("ROCPD_OUTPUT_PATH", "./rocpd-output-data"),
type=str,
required=False,
)
return ["format", "output_file", "output_path"]
def add_args(parser):
"""Add arguments for summary."""
summary_options = parser.add_argument_group("Summary options")
summary_options.add_argument(
"--domain-summary",
@@ -528,26 +517,25 @@ def add_args(parser):
help="Specify region categories to include in the summary (example: HIP, HSA, RCCL, ROCDECODE, ROCJPEG, MARKER). If not specified, categories will be automatically retrieved from the database.",
)
return ["domain_summary", "summary_by_rank", "region_categories"]
def process_args(input, args):
valid_args = ["format", "domain_summary", "summary_by_rank", "region_categories"]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def process_args(args, valid_args):
def execute(input, **kwargs: Any) -> RocpdImportData:
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
def execute(input, window_args=None, **kwargs: Any) -> RocpdImportData:
from .time_window import apply_time_window
importData = RocpdImportData(input)
apply_time_window(importData, **window_args)
importData = RocpdImportData(
input, automerge_limit=getattr(kwargs, "automerge_limit", None)
)
generate_all_summaries(importData, **kwargs)
@@ -556,12 +544,10 @@ def execute(input, window_args=None, **kwargs: Any) -> RocpdImportData:
def main(argv=None) -> int:
"""Main entry point for command line execution."""
from .time_window import add_args as add_args_time_window
from .time_window import process_args as process_args_time_window
from . import time_window
from . import output_config
parser = argparse.ArgumentParser(
description="Create ROCpd database summary region views"
)
parser = argparse.ArgumentParser(description="Generate summary views from rocPD data")
required_params = parser.add_argument_group("Required options")
required_params.add_argument(
@@ -573,21 +559,22 @@ def main(argv=None) -> int:
help="Input path and filename to one or more database(s), separated by spaces",
)
valid_io_args = add_io_args(parser)
valid_summary_args = add_args(parser)
valid_time_window_args = add_args_time_window(parser)
process_outcfg_args = output_config.add_args(parser)
process_summary_args = add_args(parser)
process_time_window_args = time_window.add_args(parser)
args = parser.parse_args(argv)
summary_args = process_args(args, valid_summary_args)
io_args = output_config.process_args(args, valid_io_args)
window_args = process_args_time_window(args, valid_time_window_args)
input = RocpdImportData(args.input)
summary_args = process_summary_args(input, args)
io_args = process_outcfg_args(input, args)
process_time_window_args(input, args)
all_args = {**summary_args, **io_args}
execute(
args.input,
window_args=window_args,
**all_args,
)
+16 -11
Переглянути файл
@@ -26,10 +26,12 @@
import argparse
import sqlite3
from argparse import ArgumentParser
from typing import Optional, Tuple, Dict, Any, List
from typing import Optional, Tuple, Dict, Any
from .importer import RocpdImportData, execute_statement
__all__ = ["apply_time_window", "execute", "add_args", "main"]
def get_marker_timestamp(
connection: sqlite3.Connection, marker_name: str, marker_type: str = "start"
@@ -265,7 +267,7 @@ def apply_time_window(connection: RocpdImportData, **kwargs: Any) -> None:
#
# Command-line interface functions
#
def add_args(parser: ArgumentParser) -> List[str]:
def add_args(parser: ArgumentParser):
"""Add time slice arguments to an existing parser."""
tw_options = parser.add_argument_group("Time window options")
@@ -307,18 +309,21 @@ def add_args(parser: ArgumentParser) -> List[str]:
default=True,
)
return ["start", "end", "inclusive", "start_marker", "end_marker"]
def process_args(input, args):
valid_args = ["start", "end", "inclusive", "start_marker", "end_marker"]
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
if ret and input is not None:
apply_time_window(input, **ret)
def process_args(args, valid_args):
return ret
ret = {}
for itr in valid_args:
if hasattr(args, itr):
val = getattr(args, itr)
if val is not None:
ret[itr] = val
return ret
return process_args
def execute(input_rpd: str, **kwargs: Any) -> RocpdImportData:
+2
Переглянути файл
@@ -170,8 +170,10 @@ function(rocprofiler_rocpd_python_bindings _VERSION)
importer.py
__init__.py
__main__.py
merge.py
output_config.py
otf2.py
package.py
pftrace.py
query.py
schema.py
+222 -4
Переглянути файл
@@ -48,8 +48,18 @@ add_test(
--runtime-trace --kernel-rename --output-config --pmc SQ_WAVES --
$<TARGET_FILE:reproducible-dispatch-count> 500 2)
# Can remove this test if MULTIPROC is re-enabled and rely on the multiproc databases for
# packaging
add_test(
NAME rocprofv3-test-rocpd-execute2
COMMAND
$<TARGET_FILE:rocprofiler-sdk::rocprofv3> -d
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data -o out2 --output-format rocpd
--runtime-trace --kernel-rename --pmc SQ_WAVES --
$<TARGET_FILE:reproducible-dispatch-count> 500 2)
set_tests_properties(
rocprofv3-test-rocpd-execute
rocprofv3-test-rocpd-execute rocprofv3-test-rocpd-execute2
PROPERTIES TIMEOUT
120
LABELS
@@ -140,7 +150,7 @@ set_tests_properties(
#########################################################################################
#
# perfetto generate
# Perfetto generation
#
#########################################################################################
@@ -194,7 +204,7 @@ set_tests_properties(
#########################################################################################
#
# CSV generate
# CSV generation
#
#########################################################################################
@@ -224,7 +234,7 @@ set_tests_properties(
#########################################################################################
#
# Summary generate
# Summary generation
#
#########################################################################################
add_test(
@@ -311,3 +321,211 @@ set_tests_properties(
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd-generation)
#########################################################################################
#
# Package generation
#
#########################################################################################
add_test(
NAME rocprofv3-test-rocpd-package-generation
COMMAND
${Python3_EXECUTABLE} -m rocpd package --consolidate -d
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/test_package -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/out*.db)
set_tests_properties(
rocprofv3-test-rocpd-package-generation
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-execute"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd)
add_test(
NAME rocprofv3-test-rocpd-package-generation-multiproc
COMMAND
${Python3_EXECUTABLE} -m rocpd package --consolidate -d
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/test_package_multi -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/out*.db)
set_tests_properties(
rocprofv3-test-rocpd-package-generation-multiproc
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-execute-multiproc"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
DISABLED
"${MULTIPROC_IS_DISABLED}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd-multiproc)
#########################################################################################
#
# Query SQL string using package
#
#########################################################################################
add_test(
NAME rocprofv3-test-rocpd-query-using-package
COMMAND
${Python3_EXECUTABLE} -m rocpd query -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/test_package.rpdb --query
"SELECT id, type FROM rocpd_info_agent LIMIT 10;")
set_tests_properties(
rocprofv3-test-rocpd-query-using-package
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-package-generation"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd)
add_test(
NAME rocprofv3-test-rocpd-query-using-package-multiproc
COMMAND
${Python3_EXECUTABLE} -m rocpd query -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/test_package_multi.rpdb
--query "SELECT id, type FROM rocpd_info_agent LIMIT 10;")
set_tests_properties(
rocprofv3-test-rocpd-query-using-package-multiproc
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-package-generation-multiproc"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
DISABLED
"${MULTIPROC_IS_DISABLED}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd-multiproc)
#########################################################################################
#
# Merge database generation with package
#
#########################################################################################
add_test(
NAME rocprofv3-test-rocpd-merge-generation-using-package
COMMAND
${Python3_EXECUTABLE} -m rocpd merge -d
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data -o merged_db -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/test_package.rpdb)
set_tests_properties(
rocprofv3-test-rocpd-merge-generation-using-package
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-package-generation"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd)
add_test(
NAME rocprofv3-test-rocpd-merge-generation-using-package-multiproc
COMMAND
${Python3_EXECUTABLE} -m rocpd merge -d
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc -o merged_db_multi.db -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/test_package_multi.rpdb)
set_tests_properties(
rocprofv3-test-rocpd-merge-generation-using-package-multiproc
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-package-generation-multiproc"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
DISABLED
"${MULTIPROC_IS_DISABLED}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd-multiproc)
#########################################################################################
#
# Query SQL string using merged database
#
#########################################################################################
add_test(
NAME rocprofv3-test-rocpd-query-using-mergedDB
COMMAND
${Python3_EXECUTABLE} -m rocpd query -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data/merged_db.db --query
"SELECT id, type FROM rocpd_info_agent LIMIT 10;")
set_tests_properties(
rocprofv3-test-rocpd-query-using-mergedDB
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-merge-generation-using-package"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd)
add_test(
NAME rocprofv3-test-rocpd-query-using-mergedDB-multiproc
COMMAND
${Python3_EXECUTABLE} -m rocpd query -i
${CMAKE_CURRENT_BINARY_DIR}/rocpd-input-data-multiproc/merged_db_multi.db --query
"SELECT id, type FROM rocpd_info_agent LIMIT 10;")
set_tests_properties(
rocprofv3-test-rocpd-query-using-mergedDB-multiproc
PROPERTIES TIMEOUT
120
LABELS
"integration-tests;rocpd"
ENVIRONMENT
"${rocprofv3-rocpd-env}"
DEPENDS
"rocprofv3-test-rocpd-merge-generation-using-package-multiproc"
FAIL_REGULAR_EXPRESSION
"${ROCPROFILER_DEFAULT_FAIL_REGEX}"
DISABLED
"${MULTIPROC_IS_DISABLED}"
FIXTURES_REQUIRED
rocprofv3-test-rocpd-multiproc)