[rocprofiler-compute] Data imputation strategy for iteration multiplexing (#2468)

* Data imputation strategy for iteration multiplexing

* Implement data imputation methodology to handle missing counter values
  in case of iteration multiplexing

* Enable dispatch filtering with iteration multiplexing since we are no
  longer merging dispatches

* Bugfix to prevent check for missing counter values when using csv
  format when profiling with iteration multiplexing

* Move warning and info message in case of iteration multiplexing to
  sanitize function which comes earlier in analyze mode

* Address review comments

* Fix typo in documentation

* Move profiling config init. after path check in sanitize()

* Graceful handling of dispatches with all counters empty within data
  imputation logic

* Improve info message for iteration multiplexing based analysis

* Ensure proper error message when trying to run iteration multiplexing with attach/detach

* fix test case
このコミットが含まれているのは:
vedithal-amd
2026-01-08 12:01:51 -05:00
committed by GitHub
コミット 769d3dd67a
11個のファイルの変更119行の追加126行の削除
-5
ファイルの表示
@@ -346,11 +346,6 @@ Show System Speed-of-Light and CS_Busy blocks only
this case, ``1`` is the ID for System Speed-of-Light and ``5.1.0`` the ID for
GPU Busy Cycles metric.
.. note::
Dispatch filtering via ``-d`` or ``--dispatch`` is not supported for profiling
data collected with ``--iteration-multiplexing`` option.
Filter kernels
First, list the top kernels in your application using `--list-stats`.
+2 -1
ファイルの表示
@@ -42,5 +42,6 @@ The analyze options for attach/detach are completely compatible with the non-att
.. note::
* Live Attach Detach feature is currently in BETA version. To enable Live/Attach Detach, you need to have the correct supported proper version of ROCprofiler-SDK and rocprofiler-register.
* To make the Live Attach/Detach feature work, you must use "--block" or a single path to limit the number of counter input files to one. This limitation will be removed in a later version with implementations such as Iteration Multiplexing.
* Live Attach/Detach does not work with --iteration-multiplexing option. This is because --iteration-multiplexing uses native counter collection tool which currently does not support attach/detach feature.
* To make the Live Attach/Detach feature work, you must restrict the number of counter input files (which determine number of application runs) to one. This can be achieved with options such as: "--block", "--set".
* Due to the limitation of ROCprofiler-SDK, the attach can now only happen before Heterogeneous System Architecture (HSA) initialization. HSA initialization happens before the execution of the first HIP kernel call. It only happens once to save all the kernels' function signature, such as the function name and other launch parameters. Attaching after this stage misses all crucial information of the HIP kernel and makes it impossible to store the output. This limitation will be solved in later releases of ROCprofiler-SDK.
+4 -4
ファイルの表示
@@ -703,6 +703,10 @@ By default, if no policy is specified, ROCm Compute Profiler uses the ``kernel_l
Iteration multiplexing is only supported when using ROCm Compute Profiler with
the native counter collection tool. Ensure that ``--no-native-tool`` is not used in your profiling command.
* Do not use ``--attach-pid`` with ``--iteration-multiplexing``.
Iteration multiplexing is only supported when using ROCm Compute Profiler with
the native counter collection tool. Ensure that ``--attach-pid`` is not used in your profiling command.
* Ensure that your workload runs for enough iterations to cover all counter subsets.
When using iteration multiplexing, the total number of iterations, for each kernel (for ``kernel`` policy)
or for each unique kernel and launch parameters combination (for ``kernel_launch_params`` policy),
@@ -786,7 +790,3 @@ Iteration multiplexing feature comes with some caveats to be considered when pro
* **Non-deterministic workloads**
Workloads which dispatch kernels with non-deterministic names and launch parameters may trigger warnings for insufficient dispatch counts because iteration multiplexing identifies unique kernels by their names and optionally by their launch parameters; this is especially true of large AI workloads that dispatch kernels non-deterministically based on the model layers being used for the current input, and in such cases kernel filtering of common kernels is recommended.
* **Cannot use with dispatch filtering**
It is not possible to use dispatch filtering mentioned in :ref:`Filtering <filtering>` with iteration multiplexing, because iteration multiplexing merges counters across dispatches, making it impossible to isolate specific dispatches for profiling and analysis, so attempting to combine them will result in an error.
-2
ファイルの表示
@@ -271,7 +271,6 @@ Examples:
required=False,
nargs="?",
choices=[
# "simple",
"kernel",
"kernel_launch_params",
],
@@ -279,7 +278,6 @@ Examples:
help=(
"\t\t\tChoose the iteration multiplexing policy: "
"(DEFAULT: kernel_launch_params).\n"
# "\t\t\t simple (i.e. Round robin over all kernel dispatches\n"
"\t\t\t kernel (i.e. Round robin counters over kernel calls with "
"unique kernel names.)\n"
"\t\t\t kernel_launch_params (i.e. Round robin counters over "
+40 -45
ファイルの表示
@@ -49,8 +49,8 @@ from utils.roofline_calc import validate_roofline_csv
from utils.utils import (
get_panel_alias,
get_uuid,
impute_counters_iteration_multiplex,
is_workload_empty,
merge_counters_iteration_multiplex,
merge_counters_spatial_multiplex,
)
@@ -86,7 +86,6 @@ class OmniAnalyze_Base:
self.__args = args
self._runs: OrderedDict[str, schema.Workload] = OrderedDict()
self._arch_configs: dict[str, schema.ArchConfig] = {}
self._profiling_config: dict[str, Any] = {}
self.__supported_archs = supported_archs
self._output: Optional[TextIO] = None
self.__socs: Optional[dict[str, OmniSoC_Base]] = None
@@ -94,6 +93,9 @@ class OmniAnalyze_Base:
def get_args(self) -> argparse.Namespace:
return self.__args
def get_profiling_config(self) -> dict[str, Any]:
return self._profiling_config
def set_soc(self, omni_socs: dict[str, OmniSoC_Base]) -> None:
self.__socs = omni_socs
@@ -105,10 +107,10 @@ class OmniAnalyze_Base:
return merge_counters_spatial_multiplex(df)
@demarcate
def iteration_multiplex_merge_counters(
def iteration_multiplex_impute_counters(
self, df: pd.DataFrame, policy: str
) -> pd.DataFrame:
return merge_counters_iteration_multiplex(df, policy)
return impute_counters_iteration_multiplex(df, policy)
@demarcate
def generate_configs(
@@ -214,6 +216,7 @@ class OmniAnalyze_Base:
@demarcate
def load_options(self, normalization_filter: Optional[str]) -> None:
args = self.get_args()
profiling_config = self.get_profiling_config()
target_filter = normalization_filter or args.normal_unit
for arch_config in self._arch_configs.values():
@@ -221,7 +224,7 @@ class OmniAnalyze_Base:
arch_config.dfs,
arch_config.dfs_type,
target_filter,
self._profiling_config,
profiling_config,
)
# Error checking for multiple runs and multiple kernel filters
if args.gpu_kernel and (len(args.path) != len(args.gpu_kernel)):
@@ -320,6 +323,7 @@ class OmniAnalyze_Base:
def sanitize(self) -> None:
"""Perform sanitization of inputs"""
args = self.get_args()
if args.tui:
return
@@ -348,10 +352,17 @@ class OmniAnalyze_Base:
console_error("analysis", "You cannot provide the same path twice.")
seen_paths.add(dir_info[0])
self._profiling_config: dict[str, Any] = file_io.load_profiling_config(
args.path[0][0]
)
profiling_config = self.get_profiling_config()
for dir_info in args.path:
if not any([
args.nodes,
args.list_nodes,
args.spatial_multiplexing,
profiling_config.get("iteration_multiplexing"),
]):
is_workload_empty(dir_info[0])
@@ -396,6 +407,30 @@ class OmniAnalyze_Base:
"Please choose a different name."
)
# Check if any kernel's counters are missing due to iteration multiplexing
if (
profiling_config.get("iteration_multiplexing") is not None
and profiling_config.get("kernels_with_missing_counters") is not None
):
missing_kernels = profiling_config.get("kernels_with_missing_counters")
console_warning(
"analysis",
(
"The following kernels have missing counter data "
"due to iteration multiplexing and should be filtered out: "
f"{', '.join(missing_kernels)}"
),
)
if profiling_config.get("iteration_multiplexing") is not None:
console_log(
"analysis",
(
"Profiling data was collected using iteration multiplexing.\n\t"
"Metrics are calculated based on partially available counter data."
),
)
# ----------------------------------------------------
# Required methods to be implemented by child classes
# ----------------------------------------------------
@@ -415,37 +450,6 @@ class OmniAnalyze_Base:
elif args.output_format == "stdout":
self._output = sys.stdout
# Read profiling config
self._profiling_config = file_io.load_profiling_config(args.path[0][0])
# Check dispatch filtering isn't used with iteration multiplexing
if (
self._profiling_config.get("iteration_multiplexing") is not None
and args.gpu_dispatch_id
):
console_error(
"analysis",
"Dispatch filtering (-d/--dispatch) cannot be used "
"with profiling data collected with iteration multiplexing.",
)
# Check if any kernel's counters are missing due to iteration multiplexing
if (
self._profiling_config.get("iteration_multiplexing") is not None
and self._profiling_config.get("kernels_with_missing_counters") is not None
):
missing_kernels = self._profiling_config.get(
"kernels_with_missing_counters"
)
console_warning(
"analysis",
(
"The following kernels have missing counter data "
"due to iteration multiplexing and should be filtered out: "
f"{', '.join(missing_kernels)}"
),
)
# initalize runs
self._runs = self.initalize_runs()
@@ -473,12 +477,3 @@ class OmniAnalyze_Base:
def run_analysis(self) -> None:
"""Run analysis."""
console_debug("analysis", "generating analysis")
if self._profiling_config.get("iteration_multiplexing") is not None:
console_log(
"analysis",
(
"Profiling data was collected using iteration multiplexing. "
"Some metrics may represent aggregated values "
"across multiple iterations."
),
)
+1 -1
ファイルの表示
@@ -61,7 +61,7 @@ class cli_analysis(OmniAnalyze_Base):
)
if self._profiling_config.get("iteration_multiplexing") is not None:
workload.raw_pmc = self.iteration_multiplex_merge_counters(
workload.raw_pmc = self.iteration_multiplex_impute_counters(
workload.raw_pmc,
policy=self._profiling_config["iteration_multiplexing"],
)
+1 -1
ファイルの表示
@@ -237,7 +237,7 @@ class db_analysis(OmniAnalyze_Base):
)
if self._profiling_config.get("iteration_multiplexing") is not None:
raw_pmc = self.iteration_multiplex_merge_counters(
raw_pmc = self.iteration_multiplex_impute_counters(
raw_pmc,
policy=self._profiling_config["iteration_multiplexing"],
)
+3 -1
ファイルの表示
@@ -355,7 +355,9 @@ class webui_analysis(OmniAnalyze_Base):
)
if self._profiling_config.get("iteration_multiplexing") is not None:
self._runs[self.dest_dir].raw_pmc = self.iteration_multiplex_merge_counters(
self._runs[
self.dest_dir
].raw_pmc = self.iteration_multiplex_impute_counters(
self._runs[self.dest_dir].raw_pmc,
policy=self._profiling_config["iteration_multiplexing"],
)
+6
ファイルの表示
@@ -104,6 +104,12 @@ class RocProfCompute_Base:
"Please remove one of these options."
)
if args.attach_pid and args.iteration_multiplexing is not None:
console_error(
"--attach-pid cannot be used with --iteration-multiplexing. "
"Please remove one of these options."
)
# verify correct formatting for application binary
args.remaining = args.remaining[1:]
if args.remaining:
+61 -65
ファイルの表示
@@ -1373,13 +1373,12 @@ def reverse_multi_index_df_pmc(
return dfs, coll_levels
def merge_counters_iteration_multiplex(
def impute_counters_iteration_multiplex(
df_multi_index: pd.DataFrame,
policy: str,
) -> pd.DataFrame:
"""
For iteration multiplexing, this merges counter values for the kernel collected
over multiple iterations.
Perform data imputation for missing counter values due to iteration multiplexing.
"""
non_counter_column_index = [
"Dispatch_ID",
@@ -1396,25 +1395,17 @@ def merge_counters_iteration_multiplex(
"End_Timestamp",
"Kernel_ID",
]
result_dfs: list[pd.DataFrame] = []
# TODO: will need to optimize to avoid this conversion to single index format
# and do merge directly on multi-index dataframe
dfs, coll_levels = reverse_multi_index_df_pmc(df_multi_index)
for df in dfs:
kernel_name_column_name = "Kernel_Name"
if "Kernel_Name" not in df and "Name" in df:
kernel_name_column_name = "Name"
# Find the values in Kernel_Name that occur more than once
# Group by unique kernel configurations
unique_occurences = (
df.groupby(kernel_name_column_name)
df.groupby("Kernel_Name")
if policy == "kernel"
else df.groupby(
[
kernel_name_column_name,
"Kernel_Name",
"Grid_Size",
"Workgroup_Size",
"LDS_Per_Workgroup",
@@ -1423,64 +1414,69 @@ def merge_counters_iteration_multiplex(
)
)
# Define a list to store the merged rows
result_data: list[dict[str, Any]] = []
counter_columns = [
col for col in df.columns if col not in non_counter_column_index
]
# Collect imputed groups as dataframes
group_dfs = []
pd.set_option("display.max_columns", None)
for _, group in unique_occurences:
# Identify counter buckets
counter_groups: set[frozenset[str]] = set()
for _, row in group.iterrows():
# Set of counter column names with non empty values
cols_frozenset = frozenset(
row[counter_columns][row[counter_columns].notna()].index
)
# If no counters found for this dispatch, continue
if not cols_frozenset:
continue
# Since counter buckets are repeated in round robin fashion,
# we can stop once we see a repeated bucket
if cols_frozenset in counter_groups:
break
counter_groups.add(cols_frozenset)
# Reset Dispatch_ID
dispatch_id_counter = 0
# If no counters found for this group, continue
if not counter_groups:
continue
for name, group in unique_occurences:
# Create a dictionary to store the merged row for the current group
merged_row: dict[str, Any] = {}
# Iterate over subgroups of dispatches containing
# all counters and impute missing values
subgroup_size = len(counter_groups)
all_counters = {
counter for counter_group in counter_groups for counter in counter_group
}
# Collect imputed sub-groups as dataframes
subgroup_dfs = []
for i in range(0, len(group), subgroup_size):
subgroup = group.iloc[i : i + subgroup_size]
# Process non-counter columns
for col in non_counter_column_index:
if col == "End_Timestamp":
# For End_Timestamp, calculate the median delta time
delta_time = group[col] - group["Start_Timestamp"]
merged_row[col] = group["Start_Timestamp"] + delta_time.median()
if col == "Dispatch_ID":
# Assign new Dispatch_ID
merged_row[col] = dispatch_id_counter
dispatch_id_counter += 1
elif pd.api.types.is_numeric_dtype(group[col]):
# For other non-counter numeric columns, take the median value
merged_row[col] = group[col].median()
if pd.api.types.is_integer_dtype(group[col]):
merged_row[col] = merged_row[col].astype(int)
else:
# For other non-counter non-numeric columns,
# take the first occurrence (0th row)
# Only Kernel_Name should be non-numeric here
merged_row[col] = group.iloc[0][col]
# Build imputation mapping once for all counters in this subgroup
fill_values = {}
for counter in all_counters:
valid_mask = subgroup[counter].notna()
if valid_mask.any():
# Get the first valid value for this counter
fill_values[counter] = subgroup.loc[valid_mask, counter].iloc[0]
# Process counter columns (assumed to be all columns not in
# non_counter_column_index)
counter_columns = [
col for col in group.columns if col not in non_counter_column_index
]
for counter_col in counter_columns:
# For counter columns, calculate median only across non-NaN values
# Preserve original data type
valid_values = group[counter_col].dropna()
if not valid_values.empty:
median_value = valid_values.median()
# Preserve original data type - check if all
# non-null values are integers
if (valid_values == valid_values.astype(int)).all():
merged_row[counter_col] = int(median_value)
else:
merged_row[counter_col] = median_value
else:
merged_row[counter_col] = None
# Apply all fills at once using vectorized fillna
if fill_values:
subgroup = subgroup.fillna(fill_values)
# Append the merged row to the result list
result_data.append(merged_row)
subgroup_dfs.append(subgroup)
# Create a new DataFrame from the merged rows
result_dfs.append(pd.DataFrame(result_data))
# Concatenate all subgroups for this group
if subgroup_dfs:
# Add the imputed group dataframe
group_dfs.append(pd.concat(subgroup_dfs, ignore_index=True))
# Create a new dataframe by concatenating all groups
result_dfs.append(
pd.concat(group_dfs, ignore_index=True)
if group_dfs
else pd.DataFrame(df.columns)
)
final_df = pd.concat(result_dfs, keys=coll_levels, axis=1, copy=False)
return final_df
+1 -1
ファイルの表示
@@ -1677,7 +1677,7 @@ def test_iteration_multiplexing(binary_handler_analyze_rocprof_compute):
"--path",
workload_dir,
])
assert code == 1
assert code == 0
# Test without dispatch filtering
code = binary_handler_analyze_rocprof_compute([