Files
rocm-systems/source/python/gui/source/parser.py
T
David Galiffi d07bf508a9 Rename Omnitrace to ROCm Systems Profiler (#4)
The Omnitrace program is being renamed. 

Full name: "ROCm Systems Profiler"
Package name: "rocprofiler-systems"
Binary / Library names: "rocprof-sys-*"

---------
Co-authored-by: Xuan Chen <xuchen@amd.com>
Signed-off-by: David Galiffi <David.Galiffi@amd.com>
2024-10-15 11:20:40 -04:00

874 wiersze
30 KiB
Python

#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2023-2024 Advanced Micro Devices, Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import absolute_import
__author__ = "AMD Research"
__copyright__ = "Copyright 2023-2024, Advanced Micro Devices, Inc."
__license__ = "MIT"
__maintainer__ = "AMD Research"
__status__ = "Development"
import sys
import json
import jsondiff
import re
import pandas as pd
import numpy as np
import math
from collections import OrderedDict
import os
import glob
num_stddev = 1.0
def set_num_stddev(_value):
global num_stddev
num_stddev = _value
def mean(_data):
return sum(_data) / float(len(_data)) if len(_data) > 0 else 0.0
def stddev(_data):
if len(_data) == 0:
return 0.0
_mean = mean(_data)
_variance = sum([((x - _mean) ** 2) for x in _data]) / float(len(_data))
return float(num_stddev) * math.sqrt(_variance)
class validation(object):
def __init__(self, _exp_re, _pp_re, _virt, _expected, _tolerance):
self.experiment_filter = re.compile(_exp_re)
self.progress_pt_filter = re.compile(_pp_re)
self.virtual_speedup = int(_virt)
self.program_speedup = float(_expected)
self.tolerance = float(_tolerance)
def validate(
self,
_exp_name,
_pp_name,
_virt_speedup,
_prog_speedup,
_prog_speedup_stddev,
_base_speedup_stddev,
):
if (
not re.search(self.experiment_filter, _exp_name)
or not re.search(self.progress_pt_filter, _pp_name)
or _virt_speedup != self.virtual_speedup
):
return None
_tolerance = self.tolerance
if _base_speedup_stddev > 2.0 * self.tolerance:
sys.stderr.write(
f" [{_exp_name}][{_pp_name}][{_virt_speedup}] base speedup has stddev > 2 * tolerance (+/- {_base_speedup_stddev:.3f}). Relaxing validation...\n"
)
_tolerance += math.sqrt(_base_speedup_stddev)
elif _prog_speedup_stddev > 2.0 * self.tolerance:
sys.stderr.write(
f" [{_exp_name}][{_pp_name}][{_virt_speedup}] program speedup has stddev > 2 * tolerance (+/- {_prog_speedup_stddev:.3f}). Relaxing validation...\n"
)
_tolerance += math.sqrt(_prog_speedup_stddev)
return _prog_speedup >= (self.program_speedup - _tolerance) and _prog_speedup <= (
self.program_speedup + _tolerance
)
class throughput_point(object):
def __init__(self, _speedup):
self.speedup = _speedup
self.delta = []
self.duration = []
def __iadd__(self, _data):
self.delta += [float(_data[0])]
self.duration += [float(_data[1])]
def __len__(self):
return len(self.duration)
def __eq__(self, rhs):
return self.speedup == rhs.speedup
def __neq__(self, rhs):
return not self == rhs
def __lt__(self, rhs):
return self.speedup < rhs.speedup
def get_data(self):
return [x / y for x, y in zip(self.duration, self.delta)]
def mean(self):
return sum(self.duration) / sum(self.delta)
class latency_point(object):
def __init__(self, _speedup):
self.speedup = _speedup
self.arrivals = []
self.departures = []
self.duration = []
def __iadd__(self, _data):
self.arrivals += [float(_data[0])]
self.departures += [float(_data[1])]
self.duration += [float(_data[2])]
def __len__(self):
return len(self.duration)
def __eq__(self, rhs):
return self.speedup == rhs.speedup
def __neq__(self, rhs):
return not self == rhs
def __lt__(self, rhs):
return self.speedup < rhs.speedup
def get_data(self):
return [y / x for x, y in zip(self.arrivals, self.duration)]
def get_difference(self):
_duration = sum(self.duration)
return [x / _duration for x in self.duration]
def mean(self):
rate = sum(self.arrivals) / sum(self.duration)
return sum(self.get_difference()) / rate
class line_speedup(object):
def __init__(self, _name="", _prog="", _exp_data=None, _exp_base=None):
self.name = _name
self.prog = _prog
self.data = _exp_data
self.base = _exp_base
def virtual_speedup(self):
if self.data is None or self.base is None:
return 0.0
return self.data.speedup
def compute_speedup(self):
if self.data is None or self.base is None:
return 0.0
return ((self.base.mean() - self.data.mean()) / self.base.mean()) * 100
def compute_speedup_stddev(self):
if self.data is None or self.base is None:
return 0.0
_data = []
_base = self.base.mean()
for ditr in self.data.get_data():
_data += [((_base - ditr) / _base) * 100]
return stddev(_data)
def get_name(self):
return ":".join(
[
os.path.basename(x) if os.path.isfile(x) else x
for x in self.name.split(":")
]
)
def __str__(self):
if self.data is None or self.base is None:
return f"{self.name}"
_line_speedup = self.compute_speedup()
_line_stddev = self.compute_speedup_stddev() # 3 stddev == 99.87%
_name = self.get_name()
return f"[{_name}][{self.prog}][{self.data.speedup:3}] speedup: {_line_speedup:6.1f} +/- {_line_stddev:6.2f} %"
def __eq__(self, rhs):
return (
self.name == rhs.name
and self.prog == rhs.prog
and self.data == rhs.data
and self.base == rhs.base
)
def __neq__(self, rhs):
return not self == rhs
def __lt__(self, rhs):
if self.name != rhs.name:
return self.name < rhs.name
elif self.prog != rhs.prog:
return self.prog < rhs.prog
elif self.data != rhs.data:
return self.data < rhs.data
elif self.base != rhs.base:
return self.base < rhs.base
return False
class experiment_progress(object):
def __init__(self, _data):
self.data = _data
def get_impact(self):
speedup_c = [x.compute_speedup() for x in self.data]
speedup_v = [x.virtual_speedup() for x in self.data]
impact = []
for i in range(len(self.data) - 1):
x = speedup_v[i + 1] - speedup_v[i]
y = [speedup_c[i], speedup_c[i + 1]]
y_min = min(y)
y_max = max(y)
a_low = x * y_min
a_upp = 0.5 * x * (y_max - y_min)
impact += [a_low + a_upp]
# impact = [x.compute_speedup() for x in self.data]
return [sum(impact), mean(impact), stddev(impact)]
def __len__(self):
return len(self.data)
def __str__(self):
_impact_v = self.get_impact()
_name = self.data[0].get_name()
_prog = self.data[0].prog
_impact = [
f"[{_name}][{_prog}][sum] impact: {_impact_v[0]:6.1f}",
f"[{_name}][{_prog}][avg] impact: {_impact_v[1]:6.1f} +/- {_impact_v[2]:6.2f}",
]
return "\n".join([f"{x}" for x in self.data] + _impact)
def __lt__(self, rhs):
self.data.sort()
return self.get_impact()[0] < rhs.get_impact()[0]
def process_samples(data, _data):
if not _data:
return data
for record in _data["rocprofsys"]["causal"]["records"]:
for samp in record["samples"]:
_info = samp["info"]
_count = samp["count"]
_func = _info["dfunc"]
if _func not in data:
data[_func] = 0
data[_func] += _count
for dwarf_entry in _info["dwarf_info"]:
_name = "{}:{}".format(dwarf_entry["file"], dwarf_entry["line"])
if _name not in data:
data[_name] = 0
data[_name] += _count
return data
def process_data(data, _data, experiments, progress_points):
def find_or_insert(_data, _value, _type):
if _value not in _data:
if _type == "throughput":
_data[_value] = throughput_point(_value)
elif _type == "latency":
_data[_value] = latency_point(_value)
else:
raise RuntimeError(f"unknown data type: {_type}")
return _data[_value]
if not _data:
return data
_selection_filter = re.compile(experiments)
_progresspt_filter = re.compile(progress_points)
for record in _data["rocprofsys"]["causal"]["records"]:
for exp in record["experiments"]:
_speedup = exp["virtual_speedup"]
_duration = exp["duration"]
_file = exp["selection"]["info"]["file"]
_line = exp["selection"]["info"]["line"]
_func = exp["selection"]["info"]["dfunc"]
_sym_addr = exp["selection"]["symbol_address"]
_selected = ":".join([_file, f"{_line}"]) if _sym_addr == 0 else _func
if not re.search(_selection_filter, _selected):
continue
if _selected not in data:
data[_selected] = {}
for pts in exp["progress_points"]:
_name = pts["name"]
if not re.search(_progresspt_filter, _name):
continue
if _name not in data[_selected]:
data[_selected][_name] = {}
if "delta" in pts:
_delt = pts["delta"]
if _delt > 0:
itr = find_or_insert(
data[_selected][_name], _speedup, "throughput"
)
itr += [_delt, _duration]
elif "arrival" in pts and pts["arrival"] > 0:
itr = find_or_insert(data[_selected][_name], _speedup, "latency")
itr += [pts["arrival"], pts["departure"], _duration]
else:
_delt = pts["laps"]
if _delt > 0:
itr = find_or_insert(data[_selected][_name], _speedup)
itr += [_delt, _duration]
return data
def compute_speedups(runs, speedups=[], num_points=0, validate=[], debug=False):
out = pd.DataFrame()
data = {}
for workload in runs:
_data = runs[workload]
for selected, pitr in _data.items():
if selected not in data:
data[selected] = {}
for progpt, ditr in pitr.items():
data[selected][progpt] = OrderedDict(sorted(ditr.items()))
ret = []
for selected, pitr in _data.items():
for progpt, ditr in pitr.items():
if 0 not in ditr.keys():
print(f"missing baseline data for {progpt} in {selected}...")
continue
for speedup, itr in ditr.items():
if len(speedups) > 0 and speedup not in speedups:
continue
if speedup != itr.speedup:
raise ValueError(f"in {selected}: {speedup} != {itr.speedup}")
_val = line_speedup(selected, progpt, itr, ditr[0])
ret.append(_val)
ret.sort()
_last_name = None
_last_prog = None
result = []
for itr in ret:
if itr.name != _last_name or itr.prog != _last_prog:
result.append([])
result[-1].append(itr)
_last_name = itr.name
_last_prog = itr.prog
_data = []
for itr in result:
experiment_prog = experiment_progress(itr)
_data.append(experiment_prog)
if len(itr) != 0:
impact = experiment_prog.get_impact()
for itrx in itr:
speedup = itrx.compute_speedup()
line_stddev = itrx.compute_speedup_stddev()
if speedup <= 200 and speedup >= -100:
out = pd.concat(
[
out,
pd.DataFrame(
{
"idx": [(itrx.prog, itrx.name)],
"progress points": [itrx.prog],
"point": [itrx.name],
"line speedup": [itrx.virtual_speedup()],
"program speedup": [speedup],
"speedup err": line_stddev,
"impact sum": impact[0],
"impact avg": impact[1],
"impact err": float(impact[2]),
"workload": workload,
}
),
],
ignore_index=True,
)
_data.sort()
if debug:
for itr in _data:
if len(itr) < num_points:
continue
print("")
print(f"{itr}")
sys.stdout.flush()
validations = get_validations(validate)
expected_validations = len(validations)
correct_validations = 0
if expected_validations > 0:
print(f"\nPerforming {expected_validations} validations...\n")
for eitr in _data:
_experiment = eitr.data[0].get_name()
_progresspt = eitr.data[0].prog
_base_speedup_stddev = eitr.data[0].compute_speedup_stddev()
for ditr in eitr.data:
_virt_speedup = ditr.virtual_speedup()
_prog_speedup = ditr.compute_speedup()
_prog_speedup_stddev = ditr.compute_speedup_stddev()
for vitr in validations:
_v = vitr.validate(
_experiment,
_progresspt,
_virt_speedup,
_prog_speedup,
_prog_speedup_stddev,
_base_speedup_stddev,
)
if _v is None:
continue
if _v is True:
correct_validations += 1
else:
sys.stderr.write(
f" [{_experiment}][{_progresspt}][{_virt_speedup}] failed validation: {_prog_speedup:8.3f} != {vitr.program_speedup} +/- {vitr.tolerance}\n"
)
if expected_validations != correct_validations:
sys.stderr.flush()
sys.stderr.write(
f"\nCausal profiling predictions not validated. Expected {expected_validations}, found {correct_validations}\n"
)
sys.stderr.flush()
sys.exit(-1)
elif expected_validations > 0:
print(f"Causal profiling predictions validated: {expected_validations}")
return out
def get_validations(validate):
data = []
_len = len(validate)
if _len == 0:
return data
elif _len % 5 != 0:
raise ValueError(
"validation requires format: {0}experiment regex{1} {0}progress-point regex{1} {0}virtual-speedup{1} {0}expected-speedup{1} {0}tolerance{1} (i.e. 5 args per validation. There are {2} extra/missing arguments".format(
"{", "}", _len % 5
)
)
v = validate
for i in range(int(_len / 5)):
off = 5 * i
data.append(
validation(v[off + 0], v[off + 1], v[off + 2], v[off + 3], v[off + 4])
)
return data
def compute_sorts(_data):
Max_speedup_order = _data.sort_values(
by="program speedup", ascending=False
).point.unique()
Min_speedup_order = _data.sort_values(
by="program speedup", ascending=True
).point.unique()
point_counts = _data.point.value_counts()
# speedups = pd.DataFrame(_data["Line Speedup"].unique(), columns=["Line Speedup"])
_data["max speedup"] = np.nan
_data["min speedup"] = np.nan
_data["point count"] = np.nan
for index in _data.index:
_data.at[index, "max speedup"] = np.where(
Max_speedup_order == _data.at[index, "point"]
)[0][0]
_data.at[index, "min speedup"] = np.where(
Min_speedup_order == _data.at[index, "point"]
)[0][0]
_data.at[index, "point count"] = point_counts[_data.at[index, "point"]]
return _data
def is_valid_data_point(data):
return not math.isnan(data) and not math.isinf(data)
def get_value(splice):
if "type" in splice:
# Something might happen here later
print("type found in coz file, finish adding code")
sys.exit(1)
if "difference" in splice or "speedup" in splice:
return float(splice[1])
else:
return splice[1]
def add_throughput(df, experiment, value):
ix = (experiment["selected"], value["name"], experiment["speedup"])
df_points = list(df.index)
if ix not in df_points:
new_row = pd.DataFrame(
data={
# "selected" : [experiment["selected"]],
# "speedup" : [float(experiment["speedup"])],
"delta": [float(value["delta"])],
"duration": [float(experiment["duration"])],
"type": ["throughput"],
},
index=[ix],
)
df = pd.concat([new_row, df])
else:
df["delta"][ix] = df["delta"][ix] + float(value["delta"])
df["duration"][ix] = df["duration"][ix] + float(experiment["duration"])
return df
def add_latency(df, experiment, value):
ix = (experiment["selected"], experiment["speedup"])
df_points = list(df.index)
if ix not in df_points:
df = pd.concat(
[
df,
pd.Dataframe(
data={
"selected": [experiment["selected"]],
"point": [value["name"]],
"speedup": [float(experiment["speedup"])],
"arrivals": [float(value["arrivals"])],
"departures": [float(value["departures"])],
"duration": [0],
"type": ["latency"],
}
),
]
)
if value.duration == 0:
df["difference"] = value.difference
else:
duration = df["duration"][ix] + float(experiment["duration"])
df["difference"][ix] = df["difference"][ix] * df["duration"][ix] / duration
df["difference"][ix] = (
df["difference"]
+ (float(value["difference"]) * float(experiment["duration"])) / duration
)
df["duration"] = df["duration"] + float(experiment["duration"])
return df
def parse_files(
files,
experiments=".*",
progress_points=".*",
speedups=[],
num_points=0,
validate=[],
verbose=0,
cli=False,
):
result_df = pd.DataFrame()
sample_df = pd.DataFrame()
def name_wo_ext(x):
return x.replace(".json", "").replace(".coz", "")
json_files = [x for x in filter(lambda y: y.endswith(".json"), files)]
coz_files = [x for x in filter(lambda y: y.endswith(".coz"), files)]
read_files = []
file_names = []
# prefer JSON files first
files = json_files + coz_files
for file in files:
if verbose >= 3:
print(f"Potentially reading causal profile: '{file}'...")
_base_name = name_wo_ext(file)
# do not read in a COZ file if the JSON already read
if _base_name in read_files:
continue
if verbose >= 3 or (verbose >= 1 and not cli):
print(f"Reading causal profile: '{file}'...")
if file.endswith(".json"):
with open(file, "r") as j:
_data = json.load(j)
dict_data = {}
# make sure the JSON is an rocprof-sys causal JSON
if "rocprofsys" not in _data or "causal" not in _data["rocprofsys"]:
continue
dict_data[file] = process_data({}, _data, experiments, progress_points)
samps = process_samples({}, _data)
sample_df = pd.concat(
[
sample_df,
pd.DataFrame(
[
{"location": loc, "count": count}
for loc, count in sorted(samps.items())
]
),
]
)
result_df = pd.concat(
[
result_df,
compute_sorts(
compute_speedups(
dict_data,
speedups,
num_points,
validate,
verbose >= 3 or cli,
)
),
]
)
read_files.append(_base_name)
file_names.append(file)
elif file.endswith(".coz"):
try:
f = open(file, "r")
except IOError as e:
sys.stderr.write(f"{e}\n")
continue
lines = f.readlines()
# Parse lines
experiment = None
samps = {}
data = pd.DataFrame()
for line in lines:
if line != "\n":
data_type = ""
value = {}
sections = line.split("\t")
if len(sections) == 0:
continue
value["type"] = sections[0]
for section in sections:
splice = section.split("\n")[0].split("=")
if len(splice) > 1:
val = get_value(splice)
value[splice[0]] = val
else:
data_type = splice[0]
if data_type == "experiment":
experiment = value
elif data_type == "throughput-point" or data_type == "progress-point":
experiment["speedup"] = 100 * experiment["speedup"]
data = add_throughput(data, experiment, value)
elif data_type == "latency-point":
data = add_latency(data, experiment, value)
elif data_type == "samples":
if value["location"] not in samps:
samps[value["location"]] = 0
samps[value["location"]] += int(value["count"])
elif data_type not in ["startup", "shutdown", "runtime"]:
print("Invalid profile")
result_df = pd.concat(
[
result_df,
get_speedup_data(data.sort_index()),
]
)
sample_df = pd.concat(
[
sample_df,
pd.DataFrame(
[
{"location": loc, "count": count}
for loc, count in sorted(samps.items())
]
),
]
)
read_files.append(_base_name)
file_names.append(file)
return result_df, sample_df, file_names
def parse_uploaded_file(file_name, file, experiments=".*", progress_points=".*"):
data = pd.DataFrame()
if "{" in file:
dict_data = {}
_data = json.loads(file)
dict_data = {
file_name: process_data(dict_data, _data, experiments, progress_points)
}
samps = process_samples({}, _data)
sample_data = pd.DataFrame(
[{"location": loc, "count": count} for loc, count in sorted(samps.items())]
)
data = compute_sorts(compute_speedups(dict_data))
return data, sample_data
else:
# coz
for line in file.split("\n"):
if len(line) > 0:
data_type = ""
value = {}
sections = line.split("\t")
value["type"] = sections[0]
for section in sections:
splice = section.split("\n")[0].split("=")
if len(splice) > 1:
value[splice[0]] = get_value(splice)
else:
data_type = splice[0]
if data_type == "experiment":
experiment = value
elif data_type == "throughput-point" or data_type == "progress-point":
experiment["speedup"] = 100 * experiment["speedup"]
data = add_throughput(data, experiment, value)
elif data_type == "latency-point":
data = add_latency(data, experiment, value)
elif data_type not in ["startup", "shutdown", "samples", "runtime"]:
print("Invalid profile")
# sys.exit(1)
data = get_speedup_data(data.sort_index())
return data
def get_data_point(data):
val = ""
if math.isclose(data.delta, 0, rel_tol=1e-09, abs_tol=0.0):
return np.nan
if data["type"] == "throughput":
val = data.duration / data.delta
return val
elif data["type"] == "latency":
arrivalRate = data.arrivals / data.duration
val = data.difference / arrivalRate
else:
print("invalid datapoint")
val = np.inf
if math.isinf(val):
return np.nan
else:
return val
def get_speedup_data(data):
baseline_data_point = get_data_point(data.iloc[0])
speedup_df = pd.DataFrame()
curr_selected = ""
for index, row in data.iterrows():
if curr_selected != index[0]:
curr_selected = index[0]
baseline_data_point = get_data_point(row)
maximize = True
if row["type"] == "latency":
maximize = False
if not math.isnan(baseline_data_point):
# for speedup in row:
data_point = get_data_point(row)
if not math.isnan(data_point):
progress_speedup = (
baseline_data_point - data_point
) / baseline_data_point
# speedup = row["speedup"]
speedup = row.name[2]
name = row.name[0]
if not maximize:
# We are trying to *minimize* this progress point, so negate the speedup.
progress_speedup = -progress_speedup
if progress_speedup >= -1 and progress_speedup <= 2:
speedup_df = pd.concat(
[
pd.DataFrame.from_dict(
data={
"point": [name],
"speedup": [speedup],
"progress_speedup": [100 * progress_speedup],
}
),
speedup_df,
]
)
speedup_df = speedup_df.sort_values(by=["speedup"])
return speedup_df
def find_causal_files(workload_path, verbose, recursive):
input_files = []
def find_causal_files_helper(inp, _files):
_input_files_tmp = []
for itr in _files:
if os.path.isfile(itr) and itr.endswith(".json"):
with open(itr, "r") as f:
inp_data = json.load(f)
if (
"rocprofsys" not in inp_data.keys()
or "causal" not in inp_data["rocprofsys"].keys()
):
if verbose >= 2:
print(f"{itr} is not a causal profile")
continue
_input_files_tmp += [itr]
elif os.path.isfile(itr) and itr.endswith(".coz"):
_input_files_tmp += [itr]
return _input_files_tmp
for inp in workload_path:
if verbose == 3:
print("find_causal_files inp:", inp)
if os.path.exists(inp):
if os.path.isdir(inp):
_files = glob.glob(os.path.join(inp, "**"), recursive=recursive)
_input_files_tmp = find_causal_files_helper(inp, _files)
if len(_input_files_tmp) == 0:
raise ValueError(f"No causal profiles found in {inp}")
else:
input_files += _input_files_tmp
elif os.path.isfile(inp):
input_files += [inp]
else:
_files = glob.glob(inp, recursive=recursive)
_input_files_tmp = find_causal_files_helper(inp, _files)
if len(_input_files_tmp) == 0:
raise ValueError(f"No causal profiles found in {inp}")
else:
input_files += _input_files_tmp
return input_files
def metadata_diff(json1, json2):
res = jsondiff.diff(json1, json2)
if res:
print("Diff found")
else:
print("Same")