[rocpd] Use SQL queries instead of views in summary generator (#311)
* Use queries instead of views in summary.py * Export queries when created * Remove HIP and HSA from output * Fix domain query * Export summary queries in the main function * Fix comments and variable names * Change syntax for old python versions --------- Co-authored-by: Young Hui <young.hui@amd.com>
Этот коммит содержится в:
коммит произвёл
GitHub
родитель
902ec4d3ad
Коммит
646e4d211a
@@ -69,7 +69,7 @@ def get_temp_view_names(connection: RocpdImportData) -> List[str]:
|
||||
return [
|
||||
v[0]
|
||||
for v in execute_statement(
|
||||
connection, "SELECT name FROM sqlite_temp_master WHERE type='view';"
|
||||
connection, "SELECT name FROM sqlite_temp_master WHERE type='view'"
|
||||
).fetchall()
|
||||
]
|
||||
|
||||
@@ -81,30 +81,34 @@ def get_temp_view_columns(connection: RocpdImportData, view_name: str) -> List[s
|
||||
return [row[1] for row in cursor.fetchall()]
|
||||
|
||||
|
||||
def make_temp_view_query(view_name, query) -> str:
|
||||
return "CREATE TEMPORARY VIEW IF NOT EXISTS `{}` AS {}".format(view_name, query)
|
||||
|
||||
|
||||
def export_view(
|
||||
connection: RocpdImportData, view_name, output_format, output_path, filename=""
|
||||
def export_query(
|
||||
connection: RocpdImportData,
|
||||
output_path,
|
||||
output_file,
|
||||
output_format,
|
||||
query_name,
|
||||
query,
|
||||
) -> None:
|
||||
"""Write the contents of a SQL view to an output format."""
|
||||
"""Write the contents of a SQL query to an output format."""
|
||||
|
||||
query = "SELECT * FROM `{}`".format(view_name)
|
||||
query_one = "SELECT * FROM `{}` LIMIT 1".format(view_name)
|
||||
query_not_empty = f"""
|
||||
SELECT EXISTS (
|
||||
{query}
|
||||
)
|
||||
"""
|
||||
|
||||
# just return if view is empty
|
||||
if not connection.execute(query_one).fetchone():
|
||||
# just return if the result is empty
|
||||
if not connection.execute(query_not_empty).fetchone()[0]:
|
||||
return
|
||||
|
||||
# prepare the output filename
|
||||
if not filename:
|
||||
output_filename = view_name
|
||||
if not output_file:
|
||||
output_filename = query_name
|
||||
else:
|
||||
output_filename = f"{filename}_{view_name}"
|
||||
output_filename = f"{output_file}_{query_name}"
|
||||
|
||||
if output_format == "console":
|
||||
print(f"\n{view_name.upper()}:")
|
||||
print(f"\n{query_name.upper()}:")
|
||||
|
||||
# call query module to export. query will append the extension
|
||||
export_path = os.path.join(output_path, output_filename)
|
||||
@@ -115,10 +119,11 @@ def export_view(
|
||||
|
||||
def generate_summary_query(
|
||||
view_name: str,
|
||||
view_query="",
|
||||
name_column="name",
|
||||
by_rank=False,
|
||||
) -> Tuple[str, str]:
|
||||
"""Generate the SQL statement to create a summary view."""
|
||||
"""Generate the SQL statement to create a summary query."""
|
||||
|
||||
if by_rank:
|
||||
view_suffix = "_summary_by_rank"
|
||||
@@ -148,8 +153,19 @@ def generate_summary_query(
|
||||
|
||||
full_view_name = f"{view_name}{view_suffix}"
|
||||
|
||||
view_select = (
|
||||
f"""
|
||||
{view_name} AS (
|
||||
{view_query}
|
||||
),
|
||||
"""
|
||||
if view_query
|
||||
else ""
|
||||
)
|
||||
|
||||
summary_query = f"""
|
||||
WITH
|
||||
{view_select}
|
||||
avg_data AS (
|
||||
SELECT
|
||||
{group_by_columns.replace(name_column, f"{name_column} AS name")},
|
||||
@@ -192,14 +208,16 @@ def generate_summary_query(
|
||||
aggregated_data AD
|
||||
{total_duration_join}
|
||||
ORDER BY
|
||||
{"AD.pid," if by_rank else ""} AD.total_duration DESC;
|
||||
{"AD.pid," if by_rank else ""} AD.total_duration DESC
|
||||
"""
|
||||
|
||||
return (full_view_name, summary_query)
|
||||
|
||||
|
||||
def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[str, str]:
|
||||
"""Generate the SQL statement for domain summary by doing union over all summary views."""
|
||||
def generate_domain_query(
|
||||
connection: RocpdImportData, summary_queries, by_rank=False
|
||||
) -> Tuple[str, str]:
|
||||
"""Generate the SQL statement for domain summary by doing union over all summary queries."""
|
||||
|
||||
if by_rank:
|
||||
view_suffix = "_summary_by_rank"
|
||||
@@ -218,20 +236,27 @@ def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[s
|
||||
join_condition = "CROSS JOIN total_duration TD"
|
||||
order_by = 'ORDER BY GD."DURATION (nsec)" DESC'
|
||||
|
||||
summary_views = [
|
||||
itr for itr in get_temp_view_names(connection) if itr.endswith(view_suffix)
|
||||
summary_dictionary = {
|
||||
query_name: query
|
||||
for query_name, query in summary_queries.items()
|
||||
if query_name.endswith(view_suffix)
|
||||
}
|
||||
|
||||
if len(summary_dictionary) < 1:
|
||||
return ()
|
||||
|
||||
summary_selects = [
|
||||
f"{query_name} AS ({query}) ," for query_name, query in summary_dictionary.items()
|
||||
]
|
||||
|
||||
if len(summary_views) < 1:
|
||||
return view_name
|
||||
|
||||
union_selects = [
|
||||
f" SELECT '{s.replace(view_suffix, '').upper()}' as domain, * FROM {s} "
|
||||
for s in summary_views
|
||||
f" SELECT '{query_name.replace(view_suffix, '').upper()}' as domain, * FROM {query_name} "
|
||||
for query_name, query in summary_dictionary.items()
|
||||
]
|
||||
|
||||
domain_select = f"""
|
||||
WITH
|
||||
{f"".join(summary_selects)}
|
||||
all_domains AS (
|
||||
{f" UNION ALL ".join(union_selects)}
|
||||
),
|
||||
@@ -268,14 +293,14 @@ def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[s
|
||||
FROM
|
||||
grouped_domains GD
|
||||
{join_condition}
|
||||
{order_by};
|
||||
{order_by}
|
||||
"""
|
||||
|
||||
return (view_name, domain_select)
|
||||
|
||||
|
||||
def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
|
||||
"""Create summary views for eligible temporary views in the database."""
|
||||
def create_summary_queries(connection: RocpdImportData, by_rank=False):
|
||||
"""Create summary queries for eligible temporary views in the database."""
|
||||
|
||||
NAME_COLUMN_MAP = {
|
||||
"memory_allocations": "type",
|
||||
@@ -287,6 +312,8 @@ def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
|
||||
|
||||
views = get_temp_view_names(connection)
|
||||
|
||||
queries = {}
|
||||
|
||||
for view_name in views:
|
||||
if any(pattern in view_name for pattern in avoid_view_pattern):
|
||||
continue
|
||||
@@ -295,30 +322,33 @@ def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
|
||||
if not required_columns.issubset(columns):
|
||||
continue
|
||||
|
||||
# Create regular summary view
|
||||
summary_view_name, summary_query = generate_summary_query(
|
||||
view_name, name_column=NAME_COLUMN_MAP.get(view_name, "name")
|
||||
# Create regular summary query
|
||||
summary_query_name, summary_query = generate_summary_query(
|
||||
view_name, "", name_column=NAME_COLUMN_MAP.get(view_name, "name")
|
||||
)
|
||||
connection.execute(make_temp_view_query(summary_view_name, summary_query))
|
||||
queries[summary_query_name] = summary_query
|
||||
|
||||
# Create per-rank summary
|
||||
# Create per-rank summary query
|
||||
if by_rank:
|
||||
per_rank_view_name, summary_by_rank_query = generate_summary_query(
|
||||
per_rank_query_name, summary_by_rank_query = generate_summary_query(
|
||||
view_name,
|
||||
"",
|
||||
name_column=NAME_COLUMN_MAP.get(view_name, "name"),
|
||||
by_rank=True,
|
||||
)
|
||||
connection.execute(
|
||||
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
|
||||
)
|
||||
queries[per_rank_query_name] = summary_by_rank_query
|
||||
|
||||
return queries
|
||||
|
||||
|
||||
def create_summary_region_views(
|
||||
connection: RocpdImportData, by_rank=False, region_categories=None
|
||||
) -> None:
|
||||
"""Create summary and region views"""
|
||||
def create_summary_region_queries(
|
||||
connection: RocpdImportData,
|
||||
by_rank=False,
|
||||
region_categories=None,
|
||||
):
|
||||
"""Create summary and region queries"""
|
||||
|
||||
query = "SELECT DISTINCT(category) FROM regions_and_samples;"
|
||||
query = "SELECT DISTINCT(category) FROM regions_and_samples"
|
||||
categories = execute_statement(connection, query).fetchall()
|
||||
|
||||
if region_categories is None:
|
||||
@@ -331,77 +361,73 @@ def create_summary_region_views(
|
||||
if "MARKER" not in cat.upper()
|
||||
}
|
||||
|
||||
queries = {}
|
||||
|
||||
for k, v in category_map.items():
|
||||
if len(v) > 0:
|
||||
conditions = [f"category LIKE '{c}'" for c in v]
|
||||
temp_region_view = f"""
|
||||
CREATE TEMPORARY VIEW IF NOT EXISTS `{k}` AS
|
||||
region_query = f"""
|
||||
SELECT *
|
||||
FROM regions_and_samples
|
||||
WHERE {" OR ".join(conditions)};
|
||||
WHERE {" OR ".join(conditions)}
|
||||
"""
|
||||
|
||||
connection.execute(temp_region_view)
|
||||
# Create regular summary query
|
||||
summary_query_name, summary_query = generate_summary_query(k, region_query)
|
||||
queries[summary_query_name] = summary_query
|
||||
|
||||
# Create regular summary view
|
||||
summary_view_name, summary_query = generate_summary_query(k)
|
||||
connection.execute(make_temp_view_query(summary_view_name, summary_query))
|
||||
|
||||
# Create per-rank summary view
|
||||
# Create per-rank summary query
|
||||
if by_rank:
|
||||
per_rank_view_name, summary_by_rank_query = generate_summary_query(
|
||||
k, by_rank=True
|
||||
)
|
||||
connection.execute(
|
||||
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
|
||||
per_rank_query_name, summary_by_rank_query = generate_summary_query(
|
||||
k, region_query, by_rank=True
|
||||
)
|
||||
queries[per_rank_query_name] = summary_by_rank_query
|
||||
|
||||
# Markers
|
||||
if "MARKER" not in region_categories:
|
||||
return
|
||||
return queries
|
||||
|
||||
view_name = "markers"
|
||||
markers_create = f"""
|
||||
CREATE TEMPORARY VIEW IF NOT EXISTS `{view_name}` AS
|
||||
markers_query_name = "markers"
|
||||
markers_query = """
|
||||
SELECT JSON_EXTRACT(extdata, '$.message') AS marker_name, *
|
||||
FROM regions_and_samples
|
||||
WHERE category LIKE 'MARKER_%'
|
||||
"""
|
||||
connection.execute(markers_create)
|
||||
|
||||
# Create regular summary view
|
||||
summary_view_name, summary_query = generate_summary_query(
|
||||
view_name, name_column="marker_name"
|
||||
# Create regular summary query
|
||||
summary_query_name, summary_query = generate_summary_query(
|
||||
markers_query_name, markers_query, name_column="marker_name"
|
||||
)
|
||||
connection.execute(make_temp_view_query(summary_view_name, summary_query))
|
||||
queries[summary_query_name] = summary_query
|
||||
|
||||
# Create per-rank summary view
|
||||
# Create per-rank summary query
|
||||
if by_rank:
|
||||
per_rank_view_name, summary_by_rank_query = generate_summary_query(
|
||||
view_name, name_column="marker_name", by_rank=True
|
||||
)
|
||||
connection.execute(
|
||||
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
|
||||
per_rank_query_name, summary_by_rank_query = generate_summary_query(
|
||||
markers_query_name, markers_query, name_column="marker_name", by_rank=True
|
||||
)
|
||||
queries[per_rank_query_name] = summary_by_rank_query
|
||||
|
||||
return queries
|
||||
|
||||
|
||||
def create_domain_view(connection: RocpdImportData, by_rank=False) -> str:
|
||||
"""Create a domain summary view by aggregating all summary views."""
|
||||
def create_domain_query(connection: RocpdImportData, summary_queries, by_rank=False):
|
||||
"""Create a domain summary query by aggregating all summary queries."""
|
||||
|
||||
view_name, domain_query = generate_domain_query(connection, by_rank=by_rank)
|
||||
result = generate_domain_query(connection, summary_queries, by_rank=by_rank)
|
||||
if not result:
|
||||
return {}
|
||||
|
||||
# Create the domain summary view
|
||||
connection.execute(make_temp_view_query(view_name, domain_query))
|
||||
query_name, query = result
|
||||
|
||||
return view_name
|
||||
return {query_name: query}
|
||||
|
||||
|
||||
def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
|
||||
"""Generate all summary views and write them to CSV files."""
|
||||
"""Generate all summaries and export them to selected format."""
|
||||
|
||||
domain_summary = kwargs.get("domain_summary", False)
|
||||
by_rank = kwargs.get("summary_by_rank", False)
|
||||
filename = kwargs.get("output_file", "")
|
||||
output_file = kwargs.get("output_file", "")
|
||||
output_path = kwargs.get("output_path", "./rocpd-output-data")
|
||||
region_categories = kwargs.get("region_categories", None)
|
||||
output_format = kwargs.get("format", "console")
|
||||
@@ -417,34 +443,29 @@ def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
|
||||
),
|
||||
)
|
||||
|
||||
# create the temporary summary views
|
||||
create_summary_views(connection, by_rank)
|
||||
create_summary_region_views(connection, by_rank, region_categories=region_categories)
|
||||
summary_queries = {}
|
||||
|
||||
# Create the summary queries
|
||||
summary_queries.update(create_summary_queries(connection, by_rank))
|
||||
summary_queries.update(
|
||||
create_summary_region_queries(
|
||||
connection, by_rank, region_categories=region_categories
|
||||
)
|
||||
)
|
||||
|
||||
if domain_summary:
|
||||
create_domain_view(connection)
|
||||
summary_queries.update(create_domain_query(connection, summary_queries))
|
||||
# Create domain summary per rank only if both domain_summary and summary_by_rank are enabled
|
||||
if by_rank:
|
||||
create_domain_view(connection, by_rank=True)
|
||||
summary_queries.update(
|
||||
create_domain_query(connection, summary_queries, by_rank=True)
|
||||
)
|
||||
|
||||
# Write regular summary views
|
||||
print("\nSummary files:")
|
||||
summary_views = [
|
||||
itr for itr in get_temp_view_names(connection) if itr.endswith("_summary")
|
||||
]
|
||||
for v in summary_views:
|
||||
export_view(connection, v, output_format, output_path, filename)
|
||||
|
||||
# Write per-rank summary views if flag is set
|
||||
if by_rank:
|
||||
print("\nSummary files by rank:")
|
||||
summary_by_rank_views = [
|
||||
itr
|
||||
for itr in get_temp_view_names(connection)
|
||||
if itr.endswith("_summary_by_rank")
|
||||
]
|
||||
for v in summary_by_rank_views:
|
||||
export_view(connection, v, output_format, output_path, filename)
|
||||
# Export all summary queries
|
||||
for query_name, query in summary_queries.items():
|
||||
export_query(
|
||||
connection, output_path, output_file, output_format, query_name, query
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
|
||||
Ссылка в новой задаче
Block a user