#!/usr/bin/env python3 # MIT License # # Copyright (c) 2024-2025 Advanced Micro Devices, Inc. All rights reserved. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys import ctypes import json def fatal_error(msg, exit_code=1): sys.stderr.write(f"Fatal error: {msg}\n") sys.stderr.flush() sys.exit(exit_code) def build_counter_string(obj): counter_str = "\n".join( ["{:20}:\t{}".format(key, itr) for key, itr in obj.get_as_dict().items()] ) counter_str += "\n" + "{:20}:\t".format("Dimensions") counter_str += " ".join(dim.__str__() for dim in obj.dimensions) return counter_str class dimension: columns = ["Dimension_Id", "Dimension_Name", "Dimension_Instances"] def __init__(self, dimension_id, dimension_name, dimension_instances): self.id = dimension_id self.name = dimension_name self.instances = dimension_instances def get_as_dict(self): return dict(zip((self.columns), [self.id, self.name, self.instances])) def __str__(self): dimension = "{}[0:{}]".format( self.get_as_dict()["Dimension_Name"], self.get_as_dict()["Dimension_Instances"] - 1, ) return dimension class counter: columns = ["Counter_Name", "Description"] def __init__( self, counter_handle, counter_name, counter_description, counter_dimensions, is_hw_constant, ): self.name = counter_name self.counter_handle = counter_handle self.description = counter_description self.dimensions = counter_dimensions self.is_hw_constant = is_hw_constant def get_as_dict(self): return dict(zip((self.columns), [self.name, self.description])) def __str__(self): return "\n".join( ["{:20}:\t{}".format(key, itr) for key, itr in self.get_as_dict().items()] ) class derived_counter(counter): columns = ["Counter_Name", "Description", "Expression"] def __init__( self, counter_handle, counter_name, counter_description, counter_expression, counter_dimensions, is_hw_constant, ): super().__init__( counter_handle, counter_name, counter_description, counter_dimensions, is_hw_constant, ) self.expression = counter_expression def get_as_dict(self): return dict(zip((self.columns), [self.name, self.description, self.expression])) def __str__(self): return build_counter_string(self) class basic_counter(counter): columns = ["Counter_Name", "Description", "Block"] def __init__( self, counter_handle, counter_name, counter_description, counter_block, counter_dimensions, is_hw_constant, ): super().__init__( counter_handle, counter_name, counter_description, counter_dimensions, is_hw_constant, ) self.block = counter_block def get_as_dict(self): return dict(zip((self.columns), [self.name, self.description, self.block])) def __str__(self): return build_counter_string(self) class pc_config: columns = ["Method", "Unit", "Min_Interval", "Max_Interval", "Flags"] def __init__(self, config_method, config_unit, min_interval, max_interval, flags): self.method = self.get_method_string(config_method.value) self.unit = self.get_unit_string(config_unit.value) self.min_interval = min_interval self.max_interval = max_interval self.flags = flags def __str__(self): return "\n".join( [ " {:20}:\t{}".format( key, itr if key == "Method" or key == "Unit" else self.get_value(key, itr), ) for key, itr in self.get_as_dict().items() ] ) @staticmethod def get_value(key, itr): if key == "Min_Interval" or key == "Max_Interval": return itr.value elif key == "Flags": if itr.value == 1: return "interval pow2" else: return "none" else: fatal_error("Incorrect key") @staticmethod def get_method_string(key): method_map = {1: "stochastic", 2: "host_trap"} return method_map[key] @staticmethod def get_unit_string(key): unit_map = {1: "instructions", 2: "cycle", 3: "time"} return unit_map[key] def get_as_dict(self): return dict( zip( (self.columns), [ self.method, self.unit, self.min_interval, self.max_interval, self.flags, ], ) ) class loadLibrary: libname = None c_lib = None def get_library(): get_library.libname = None if loadLibrary.c_lib is None: loadLibrary.c_lib = ctypes.CDLL(loadLibrary.libname) return loadLibrary.c_lib def get_string_value(str_ptr): return ctypes.cast(str_ptr, ctypes.c_char_p).value.decode("utf-8") def get_agent_info(agent_handle): lib = get_library() lib.agent_info.argtypes = [ctypes.c_ulong, ctypes.POINTER(ctypes.c_char_p)] agent_info_str = ctypes.c_char_p() lib.agent_info(agent_handle, ctypes.byref(agent_info_str)) return json.loads(agent_info_str.value.decode("utf-8")) def get_number_of_counters(agent_handle): lib = get_library() lib.get_number_of_counters.restype = ctypes.c_ulong lib.get_number_of_counters.argtypes = [ctypes.c_ulong] return lib.get_number_of_agent_counters(agent_handle) def get_number_agents(): lib = get_library() lib.get_number_of_agents.restype = ctypes.c_ulong return lib.get_number_of_agents() def get_agent_handles(): lib = get_library() num_agents = get_number_agents() lib.agent_handles.argtypes = [ctypes.c_ulong * num_agents, ctypes.c_ulong] agent_handles_arr = (ctypes.c_ulong * num_agents)() lib.agent_handles(agent_handles_arr, num_agents) return list(agent_handles_arr) def get_agent_info_map(): agent_info_map = {} agents = get_agent_handles() for agent in agents: agent_info_map[agent] = get_agent_info(agent) return agent_info_map def get_number_of_agent_counters(agent_handle): lib = get_library() lib.get_number_of_agent_counters.argtypes = [ctypes.c_ulong] return lib.get_number_of_agent_counters(agent_handle) def get_agent_counter_handles(agent_handle): lib = get_library() num_counters = get_number_of_agent_counters(agent_handle) lib.agent_counter_handles.argtypes = [ ctypes.c_ulong * num_counters, ctypes.c_ulong, ctypes.c_ulong, ] counter_handles = (ctypes.c_ulong * num_counters)() lib.agent_counter_handles(counter_handles, agent_handle, num_counters) return list(counter_handles) def get_dimensions(counter_handle): lib = get_library() lib.get_number_of_dimensions.argtypes = [ctypes.c_ulong] lib.get_number_of_dimensions.restype = ctypes.c_ulong num_dims = lib.get_number_of_dimensions(counter_handle) lib.counter_dimension_ids.argtypes = [ ctypes.c_ulong, ctypes.c_ulong * num_dims, ctypes.c_uint, ] dims_ids = (ctypes.c_ulong * num_dims)() lib.counter_dimension.argtypes = [ ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_char_p), ctypes.POINTER(ctypes.c_uint), ] lib.counter_dimension_ids(counter_handle, dims_ids, num_dims) dimensions = [] for dim_id in list(dims_ids): dimension_name = ctypes.c_char_p() dimension_instance = ctypes.c_uint() lib.counter_dimension( counter_handle, dim_id, ctypes.byref(dimension_name), ctypes.byref(dimension_instance), ) dim = dimension( dim_id, get_string_value(dimension_name), dimension_instance.value ) dimensions.append(dim) return dimensions def get_counters(): agent_counters = {} agents = get_agent_handles() agent_counters = {} agent_info_map = get_agent_info_map() for agent in agents: if agent_info_map[agent]["type"] != 2: continue agent_counters.setdefault(agent, []) counters = get_agent_counter_handles(agent) if counters: for counter_id in list(counters): counter_info = get_counter_info(counter_id) agent_counters[agent].append(counter_info) return agent_counters def get_pc_sample_configs(): agent_pc_sample_config = {} agents = get_agent_handles() for agent in agents: configs = get_pc_sample_config(agent) if len(configs) > 0: agent_pc_sample_config[agent] = configs return agent_pc_sample_config def get_counter_info(counter_handle): lib = get_library() lib.counter_info.argtypes = [ ctypes.c_ulong, ctypes.POINTER(ctypes.c_char_p), ctypes.POINTER(ctypes.c_char_p), ctypes.POINTER(ctypes.c_uint), ctypes.POINTER(ctypes.c_uint), ] counter_name = ctypes.c_char_p() counter_description = ctypes.c_char_p() is_derived = ctypes.c_uint() is_hw_constant = ctypes.c_uint() lib.counter_info( counter_handle, ctypes.byref(counter_name), ctypes.byref(counter_description), ctypes.byref(is_derived), ctypes.byref(is_hw_constant), ) if is_derived.value == 1: lib.counter_expression.argtypes = [ ctypes.c_ulong, ctypes.POINTER(ctypes.c_char_p), ] expression = ctypes.c_char_p() lib.counter_expression(counter_handle, ctypes.byref(expression)) dimensions = get_dimensions(counter_handle) return derived_counter( counter_handle, get_string_value(counter_name), get_string_value(counter_description), get_string_value(expression), dimensions, is_hw_constant, ) elif not is_hw_constant.value: lib.counter_block.argtypes = [ctypes.c_ulong, ctypes.POINTER(ctypes.c_char_p)] block = ctypes.c_char_p() lib.counter_block(counter_handle, ctypes.byref(block)) dimensions = get_dimensions(counter_handle) return basic_counter( counter_handle, get_string_value(counter_name), get_string_value(counter_description), get_string_value(block), dimensions, is_hw_constant, ) else: return counter( counter_handle, get_string_value(counter_name), get_string_value(counter_description), [], is_hw_constant.value, ) def get_number_of_pc_sample_configs(agent_handle): lib = get_library() lib.get_number_of_pc_sample_configs.argtypes = [ctypes.c_ulong] lib.get_number_of_pc_sample_configs.restype = ctypes.c_ulong return lib.get_number_of_pc_sample_configs(agent_handle) def get_pc_sample_config(agent_handle): lib = get_library() num_configs = get_number_of_pc_sample_configs(agent_handle) lib.pc_sample_config.argtypes = [ ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong), ctypes.POINTER(ctypes.c_ulong), ctypes.POINTER(ctypes.c_ulong), ctypes.POINTER(ctypes.c_ulong), ctypes.POINTER(ctypes.c_ulong), ] pc_configs = [] for config in range(0, num_configs): method = (ctypes.c_ulong)() unit = (ctypes.c_ulong)() max_interval = (ctypes.c_ulong)() min_interval = (ctypes.c_ulong)() flags = (ctypes.c_ulong)() lib.pc_sample_config( agent_handle, config, ctypes.byref(method), ctypes.byref(unit), ctypes.byref(min_interval), ctypes.byref(max_interval), flags, ) pc_configs.append( pc_config( method, unit, min_interval, max_interval, flags, ) ) return pc_configs def check_pmc(agent_counter): lib = get_library() def get_counter_names(counter_ids): counter_names = [] for counter_id in counter_ids: counter = get_counter_info(counter_id) if counter.counter_handle == counter_id: counter_names.append(counter.name) return counter_names def get_agent_name(agent_id): agent_info_map = get_agent_info_map() for agent, info in agent_info_map.items(): if agent == agent_id: return info["name"] for agent, counter_ids in agent_counter.items(): num_counters = len(counter_ids) counters = (ctypes.c_ulong * num_counters)(*counter_ids) lib.is_counter_set.argtypes = [ ctypes.c_ulong * num_counters, ctypes.c_ulong, ctypes.c_ulong, ] lib.is_counter_set.restype = ctypes.c_bool if lib.is_counter_set(counters, agent, num_counters) is False: fatal_error( "{} not collected on agent {}".format( " ".join(get_counter_names(counter_ids)), get_agent_name(agent) ) ) return True