diff --git a/projects/rocprofiler-compute/CHANGELOG.md b/projects/rocprofiler-compute/CHANGELOG.md index 9031b4bc63..d85873a698 100644 --- a/projects/rocprofiler-compute/CHANGELOG.md +++ b/projects/rocprofiler-compute/CHANGELOG.md @@ -5,6 +5,10 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs. ## Unreleased ### Added +* Add support for multi-kernel applications' pc sampling. + * PC Sampling's outputs' instructions are displayed with the name of the kernel that individual instruction belongs to. + * Single kernel selection is supported so that the pc samples of selected kernel can be displayed. + * Add `--list-blocks ` option to general options to list available IP blocks on specified arch (similar to `--list-metrics`), cannot be used with `--block`. * Added `config_delta/gfx950_diff.yaml` to analysis config yamls to track the revision between a gfx9 architecture against the latest supported architecture gfx950 diff --git a/projects/rocprofiler-compute/docs/how-to/pc_sampling.rst b/projects/rocprofiler-compute/docs/how-to/pc_sampling.rst index 64aff4ae2a..c2d7ff9cd1 100644 --- a/projects/rocprofiler-compute/docs/how-to/pc_sampling.rst +++ b/projects/rocprofiler-compute/docs/how-to/pc_sampling.rst @@ -71,4 +71,5 @@ Selecting single kernel sorting by PC count: .. note:: * PC sampling feature is currently in BETA version. To enable PC sampling, you have to explicitly enable it with block index 21. + * PC sampling now only shows assembly instructions collected in our record of pc samples and not all instructions of compiled code are represented. * To associate PC sampling info back to HIP source code, you need to build the profiling target app with ``-g`` to keep the symbols. Otherwise, PC sampling info will be only associated with assembly lines. diff --git a/projects/rocprofiler-compute/src/utils/parser.py b/projects/rocprofiler-compute/src/utils/parser.py index 6c9ef031e2..a5b27ffd6a 100755 --- a/projects/rocprofiler-compute/src/utils/parser.py +++ b/projects/rocprofiler-compute/src/utils/parser.py @@ -27,7 +27,6 @@ import argparse import ast import json import re -import sys import warnings from pathlib import Path from typing import Any, Optional, Union @@ -1233,20 +1232,35 @@ def search_pc_sampling_record( records: Union[list[dict], dict], ) -> Optional[list[tuple]]: """ - Search PC sampling records, and group and sort them - """ + Search PC sampling records. - # NB: - # The field stall_reason is vailid only for HW stochastic pc sampling. - # TODO: might save wavefront count for HW stochastic pc sampling? + Group by (code_object_id, code_object_offset, inst_index), and aggregate + counts, stall reasons, and dispatch IDs. + + Returns: + A sorted list of tuples: + ( + code_object_id, + code_object_offset, + inst_index, + total_count, + count_issued, + count_stalled, + sorted_stall_reasons, + sorted_dispatch_ids, + ) + """ if not records: console_warning("PC sampling: no pc sampling record found!") return None + # records should always be a list of dict + if isinstance(records, dict): + records = [records] + rocp_inst_not_issued_prefix_len = len(PC_SAMPLING_NOT_ISSUE_PREFIX) - grouped_data = {} stall_reason_keys = { "NONE": 0, # No instruction available in the instruction cache. @@ -1265,82 +1279,74 @@ def search_pc_sampling_record( "LAST": 0, } - # Populate grouped_data + grouped_data: dict[tuple, list] = {} + for item in records: - record = item["record"] + record = item.get("record", {}) pc_info = record.get("pc", {}) code_object_id = pc_info.get("code_object_id") code_object_offset = pc_info.get("code_object_offset") inst_index = item.get("inst_index") + dispatch_id = record.get("dispatch_id") if None in (code_object_id, code_object_offset, inst_index): continue - # Create composite key - key = (code_object_id, code_object_offset) + key = (code_object_id, code_object_offset, inst_index) snapshot = record.get("snapshot", {}) - issued = record.get("wave_issued") + issued = record.get("wave_issued", False) if key not in grouped_data: - grouped_data[key] = [0, 0, 0, inst_index, {}] + grouped_data[key] = [0, 0, 0, {}, set()] + + entry = grouped_data[key] # Update counts - entry = grouped_data[key] - entry[0] += 1 # count - entry[3] = inst_index # inst_index + entry[0] += 1 # total_count + if issued: + entry[1] += 1 # count_issued + else: + entry[2] += 1 # count_stalled + stall_reason = snapshot.get("stall_reason") + if stall_reason and len(stall_reason) > rocp_inst_not_issued_prefix_len: + reason_key = stall_reason[rocp_inst_not_issued_prefix_len:] + if reason_key in stall_reason_keys: + entry[3][reason_key] = entry[3].get(reason_key, 0) + 1 - # Process snapshot data - if snapshot: - if issued: - entry[1] += 1 # count_issued - else: - entry[2] += 1 # count_stalled - - # Process stall reason only when stalled - stall_reason = snapshot.get("stall_reason") - if stall_reason: - # Extract reason key with bounds checking - if len(stall_reason) > rocp_inst_not_issued_prefix_len: - reason_key = stall_reason[rocp_inst_not_issued_prefix_len:] - # Only track known stall reasons - if reason_key in stall_reason_keys: - stall_reasons = entry[4] - stall_reasons[reason_key] = ( - stall_reasons.get(reason_key, 0) + 1 - ) + # Add dispatch_id if valid + if dispatch_id is not None: + entry[4].add(dispatch_id) if not grouped_data: console_warning("PC sampling: no pc sampling record found!") return None - # Convert to sorted list of tuples: - # (code_object_id, inst_index, code_object_offset, count) + # Prepare sorted output list sorted_counts = sorted( [ ( code_object_id, - info["inst_index"], - offset, - info["count"], - info["count_issued"], - info["count_stalled"], - # For info["stall_reason"], remove the zero entries, - # sorting the remaining items by their values in descending order + code_object_offset, + inst_index, + info[0], # total_count + info[1], # count_issued + info[2], # count_stalled sorted( - ((k, v) for k, v in info["stall_reason"].items() if v > 0), + ((k, v) for k, v in info[3].items() if v > 0), key=lambda item: item[1], reverse=True, - ), + ), # sorted stall reasons + sorted(info[4]), # sorted dispatch_ids list ) - for code_object_id, offsets in grouped_data.items() - for offset, info in offsets.items() + for ( + code_object_id, + code_object_offset, + inst_index, + ), info in grouped_data.items() ], - key=lambda x: ( - x[0], - x[2], - ), # Sort by code_object_id, then by code_object_offset + key=lambda x: (x[0], x[1], x[2]), ) return sorted_counts @@ -1348,7 +1354,11 @@ def search_pc_sampling_record( @demarcate def load_pc_sampling_data_per_kernel( - method: str, file_name: Path, kernel_name: str, sorting_type: str + method: str, + file_name: Path, + csv_file_name: Path, + kernel_name: str, + sorting_type: str, ) -> pd.DataFrame: """ Load PC sampling raw data from json file with given method and kernel name, @@ -1367,48 +1377,21 @@ def load_pc_sampling_data_per_kernel( :return: The counted and reordering pc sampling info. :rtype: pd.DataFrame: """ - kernel_info_list = search_key_in_json(file_name, "kernel_symbols") - - kernel_info = {} - if kernel_info_list: - for item in kernel_info_list: - if ( - item["formatted_kernel_name"] == kernel_name - or item["demangled_kernel_name"] == kernel_name - or item["truncated_kernel_name"] == kernel_name - ): - # kernel_info["kernel_id"] = item["kernel_id"] - kernel_info["code_object_id"] = item["code_object_id"] - kernel_info["entry_byte_offset"] = item["kernel_code_entry_byte_offset"] - break - - if not kernel_info: - console_warning(f"PC sampling: can not find the kernel {kernel_name}") - return pd.DataFrame() - else: - console_debug(f"PC sampling: kernel {kernel_info}") - - filtered_sorted_list = sorted( - [ - item - for item in kernel_info_list - if item["code_object_id"] == kernel_info["code_object_id"] - ], - key=lambda x: x["kernel_code_entry_byte_offset"], + # Load kernel trace CSV with kernel info + kernel_trace_df = pd.read_csv( + csv_file_name, usecols=["Dispatch_Id", "Kernel_Id", "Kernel_Name"] + ) + console_debug( + f"PC sampling: loaded kernel trace with {len(kernel_trace_df)} entries" ) - for i, item in enumerate(filtered_sorted_list): - if item["kernel_code_entry_byte_offset"] == kernel_info["entry_byte_offset"]: - next_index = i + 1 - if next_index < len(filtered_sorted_list): # Ensure the next item exists - next_item = filtered_sorted_list[next_index] - kernel_info["potential_end_offset"] = next_item[ - "kernel_code_entry_byte_offset" - ] - else: - kernel_info["potential_end_offset"] = sys.maxsize - break + # Filter kernels matching requested kernel_name + matching_kernels = kernel_trace_df[kernel_trace_df["Kernel_Name"] == kernel_name] + if matching_kernels.empty: + console_warning(f"PC sampling: cannot find kernel '{kernel_name}' in CSV") + return pd.DataFrame() + # Extract raw PC sampling records from JSON pc_sample_key_loc = ( search_key_in_json(file_name, "pc_sample_host_trap") if method == "host_trap" @@ -1419,90 +1402,145 @@ def load_pc_sampling_data_per_kernel( console_warning("PC sampling: can not find pc sample.") return pd.DataFrame() - df = pd.DataFrame( - search_pc_sampling_record(pc_sample_key_loc), - columns=[ - "code_object_id", - "inst_index", - "offset", - "count", - "count_issued", - "count_stalled", - "stall_reason", - ], + # Get processed sampling data grouped by (code_object_id, offset, inst_index) + records = search_pc_sampling_record(pc_sample_key_loc) + if not records: + console_warning("PC sampling: no records found in PC sampling data.") + return pd.DataFrame() + + # Flatten records by dispatch_id to create one row per dispatch ID + rows = [] + for ( + code_object_id, + offset, + inst_index, + count, + count_issued, + count_stalled, + stall_reasons, + dispatch_ids, + ) in records: + for dispatch_id in dispatch_ids: + rows.append({ + "dispatch_id": dispatch_id, + "code_object_id": code_object_id, + "offset": offset, + "inst_index": inst_index, + "count": count, + "count_issued": count_issued, + "count_stalled": count_stalled, + "stall_reason": stall_reasons, + }) + + df = pd.DataFrame(rows) + if df.empty: + console_warning("PC sampling: no records found after flattening dispatch IDs.") + return df + + # Map dispatch_id to kernel info (Kernel_Id and Kernel_Name) + dispatch_to_kernel = kernel_trace_df.set_index("Dispatch_Id")[ + ["Kernel_Id", "Kernel_Name"] + ] + + # Map dispatch_id to kernel info (Kernel_Id and Kernel_Name) + df["kernel_id"] = df["dispatch_id"].map(dispatch_to_kernel["Kernel_Id"]) + df["kernel_name"] = df["dispatch_id"].map(dispatch_to_kernel["Kernel_Name"]) + + # Drop dispatch_id + df.drop(columns=["dispatch_id"], inplace=True) + + def merge_stall_reasons( + stall_reason_series: list[Optional[list[tuple[str, int]]]], + ) -> list[tuple[str, int]]: + """ + Function to merge stall_reason lists (list of dicts -> merged & sorted dict) + """ + merged_counts = {} + + for entry in stall_reason_series: + if not entry: + continue + # Each entry is a list of (key, count) tuples + for k, v in entry: + if v > 0: + merged_counts[k] = merged_counts.get(k, 0) + v + + # Return sorted list of tuples by descending count + return sorted(merged_counts.items(), key=lambda item: item[1], reverse=True) + + # Group and aggregate + df = df.groupby(["code_object_id", "offset", "kernel_id"], as_index=False).agg({ + "inst_index": "first", + "count": "sum", + "count_issued": "sum", + "count_stalled": "sum", + "stall_reason": merge_stall_reasons, + "kernel_name": "first", + }) + + # Filter DataFrame to only include rows matching the requested kernel_name + df = df[df["kernel_name"] == kernel_name] + + # Convert offset column to hex string for display, keep original numeric for sorting + df["offset"] = df["offset"].apply(lambda x: hex(x)) + + # Load PC sampling instructions from JSON (if available) + pc_sample_instructions = search_key_in_json(file_name, "pc_sample_instructions") + df["instruction"] = ( + df["inst_index"].apply( + lambda x: pc_sample_instructions[x] + if x < len(pc_sample_instructions) + else None + ) + if pc_sample_instructions + else None ) - df = df[ - (df["code_object_id"] == kernel_info["code_object_id"]) - & (df["offset"] > kernel_info["entry_byte_offset"]) - & (df["offset"] < kernel_info["potential_end_offset"]) - ][ + # Load source code comments (if available) + pc_sample_comments = search_key_in_json(file_name, "pc_sample_comments") + df["source_line"] = ( + df["inst_index"].apply( + lambda x: f".../{Path(pc_sample_comments[x]).name}" + if x < len(pc_sample_comments) + else None + ) + if pc_sample_comments + else None + ) + + # Sorting and returning relevant columns depending on method and sorting_type + if sorting_type == "offset": + df_sorted = df.sort_values(by=["code_object_id", "offset"]) + elif sorting_type == "count": + df_sorted = df.sort_values(by=["count"], ascending=False) + else: + console_error( + 'Error: pc sampling sorting_type must be either "offset" or "count".' + ) + return pd.DataFrame() + + columns_to_return = ( [ - "inst_index", + "source_line", + "instruction", + "code_object_id", + "offset", + "count", + ] + if method == "host_trap" + else [ + "source_line", + "instruction", + "code_object_id", "offset", "count", "count_issued", "count_stalled", "stall_reason", ] - ] + ) - df["offset"] = df["offset"].apply(lambda x: hex(x)) - - # Add instruction and source line information - pc_sample_instructions = search_key_in_json(file_name, "pc_sample_instructions") - if pc_sample_instructions: - df["instruction"] = df["inst_index"].apply( - lambda x: ( - pc_sample_instructions[x] if x < len(pc_sample_instructions) else None - ) - ) - - pc_sample_comments = search_key_in_json(file_name, "pc_sample_comments") - if pc_sample_comments: - df["source_line"] = df["inst_index"].apply( - lambda x: ( - f".../{Path(pc_sample_comments[x]).name}" - if x < len(pc_sample_comments) - else None - ) - ) - - # Return sorted data based on sorting type - if sorting_type == "offset": - return ( - df[["source_line", "instruction", "offset", "count"]] - if method == "host_trap" - else df[ - [ - "source_line", - "instruction", - "offset", - "count", - "count_issued", - "count_stalled", - "stall_reason", - ] - ] - ) - else: # sort by "count" - return ( - df[["source_line", "instruction", "offset", "count"]].sort_values( - by="count", ascending=False - ) - if method == "host_trap" - else df[ - [ - "source_line", - "instruction", - "offset", - "count", - "count_issued", - "count_stalled", - "stall_reason", - ] - ].sort_values(by="count", ascending=False) - ) + return df_sorted[columns_to_return] # might support sort by stall reason in the future @@ -1526,6 +1564,14 @@ def load_pc_sampling_data( # - Alternatively, we could check pc_sampling_method in json stochastic_path = Path(dir_path) / f"{file_prefix}_pc_sampling_stochastic.csv" host_trap_path = Path(dir_path) / f"{file_prefix}_pc_sampling_host_trap.csv" + json_file_path = Path(dir_path) / f"{file_prefix}_results.json" + csv_kernel_trace_file_path = Path(dir_path) / f"{file_prefix}_kernel_trace.csv" + + if not csv_kernel_trace_file_path.exists(): + console_error( + f"PC sampling: can not read {csv_kernel_trace_file_path}", exit=False + ) + return pd.DataFrame() if stochastic_path.exists(): pc_sampling_method = "stochastic" @@ -1541,24 +1587,44 @@ def load_pc_sampling_data( # No kernel filter, return grouped and sorted csv dir_pathectly if not workload.filter_kernel_ids: + # Load instruction CSV df = pd.read_csv(csv_file_path) - # Group by 'Instruction_Comment' and count occurrences + + # Load kernel trace CSV + kernel_trace_df = pd.read_csv(csv_kernel_trace_file_path) + + # Merge on Correlation_Id (instruction CSV) and Dispatch_Id (kernel trace CSV) + merged_df = df.merge( + kernel_trace_df[["Dispatch_Id", "Kernel_Name", "Kernel_Id"]], + how="left", + left_on="Correlation_Id", + right_on="Dispatch_Id", + ) + + # Group by Instruction_Comment and aggregate grouped_counts = ( - df.groupby("Instruction_Comment") + merged_df.groupby("Instruction_Comment") .agg( count=("Instruction_Comment", "count"), instruction=("Instruction", "first"), + Kernel_Id=("Kernel_Id", "first"), + Kernel_Name=("Kernel_Name", "first"), ) .reset_index() .rename(columns={"Instruction_Comment": "source_line"}) ) - - grouped_counts = grouped_counts[["source_line", "instruction", "count"]] + grouped_counts = grouped_counts[ + [ + "source_line", + "Kernel_Name", + "instruction", + "count", + ] + ] grouped_counts["source_line"] = grouped_counts["source_line"].apply( - lambda x: f".../{Path(x).name}" + lambda x: f".../{Path(x).name}" if isinstance(x, str) and x else x ) - # Sort by the count of occurrences return grouped_counts.sort_values(by="count", ascending=False) elif len(workload.filter_kernel_ids) > 1: @@ -1570,8 +1636,6 @@ def load_pc_sampling_data( return pd.DataFrame() elif len(workload.filter_kernel_ids) == 1: - # NB: the default file name is subject to changes from rocprofv3/rocprofiler_sdk - json_file_path = Path(dir_path) / f"{file_prefix}_results.json" if not json_file_path.exists(): console_error(f"PC sampling: can not read {json_file_path}", exit=False) return pd.DataFrame() @@ -1580,11 +1644,25 @@ def load_pc_sampling_data( # We should find better way to remove the dependency on kernel_top_table kernel_top_df = workload.dfs[PMC_KERNEL_TOP_TABLE_ID] file = Path(dir_path) / str(kernel_top_df.loc[0, "from_csv"]) - kernel_name = pd.read_csv(file).loc[ - workload.filter_kernel_ids[0], "Kernel_Name" - ] + kernel_index = workload.filter_kernel_ids[0] + + kernel_df = pd.read_csv(file) + + if kernel_index >= len(kernel_df): + console_warning( + f"Kernel index {kernel_index} is out of bounds. " + f"kernel_top CSV has only {len(kernel_df)} rows." + ) + return pd.DataFrame() + + kernel_name = kernel_df.iloc[kernel_index]["Kernel_Name"] + return load_pc_sampling_data_per_kernel( - pc_sampling_method, json_file_path, kernel_name, sorting_type + pc_sampling_method, + json_file_path, + csv_kernel_trace_file_path, + kernel_name, + sorting_type, ) else: console_warning("PC sampling: No data") diff --git a/projects/rocprofiler-compute/src/utils/utils.py b/projects/rocprofiler-compute/src/utils/utils.py index 750c5f2353..deb562707d 100644 --- a/projects/rocprofiler-compute/src/utils/utils.py +++ b/projects/rocprofiler-compute/src/utils/utils.py @@ -977,6 +977,7 @@ def pc_sampling_prof( "ROCPROF_PC_SAMPLING_UNIT": unit, "ROCPROF_PC_SAMPLING_INTERVAL": str(interval), "ROCPROF_PC_SAMPLING_METHOD": method, + "ROCPROF_KERNEL_TRACE": "1", } new_env = os.environ.copy() for key, value in options.items(): @@ -988,6 +989,7 @@ def pc_sampling_prof( ) else: options = [ + "--kernel-trace", "--pc-sampling-beta-enabled", "--pc-sampling-method", method,