Files
rocm-systems/projects/rocprofiler-compute/tests/test_profile_general.py
T
ggottipa-amd 77f7541755 [rocprofiler-compute] Adding --torch-trace option for SWDEV-559789 (#2089)
* Adding --torch-operator option in rocprof-compute. Creates csv file for
each operator that has gpu activity, showing operator to counter values
mapping.

* --torch-operators flag added to rocprofiler-sdk

* Adding ctest for --torch-operators.

* Adding pytest markers.

* Corrections in ctest and message logging.

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Adding a check for pytorch installation only when --torch-operators is passed.

* moving inject_roctx.py into src/utils.

* rebase

* Updating docs and changelog.

* Update projects/rocprofiler-compute/src/argparser.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/rocprofiler-compute/src/utils/inject_roctx.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Removing special characters.

* Minor corrections.

* Setting default value for torch_operators_enabled.

* Updating the number of files according to the number of passes.

* Adding rocpd support.

* Adding a warning message to be shown when profiling a non-python workload.

* copilot suggestions, rocpd+native tool fix

* Fixed the incorrect usage of dispatch_id as event_id in the function update_rocpd_pmc_events()

* ruff format fix

* ruff formating

* Deleting torch_trace.csvs after consolidating the operator data.

* Removing checks since *torch_trace.csv files are deleted.

* Fixing file deletion.

* Update projects/rocprofiler-compute/src/utils/inject_roctx.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/rocprofiler-compute/src/utils/utils.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/rocprofiler-compute/tests/test_profile_general.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Using default options in the testcase.

* Adding test for overhead measurement.

* Corrections in docs.

* doc updates.

* Update projects/rocprofiler-compute/src/utils/inject_roctx.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Handling potential empty frames.

* Corrected the test cases.

* Changing the flag to --torch-trace

* Fixed helper_app path issues

* Path issues

* process_torch_trace_output() now takes csv file paths as input + allows default usage.

* Replaced pandas with sqlite3

* Adding marker_trace extraction to rocpd_data.py

* Allowing all workloads to use --torch-trace option. Assuming the workload is user verified.

* Modified help section for the flag.

* Added difference in runtimes for longest running kernels in each profiling runs to overhead measurements.

* Update projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Removed the accesses to the tables.

* Ruff fixes.

* ruff

* Ruff Fixes

* Adding getattr for args.torch_trace to handle mock args.

* Fix for 'Missing guid in counter collection data - in csv mode'

* Sending output_format to process_torch_trace_output

* Warning for self contained binaries.

* Ruff

* Ruff

* Measuring longest_running_kernel_baseline instead of worst_kernel_increase, very small kernel runtimes are blowing up the worst_kernel_increase metric.

* Minor fixes in input arguments

* Ruff

* Loging PyTorch version

* Fix ruff formatting for PyTorch version logging

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-01-27 19:50:25 +05:30

2995 строки
97 KiB
Python

##############################################################################
# MIT License
#
# Copyright (c) 2025 Advanced Micro Devices, Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
##############################################################################
import importlib.util
import inspect
import os
import re
import sqlite3
import subprocess
import sys
import time
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
import test_utils
from scipy.stats import zscore
# Runtime config options
config = {}
config["kernel_name_1"] = "vecCopy"
config["app_1"] = ["./tests/vcopy", "-n", "1048576", "-b", "256", "-i", "3"]
config["app_occupancy"] = ["./tests/occupancy"]
config["app_mat_mul_max"] = ["./tests/mat_mul_max"]
config["app_hip_dynamic_shared"] = ["./tests/hip_dynamic_shared"]
config["app_laplace_eqn"] = ["./tests/laplace_eqn", "-i", "5000"]
config["app_laplace_eqn_iter"] = ["./tests/laplace_eqn", "-i", "15000"]
config["rocflop"] = ["./tests/rocflop", "--device", "0"]
config["cleanup"] = True
config["COUNTER_LOGGING"] = False
config["METRIC_COMPARE"] = False
config["METRIC_LOGGING"] = False
arch_config = {}
num_kernels = 3
num_devices = 1
attach_detach_interval_msec_no_delay = 1000
attach_detach_interval_msec_with_delay = 60000
DEFAULT_ABS_DIFF = 15
DEFAULT_REL_DIFF = 50
MAX_REOCCURING_COUNT = 28
CSVS = sorted([
"pmc_perf.csv",
"sysinfo.csv",
])
ROOF_ONLY_FILES = sorted([
"empirRoof_gpu-0_FP32.html",
"pmc_perf.csv",
"roofline.csv",
"sysinfo.csv",
])
PC_SAMPLING_HOST_TRAP_FILES = sorted([
"pmc_perf.csv",
"ps_file_agent_info.csv",
"ps_file_kernel_trace.csv",
"ps_file_pc_sampling_host_trap.csv",
"ps_file_results.json",
"sysinfo.csv",
])
PC_SAMPLING_STOCHASTIC_FILES = sorted([
"pmc_perf.csv",
"ps_file_agent_info.csv",
"ps_file_kernel_trace.csv",
"ps_file_pc_sampling_stochastic.csv",
"ps_file_results.json",
"sysinfo.csv",
])
METRIC_THRESHOLDS = {
"2.1.12": {"absolute": 0, "relative": 8},
"3.1.1": {"absolute": 0, "relative": 10},
"3.1.10": {"absolute": 0, "relative": 10},
"3.1.11": {"absolute": 0, "relative": 1},
"3.1.12": {"absolute": 0, "relative": 1},
"3.1.13": {"absolute": 0, "relative": 1},
"5.1.0": {"absolute": 0, "relative": 15},
"5.2.0": {"absolute": 0, "relative": 15},
"6.1.4": {"absolute": 4, "relative": 0},
"6.1.5": {"absolute": 0, "relative": 1},
"6.1.0": {"absolute": 0, "relative": 15},
"6.1.3": {"absolute": 0, "relative": 11},
"6.2.12": {"absolute": 0, "relative": 1},
"6.2.13": {"absolute": 0, "relative": 1},
"7.1.0": {"absolute": 0, "relative": 1},
"7.1.1": {"absolute": 0, "relative": 1},
"7.1.2": {"absolute": 0, "relative": 1},
"7.1.5": {"absolute": 0, "relative": 1},
"7.1.6": {"absolute": 0, "relative": 1},
"7.1.7": {"absolute": 0, "relative": 1},
"7.2.1": {"absolute": 0, "relative": 10},
"7.2.3": {"absolute": 0, "relative": 12},
"7.2.6": {"absolute": 0, "relative": 1},
"10.1.4": {"absolute": 0, "relative": 1},
"10.1.5": {"absolute": 0, "relative": 1},
"10.1.6": {"absolute": 0, "relative": 1},
"10.1.7": {"absolute": 0, "relative": 1},
"10.3.4": {"absolute": 0, "relative": 1},
"10.3.5": {"absolute": 0, "relative": 1},
"10.3.6": {"absolute": 0, "relative": 1},
"11.2.1": {"absolute": 0, "relative": 1},
"11.2.4": {"absolute": 0, "relative": 5},
"13.2.0": {"absolute": 0, "relative": 1},
"13.2.2": {"absolute": 0, "relative": 1},
"14.2.0": {"absolute": 0, "relative": 1},
"14.2.5": {"absolute": 0, "relative": 1},
"14.2.7": {"absolute": 0, "relative": 1},
"14.2.8": {"absolute": 0, "relative": 1},
"15.1.4": {"absolute": 0, "relative": 1},
"15.1.5": {"absolute": 0, "relative": 1},
"15.1.6": {"absolute": 0, "relative": 1},
"15.1.7": {"absolute": 0, "relative": 1},
"15.2.4": {"absolute": 0, "relative": 1},
"15.2.5": {"absolute": 0, "relative": 1},
"16.1.0": {"absolute": 0, "relative": 1},
"16.1.3": {"absolute": 0, "relative": 1},
"16.3.0": {"absolute": 0, "relative": 1},
"16.3.1": {"absolute": 0, "relative": 1},
"16.3.2": {"absolute": 0, "relative": 1},
"16.3.5": {"absolute": 0, "relative": 1},
"16.3.6": {"absolute": 0, "relative": 1},
"16.3.7": {"absolute": 0, "relative": 1},
"16.3.9": {"absolute": 0, "relative": 1},
"16.3.10": {"absolute": 0, "relative": 1},
"16.3.11": {"absolute": 0, "relative": 1},
"16.4.3": {"absolute": 0, "relative": 1},
"16.4.4": {"absolute": 0, "relative": 1},
"16.5.0": {"absolute": 0, "relative": 1},
"17.3.3": {"absolute": 0, "relative": 1},
"17.3.6": {"absolute": 0, "relative": 1},
"18.1.0": {"absolute": 0, "relative": 1},
"18.1.1": {"absolute": 0, "relative": 1},
"18.1.2": {"absolute": 0, "relative": 1},
"18.1.3": {"absolute": 0, "relative": 1},
"18.1.5": {"absolute": 0, "relative": 1},
"18.1.6": {"absolute": 1, "relative": 0},
}
# check for parallel resource allocation
test_utils.check_resource_allocation()
def counter_compare(test_name, errors_pd, baseline_df, run_df, threshold=5):
# iterate data one row at a time
for idx_1 in run_df.index:
run_row = run_df.iloc[idx_1]
baseline_row = baseline_df.iloc[idx_1]
if not run_row["KernelName"] == baseline_row["KernelName"]:
print("Kernel/dispatch mismatch")
assert 0
kernel_name = run_row["KernelName"]
gpu_id = run_row["gpu-id"]
differences = {}
for pmc_counter in run_row.index:
if "Ns" in pmc_counter or "id" in pmc_counter or "[" in pmc_counter:
# print("skipping "+pmc_counter)
continue
# assert 0
if not pmc_counter in list(baseline_df.columns):
print("error: pmc mismatch! " + pmc_counter + " is not in baseline_df")
continue
run_data = run_row[pmc_counter]
baseline_data = baseline_row[pmc_counter]
if isinstance(run_data, str) and isinstance(baseline_data, str):
if run_data not in baseline_data:
print(baseline_data)
else:
# relative difference
if not run_data == 0:
diff = round(100 * abs(baseline_data - run_data) / run_data, 2)
if diff > threshold:
print("[" + pmc_counter + "] diff is :" + str(diff) + "%")
if pmc_counter not in differences.keys():
print(
"[" + pmc_counter + "] not found in ",
list(differences.keys()),
)
differences[pmc_counter] = [diff]
else:
# Why are we here?
print(
"Why did we get here?!?!? errors_pd[idx_1]:",
list(differences.keys()),
)
differences[pmc_counter].append(diff)
else:
# if 0 show absolute difference
diff = round(baseline_data - run_data, 2)
if diff > threshold:
print(
str(idx_1) + "[" + pmc_counter + "] diff is :" + str(diff)
)
differences["kernel_name"] = [kernel_name]
differences["test_name"] = [test_name]
differences["gpu-id"] = [gpu_id]
errors_pd = pd.concat([errors_pd, pd.DataFrame.from_dict(differences)])
return errors_pd
soc = test_utils.gpu_soc()
os.environ["ROCPROF"] = "rocprofiler-sdk"
Baseline_dir = str(Path("tests/workloads/vcopy/" + soc).resolve())
def log_counter(file_dict, test_name):
for file in file_dict.keys():
if file == "pmc_perf.csv" or "SQ" in file:
# read file in Baseline
df_1 = pd.read_csv(Baseline_dir + "/" + file, index_col=0)
# get corresponding file from current test run
df_2 = file_dict[file]
errors = counter_compare(test_name, pd.DataFrame(), df_1, df_2, 5)
if not errors.empty:
if Path(
Baseline_dir + "/" + file.split(".")[0] + "_error_log.csv"
).exists():
error_log = pd.read_csv(
Baseline_dir + "/" + file.split(".")[0] + "_error_log.csv",
index_col=0,
)
new_error_log = pd.concat([error_log, errors])
new_error_log = new_error_log.reindex(
sorted(new_error_log.columns), axis=1
)
new_error_log = new_error_log.sort_values(
by=["test_name", "kernel_name", "gpu-id"]
)
new_error_log.to_csv(
Baseline_dir + "/" + file.split(".")[0] + "_error_log.csv"
)
else:
errors.to_csv(
Baseline_dir + "/" + file.split(".")[0] + "_error_log.csv"
)
def baseline_compare_metric(test_name, workload_dir, args=[]):
t = subprocess.Popen(
[
sys.executable,
"src/rocprof_compute",
"analyze",
"--path",
Baseline_dir,
]
+ args
+ ["--path", workload_dir, "--report-diff", "-1"],
stdout=subprocess.PIPE,
)
captured_output = t.communicate(timeout=1300)[0].decode("utf-8")
print(captured_output)
assert t.returncode == 0
if "DEBUG ERROR" in captured_output:
error_df = pd.DataFrame()
if Path(Baseline_dir + "/metric_error_log.csv").exists():
error_df = pd.read_csv(
Baseline_dir + "/metric_error_log.csv",
index_col=0,
)
output_metric_errors = re.findall(r"(\')([0-9.]*)(\')", captured_output)
high_diff_metrics = [x[1] for x in output_metric_errors]
for metric in high_diff_metrics:
metric_info = re.findall(
r"(^"
+ metric
+ (
r")(?: *)([()0-9A-Za-z- ]+ )"
r"(?: *)([0-9.-]*)"
r"(?: *)([0-9.-]*)"
r"(?: *)\(([-0-9.]*)%\)"
r"(?: *)([-0-9.e]*)"
),
captured_output,
flags=re.MULTILINE,
)
if len(metric_info):
metric_info = metric_info[0]
metric_idx = metric_info[0]
metric_name = metric_info[1].strip()
baseline_val = metric_info[-3]
current_val = metric_info[-4]
relative_diff = float(metric_info[-2])
absolute_diff = float(metric_info[-1])
if relative_diff > -99:
if metric_idx in METRIC_THRESHOLDS.keys():
# print(metric_idx+" is in FIXED_METRICS")
threshold_type = (
"absolute"
if METRIC_THRESHOLDS[metric_idx]["absolute"]
> METRIC_THRESHOLDS[metric_idx]["relative"]
else "relative"
)
isValid = (
(
abs(absolute_diff)
<= METRIC_THRESHOLDS[metric_idx]["absolute"]
)
if (threshold_type == "absolute")
else (
abs(relative_diff)
<= METRIC_THRESHOLDS[metric_idx]["relative"]
)
)
if not isValid:
print(
"index "
+ metric_idx
+ " "
+ threshold_type
+ " difference is supposed to be "
+ str(METRIC_THRESHOLDS[metric_idx][threshold_type])
+ ", absolute diff:",
absolute_diff,
"relative diff: ",
relative_diff,
)
assert 0
continue
# Used for debugging metric lists
if config["METRIC_LOGGING"] and (
(
abs(relative_diff) <= abs(DEFAULT_REL_DIFF)
or (abs(absolute_diff) <= abs(DEFAULT_ABS_DIFF))
)
and (False if baseline_val == "" else float(baseline_val) > 0)
):
# print("logging...")
# print(metric_info)
new_error = pd.DataFrame.from_dict({
"Index": [metric_idx],
"Metric": [metric_name],
"Percent Difference": [relative_diff],
"Absolute Difference": [absolute_diff],
"Baseline": [baseline_val],
"Current": [current_val],
"Test Name": [test_name],
})
error_df = pd.concat([error_df, new_error])
counts = error_df.groupby(["Index"]).cumcount()
reoccurring_metrics = error_df.loc[
counts > MAX_REOCCURING_COUNT
]
reoccurring_metrics["counts"] = counts[
counts > MAX_REOCCURING_COUNT
]
if reoccurring_metrics.any(axis=None):
with pd.option_context(
"display.max_rows",
None,
"display.max_columns",
None,
# 'display.precision', 3,
):
print(
"These metrics appear alot\n",
reoccurring_metrics,
)
# print(list(reoccurring_metrics["Index"]))
# log into csv
if not error_df.empty:
error_df.to_csv(Baseline_dir + "/metric_error_log.csv")
def validate(test_name, workload_dir, file_dict, args=[]):
if config["COUNTER_LOGGING"]:
log_counter(file_dict, test_name)
if config["METRIC_COMPARE"]:
baseline_compare_metric(test_name, workload_dir, args)
def are_stochastic_counters_similar(test_dfs, baseline_df):
"""
Compares multiple test dataframes against a baseline dataframe to check
if the stochastic counter values are similar. Returns True if all test dataframes
have similar counter values to the baseline, otherwise returns False.
"""
group_labels = [
"Kernel_Name",
"Grid_Size",
"Workgroup_Size",
"LDS_Per_Workgroup",
"Counter_Name",
]
baseline_grouped = baseline_df.groupby(group_labels)
tests_grouped = [df.groupby(group_labels) for df in test_dfs]
baseline_group_keys = set(baseline_grouped.groups.keys())
tests_group_keys = [set(group.groups.keys()) for group in tests_grouped]
# Check if all test dataframes have the same group keys as the baseline
if not all(baseline_group_keys == keys for keys in tests_group_keys):
return False
stochastic_counter_patterns = list(
map(
re.compile,
[
".*REQ_sum$",
".*REQ_.*_sum$",
".*READ_sum$",
".*WRITE_sum$",
],
)
)
for group_key, baseline_group in baseline_grouped:
test_groups = [
test_grouped.get_group(group_key) for test_grouped in tests_grouped
]
baseline_counters = baseline_group["Counter_Value"]
test_counters_list = [test_group["Counter_Value"] for test_group in test_groups]
counter_name = group_key[4]
# Warmup values aren't ignored as they do not significantly impact
# the analysis for stochastic counters and leaves too few data points
# for baseline.
if any(
re.match(pattern, counter_name) for pattern in stochastic_counter_patterns
):
# Remove outliers using Z-score method
z_score_threshold = 2.0
test_z_scores_list = [
np.abs(zscore(test_counters)) for test_counters in test_counters_list
]
test_counters_list_trimmed = [
test_counters[test_z_scores < z_score_threshold]
for test_counters, test_z_scores in zip(
test_counters_list, test_z_scores_list
)
]
baseline_mean = baseline_counters.mean()
baseline_std = baseline_counters.std()
upper_bound = baseline_mean + 3 * baseline_std
lower_bound = baseline_mean - 3 * baseline_std
for test_counters in test_counters_list_trimmed:
if test_counters.between(lower_bound, upper_bound).all() is False:
return False
return True
def are_deterministic_counters_equal(test_dfs, baseline_df):
"""
Compares multiple test dataframes against a baseline dataframe to check
if the deterministic counter values are equal. Returns True if all test dataframes
have equal counter values to the baseline, otherwise returns False.
"""
group_labels = [
"Kernel_Name",
"Grid_Size",
"Workgroup_Size",
"LDS_Per_Workgroup",
"Counter_Name",
]
baseline_grouped = baseline_df.groupby(group_labels)
tests_grouped = [df.groupby(group_labels) for df in test_dfs]
baseline_group_keys = set(baseline_grouped.groups.keys())
tests_group_keys = [set(group.groups.keys()) for group in tests_grouped]
# Check if all test dataframes have the same group keys as the baseline
if not all(baseline_group_keys == keys for keys in tests_group_keys):
return False
# series prior to MI350 use CSN, MI350 uses CS{0,1,2,3}
deterministic_counter_patterns = list(
map(
re.compile,
[
"SQ_INSTS_.*",
"SPI_CS\\d_NUM_THREADGROUPS",
"SPI_CSN_NUM_THREADGROUPS",
"SPI_CS\\d_WAVE",
"SPI_CSN_WAVE",
"SQ_WAVES",
],
)
)
for group_key, baseline_group in baseline_grouped:
test_groups = [
test_grouped.get_group(group_key) for test_grouped in tests_grouped
]
baseline_counters = baseline_group["Counter_Value"]
test_counters_list = [test_group["Counter_Value"] for test_group in test_groups]
counter_name = group_key[4]
if any(
re.match(pattern, counter_name)
for pattern in deterministic_counter_patterns
):
if (
all([
test_counters.unique().size == 1
for test_counters in test_counters_list
])
and baseline_counters.unique().size == 1
and all([
test_counters.values[0] == baseline_counters.values[0]
for test_counters in test_counters_list
])
):
continue
return False
return True
# --
# Start of profiling tests
# --
@pytest.mark.path
def test_path(binary_handler_profile_rocprof_compute):
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"This test is not supported for {soc}")
assert 0
validate(inspect.stack()[0][3], workload_dir, file_dict)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.path
def test_path_rocflop(binary_handler_profile_rocprof_compute):
# Test whether multiprocess workloads like rocflop are handled correctly
workload_dir = test_utils.get_output_dir()
options = ["--block", "2.1.1"]
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="rocflop",
)
pmc_perf_df = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)[
"pmc_perf.csv"
]
# Ensure non zero length of df
assert len(pmc_perf_df) > 0
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.path
def test_path_no_native(binary_handler_profile_rocprof_compute):
workload_dir = test_utils.get_output_dir()
options = ["--no-native-tool"]
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"This test is not supported for {soc}")
assert 0
validate(inspect.stack()[0][3], workload_dir, file_dict)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.path
def test_path_rocpd(
binary_handler_profile_rocprof_compute, binary_handler_analyze_rocprof_compute
):
workload_dir = test_utils.get_output_dir()
options = ["--format-rocprof-output", "rocpd"]
binary_handler_profile_rocprof_compute(config, workload_dir, options)
assert (Path(workload_dir) / "pmc_perf.csv").exists()
assert test_utils.check_file_pattern(
"format_rocprof_output: rocpd", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern("Counter_Name", f"{workload_dir}/pmc_perf.csv")
code = binary_handler_analyze_rocprof_compute(["analyze", "--path", workload_dir])
assert code == 0
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.path
def test_path_csv(
binary_handler_profile_rocprof_compute, binary_handler_analyze_rocprof_compute
):
workload_dir = test_utils.get_output_dir()
options = ["--format-rocprof-output", "csv"]
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
all_csvs_mi100 = sorted([
"SQC_DCACHE_INFLIGHT_LEVEL.csv",
"SQC_ICACHE_INFLIGHT_LEVEL.csv",
"SQ_IFETCH_LEVEL.csv",
"SQ_INST_LEVEL_LDS.csv",
"SQ_LEVEL_WAVES.csv",
"pmc_perf.csv",
"pmc_perf_0.csv",
"pmc_perf_1.csv",
"pmc_perf_2.csv",
"pmc_perf_3.csv",
"pmc_perf_4.csv",
"pmc_perf_5.csv",
"pmc_perf_6.csv",
"sysinfo.csv",
])
all_csvs_mi200 = sorted([
"SQC_DCACHE_INFLIGHT_LEVEL.csv",
"SQC_ICACHE_INFLIGHT_LEVEL.csv",
"SQ_IFETCH_LEVEL.csv",
"SQ_INST_LEVEL_LDS.csv",
"SQ_INST_LEVEL_SMEM.csv",
"SQ_INST_LEVEL_VMEM.csv",
"SQ_LEVEL_WAVES.csv",
"pmc_perf.csv",
"pmc_perf_0.csv",
"pmc_perf_1.csv",
"pmc_perf_2.csv",
"pmc_perf_3.csv",
"pmc_perf_4.csv",
"pmc_perf_5.csv",
"sysinfo.csv",
])
all_csvs_mi300 = sorted([
"SQC_DCACHE_INFLIGHT_LEVEL.csv",
"SQC_ICACHE_INFLIGHT_LEVEL.csv",
"SQ_IFETCH_LEVEL.csv",
"SQ_INST_LEVEL_LDS.csv",
"SQ_INST_LEVEL_SMEM.csv",
"SQ_INST_LEVEL_VMEM.csv",
"SQ_LEVEL_WAVES.csv",
"pmc_perf.csv",
"pmc_perf_0.csv",
"pmc_perf_1.csv",
"pmc_perf_2.csv",
"pmc_perf_3.csv",
"pmc_perf_4.csv",
"pmc_perf_5.csv",
"sysinfo.csv",
])
all_csvs_mi350 = sorted([
"SQC_DCACHE_INFLIGHT_LEVEL.csv",
"SQC_ICACHE_INFLIGHT_LEVEL.csv",
"SQ_IFETCH_LEVEL.csv",
"SQ_INST_LEVEL_LDS.csv",
"SQ_INST_LEVEL_SMEM.csv",
"SQ_INST_LEVEL_VMEM.csv",
"SQ_LEVEL_WAVES.csv",
"pmc_perf.csv",
"pmc_perf_0.csv",
"pmc_perf_1.csv",
"pmc_perf_2.csv",
"pmc_perf_3.csv",
"pmc_perf_4.csv",
"pmc_perf_5.csv",
"pmc_perf_6.csv",
"pmc_perf_7.csv",
"pmc_perf_8.csv",
"pmc_perf_9.csv",
"pmc_perf_10.csv",
"pmc_perf_11.csv",
"pmc_perf_12.csv",
"sysinfo.csv",
])
if soc == "MI100":
assert sorted(list(file_dict.keys())) == all_csvs_mi100
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == all_csvs_mi200
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == all_csvs_mi300
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == all_csvs_mi350
else:
print(f"This test is not supported for {soc}")
assert 0
validate(inspect.stack()[0][3], workload_dir, file_dict)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roof_basic_validation(binary_handler_profile_rocprof_compute):
"""
Test basic roofline HTML generation with full validation pipeline.
This test runs the complete validation flow including counter logging
and metric comparison (if enabled in config). Validates that roofline HTMLs
are generated with the integrated multi-subplot layout (roofline plot +
plot points table + kernel names table).
"""
if soc in ("MI100"):
# roofline is not supported on MI100
assert True
# Do not continue testing
return
options = ["--device", "0", "--roof-only"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
# assert successful run
assert returncode == 0
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roof_multiple_data_types(binary_handler_profile_rocprof_compute):
"""Test roofline with multiple data types"""
if soc in ("MI100"):
# roofline is not supported on MI100
pytest.skip("Roofline not supported on MI100")
return
# test multiple data types
data_types = ["FP32"] # start with just FP32 to avoid complex validation
for dtype in data_types:
options = [
"--device",
"0",
"--roof-only",
"--roofline-data-type",
dtype,
]
workload_dir = test_utils.get_output_dir()
try:
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
if returncode == 0:
assert os.path.exists(f"{workload_dir}/pmc_perf.csv")
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
else:
pass
finally:
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roof_invalid_data_type(binary_handler_profile_rocprof_compute):
"""Test roofline with invalid data type"""
if soc in ("MI100"):
# roofline is not supported on MI100
pytest.skip("Roofline not supported on MI100")
return
# test invalid data types
invalid_options = [
"--device",
"0",
"--roof-only",
"--roofline-data-type",
"INVALID_TYPE",
]
workload_dir = test_utils.get_output_dir()
try:
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, invalid_options, check_success=False, roof=True
)
assert returncode >= 0
finally:
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roof_file_validation(binary_handler_profile_rocprof_compute):
"""Test file validation paths in roofline"""
if soc in ("MI100"):
pytest.skip("Roofline not supported on MI100")
return
options = ["--device", "0", "--roof-only"]
workload_dir = test_utils.get_output_dir()
try:
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
if returncode == 0:
assert os.path.exists(f"{workload_dir}/pmc_perf.csv")
roofline_csv = f"{workload_dir}/roofline.csv"
if os.path.exists(roofline_csv):
import pandas as pd
df = pd.read_csv(roofline_csv)
assert len(df) >= 0
finally:
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roof_rocpd(binary_handler_profile_rocprof_compute):
if soc == "MI100":
pytest.skip("Roofline not supported on MI100")
return
workload_dir = test_utils.get_output_dir()
options = ["--device", "0", "--roof-only", "--format-rocprof-output", "rocpd"]
binary_handler_profile_rocprof_compute(config, workload_dir, options, roof=True)
assert (Path(workload_dir) / "pmc_perf.csv").exists()
assert (Path(workload_dir) / "roofline.csv").exists()
assert test_utils.check_file_pattern(
"format_rocprof_output: rocpd", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern("Counter_Name", f"{workload_dir}/pmc_perf.csv")
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.misc
def test_analyze_rocpd(
binary_handler_profile_rocprof_compute, binary_handler_analyze_rocprof_compute
):
workload_dir = test_utils.get_output_dir()
options = ["--device", "0", "--format-rocprof-output", "rocpd"]
binary_handler_profile_rocprof_compute(config, workload_dir, options, roof=True)
db_name = "test"
code = binary_handler_analyze_rocprof_compute([
"analyze",
"--output-format",
"db",
"--output-name",
f"{db_name}",
"--path",
workload_dir,
])
assert code == 0
assert os.path.isfile(f"{db_name}.db")
# Open the sqlite database and assert the schema
# Import Kernel from analysis_orm.py
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from utils.analysis_orm import (
Dispatch,
Kernel,
Metadata,
MetricDefinition,
MetricValue,
RooflineData,
Workload,
)
table_name_map = {
"compute_workload": Workload,
"compute_metric_definition": MetricDefinition,
"compute_roofline_data": RooflineData,
"compute_dispatch": Dispatch,
"compute_kernel": Kernel,
"compute_metric_value": MetricValue,
"compute_metadata": Metadata,
}
def check_cols(table_name, orm_obj):
conn = sqlite3.connect(f"{db_name}.db")
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info('{table_name}');")
columns = cursor.fetchall()
column_names = [column[1] for column in columns]
expected_columns = [col.name for col in orm_obj.__table__.columns]
assert column_names == expected_columns
conn.close()
for table_name, orm_obj in table_name_map.items():
check_cols(table_name, orm_obj)
os.remove(f"{db_name}.db")
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roofline_workload_dir_not_set_error():
"""
Test roof_setup() error: "Workload directory is not set. Cannot perform setup."
This covers lines 113-117
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
try:
from roofline import Roofline
from utils.specs import generate_machine_specs
class MockArgs:
def __init__(self):
self.roof_only = True
self.mem_level = "ALL"
self.sort = "ALL"
self.roofline_data_type = ["FP32"]
args = MockArgs()
mspec = generate_machine_specs(None, None)
run_parameters = {
"workload_dir": None,
"device_id": 0,
"sort_type": "kernels",
"mem_level": "ALL",
"is_standalone": True,
"roofline_data_type": ["FP32"],
}
roofline_instance = Roofline(args, mspec, run_parameters)
import contextlib
from io import StringIO
captured_output = StringIO()
with contextlib.redirect_stderr(captured_output):
try:
roofline_instance.roof_setup()
except SystemExit:
pass
assert True
except ImportError:
pytest.skip("Could not import roofline module for direct testing")
@pytest.mark.roofline_1
def test_roof_workload_dir_validation(binary_handler_profile_rocprof_compute):
if soc in ("MI100"):
assert True
return
options = ["--device", "0", "--roof-only"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
assert returncode == 0
nested_dir = os.path.join(workload_dir, "nested", "structure")
os.makedirs(nested_dir, exist_ok=True)
returncode = binary_handler_profile_rocprof_compute(
config, nested_dir, options, check_success=False, roof=True
)
assert returncode == 0
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roofline_empty_kernel_names_handling(binary_handler_profile_rocprof_compute):
"""
Test roofline behavior when kernel filter doesn't match any
kernels during profiling.
When profiling with a non-matching kernel filter, the workload
still executes and profiling data is collected for all kernels.
However, when roofline attempts to filter the collected data
to match the requested kernel name, it finds no match and
produces an error.
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
options = [
"--device",
"0",
"--roof-only",
"--kernel",
"nonexistent_kernel_name_that_should_not_match_anything",
]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
assert returncode == 1, f"Expected error (returncode=1), got {returncode}"
html_files = list(Path(workload_dir).glob("empirRoof_*.html"))
assert len(html_files) == 0, (
"No roofline HTML should be generated when no kernels match"
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roofline_kernel_filter(binary_handler_profile_rocprof_compute):
"""
Test roofline multi-attempt profiling with `--kernel`
Expect to be able to re-profile from same workload if kernels are valid.
(Validity of --kernels tested in test_roofline_kernel_filter_error_handling already)
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
options = [
"--device",
"0",
"--roof-only",
]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=True, roof=True
)
# Don't clean output dir, use same workload
options.extend(["--kernel", config["kernel_name_1"]])
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=True, roof=True
)
# Test nonexistent kernel on roof profile using existing profiling data
# Since already profiled, throw error if non-existent kernel requested for roofline
options.append("nonexistent_kernel_name_that_should_not_match_anything")
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=False, roof=True
)
assert returncode == 1
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_1
def test_roofline_unsupported_datatype_error(binary_handler_profile_rocprof_compute):
"""
Test datatype validation error in empirical_roofline()
This should trigger console_error for unsupported datatype
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
options = [
"--device",
"0",
"--roof-only",
"--roofline-data-type",
"UNSUPPORTED_TYPE",
]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=False, roof=True
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_2
@pytest.mark.parametrize(
"options,expected_files,test_id",
[
(
["--device", "0", "--roof-only", "--roofline-data-type", "FP32"],
["empirRoof_gpu-0_FP32.html"],
"FP32_datatype",
),
(
["--device", "0", "--roof-only", "--roofline-data-type", "FP16"],
["empirRoof_gpu-0_FP16.html"],
"FP16_datatype",
),
(
["--device", "0", "--roof-only", "--kernel", "KERNEL_NAME_PLACEHOLDER"],
["EXPECTED_FILE_PLACEHOLDER"],
"kernel_filter",
),
],
ids=["FP32_datatype", "FP16_datatype", "kernel_filter"],
)
def test_roof_plot_modes(
binary_handler_profile_rocprof_compute, options, expected_files, test_id
):
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
# Handle dynamic kernel name substitution for the kernel_filter test case
options = [
config["kernel_name_1"] if opt == "KERNEL_NAME_PLACEHOLDER" else opt
for opt in options
]
# Test `--kernel` filtering outputs are present and labelled correctly
filter_empirRoof = "empirRoof_gpu-0_" + config["kernel_name_1"]
expected_files = [
filter_empirRoof if f == "EXPECTED_FILE_PLACEHOLDER" else f
for f in expected_files
]
workload_dir = test_utils.get_output_dir(param_id=test_id)
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
assert returncode == 0
for expected_file in expected_files:
expected_path = os.path.join(workload_dir, expected_file)
if os.path.exists(expected_path):
assert os.path.getsize(expected_path) > 0
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_2
def test_roof_cli_plot_generation(binary_handler_profile_rocprof_compute):
if soc in ("MI100"):
assert True
return
try:
import plotext as plt # noqa: F401
cli_available = True
except ImportError:
cli_available = False
if cli_available:
options = ["--device", "0", "--roof-only"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=False, roof=True
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
else:
pytest.skip("plotext not available for CLI testing")
@pytest.mark.roofline_2
def test_roof_error_handling(binary_handler_profile_rocprof_compute):
if soc in ("MI100"):
assert True
return
options = ["--device", "0", "--roof-only"]
workload_dir = test_utils.get_output_dir()
pmc_perf_path = os.path.join(workload_dir, "pmc_perf.csv")
if os.path.exists(pmc_perf_path):
os.remove(pmc_perf_path)
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=False, roof=True
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_2
def test_roofline_missing_file_handling(binary_handler_profile_rocprof_compute):
"""
Test handling of missing roofline.csv file
This should trigger error message in cli_generate_plot()
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
try:
from roofline import Roofline
from utils.schema import Workload
from utils.specs import generate_machine_specs
class MockArgs:
def __init__(self):
self.roof_only = True
self.mem_level = "ALL"
self.sort = "ALL"
self.roofline_data_type = ["FP32"]
args = MockArgs()
mspec = generate_machine_specs(None, None)
workload = Workload()
workload_dir = test_utils.get_output_dir()
run_parameters = {
"workload_dir": workload_dir,
"device_id": 0,
"sort_type": "kernels",
"mem_level": "ALL",
"is_standalone": True,
"roofline_data_type": ["FP32"],
}
roofline_instance = Roofline(args, mspec, run_parameters)
result = roofline_instance.cli_generate_plot(
"FP32", workload, config, arch_config
)
assert result is None
test_utils.clean_output_dir(config["cleanup"], workload_dir)
except ImportError:
pytest.skip("Could not import roofline module for direct testing")
@pytest.mark.roofline_2
def test_roofline_invalid_datatype_cli(binary_handler_profile_rocprof_compute):
"""
Test CLI plot generation with invalid datatype
This should trigger error in cli_generate_plot() lines 617-624
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
try:
from roofline import Roofline
from utils.schema import Workload
from utils.specs import generate_machine_specs
class MockArgs:
def __init__(self):
self.roof_only = True
self.mem_level = "ALL"
self.sort = "ALL"
self.roofline_data_type = ["FP32"]
args = MockArgs()
mspec = generate_machine_specs(None, None)
workload = Workload()
run_parameters = {
"workload_dir": test_utils.get_output_dir(),
"device_id": 0,
"sort_type": "kernels",
"mem_level": "ALL",
"is_standalone": True,
"roofline_data_type": ["FP32"],
}
roofline_instance = Roofline(args, mspec, run_parameters)
result = roofline_instance.cli_generate_plot(
"INVALID_DATATYPE", workload, config, arch_config
)
assert result is None
test_utils.clean_output_dir(config["cleanup"], run_parameters["workload_dir"])
except ImportError:
pytest.skip("Could not import roofline module for direct testing")
@pytest.mark.roofline_2
def test_roofline_ceiling_data_validation(binary_handler_profile_rocprof_compute):
"""
Test ceiling data validation in generate_plot()
This covers error handling in lines 516-526
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
options = ["--device", "0", "--roof-only", "--mem-level", "INVALID_LEVEL"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute( # noqa: F841
config, workload_dir, options, check_success=False, roof=True
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.roofline_2
def test_roofline_plot_points_data_generation():
"""
Test that plot points data structure is correctly generated with:
- Symbol assignments
- AI values (FLOPs/Byte)
- Performance values (GFLOPs/s)
- Memory/Compute bound status
- Cache level information
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
try:
from roofline import Roofline
from utils.specs import generate_machine_specs
class MockArgs:
def __init__(self):
self.roof_only = True
self.mem_level = "ALL"
self.sort = "ALL"
self.roofline_data_type = ["FP32"]
args = MockArgs()
mspec = generate_machine_specs(None, None)
mock_ai_data = {
"ai_l1": [[0.5, 1.2], [100.0, 150.0]],
"ai_l2": [[0.3, 0.8], [80.0, 120.0]],
"ai_hbm": [[0.1, 0.4], [50.0, 90.0]],
"kernelNames": ["kernel_A", "kernel_B"],
}
mock_ceiling_data = {
"l1": [[0.01, 10], [10, 1000], 100],
"l2": [[0.01, 10], [10, 800], 80],
"hbm": [[0.01, 10], [10, 500], 50],
"valu": [[1, 100], [200, 200], 200],
"mfma": [[1, 100], [500, 500], 500],
}
plot_points_data = []
cache_colors = {
"ai_l1": "blue",
"ai_l2": "green",
"ai_hbm": "red",
}
roofline_instance = Roofline(args, mspec)
for cache_level in ["ai_l1", "ai_l2", "ai_hbm"]:
if cache_level in mock_ai_data:
x_vals = mock_ai_data[cache_level][0]
y_vals = mock_ai_data[cache_level][1]
num_kernels = len(mock_ai_data["kernelNames"])
for i in range(min(len(x_vals), num_kernels)):
if x_vals[i] > 0 and y_vals[i] > 0:
status = roofline_instance._determine_kernel_bound_status(
ai_value=x_vals[i],
performance=y_vals[i],
cache_level=cache_level,
ceiling_data=mock_ceiling_data,
)
plot_points_data.append({
"symbol": None,
"color": cache_colors.get(cache_level, "gray"),
"cache_level": cache_level.replace("ai_", "", 1).upper(),
"ai": f"{x_vals[i]:.2f}",
"performance": f"{y_vals[i]:.2f}",
"status": status,
"kernel_idx": i,
})
assert len(plot_points_data) > 0, "Plot points data should not be empty"
for point in plot_points_data:
assert "cache_level" in point
assert "ai" in point
assert "performance" in point
assert "status" in point
assert "kernel_idx" in point
assert "color" in point
assert point["cache_level"] in ["L1", "L2", "HBM"]
assert point["status"] in ["Memory Bound", "Compute Bound", "Unknown"]
assert isinstance(point["ai"], str)
assert isinstance(point["performance"], str)
except ImportError:
pytest.skip("Could not import roofline module for direct testing")
@pytest.mark.roofline_2
def test_roofline_bound_status_calculation():
"""
Test _determine_kernel_bound_status() correctly classifies kernels as
Memory Bound or Compute Bound based on their AI and performance vs ceilings.
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
try:
from roofline import Roofline
from utils.specs import generate_machine_specs
class MockArgs:
def __init__(self):
self.roof_only = True
self.mem_level = "ALL"
self.sort = "ALL"
self.roofline_data_type = ["FP32"]
args = MockArgs()
mspec = generate_machine_specs(None, None)
roofline_instance = Roofline(args, mspec)
ceiling_data = {
"hbm": [[0.01, 10], [10, 1000], 100],
"valu": [[1, 100], [200, 200], 200],
"mfma": [[1, 100], [500, 500], 500],
}
status1 = roofline_instance._determine_kernel_bound_status(
ai_value=1.0,
performance=100.0,
cache_level="ai_hbm",
ceiling_data=ceiling_data,
)
assert status1 == "Memory Bound", f"Expected Memory Bound, got {status1}"
status2 = roofline_instance._determine_kernel_bound_status(
ai_value=5.0,
performance=150.0,
cache_level="ai_hbm",
ceiling_data=ceiling_data,
)
assert status2 == "Compute Bound", f"Expected Compute Bound, got {status2}"
status3 = roofline_instance._determine_kernel_bound_status(
ai_value=1.0,
performance=100.0,
cache_level="ai_l1",
ceiling_data=ceiling_data,
)
assert status3 == "Unknown", f"Expected Unknown, got {status3}"
bad_ceiling_data = {
"hbm": [100],
}
status4 = roofline_instance._determine_kernel_bound_status(
ai_value=1.0,
performance=100.0,
cache_level="ai_hbm",
ceiling_data=bad_ceiling_data,
)
assert status4 == "Unknown", f"Expected Unknown for bad data, got {status4}"
except ImportError:
pytest.skip("Could not import roofline module for direct testing")
@pytest.mark.roofline_2
def test_roofline_many_kernels_dynamic_height(binary_handler_profile_rocprof_compute):
"""
Test roofline HTML generation with many kernels (10+) to verify:
- Dynamic height calculation works
- HTML is generated successfully
- File size is reasonable
Note: This test uses a regular workload but validates the HTML structure
can handle the multi-subplot layout properly.
"""
if soc in ("MI100"):
pytest.skip("Skipping roofline test for MI100")
return
options = ["--device", "0", "--roof-only"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
assert returncode == 0, "Roofline profiling should succeed"
html_files = list(Path(workload_dir).glob("empirRoof_*.html"))
assert len(html_files) > 0, "At least one roofline HTML should be generated"
for html_file in html_files:
assert html_file.exists(), f"HTML file {html_file} should exist"
file_size = html_file.stat().st_size
# HTML should be larger than 10KB (has content) but less than 50MB (reasonable)
assert file_size > 10000, (
f"HTML {html_file} too small ({file_size} bytes), may be malformed"
)
assert file_size < 50000000, (
f"HTML {html_file} too large ({file_size} bytes), may have issues"
)
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.misc
def test_device_filter(binary_handler_profile_rocprof_compute):
options = ["--device", "0"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
# TODO - verify expected device id in results
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.kernel_execution
def test_kernel(binary_handler_profile_rocprof_compute):
options = ["--kernel", config["kernel_name_1"]]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.dispatch
def test_dispatch_0(binary_handler_profile_rocprof_compute):
options = ["--dispatch", "1"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, 1)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
[
"--dispatch",
"1",
],
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.dispatch
def test_dispatch_0_1(binary_handler_profile_rocprof_compute):
options = ["--dispatch", "1:2"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, 2)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
["--dispatch", "1", "2"],
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.dispatch
def test_dispatch_2(binary_handler_profile_rocprof_compute):
options = ["--dispatch", "1"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, 1)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
[
"--dispatch",
"1",
],
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.join
def test_join_type_grid(binary_handler_profile_rocprof_compute):
options = ["--join-type", "grid"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.join
def test_join_type_kernel(binary_handler_profile_rocprof_compute):
options = ["--join-type", "kernel"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(config, workload_dir, options)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.sort
def test_roof_sort_dispatches(binary_handler_profile_rocprof_compute):
# only test 1 device for roofline
if soc in ("MI100"):
# roofline is not supported on MI100
assert True
# Do not continue testing
return
options = ["--device", "0", "--roof-only", "--sort", "dispatches"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
# assert successful run
assert returncode == 0
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.sort
def test_roof_sort_kernels(binary_handler_profile_rocprof_compute):
# only test 1 device for roofline
if soc in ("MI100"):
# roofline is not supported on MI100
assert True
# Do not continue testing
return
options = ["--device", "0", "--roof-only", "--sort", "kernels"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
# assert successful run
assert returncode == 0
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.mem
def test_roof_mem_levels_vL1D(binary_handler_profile_rocprof_compute):
# only test 1 device for roofline
if soc in ("MI100"):
# roofline is not supported on MI100
assert True
# Do not continue testing
return
options = ["--device", "0", "--roof-only", "--mem-level", "vL1D"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
# assert successful run
assert returncode == 0
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.mem
def test_roof_mem_levels_LDS(binary_handler_profile_rocprof_compute):
# only test 1 device for roofline
if soc in ("MI100"):
# roofline is not supported on MI100
assert True
# Do not continue testing
return
options = ["--device", "0", "--roof-only", "--mem-level", "LDS"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=True
)
# assert successful run
assert returncode == 0
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
assert sorted(list(file_dict.keys())) == ROOF_ONLY_FILES
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_lds_section(binary_handler_profile_rocprof_compute):
options = ["--block", "12"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
assert test_utils.check_file_pattern(
"- '12'", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern("SQ_INSTS_LDS", f"{workload_dir}/pmc_perf.csv")
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_instmix_memchart_section(binary_handler_profile_rocprof_compute):
options = ["--block", "10", "3"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
assert test_utils.check_file_pattern(
"- '10'", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern(
"- '3'", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern(
"TA_FLAT_WAVEFRONTS", f"{workload_dir}/pmc_perf.csv"
)
assert test_utils.check_file_pattern(
"SQC_TC_DATA_READ_REQ", f"{workload_dir}/pmc_perf.csv"
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_lds_sol_section(binary_handler_profile_rocprof_compute):
options = ["--block", "12.1"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
assert test_utils.check_file_pattern(
"- '12.1'", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern(
"SQ_ACTIVE_INST_LDS", f"{workload_dir}/pmc_perf.csv"
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_instmix_section_global_write_kernel(binary_handler_profile_rocprof_compute):
options = ["-k", "global_write", "--block", "10"]
custom_config = dict(config)
custom_config["kernel_name_1"] = "global_write"
custom_config["app_1"] = ["./tests/vmem"]
num_kernels = 1
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
custom_config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
assert test_utils.check_file_pattern(
"- '10'", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern(
"- global_write", f"{workload_dir}/profiling_config.yaml"
)
assert test_utils.check_file_pattern(
"TA_FLAT_WAVEFRONTS", f"{workload_dir}/pmc_perf.csv"
)
assert test_utils.check_file_pattern("global_write", f"{workload_dir}/pmc_perf.csv")
assert not test_utils.check_file_pattern(
"global_read", f"{workload_dir}/pmc_perf.csv"
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_list_metrics(binary_handler_profile_rocprof_compute):
options = ["--list-metrics", "gfx90a"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
# workload dir should not exist
assert not Path(workload_dir).exists()
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_list_metrics_with_block(binary_handler_profile_rocprof_compute):
options = ["--list-metrics", "gfx90a", "--block", "10"]
workload_dir = test_utils.get_output_dir()
code = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=False
)
# Should return code 1 since --block cannot be used with --list-metrics
assert code == 1
# workload dir should not exist
assert not Path(workload_dir).exists()
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.section
def test_list_available_metrics(binary_handler_profile_rocprof_compute, capsys):
options = ["--list-available-metrics"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
# workload dir should not exist
assert not Path(workload_dir).exists()
test_utils.clean_output_dir(config["cleanup"], workload_dir)
# Test output
output = capsys.readouterr().out
assert "0 -> Top Stats" in output
assert "1 -> System Info" in output
@pytest.mark.section
def test_list_available_metrics_with_block(
binary_handler_profile_rocprof_compute, capsys
):
options = ["--list-available-metrics", "--block", "10"]
workload_dir = test_utils.get_output_dir()
code = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=False
)
# Should return code 1 since --block cannot be used with --list-available-metrics
assert code == 1
# workload dir should not exist
assert not Path(workload_dir).exists()
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.path
def test_comprehensive_error_paths():
"""Simplified test for error path coverage"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from utils.parser import (
build_comparable_columns,
build_eval_string,
calc_builtin_var,
)
columns = build_comparable_columns("ms")
expected = [
"Count(ms)",
"Sum(ms)",
"Mean(ms)",
"Median(ms)",
"Standard Deviation(ms)",
]
for expected_col in expected:
assert expected_col in columns
class MockSysInfo:
total_l2_chan = 16
sys_info = MockSysInfo()
result = calc_builtin_var(42, sys_info)
assert result == 42
result = calc_builtin_var("$total_l2_chan", sys_info)
assert result == 16
try:
build_eval_string("test", None, config={})
assert False, "Should raise exception for None coll_level"
except Exception as e:
assert "coll_level can not be None" in str(e)
@pytest.mark.pc_sampling
def test_pc_sampling_host_trap(binary_handler_profile_rocprof_compute):
if soc in ("MI100"):
assert True
return
options = [
"--block",
"21",
"--pc-sampling-method",
"host_trap",
"--pc-sampling-interval",
"256",
]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_mat_mul_max",
)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, 1)
assert sorted(list(file_dict.keys())) == sorted(PC_SAMPLING_HOST_TRAP_FILES)
validate(inspect.stack()[0][3], workload_dir, file_dict)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.pc_sampling
def test_pc_sampling_stochastic(binary_handler_profile_rocprof_compute):
if soc in ("MI100") or soc in ("MI200"):
assert True
return
options = [
"--block",
"21",
"--pc-sampling-method",
"stochastic",
"--pc-sampling-interval",
"1048576",
]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_mat_mul_max",
)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, 1)
assert sorted(list(file_dict.keys())) == sorted(PC_SAMPLING_STOCHASTIC_FILES)
validate(inspect.stack()[0][3], workload_dir, file_dict)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.live_attach_detach
def test_live_attach_detach_block(binary_handler_profile_rocprof_compute):
options = ["--block", "3.1.1", "4.1.1", "5.1.1"]
workload_dir = test_utils.get_output_dir()
# TODO: temp fix for sdk defautly disable attach/detach,
# remove after it sets default to enable
env = os.environ.copy()
env["ROCP_TOOL_ATTACH"] = "1"
process_workload = None
try:
# Start workload
process_workload = subprocess.Popen(config["app_hip_dynamic_shared"], env=env)
time.sleep(5) # Give workload time to start
attach_detach = {
"attach_pid": process_workload.pid,
"attach-duration-msec": attach_detach_interval_msec_no_delay,
}
# Run profiler (might fail / timeout / throw)
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_hip_dynamic_shared",
attach_detach_para=attach_detach,
)
finally:
if process_workload and process_workload.poll() is None:
print(f"[finally] killing workload pid={process_workload.pid}")
process_workload.kill()
process_workload.wait()
# Validate results
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(inspect.stack()[0][3], workload_dir, file_dict)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.skip(
reason="Temporarily disabled: \
waiting for SDK fix for no outputfile with thread sleeping"
)
@pytest.mark.live_attach_detach
def test_live_attach_detach_block_thread_sleep(binary_handler_profile_rocprof_compute):
options = ["--block", "3.1.1", "4.1.1", "5.1.1"]
workload_dir = test_utils.get_output_dir()
# TODO: temp fix for sdk defautly disable attach/detach,
# remove after it sets default to enable
env = os.environ.copy()
env["ROCP_TOOL_ATTACH"] = "1"
process_workload = None
try:
# Start workload with sleep mode enabled
process_workload = subprocess.Popen(
[*config["app_hip_dynamic_shared"], "--enable-sleep"], env=env
)
time.sleep(5) # Give workload time to start
attach_detach = {
"attach_pid": process_workload.pid,
"attach-duration-msec": attach_detach_interval_msec_with_delay,
}
# Main profiling call (can fail or hang)
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_hip_dynamic_shared",
attach_detach_para=attach_detach,
)
finally:
if process_workload and process_workload.poll() is None:
print(f"[finally] killing workload pid={process_workload.pid}")
process_workload.kill()
process_workload.wait()
# Validate output
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
# Check profiling_config.yaml block entries
config_file = f"{workload_dir}/profiling_config.yaml"
assert test_utils.check_file_pattern("- 3.1.1", config_file)
assert test_utils.check_file_pattern("- 4.1.1", config_file)
assert test_utils.check_file_pattern("- 5.1.1", config_file)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.live_attach_detach
def test_live_attach_detach_singlepass_launch_stats(
binary_handler_profile_rocprof_compute,
):
options = ["--set", "launch_stats"]
workload_dir = test_utils.get_output_dir()
# TODO: temp fix for sdk defautly disable attach/detach,
# remove after it sets default to enable
env = os.environ.copy()
env["ROCP_TOOL_ATTACH"] = "1"
process_workload = None
try:
# Start workload
process_workload = subprocess.Popen(config["app_hip_dynamic_shared"], env=env)
time.sleep(5) # Give workload time to start
attach_detach = {
"attach_pid": process_workload.pid,
"attach-duration-msec": attach_detach_interval_msec_no_delay,
}
# Profiling step (may fail)
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_hip_dynamic_shared",
attach_detach_para=attach_detach,
)
finally:
if process_workload and process_workload.poll() is None:
print(f"[finally] killing workload pid={process_workload.pid}")
process_workload.kill()
process_workload.wait()
# Validate CSVs & output correctness
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
# Check that launch-stat sets were applied
config_file = f"{workload_dir}/profiling_config.yaml"
for tag in [
"7.1.0",
"7.1.1",
"7.1.2",
"7.1.5",
"7.1.6",
"7.1.7",
"7.1.8",
"7.1.9",
]:
assert test_utils.check_file_pattern(f"- {tag}", config_file)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.sets_func
class TestSetsIntegration:
def test_memory_throughput_set(self, binary_handler_profile_rocprof_compute):
options = ["--set", "mem_thruput"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
)
assert test_utils.get_num_pmc_file(workload_dir) == 1
memory_metrics = ["16.1.2", "17.1.0"]
for metric_id in memory_metrics:
assert metric_id in open(Path(workload_dir) / "log.txt").read(), (
f"Expected memory metric {metric_id} not found"
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
def test_launch_stats_set(self, binary_handler_profile_rocprof_compute):
options = ["--set", "launch_stats"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
)
assert test_utils.get_num_pmc_file(workload_dir) == 1
test_utils.clean_output_dir(config["cleanup"], workload_dir)
def test_compute_thruput_util_set(self, binary_handler_profile_rocprof_compute):
options = ["--set", "compute_thruput_util"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
)
assert test_utils.get_num_pmc_file(workload_dir) == 1
assert test_utils.check_file_pattern(
"- 11.2.3", f"{workload_dir}/profiling_config.yaml"
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
def test_compute_thruput_flops_set(self, binary_handler_profile_rocprof_compute):
options = ["--set", "compute_thruput_flops"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
)
assert test_utils.get_num_pmc_file(workload_dir) == 1
test_utils.clean_output_dir(config["cleanup"], workload_dir)
def test_invalid_set_error_handling(self, binary_handler_profile_rocprof_compute):
options = ["--set", "nonexistent_set"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=False,
roof=False,
)
assert returncode == 1
test_utils.clean_output_dir(config["cleanup"], workload_dir)
def test_set_and_block_mutual_exclusion(
self, binary_handler_profile_rocprof_compute
):
options = ["--set", "compute_thruput_util", "--block", "12"]
workload_dir = test_utils.get_output_dir()
returncode = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=False
)
assert returncode == 1
test_utils.clean_output_dir(config["cleanup"], workload_dir)
def test_list_sets_functionality(self, binary_handler_profile_rocprof_compute):
options = ["--list-sets"]
workload_dir = test_utils.get_output_dir()
binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=False,
roof=False,
)
# workload dir should not exist
assert not Path(workload_dir).exists()
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.iteration_multiplexing_1
def test_profiler_options(binary_handler_profile_rocprof_compute):
options = ["--no-native-tool", "--iteration-multiplexing"]
workload_dir = test_utils.get_output_dir()
code = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=False, roof=False
)
assert code == 1
@pytest.mark.iteration_multiplexing_1
def test_iteration_multiplexing(binary_handler_profile_rocprof_compute):
options = ["--iteration-multiplexing"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.iteration_multiplexing_1
def test_iteration_multiplexing_kernel(binary_handler_profile_rocprof_compute):
options = ["--iteration-multiplexing", "kernel"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.iteration_multiplexing_1
def test_iteration_multiplexing_kernel_launch_params(
binary_handler_profile_rocprof_compute,
):
options = ["--iteration-multiplexing", "kernel_launch_params"]
workload_dir = test_utils.get_output_dir()
_ = binary_handler_profile_rocprof_compute(
config, workload_dir, options, check_success=True, roof=False
)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)
if soc == "MI100":
assert sorted(list(file_dict.keys())) == CSVS
elif soc == "MI200":
assert sorted(list(file_dict.keys())) == CSVS
elif "MI300" in soc:
assert sorted(list(file_dict.keys())) == CSVS
elif "MI350" in soc:
assert sorted(list(file_dict.keys())) == CSVS
else:
print(f"Testing isn't supported yet for {soc}")
assert 0
validate(
inspect.stack()[0][3],
workload_dir,
file_dict,
)
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@pytest.mark.iteration_multiplexing_2
def test_iteration_multiplexing_deterministic_counter_accuracy(
binary_handler_profile_rocprof_compute,
):
# These metrics should cover the deterministic counters being checked
options = ["--block", "6.1.5", "6.1.6", "7.2.2", "10.1"]
workload_dir = test_utils.get_output_dir(param_id="no_iter_mplx")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn",
)
counters_no_multiplexing = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
options = [
"--block",
"6.1.5",
"6.1.6",
"7.2.2",
"10.1",
"--iteration-multiplexing",
"kernel",
]
workload_dir = test_utils.get_output_dir(param_id="iter_mplx_kernel")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn_iter",
)
counters_kernel = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
options = [
"--block",
"6.1.5",
"6.1.6",
"7.2.2",
"10.1",
"--iteration-multiplexing",
"kernel_launch_params",
]
workload_dir = test_utils.get_output_dir(param_id="iter_mplx_params")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn_iter",
)
counters_kernel_launch_params = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
assert are_deterministic_counters_equal(
[counters_kernel, counters_kernel_launch_params], counters_no_multiplexing
)
@pytest.mark.iteration_multiplexing_stochastic
def test_iteration_multiplexing_stochastic_counter_accuracy(
binary_handler_profile_rocprof_compute,
):
workload_dir = test_utils.get_output_dir(param_id="no_iter_mplx")
# These metrics should cover the L1 cache stochastic counters
options = ["--block", "16.1", "16.3"]
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn",
)
counters_no_multiplexing = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
options = ["--block", "16.1", "16.3", "--iteration-multiplexing", "kernel"]
workload_dir = test_utils.get_output_dir(param_id="iter_mplx_kernel")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn_iter",
)
counters_kernel = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
options = [
"--block",
"16.1",
"16.3",
"--iteration-multiplexing",
"kernel_launch_params",
]
workload_dir = test_utils.get_output_dir(param_id="iter_mplx_params")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn_iter",
)
counters_kernel_launch_params = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
assert are_stochastic_counters_similar(
[counters_kernel, counters_kernel_launch_params], counters_no_multiplexing
)
# Not part of automated test runs since testing all counters is expensive
def test_iteration_multiplexing_all_counter_accuracy(
binary_handler_profile_rocprof_compute,
):
workload_dir = test_utils.get_output_dir(param_id="no_iter_mplx")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
check_success=True,
roof=False,
app_name="app_laplace_eqn",
)
counters_no_multiplexing = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
options = ["--iteration-multiplexing", "kernel"]
workload_dir = test_utils.get_output_dir(param_id="iter_mplx_kernel")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn_iter",
)
counters_kernel = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
options = ["--iteration-multiplexing", "kernel_launch_params"]
workload_dir = test_utils.get_output_dir(param_id="iter_mplx_params")
_ = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
roof=False,
app_name="app_laplace_eqn_iter",
)
counters_kernel_launch_params = test_utils.check_csv_files(
workload_dir, num_devices, num_kernels
)["pmc_perf.csv"]
test_utils.clean_output_dir(config["cleanup"], workload_dir)
assert are_deterministic_counters_equal(
[counters_kernel, counters_kernel_launch_params], counters_no_multiplexing
)
assert are_stochastic_counters_similar(
[counters_kernel, counters_kernel_launch_params], counters_no_multiplexing
)
skip_if_no_torch = pytest.mark.skipif(
importlib.util.find_spec("torch") is None, reason="torch is required for this test"
)
@skip_if_no_torch
def test_torch_trace_profile(binary_handler_profile_rocprof_compute):
"""
Test profiling a PyTorch application with --torch-trace option.
Verifies that all required files are generated and counter values are valid.
NOTE: Not included in the test suite since this requires PyTorch installation.
"""
workload_dir = test_utils.get_output_dir(param_id="torch_ops")
Path(workload_dir).mkdir(parents=True, exist_ok=True)
torch_app_path = Path(workload_dir) / "test_torch_app.py"
torch_app_code = """
import torch
import torch.nn as nn
import torch.nn.functional as F
class SimpleNet(nn.Module):
def __init__(self):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(10, 20)
self.fc2 = nn.Linear(20, 10)
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return x
if __name__ == "__main__":
if not torch.cuda.is_available():
import sys
print("GPU is required for this test. Exiting.")
sys.exit(1)
model = SimpleNet()
model = model.cuda()
x = torch.randn(5, 10).cuda()
# Run a few iterations
for epoch in range(1):
output = model(x)
loss = output.sum()
loss.backward()
print("Training completed")
"""
with open(torch_app_path, "w") as f:
f.write(torch_app_code)
config["torch_test_app"] = ["python3", str(torch_app_path)]
# Profile with --torch-trace option
options = [
"--torch-trace",
]
returncode = binary_handler_profile_rocprof_compute(
config,
workload_dir,
options,
check_success=True,
app_name="torch_test_app",
)
assert returncode == 0, "Profiling the torch application failed"
# Verify files are generated
# 1. Check basic CSV files
num_devices = config.get("num_devices", 1)
file_dict = test_utils.check_csv_files(workload_dir, num_devices, 1)
assert "pmc_perf.csv" in file_dict, "pmc_perf.csv not generated"
# 2. Check torch trace directory
torch_trace_dir = Path(workload_dir) / "torch_trace"
assert torch_trace_dir.exists(), "torch_trace directory not created"
assert torch_trace_dir.is_dir(), "torch_trace is not a directory"
# 3. Check per-operator CSV files exist
operator_csv_files = list(torch_trace_dir.glob("*.csv"))
assert len(operator_csv_files) > 0, "No per-operator CSV files generated"
# 4. Verify per-operator CSV structure
for op_csv in operator_csv_files:
op_df = pd.read_csv(op_csv)
assert len(op_df) > 0, f"Per-operator CSV {op_csv.name} is empty"
test_utils.clean_output_dir(config["cleanup"], workload_dir)
@skip_if_no_torch
def test_torch_trace_overhead(binary_handler_profile_rocprof_compute):
"""
Measure overhead introduced by --torch-trace flag.
Compares execution time with and without the flag to ensure overhead is acceptable.
NOTE: Not included in the test suite since this requires PyTorch installation.
"""
helper_dir = Path(test_utils.get_output_dir(param_id="torch_helper_script"))
helper_dir.mkdir(parents=True, exist_ok=True)
torch_app_path = helper_dir / "test_torch_app.py"
torch_app_code = """
import torch
import torch.nn as nn
import torch.nn.functional as F
class SimpleNet(nn.Module):
def __init__(self):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(10, 20)
self.fc2 = nn.Linear(20, 10)
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return x
if __name__ == "__main__":
if not torch.cuda.is_available():
import sys
print("GPU is required for this test. Exiting.")
sys.exit(1)
model = SimpleNet()
model = model.cuda()
x = torch.randn(5, 10).cuda()
# Run a few iterations
for epoch in range(1):
output = model(x)
loss = output.sum()
loss.backward()
print("Training completed")
"""
with open(torch_app_path, "w") as f:
f.write(torch_app_code)
config["torch_test_app"] = ["python3", str(torch_app_path)]
# Run WITHOUT --torch-trace (baseline)
workload_dir_baseline = test_utils.get_output_dir(param_id="torch_baseline")
start_baseline = time.time()
returncode_baseline = binary_handler_profile_rocprof_compute(
config,
workload_dir_baseline,
[], # No torch-trace flag
check_success=True,
roof=False,
app_name="torch_test_app",
)
baseline_time = time.time() - start_baseline
assert returncode_baseline == 0, "Baseline profiling failed"
# Read baseline timestamps
baseline_df = pd.read_csv(f"{workload_dir_baseline}/pmc_perf.csv")
baseline_kernel_duration_total = (
baseline_df["End_Timestamp"].max() - baseline_df["Start_Timestamp"].min()
)
test_utils.clean_output_dir(config["cleanup"], workload_dir_baseline)
# Run WITH --torch-trace
workload_dir_with_flag = test_utils.get_output_dir(param_id="torch_with_ops")
start_with_flag = time.time()
returncode_with_flag = binary_handler_profile_rocprof_compute(
config,
workload_dir_with_flag,
["--torch-trace"],
check_success=True,
roof=False,
app_name="torch_test_app",
)
with_flag_time = time.time() - start_with_flag
assert returncode_with_flag == 0, "Profiling with torch-trace failed"
# Read with-flag timestamps
with_flag_df = pd.read_csv(f"{workload_dir_with_flag}/pmc_perf.csv")
with_flag_kernel_duration_total = (
with_flag_df["End_Timestamp"].max() - with_flag_df["Start_Timestamp"].min()
)
longest_running_kernel_baseline = (
baseline_df["End_Timestamp"] - baseline_df["Start_Timestamp"]
).max()
longest_running_kernel_with_flag = (
with_flag_df["End_Timestamp"] - with_flag_df["Start_Timestamp"]
).max()
# Calculate overheads
longest_running_kernel_overhead = (
(longest_running_kernel_with_flag - longest_running_kernel_baseline)
/ longest_running_kernel_baseline
) * 100
wall_clock_overhead = ((with_flag_time - baseline_time) / baseline_time) * 100
kernel_overhead = (
(with_flag_kernel_duration_total - baseline_kernel_duration_total)
/ baseline_kernel_duration_total
) * 100
print(f"\n{'=' * 70}")
print("Performance Overhead Analysis:")
print(f" Longest running kernel overhead: {longest_running_kernel_overhead:.1f}%")
print(f" Baseline wall-clock time: {baseline_time:.2f}s")
print(f" With --torch-trace time: {with_flag_time:.2f}s")
print(f" Wall-clock overhead: {wall_clock_overhead:.1f}%")
print(f" Baseline kernel duration: {baseline_kernel_duration_total:.0f} ns")
print(f" With flag kernel duration: {with_flag_kernel_duration_total:.0f} ns")
print(f" Kernel execution overhead: {kernel_overhead:.1f}%")
print(f"{'=' * 70}\n")
# Verify torch trace directory was created
torch_trace_dir = Path(workload_dir_with_flag) / "torch_trace"
assert torch_trace_dir.exists(), "torch_trace directory should be created"
operator_csv_files = list(torch_trace_dir.glob("*.csv"))
assert len(operator_csv_files) > 0, "Operator CSV files should be generated"
test_utils.clean_output_dir(config["cleanup"], workload_dir_with_flag)
# Assert overhead is reasonable (< 100% wall-clock, < 50% kernel)
assert wall_clock_overhead < 100, (
f"Wall-clock overhead too high: {wall_clock_overhead:.1f}%"
)
assert kernel_overhead < 50, (
f"Kernel execution overhead too high: {kernel_overhead:.1f}%"
)
assert longest_running_kernel_overhead < 50, (
f"longest running kernel increase too high: "
f"{longest_running_kernel_overhead:.1f}%"
)