From 769d3dd67a35fb706dc1b8ebcc20612cbe4ad9cd Mon Sep 17 00:00:00 2001 From: vedithal-amd Date: Thu, 8 Jan 2026 12:01:51 -0500 Subject: [PATCH] [rocprofiler-compute] Data imputation strategy for iteration multiplexing (#2468) * Data imputation strategy for iteration multiplexing * Implement data imputation methodology to handle missing counter values in case of iteration multiplexing * Enable dispatch filtering with iteration multiplexing since we are no longer merging dispatches * Bugfix to prevent check for missing counter values when using csv format when profiling with iteration multiplexing * Move warning and info message in case of iteration multiplexing to sanitize function which comes earlier in analyze mode * Address review comments * Fix typo in documentation * Move profiling config init. after path check in sanitize() * Graceful handling of dispatches with all counters empty within data imputation logic * Improve info message for iteration multiplexing based analysis * Ensure proper error message when trying to run iteration multiplexing with attach/detach * fix test case --- .../docs/how-to/analyze/cli.rst | 5 - .../docs/how-to/live_attach_detach.rst | 3 +- .../docs/how-to/profile/mode.rst | 8 +- projects/rocprofiler-compute/src/argparser.py | 2 - .../rocprof_compute_analyze/analysis_base.py | 85 ++++++------ .../rocprof_compute_analyze/analysis_cli.py | 2 +- .../rocprof_compute_analyze/analysis_db.py | 2 +- .../rocprof_compute_analyze/analysis_webui.py | 4 +- .../rocprof_compute_profile/profiler_base.py | 6 + .../rocprofiler-compute/src/utils/utils.py | 126 +++++++++--------- .../tests/test_analyze_commands.py | 2 +- 11 files changed, 119 insertions(+), 126 deletions(-) diff --git a/projects/rocprofiler-compute/docs/how-to/analyze/cli.rst b/projects/rocprofiler-compute/docs/how-to/analyze/cli.rst index 22a57c5408..984d226aaa 100644 --- a/projects/rocprofiler-compute/docs/how-to/analyze/cli.rst +++ b/projects/rocprofiler-compute/docs/how-to/analyze/cli.rst @@ -346,11 +346,6 @@ Show System Speed-of-Light and CS_Busy blocks only this case, ``1`` is the ID for System Speed-of-Light and ``5.1.0`` the ID for GPU Busy Cycles metric. -.. note:: - - Dispatch filtering via ``-d`` or ``--dispatch`` is not supported for profiling - data collected with ``--iteration-multiplexing`` option. - Filter kernels First, list the top kernels in your application using `--list-stats`. diff --git a/projects/rocprofiler-compute/docs/how-to/live_attach_detach.rst b/projects/rocprofiler-compute/docs/how-to/live_attach_detach.rst index 9ffae34b93..ec61972a0b 100644 --- a/projects/rocprofiler-compute/docs/how-to/live_attach_detach.rst +++ b/projects/rocprofiler-compute/docs/how-to/live_attach_detach.rst @@ -42,5 +42,6 @@ The analyze options for attach/detach are completely compatible with the non-att .. note:: * Live Attach Detach feature is currently in BETA version. To enable Live/Attach Detach, you need to have the correct supported proper version of ROCprofiler-SDK and rocprofiler-register. - * To make the Live Attach/Detach feature work, you must use "--block" or a single path to limit the number of counter input files to one. This limitation will be removed in a later version with implementations such as Iteration Multiplexing. + * Live Attach/Detach does not work with --iteration-multiplexing option. This is because --iteration-multiplexing uses native counter collection tool which currently does not support attach/detach feature. + * To make the Live Attach/Detach feature work, you must restrict the number of counter input files (which determine number of application runs) to one. This can be achieved with options such as: "--block", "--set". * Due to the limitation of ROCprofiler-SDK, the attach can now only happen before Heterogeneous System Architecture (HSA) initialization. HSA initialization happens before the execution of the first HIP kernel call. It only happens once to save all the kernels' function signature, such as the function name and other launch parameters. Attaching after this stage misses all crucial information of the HIP kernel and makes it impossible to store the output. This limitation will be solved in later releases of ROCprofiler-SDK. diff --git a/projects/rocprofiler-compute/docs/how-to/profile/mode.rst b/projects/rocprofiler-compute/docs/how-to/profile/mode.rst index 95cf6a1b6d..bb20084291 100644 --- a/projects/rocprofiler-compute/docs/how-to/profile/mode.rst +++ b/projects/rocprofiler-compute/docs/how-to/profile/mode.rst @@ -703,6 +703,10 @@ By default, if no policy is specified, ROCm Compute Profiler uses the ``kernel_l Iteration multiplexing is only supported when using ROCm Compute Profiler with the native counter collection tool. Ensure that ``--no-native-tool`` is not used in your profiling command. + * Do not use ``--attach-pid`` with ``--iteration-multiplexing``. + Iteration multiplexing is only supported when using ROCm Compute Profiler with + the native counter collection tool. Ensure that ``--attach-pid`` is not used in your profiling command. + * Ensure that your workload runs for enough iterations to cover all counter subsets. When using iteration multiplexing, the total number of iterations, for each kernel (for ``kernel`` policy) or for each unique kernel and launch parameters combination (for ``kernel_launch_params`` policy), @@ -786,7 +790,3 @@ Iteration multiplexing feature comes with some caveats to be considered when pro * **Non-deterministic workloads** Workloads which dispatch kernels with non-deterministic names and launch parameters may trigger warnings for insufficient dispatch counts because iteration multiplexing identifies unique kernels by their names and optionally by their launch parameters; this is especially true of large AI workloads that dispatch kernels non-deterministically based on the model layers being used for the current input, and in such cases kernel filtering of common kernels is recommended. - -* **Cannot use with dispatch filtering** - - It is not possible to use dispatch filtering mentioned in :ref:`Filtering ` with iteration multiplexing, because iteration multiplexing merges counters across dispatches, making it impossible to isolate specific dispatches for profiling and analysis, so attempting to combine them will result in an error. \ No newline at end of file diff --git a/projects/rocprofiler-compute/src/argparser.py b/projects/rocprofiler-compute/src/argparser.py index 3b89f87dbf..9d118986a0 100644 --- a/projects/rocprofiler-compute/src/argparser.py +++ b/projects/rocprofiler-compute/src/argparser.py @@ -271,7 +271,6 @@ Examples: required=False, nargs="?", choices=[ - # "simple", "kernel", "kernel_launch_params", ], @@ -279,7 +278,6 @@ Examples: help=( "\t\t\tChoose the iteration multiplexing policy: " "(DEFAULT: kernel_launch_params).\n" - # "\t\t\t simple (i.e. Round robin over all kernel dispatches\n" "\t\t\t kernel (i.e. Round robin counters over kernel calls with " "unique kernel names.)\n" "\t\t\t kernel_launch_params (i.e. Round robin counters over " diff --git a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_base.py b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_base.py index 3256b17226..7cfe42403d 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_base.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_base.py @@ -49,8 +49,8 @@ from utils.roofline_calc import validate_roofline_csv from utils.utils import ( get_panel_alias, get_uuid, + impute_counters_iteration_multiplex, is_workload_empty, - merge_counters_iteration_multiplex, merge_counters_spatial_multiplex, ) @@ -86,7 +86,6 @@ class OmniAnalyze_Base: self.__args = args self._runs: OrderedDict[str, schema.Workload] = OrderedDict() self._arch_configs: dict[str, schema.ArchConfig] = {} - self._profiling_config: dict[str, Any] = {} self.__supported_archs = supported_archs self._output: Optional[TextIO] = None self.__socs: Optional[dict[str, OmniSoC_Base]] = None @@ -94,6 +93,9 @@ class OmniAnalyze_Base: def get_args(self) -> argparse.Namespace: return self.__args + def get_profiling_config(self) -> dict[str, Any]: + return self._profiling_config + def set_soc(self, omni_socs: dict[str, OmniSoC_Base]) -> None: self.__socs = omni_socs @@ -105,10 +107,10 @@ class OmniAnalyze_Base: return merge_counters_spatial_multiplex(df) @demarcate - def iteration_multiplex_merge_counters( + def iteration_multiplex_impute_counters( self, df: pd.DataFrame, policy: str ) -> pd.DataFrame: - return merge_counters_iteration_multiplex(df, policy) + return impute_counters_iteration_multiplex(df, policy) @demarcate def generate_configs( @@ -214,6 +216,7 @@ class OmniAnalyze_Base: @demarcate def load_options(self, normalization_filter: Optional[str]) -> None: args = self.get_args() + profiling_config = self.get_profiling_config() target_filter = normalization_filter or args.normal_unit for arch_config in self._arch_configs.values(): @@ -221,7 +224,7 @@ class OmniAnalyze_Base: arch_config.dfs, arch_config.dfs_type, target_filter, - self._profiling_config, + profiling_config, ) # Error checking for multiple runs and multiple kernel filters if args.gpu_kernel and (len(args.path) != len(args.gpu_kernel)): @@ -320,6 +323,7 @@ class OmniAnalyze_Base: def sanitize(self) -> None: """Perform sanitization of inputs""" args = self.get_args() + if args.tui: return @@ -348,10 +352,17 @@ class OmniAnalyze_Base: console_error("analysis", "You cannot provide the same path twice.") seen_paths.add(dir_info[0]) + self._profiling_config: dict[str, Any] = file_io.load_profiling_config( + args.path[0][0] + ) + profiling_config = self.get_profiling_config() + + for dir_info in args.path: if not any([ args.nodes, args.list_nodes, args.spatial_multiplexing, + profiling_config.get("iteration_multiplexing"), ]): is_workload_empty(dir_info[0]) @@ -396,6 +407,30 @@ class OmniAnalyze_Base: "Please choose a different name." ) + # Check if any kernel's counters are missing due to iteration multiplexing + if ( + profiling_config.get("iteration_multiplexing") is not None + and profiling_config.get("kernels_with_missing_counters") is not None + ): + missing_kernels = profiling_config.get("kernels_with_missing_counters") + console_warning( + "analysis", + ( + "The following kernels have missing counter data " + "due to iteration multiplexing and should be filtered out: " + f"{', '.join(missing_kernels)}" + ), + ) + + if profiling_config.get("iteration_multiplexing") is not None: + console_log( + "analysis", + ( + "Profiling data was collected using iteration multiplexing.\n\t" + "Metrics are calculated based on partially available counter data." + ), + ) + # ---------------------------------------------------- # Required methods to be implemented by child classes # ---------------------------------------------------- @@ -415,37 +450,6 @@ class OmniAnalyze_Base: elif args.output_format == "stdout": self._output = sys.stdout - # Read profiling config - self._profiling_config = file_io.load_profiling_config(args.path[0][0]) - - # Check dispatch filtering isn't used with iteration multiplexing - if ( - self._profiling_config.get("iteration_multiplexing") is not None - and args.gpu_dispatch_id - ): - console_error( - "analysis", - "Dispatch filtering (-d/--dispatch) cannot be used " - "with profiling data collected with iteration multiplexing.", - ) - - # Check if any kernel's counters are missing due to iteration multiplexing - if ( - self._profiling_config.get("iteration_multiplexing") is not None - and self._profiling_config.get("kernels_with_missing_counters") is not None - ): - missing_kernels = self._profiling_config.get( - "kernels_with_missing_counters" - ) - console_warning( - "analysis", - ( - "The following kernels have missing counter data " - "due to iteration multiplexing and should be filtered out: " - f"{', '.join(missing_kernels)}" - ), - ) - # initalize runs self._runs = self.initalize_runs() @@ -473,12 +477,3 @@ class OmniAnalyze_Base: def run_analysis(self) -> None: """Run analysis.""" console_debug("analysis", "generating analysis") - if self._profiling_config.get("iteration_multiplexing") is not None: - console_log( - "analysis", - ( - "Profiling data was collected using iteration multiplexing. " - "Some metrics may represent aggregated values " - "across multiple iterations." - ), - ) diff --git a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_cli.py b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_cli.py index 451f2bc72b..f2a170a635 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_cli.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_cli.py @@ -61,7 +61,7 @@ class cli_analysis(OmniAnalyze_Base): ) if self._profiling_config.get("iteration_multiplexing") is not None: - workload.raw_pmc = self.iteration_multiplex_merge_counters( + workload.raw_pmc = self.iteration_multiplex_impute_counters( workload.raw_pmc, policy=self._profiling_config["iteration_multiplexing"], ) diff --git a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py index 4baacd950c..05d6d0fdd0 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_db.py @@ -237,7 +237,7 @@ class db_analysis(OmniAnalyze_Base): ) if self._profiling_config.get("iteration_multiplexing") is not None: - raw_pmc = self.iteration_multiplex_merge_counters( + raw_pmc = self.iteration_multiplex_impute_counters( raw_pmc, policy=self._profiling_config["iteration_multiplexing"], ) diff --git a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_webui.py b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_webui.py index 3b70528036..40df31abc4 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_webui.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_analyze/analysis_webui.py @@ -355,7 +355,9 @@ class webui_analysis(OmniAnalyze_Base): ) if self._profiling_config.get("iteration_multiplexing") is not None: - self._runs[self.dest_dir].raw_pmc = self.iteration_multiplex_merge_counters( + self._runs[ + self.dest_dir + ].raw_pmc = self.iteration_multiplex_impute_counters( self._runs[self.dest_dir].raw_pmc, policy=self._profiling_config["iteration_multiplexing"], ) diff --git a/projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py b/projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py index ab1915d837..9bae23bf14 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_profile/profiler_base.py @@ -104,6 +104,12 @@ class RocProfCompute_Base: "Please remove one of these options." ) + if args.attach_pid and args.iteration_multiplexing is not None: + console_error( + "--attach-pid cannot be used with --iteration-multiplexing. " + "Please remove one of these options." + ) + # verify correct formatting for application binary args.remaining = args.remaining[1:] if args.remaining: diff --git a/projects/rocprofiler-compute/src/utils/utils.py b/projects/rocprofiler-compute/src/utils/utils.py index 54dc7bcfdd..a6c5e01484 100644 --- a/projects/rocprofiler-compute/src/utils/utils.py +++ b/projects/rocprofiler-compute/src/utils/utils.py @@ -1373,13 +1373,12 @@ def reverse_multi_index_df_pmc( return dfs, coll_levels -def merge_counters_iteration_multiplex( +def impute_counters_iteration_multiplex( df_multi_index: pd.DataFrame, policy: str, ) -> pd.DataFrame: """ - For iteration multiplexing, this merges counter values for the kernel collected - over multiple iterations. + Perform data imputation for missing counter values due to iteration multiplexing. """ non_counter_column_index = [ "Dispatch_ID", @@ -1396,25 +1395,17 @@ def merge_counters_iteration_multiplex( "End_Timestamp", "Kernel_ID", ] - result_dfs: list[pd.DataFrame] = [] - - # TODO: will need to optimize to avoid this conversion to single index format - # and do merge directly on multi-index dataframe dfs, coll_levels = reverse_multi_index_df_pmc(df_multi_index) for df in dfs: - kernel_name_column_name = "Kernel_Name" - if "Kernel_Name" not in df and "Name" in df: - kernel_name_column_name = "Name" - - # Find the values in Kernel_Name that occur more than once + # Group by unique kernel configurations unique_occurences = ( - df.groupby(kernel_name_column_name) + df.groupby("Kernel_Name") if policy == "kernel" else df.groupby( [ - kernel_name_column_name, + "Kernel_Name", "Grid_Size", "Workgroup_Size", "LDS_Per_Workgroup", @@ -1423,64 +1414,69 @@ def merge_counters_iteration_multiplex( ) ) - # Define a list to store the merged rows - result_data: list[dict[str, Any]] = [] + counter_columns = [ + col for col in df.columns if col not in non_counter_column_index + ] + # Collect imputed groups as dataframes + group_dfs = [] - pd.set_option("display.max_columns", None) + for _, group in unique_occurences: + # Identify counter buckets + counter_groups: set[frozenset[str]] = set() + for _, row in group.iterrows(): + # Set of counter column names with non empty values + cols_frozenset = frozenset( + row[counter_columns][row[counter_columns].notna()].index + ) + # If no counters found for this dispatch, continue + if not cols_frozenset: + continue + # Since counter buckets are repeated in round robin fashion, + # we can stop once we see a repeated bucket + if cols_frozenset in counter_groups: + break + counter_groups.add(cols_frozenset) - # Reset Dispatch_ID - dispatch_id_counter = 0 + # If no counters found for this group, continue + if not counter_groups: + continue - for name, group in unique_occurences: - # Create a dictionary to store the merged row for the current group - merged_row: dict[str, Any] = {} + # Iterate over subgroups of dispatches containing + # all counters and impute missing values + subgroup_size = len(counter_groups) + all_counters = { + counter for counter_group in counter_groups for counter in counter_group + } + # Collect imputed sub-groups as dataframes + subgroup_dfs = [] + for i in range(0, len(group), subgroup_size): + subgroup = group.iloc[i : i + subgroup_size] - # Process non-counter columns - for col in non_counter_column_index: - if col == "End_Timestamp": - # For End_Timestamp, calculate the median delta time - delta_time = group[col] - group["Start_Timestamp"] - merged_row[col] = group["Start_Timestamp"] + delta_time.median() - if col == "Dispatch_ID": - # Assign new Dispatch_ID - merged_row[col] = dispatch_id_counter - dispatch_id_counter += 1 - elif pd.api.types.is_numeric_dtype(group[col]): - # For other non-counter numeric columns, take the median value - merged_row[col] = group[col].median() - if pd.api.types.is_integer_dtype(group[col]): - merged_row[col] = merged_row[col].astype(int) - else: - # For other non-counter non-numeric columns, - # take the first occurrence (0th row) - # Only Kernel_Name should be non-numeric here - merged_row[col] = group.iloc[0][col] + # Build imputation mapping once for all counters in this subgroup + fill_values = {} + for counter in all_counters: + valid_mask = subgroup[counter].notna() + if valid_mask.any(): + # Get the first valid value for this counter + fill_values[counter] = subgroup.loc[valid_mask, counter].iloc[0] - # Process counter columns (assumed to be all columns not in - # non_counter_column_index) - counter_columns = [ - col for col in group.columns if col not in non_counter_column_index - ] - for counter_col in counter_columns: - # For counter columns, calculate median only across non-NaN values - # Preserve original data type - valid_values = group[counter_col].dropna() - if not valid_values.empty: - median_value = valid_values.median() - # Preserve original data type - check if all - # non-null values are integers - if (valid_values == valid_values.astype(int)).all(): - merged_row[counter_col] = int(median_value) - else: - merged_row[counter_col] = median_value - else: - merged_row[counter_col] = None + # Apply all fills at once using vectorized fillna + if fill_values: + subgroup = subgroup.fillna(fill_values) - # Append the merged row to the result list - result_data.append(merged_row) + subgroup_dfs.append(subgroup) - # Create a new DataFrame from the merged rows - result_dfs.append(pd.DataFrame(result_data)) + # Concatenate all subgroups for this group + if subgroup_dfs: + # Add the imputed group dataframe + group_dfs.append(pd.concat(subgroup_dfs, ignore_index=True)) + + # Create a new dataframe by concatenating all groups + result_dfs.append( + pd.concat(group_dfs, ignore_index=True) + if group_dfs + else pd.DataFrame(df.columns) + ) final_df = pd.concat(result_dfs, keys=coll_levels, axis=1, copy=False) return final_df diff --git a/projects/rocprofiler-compute/tests/test_analyze_commands.py b/projects/rocprofiler-compute/tests/test_analyze_commands.py index 775a3346d9..ba9bd7152f 100644 --- a/projects/rocprofiler-compute/tests/test_analyze_commands.py +++ b/projects/rocprofiler-compute/tests/test_analyze_commands.py @@ -1677,7 +1677,7 @@ def test_iteration_multiplexing(binary_handler_analyze_rocprof_compute): "--path", workload_dir, ]) - assert code == 1 + assert code == 0 # Test without dispatch filtering code = binary_handler_analyze_rocprof_compute([