Improve kokkos trace handling. (#573)

* Improve kokkos trace is handling.

* Fix formatting.

* Syntax fix and adding comments.

[ROCm/rocprofiler-compute commit: 10e1d57bfb]
This commit is contained in:
xuchen-amd
2025-02-22 14:07:14 -05:00
committato da GitHub
parent 8c43eec752
commit 69f0312b7b
2 ha cambiato i file con 129 aggiunte e 80 eliminazioni
@@ -77,6 +77,26 @@ class RocProfCompute_Base:
out = self.__args.path + "/pmc_perf.csv"
files = glob.glob(self.__args.path + "/" + "pmc_perf_*.csv")
files.extend(glob.glob(self.__args.path + "/" + "SQ_*.csv"))
if self.get_args().hip_trace:
# remove hip api trace ouputs from this list
files = [
f
for f in files
if not re.compile(r"^.*_hip_api_trace\.csv$").match(
os.path.basename(f)
)
]
if self.get_args().kokkos_trace:
# remove marker api trace ouputs from this list
files = [
f
for f in files
if not re.compile(r"^.*_marker_api_trace\.csv$").match(
os.path.basename(f)
)
]
elif type(self.__args.path) == list:
files = self.__args.path
else:
@@ -624,94 +624,36 @@ def run_prof(
console_error(output, exit=False)
console_error("Profiling execution failed.")
results_files = []
if rocprof_cmd.endswith("v2"):
# rocprofv2 has separate csv files for each process
results_files = glob.glob(workload_dir + "/out/pmc_1/results_*.csv")
# Combine results into single CSV file
combined_results = pd.concat(
[pd.read_csv(f) for f in results_files], ignore_index=True
elif rocprof_cmd.endswith("v3"):
# rocprofv3 requires additional processing for each process
results_files = process_rocprofv3_output(
format_rocprof_output, workload_dir, is_timestamps
)
# Overwrite column to ensure unique IDs.
combined_results["Dispatch_ID"] = range(0, len(combined_results))
combined_results.to_csv(
workload_dir + "/out/pmc_1/results_" + fbase + ".csv", index=False
)
if rocprof_cmd.endswith("v3"):
results_files_csv = {}
if format_rocprof_output == "json":
results_files_json = glob.glob(workload_dir + "/out/pmc_1/*/*.json")
for json_file in results_files_json:
csv_file = pathlib.Path(json_file).with_suffix(".csv")
v3_json_to_csv(json_file, csv_file)
results_files_csv = glob.glob(workload_dir + "/out/pmc_1/*/*.csv")
elif format_rocprof_output == "csv":
counter_info_csvs = glob.glob(
workload_dir + "/out/pmc_1/*/*_counter_collection.csv"
# kokkos trace output processing for --kokkos-trace
# TODO: as rocprofv3 --kokkos-trace feature improves, rocprof-compute should make updates accordingly
if "--kokkos-trace" in options:
console_debug(
"[run_prof] --kokkos-trace detected, handling *_marker_api_trace.csv outputs."
)
existing_counter_files_csv = [
d for d in counter_info_csvs if path(d).is_file()
]
process_kokkos_trace_output(workload_dir, fbase)
# TODO: add hip trace output processing
if len(existing_counter_files_csv) > 0:
for counter_file in existing_counter_files_csv:
current_dir = str(path(counter_file).parent)
agent_info_filepath = str(
path(current_dir).joinpath(
path(counter_file).name.replace(
"_counter_collection", "_agent_info"
)
)
)
if not path(agent_info_filepath).is_file():
raise ValueError(
'{} has no coresponding "agent info" file'.format(
counter_file
)
)
# Combine results into single CSV file
combined_results = pd.concat(
[pd.read_csv(f) for f in results_files], ignore_index=True
)
converted_csv_file = str(
path(current_dir).joinpath(
path(counter_file).name.replace(
"_counter_collection", "_converted"
)
)
)
# Overwrite column to ensure unique IDs.
combined_results["Dispatch_ID"] = range(0, len(combined_results))
v3_counter_csv_to_v2_csv(
counter_file, agent_info_filepath, converted_csv_file
)
results_files_csv = glob.glob(
workload_dir + "/out/pmc_1/*/*_converted.csv"
)
elif is_timestamps:
# when the input is timestamps, we know counter csv file is not generated and will instead parse kernel trace file
results_files_csv = glob.glob(
workload_dir + "/out/pmc_1/*/*_kernel_trace.csv"
)
else:
# when the input is not for timestamps, and counter csv file is not generated, we assume failed rocprof run and will completely bypass the file generation and merging for current pmc
return
else:
console_error("The output file of rocprofv3 can only support json or csv!!!")
# Combine results into single CSV file
combined_results = pd.concat(
[pd.read_csv(f) for f in results_files_csv], ignore_index=True
)
# Overwrite column to ensure unique IDs.
combined_results["Dispatch_ID"] = range(0, len(combined_results))
combined_results.to_csv(
workload_dir + "/out/pmc_1/results_" + fbase + ".csv", index=False
)
combined_results.to_csv(
workload_dir + "/out/pmc_1/results_" + fbase + ".csv", index=False
)
if new_env:
# flatten tcc for applicable mi300 input
@@ -757,6 +699,93 @@ def run_prof(
df.to_csv(workload_dir + "/" + fbase + ".csv", index=False)
def process_rocprofv3_output(rocprof_output, workload_dir, is_timestamps):
"""
rocprofv3 specific output processing.
takes care of json or csv formats, for csv format, additional processing is performed.
"""
results_files_csv = {}
if rocprof_output == "json":
results_files_json = glob.glob(workload_dir + "/out/pmc_1/*/*.json")
for json_file in results_files_json:
csv_file = pathlib.Path(json_file).with_suffix(".csv")
v3_json_to_csv(json_file, csv_file)
results_files_csv = glob.glob(workload_dir + "/out/pmc_1/*/*.csv")
elif rocprof_output == "csv":
counter_info_csvs = glob.glob(
workload_dir + "/out/pmc_1/*/*_counter_collection.csv"
)
existing_counter_files_csv = [d for d in counter_info_csvs if path(d).is_file()]
if len(existing_counter_files_csv) > 0:
for counter_file in existing_counter_files_csv:
current_dir = str(path(counter_file).parent)
agent_info_filepath = str(
path(current_dir).joinpath(
path(counter_file).name.replace(
"_counter_collection", "_agent_info"
)
)
)
if not path(agent_info_filepath).is_file():
raise ValueError(
'{} has no coresponding "agent info" file'.format(counter_file)
)
converted_csv_file = str(
path(current_dir).joinpath(
path(counter_file).name.replace(
"_counter_collection", "_converted"
)
)
)
v3_counter_csv_to_v2_csv(
counter_file, agent_info_filepath, converted_csv_file
)
results_files_csv = glob.glob(workload_dir + "/out/pmc_1/*/*_converted.csv")
elif is_timestamps:
# when the input is timestamps, we know counter csv file is not generated and will instead parse kernel trace file
results_files_csv = glob.glob(
workload_dir + "/out/pmc_1/*/*_kernel_trace.csv"
)
else:
# when the input is not for timestamps, and counter csv file is not generated, we assume failed rocprof run and will completely bypass the file generation and merging for current pmc
console_error("No counter csv files generated, rocprofv3 run failed!!!")
else:
console_error("The output file of rocprofv3 can only support json or csv!!!")
return results_files_csv
def process_kokkos_trace_output(workload_dir, fbase):
# marker api trace csv files are generated for each process
marker_api_trace_csvs = glob.glob(
workload_dir + "/out/pmc_1/*/*_marker_api_trace.csv"
)
existing_marker_files_csv = [d for d in marker_api_trace_csvs if path(d).is_file()]
# concate and output marker api trace info
combined_results = pd.concat(
[pd.read_csv(f) for f in existing_marker_files_csv], ignore_index=True
)
combined_results.to_csv(
workload_dir + "/out/pmc_1/results_" + fbase + "_marker_api_trace.csv",
index=False,
)
if path(workload_dir + "/out").exists():
shutil.copyfile(
workload_dir + "/out/pmc_1/results_" + fbase + "_marker_api_trace.csv",
workload_dir + "/" + fbase + "_marker_api_trace.csv",
)
def replace_timestamps(workload_dir):
df_stamps = pd.read_csv(workload_dir + "/timestamps.csv")
if "Start_Timestamp" in df_stamps.columns and "End_Timestamp" in df_stamps.columns: