2023-03-20 13:29:28 -05:00
#!/usr/bin/env python3
2023-03-06 06:20:21 -06:00
#
2024-11-14 11:27:38 -06:00
# Copyright (C) Advanced Micro Devices. All rights reserved.
2023-03-06 06:20:21 -06:00
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2023-04-19 23:58:33 -05:00
import csv
2023-03-06 06:20:21 -06:00
import json
2023-04-19 23:58:33 -05:00
import re
2023-03-06 06:20:21 -06:00
import time
2023-12-06 16:57:26 -06:00
from typing import Dict
2023-03-06 06:20:21 -06:00
from enum import Enum
from amdsmi_helpers import AMDSMIHelpers
class AMDSMILogger ( ) :
2025-07-11 11:37:23 -05:00
def __init__ ( self , format = ' human_readable ' , destination = ' stdout ' , helpers = None ) - > None :
2023-03-06 06:20:21 -06:00
self . output = { }
self . multiple_device_output = [ ]
self . watch_output = [ ]
self . format = format # csv, json, or human_readable
self . destination = destination # stdout, path to a file (append)
2023-12-14 07:18:38 -06:00
self . table_title = " "
2024-06-09 16:31:34 -05:00
self . table_header = " "
self . secondary_table_title = " "
self . secondary_table_header = " "
2025-03-20 17:23:01 -05:00
self . warning_message = " "
2025-07-11 11:37:23 -05:00
if helpers is None :
# If helpers is not provided, create a new instance
self . helpers = AMDSMIHelpers ( )
else :
self . helpers = helpers
2025-04-12 01:54:57 -05:00
self . _cper_exit_message = True
2025-04-23 00:08:43 -05:00
self . store_cpu_json_output = [ ]
self . store_core_json_output = [ ]
self . store_gpu_json_output = [ ]
2025-06-03 12:08:25 -05:00
self . store_xgmi_metric_json_output = [ ]
2025-10-29 13:03:43 -05:00
self . store_xgmi_source_status_json_output = [ ]
2025-06-03 12:08:25 -05:00
self . store_xgmi_link_status_json_output = [ ]
2025-06-04 14:52:27 -05:00
self . store_current_partition_json_output = [ ]
self . store_memory_partition_json_output = [ ]
self . store_partition_profiles_json_output = [ ]
self . store_partition_resources_json_output = [ ]
2023-03-06 06:20:21 -06:00
class LoggerFormat ( Enum ) :
""" Enum for logger formats """
json = ' json '
csv = ' csv '
human_readable = ' human_readable '
2023-05-18 15:53:48 -05:00
class CsvStdoutBuilder ( object ) :
def __init__ ( self ) :
self . csv_string = [ ]
def write ( self , row ) :
self . csv_string . append ( row )
def __str__ ( self ) :
return ' ' . join ( self . csv_string )
2023-03-06 06:20:21 -06:00
def is_json_format ( self ) :
return self . format == self . LoggerFormat . json . value
def is_csv_format ( self ) :
return self . format == self . LoggerFormat . csv . value
def is_human_readable_format ( self ) :
return self . format == self . LoggerFormat . human_readable . value
2024-06-08 05:36:01 -05:00
2025-02-04 09:30:12 -06:00
def clear_multiple_devices_output ( self ) :
2023-12-07 07:33:17 -08:00
self . multiple_device_output . clear ( )
2023-03-06 06:20:21 -06:00
2024-06-08 05:36:01 -05:00
2025-08-21 12:35:18 -05:00
def get_cper_exit_message ( self ) :
""" Get the cper exit message
2025-04-12 01:54:57 -05:00
params:
2025-08-21 12:35:18 -05:00
None
2025-04-12 01:54:57 -05:00
return:
cper_exit_message (bool) - True if cper exit message is set
"""
return self . _cper_exit_message
def set_cper_exit_message ( self , flag : bool ) :
""" Set the cper exit message
params:
flag (bool) - True if cper exit message is set
return:
Nothing
"""
self . _cper_exit_message = flag
2023-04-19 23:58:33 -05:00
def _capitalize_keys ( self , input_dict ) :
output_dict = { }
for key in input_dict . keys ( ) :
# Capitalize key if it is a string
if isinstance ( key , str ) :
cap_key = key . upper ( )
else :
cap_key = key
if isinstance ( input_dict [ key ] , dict ) :
output_dict [ cap_key ] = self . _capitalize_keys ( input_dict [ key ] )
elif isinstance ( input_dict [ key ] , list ) :
cap_key_list = [ ]
for data in input_dict [ key ] :
if isinstance ( data , dict ) :
cap_key_list . append ( self . _capitalize_keys ( data ) )
else :
cap_key_list . append ( data )
output_dict [ cap_key ] = cap_key_list
else :
output_dict [ cap_key ] = input_dict [ key ]
return output_dict
2025-01-15 20:28:45 -06:00
def _convert_json_to_tabular ( self , json_object : Dict [ str , any ] , dynamic = False ) :
# TODO make dynamic - convert other python CLI outputs to use (as needed)
# Update: using dynamic=true provides dynamic re-sizing based on key name length
2024-06-09 16:31:34 -05:00
table_values = ' '
stored_gpu = ' '
stored_timestamp = ' '
for key , value in json_object . items ( ) :
string_value = str ( value )
2025-01-15 20:28:45 -06:00
if key == ' partition_id ' :
# Special case for partition_id: 8 partitions + 7 comma + 2 spaces = 17
table_values + = string_value . ljust ( 17 )
continue
key_length = len ( key ) + 2
if dynamic and len ( key ) > 0 :
stored_gpu = string_value
table_values + = string_value . ljust ( key_length )
elif key == ' gpu ' :
2024-06-09 16:31:34 -05:00
stored_gpu = string_value
table_values + = string_value . rjust ( 3 )
2025-08-06 16:03:06 -05:00
elif key == ' xcp ' :
stored_gpu = string_value
table_values + = string_value . rjust ( 5 )
2024-06-09 16:31:34 -05:00
elif key == ' timestamp ' :
stored_timestamp = string_value
table_values + = string_value . rjust ( 10 ) + ' '
elif key == ' power_usage ' :
table_values + = string_value . rjust ( 7 )
2025-03-26 17:45:08 -05:00
elif key == ' max_power ' :
table_values + = string_value . rjust ( 9 )
2025-02-26 22:34:36 -06:00
elif key in ( ' hotspot_temperature ' , ' memory_temperature ' ) :
table_values + = string_value . rjust ( 8 )
elif key in ( ' gfx ' , ' mem ' ) :
table_values + = string_value . rjust ( 7 )
elif key in ( ' gfx_clk ' ) :
table_values + = string_value . rjust ( 10 )
2025-08-06 16:03:06 -05:00
elif key in ( ' vram_usage ' ) :
table_values + = string_value . rjust ( 16 )
2025-02-26 22:34:36 -06:00
elif key in ( ' mem_clock ' , ' vram_used ' ) :
2024-06-09 16:31:34 -05:00
table_values + = string_value . rjust ( 11 )
2025-03-26 13:12:41 -05:00
elif key in ( ' vram_total ' , ' vram_free ' ) :
table_values + = string_value . rjust ( 12 )
elif key == ' vram_percent ' :
table_values + = string_value . rjust ( 9 )
2025-02-26 22:34:36 -06:00
elif key in ( ' encoder ' , ' decoder ' ) :
table_values + = string_value . rjust ( 7 )
2024-12-06 12:18:21 -06:00
elif key in ( ' vclock ' , ' dclock ' ) :
2024-12-20 09:59:15 -06:00
table_values + = string_value . rjust ( 10 )
2025-03-26 13:12:41 -05:00
elif key in ( ' single_bit_ecc ' , ' double_bit_ecc ' , ' pcie_bw ' ) :
2024-06-09 16:31:34 -05:00
table_values + = string_value . rjust ( 12 )
2025-02-26 22:34:36 -06:00
elif key in ( ' pcie_replay ' ) :
2024-06-09 16:31:34 -05:00
table_values + = string_value . rjust ( 13 )
# Only for handling topology tables
elif ' gpu_ ' in key :
table_values + = string_value . ljust ( 13 )
# Only for handling xgmi tables
elif key == " gpu# " :
table_values + = string_value . ljust ( 7 )
elif key == " bdf " :
2024-11-07 16:35:17 -06:00
table_values + = string_value . ljust ( 14 )
2024-06-09 16:31:34 -05:00
elif " bdf_ " in key :
table_values + = string_value . ljust ( 13 )
elif key == " bit_rate " :
2024-11-07 16:35:17 -06:00
table_values + = string_value . ljust ( 10 )
2024-06-09 16:31:34 -05:00
elif key == " max_bandwidth " :
2024-11-07 16:35:17 -06:00
table_values + = string_value . ljust ( 15 )
2024-06-09 16:31:34 -05:00
elif key == " link_type " :
2024-11-07 16:35:17 -06:00
table_values + = string_value . ljust ( 11 )
elif key == " link_status " :
for i in value :
table_values + = str ( i ) . ljust ( 3 )
2024-06-09 16:31:34 -05:00
elif key == " RW " :
2024-11-07 16:35:17 -06:00
table_values + = string_value . ljust ( 57 )
2025-01-08 16:35:53 -06:00
elif key in ( ' pviol ' , ' tviol ' ) :
table_values + = string_value . rjust ( 7 )
elif key == " tviol_active " :
table_values + = string_value . rjust ( 14 )
elif key == " phot_tviol " :
table_values + = string_value . rjust ( 12 )
elif key == " vr_tviol " :
table_values + = string_value . rjust ( 10 )
elif key == " hbm_tviol " :
table_values + = string_value . rjust ( 11 )
2025-01-08 22:07:23 -06:00
elif key == " gfx_clkviol " :
table_values + = string_value . rjust ( 13 )
2025-08-06 16:03:06 -05:00
elif key in ( " gfxclk_pviol " , " gfxclk_tviol " , " gfxclk_totalviol " , " low_utilviol " ) :
table_values + = string_value . rjust ( 58 )
2024-06-09 16:31:34 -05:00
elif key == " process_list " :
#Add an additional padding between the first instance of GPU and NAME
table_values + = ' '
for process_dict in value :
if process_dict [ ' process_info ' ] == " No running processes detected " :
# Add N/A for empty process_info
2025-05-30 19:37:58 -05:00
table_values + = " N/A " . rjust ( 17 ) + " N/A " . rjust ( 9 ) + " N/A " . rjust ( 10 ) + \
" N/A " . rjust ( 10 ) + " N/A " . rjust ( 10 ) + " N/A " . rjust ( 10 ) + \
2025-10-28 14:49:03 -05:00
" N/A " . rjust ( 9 ) + " N/A " . rjust ( 10 ) + ' \n '
2024-06-09 16:31:34 -05:00
else :
2025-05-29 20:35:27 -05:00
#Fix this herre
2024-06-09 16:31:34 -05:00
for process_key , process_value in process_dict [ ' process_info ' ] . items ( ) :
string_process_value = str ( process_value )
if process_key == " name " :
# Truncate name if too long
2025-08-21 11:41:03 -05:00
if string_process_value == " " or string_process_value == " N/A " :
2024-06-09 16:31:34 -05:00
process_name = " N/A "
2025-08-21 11:41:03 -05:00
else :
process_name = string_process_value . split ( ' / ' ) [ - 1 ] [ : 17 ]
2025-05-30 19:37:58 -05:00
table_values + = process_name . rjust ( 17 )
2024-06-09 16:31:34 -05:00
elif process_key == " pid " :
table_values + = string_process_value . rjust ( 9 )
elif process_key == " memory_usage " :
for memory_key , memory_value in process_value . items ( ) :
table_values + = str ( memory_value ) . rjust ( 10 )
2025-05-30 19:37:58 -05:00
elif process_key == " mem_usage " :
table_values + = string_process_value . rjust ( 10 )
2025-05-29 20:35:27 -05:00
elif process_key == " cu_occupancy " :
table_values + = string_process_value . rjust ( 9 )
2025-10-28 14:49:03 -05:00
elif process_key == " evicted_time " :
table_values + = string_process_value . rjust ( 9 )
2024-06-09 16:31:34 -05:00
# Add the stored gpu and stored timestamp to the next line
table_values + = ' \n '
if stored_timestamp :
table_values + = stored_timestamp . ljust ( 10 ) + ' '
table_values + = stored_gpu . rjust ( 3 ) + ' '
# Remove excess two values after a new line in table_values
table_values = table_values [ : table_values . rfind ( ' \n ' ) ]
table_values + = ' \n '
2025-11-24 13:12:09 -06:00
# Board temperature key patterns
elif any ( pattern in key for pattern in [ ' IBC ' , ' OAM ' , ' RETIMER ' , ' UBB ' , ' HSC ' , ' VR ' , ' VDDCR ' , ' NODE ' , ' VDD ' , ' HBM ' ] ) :
table_values + = string_value . rjust ( max ( ( len ( key ) + 2 ) , 7 ) )
2024-06-09 16:31:34 -05:00
# Default spacing
else :
table_values + = string_value . rjust ( 10 )
return table_values . rstrip ( )
2023-12-06 16:57:26 -06:00
2024-06-09 16:31:34 -05:00
def _convert_json_to_human_readable ( self , json_object : Dict [ str , any ] ) :
2023-04-19 23:58:33 -05:00
# First Capitalize all keys in the json object
capitalized_json = self . _capitalize_keys ( json_object )
2024-02-21 06:46:02 -06:00
# Increase tabbing for device arguments by pulling them out of the main dictionary and assiging them to an empty string
tabbed_dictionary = { }
for key , value in capitalized_json . items ( ) :
if key not in [ " GPU " , " CPU " , " CORE " ] :
tabbed_dictionary [ key ] = value
2025-04-28 12:28:59 -05:00
# Filter out N/A values under clock
if key == " CLOCK " :
valid_clock_data = { }
if isinstance ( value , dict ) : # Ensure value is a dictionary
for clock_key , clock_data in value . items ( ) :
if isinstance ( clock_data , dict ) : # Ensure clock_data is a dictionary
non_na = {
clock_key : clock_value
for clock_key , clock_value in clock_data . items ( )
if clock_value != " N/A "
}
if non_na :
valid_clock_data [ clock_key ] = non_na
2025-04-29 11:44:21 -05:00
else : # Handle single-tier clock_data
2025-04-28 12:28:59 -05:00
valid_clock_data [ clock_key ] = clock_data
2025-04-29 11:44:21 -05:00
else : # Handle non-dictionary clock data
2025-04-28 12:28:59 -05:00
valid_clock_data = value
2025-05-02 13:06:36 -05:00
# Add a single "N/A" if valid_clock_data is empty
if not valid_clock_data :
valid_clock_data = " N/A "
2025-04-28 12:28:59 -05:00
tabbed_dictionary [ key ] = valid_clock_data
2024-02-21 06:46:02 -06:00
for key , value in tabbed_dictionary . items ( ) :
del capitalized_json [ key ]
capitalized_json [ " AMDSMI_SPACING_REMOVAL " ] = tabbed_dictionary
2024-11-18 10:10:13 -06:00
# Convert the capitalized JSON to a YAML-like string
yaml_output = self . custom_dump ( capitalized_json )
2023-04-19 23:58:33 -05:00
# Remove a key line if it is a spacer
yaml_output = yaml_output . replace ( " AMDSMI_SPACING_REMOVAL: \n " , " " )
yaml_output = yaml_output . replace ( " ' " , " " ) # Remove ''
2024-06-08 05:36:01 -05:00
# Remove process_info indicies for Host parity:
yaml_output = re . sub ( r ' PROCESS_INFO_[0-9]+: ' , ' PROCESS_INFO: ' , yaml_output )
2023-04-19 23:58:33 -05:00
clean_yaml_output = ' '
for line in yaml_output . splitlines ( ) :
line = line . split ( ' : ' )
# Remove dashes and increase tabbing split key
line [ 0 ] = line [ 0 ] . replace ( " - " , " " , 1 )
line [ 0 ] = line [ 0 ] . replace ( " " , " " )
# Join cleaned output
line = ' : ' . join ( line ) + ' \n '
clean_yaml_output + = line
return clean_yaml_output
2024-11-18 10:10:13 -06:00
def custom_dump ( self , data , indent = 0 ) :
""" Converts a Python dictionary to a YAML-like string. """
yaml_string = " "
for key , value in data . items ( ) :
if isinstance ( value , dict ) :
yaml_string + = " " * indent + f " { key } : \n " + self . custom_dump ( value , indent + 1 )
elif isinstance ( value , list ) :
2025-05-27 19:19:43 -05:00
if not value :
yaml_string + = " " * indent + f " { key } : N/A \n "
2025-06-06 13:51:58 -05:00
else :
2025-05-27 19:19:43 -05:00
yaml_string + = " " * indent + f " { key } : \n "
2024-11-18 10:10:13 -06:00
for item in value :
2024-12-09 09:43:03 -06:00
if isinstance ( item , dict ) :
yaml_string + = self . custom_dump ( item , indent + 1 )
else : # If the list is not a dictionary, print it as a string
yaml_string + = " " * ( indent + 1 ) + f " - { item } \n "
2024-11-18 10:10:13 -06:00
else :
yaml_string + = " " * indent + f " { key } : { value } \n "
return yaml_string
2023-04-19 23:58:33 -05:00
2023-04-21 15:10:38 -05:00
def flatten_dict ( self , target_dict , topology_override = False ) :
2023-04-19 23:58:33 -05:00
""" This will flatten a dictionary out to a single level of key value stores
removing key ' s with dictionaries and wrapping each value to in a list
ex:
{
' usage ' : {
' gfx_usage ' : 0,
' mem_usage ' : 0,
' mm_usage_list ' : [22,0,0]
}
}
to:
{
' gfx_usage ' : 0,
' mem_usage ' : 0,
' mm_usage_list ' : [22,0,0]}
}
Args:
target_dict (dict): Dictionary to flatten
"""
output_dict = { }
# First flatten out values
# separetly handle ras and process and firmware
# If there are multi values, and the values are all dicts
# Then flatten the sub values with parent key
for key , value in target_dict . items ( ) :
if isinstance ( value , dict ) :
# Check number of items in the dict
2023-04-21 15:10:38 -05:00
if len ( value . values ( ) ) > 1 or topology_override :
2023-04-19 23:58:33 -05:00
value_with_parent_key = { }
for parent_key , child_dict in value . items ( ) :
if isinstance ( child_dict , dict ) :
2023-04-21 08:02:53 -05:00
if parent_key in ( ' gfx ' ) :
for child_key , value1 in child_dict . items ( ) :
value_with_parent_key [ child_key ] = value1
else :
for child_key , value1 in child_dict . items ( ) :
value_with_parent_key [ parent_key + ' _ ' + child_key ] = value1
2023-04-19 23:58:33 -05:00
else :
2023-04-21 15:10:38 -05:00
if topology_override :
value_with_parent_key [ key + ' _ ' + parent_key ] = child_dict
else :
value_with_parent_key [ parent_key ] = child_dict
2023-04-19 23:58:33 -05:00
value = value_with_parent_key
output_dict . update ( self . flatten_dict ( value ) . items ( ) )
else :
output_dict [ key ] = value
return output_dict
2023-03-06 06:20:21 -06:00
def store_output ( self , device_handle , argument , data ) :
2023-09-24 02:03:51 -05:00
""" Convert device handle to gpu id and store output
2023-03-06 06:20:21 -06:00
params:
device_handle - device handle object to the target device output
argument (str) - key to store data
data (dict | list) - Data store against argument
return:
Nothing
"""
2023-04-21 08:02:53 -05:00
gpu_id = self . helpers . get_gpu_id_from_device_handle ( device_handle )
2023-09-24 02:03:51 -05:00
self . _store_output_amdsmi ( gpu_id = gpu_id , argument = argument , data = data )
2023-03-06 06:20:21 -06:00
2023-12-07 07:33:17 -08:00
def store_cpu_output ( self , device_handle , argument , data ) :
""" Convert device handle to cpu id and store output
params:
device_handle - device handle object to the target device output
argument (str) - key to store data
data (dict | list) - Data store against argument
return:
Nothing
"""
cpu_id = self . helpers . get_cpu_id_from_device_handle ( device_handle )
self . _store_cpu_output_amdsmi ( cpu_id = cpu_id , argument = argument , data = data )
def store_core_output ( self , device_handle , argument , data ) :
""" Convert device handle to core id and store output
params:
device_handle - device handle object to the target device output
argument (str) - key to store data
data (dict | list) - Data store against argument
return:
Nothing
"""
core_id = self . helpers . get_core_id_from_device_handle ( device_handle )
self . _store_core_output_amdsmi ( core_id = core_id , argument = argument , data = data )
2024-02-21 03:48:09 -06:00
2023-12-07 07:33:17 -08:00
def _store_core_output_amdsmi ( self , core_id , argument , data ) :
if argument == ' timestamp ' : # Make sure timestamp is the first element in the output
self . output [ ' timestamp ' ] = int ( time . time ( ) )
if self . is_json_format ( ) or self . is_human_readable_format ( ) :
self . output [ ' core ' ] = int ( core_id )
if argument == ' values ' and isinstance ( data , dict ) :
self . output . update ( data )
else :
self . output [ argument ] = data
elif self . is_csv_format ( ) :
self . output [ ' core ' ] = int ( core_id )
if argument == ' values ' or isinstance ( data , dict ) :
flat_dict = self . flatten_dict ( data )
self . output . update ( flat_dict )
else :
self . output [ argument ] = data
else :
2025-08-21 12:35:18 -05:00
raise ValueError ( " Invalid output format: expected json, csv, or human_readable " )
2023-12-07 07:33:17 -08:00
def _store_cpu_output_amdsmi ( self , cpu_id , argument , data ) :
if argument == ' timestamp ' : # Make sure timestamp is the first element in the output
self . output [ ' timestamp ' ] = int ( time . time ( ) )
if self . is_json_format ( ) or self . is_human_readable_format ( ) :
self . output [ ' cpu ' ] = int ( cpu_id )
if argument == ' values ' and isinstance ( data , dict ) :
self . output . update ( data )
else :
self . output [ argument ] = data
elif self . is_csv_format ( ) :
self . output [ ' cpu ' ] = int ( cpu_id )
if argument == ' values ' or isinstance ( data , dict ) :
flat_dict = self . flatten_dict ( data )
self . output . update ( flat_dict )
else :
self . output [ argument ] = data
else :
2025-08-21 12:35:18 -05:00
raise ValueError ( " Invalid output format: expected json, csv, or human_readable " )
2023-12-07 07:33:17 -08:00
2023-03-06 06:20:21 -06:00
def _store_output_amdsmi ( self , gpu_id , argument , data ) :
2023-04-21 08:02:53 -05:00
if argument == ' timestamp ' : # Make sure timestamp is the first element in the output
self . output [ ' timestamp ' ] = int ( time . time ( ) )
2023-03-06 06:20:21 -06:00
if self . is_json_format ( ) or self . is_human_readable_format ( ) :
self . output [ ' gpu ' ] = int ( gpu_id )
if argument == ' values ' and isinstance ( data , dict ) :
self . output . update ( data )
else :
self . output [ argument ] = data
elif self . is_csv_format ( ) :
2023-04-19 23:58:33 -05:00
self . output [ ' gpu ' ] = int ( gpu_id )
if argument == ' values ' or isinstance ( data , dict ) :
flat_dict = self . flatten_dict ( data )
self . output . update ( flat_dict )
else :
self . output [ argument ] = data
2023-03-06 06:20:21 -06:00
else :
2025-08-21 12:35:18 -05:00
raise ValueError ( " Invalid output format: expected json, csv, or human_readable " )
2023-03-06 06:20:21 -06:00
def store_multiple_device_output ( self ) :
""" Store the current output into the multiple_device_output
then clear the current output
params:
None
return:
Nothing
"""
2023-04-19 23:58:33 -05:00
if not self . output :
return
2023-04-21 08:02:53 -05:00
output = { }
for key , value in self . output . items ( ) :
output [ key ] = value
2023-03-06 06:20:21 -06:00
2023-04-21 08:02:53 -05:00
self . multiple_device_output . append ( output )
2023-04-19 23:58:33 -05:00
self . output = { }
2023-03-06 06:20:21 -06:00
2023-04-21 08:02:53 -05:00
def store_watch_output ( self , multiple_device_enabled = False ) :
2023-03-06 06:20:21 -06:00
""" Add the current output or multiple_devices_output
params:
2023-04-21 08:02:53 -05:00
multiple_device_enabled (bool) - True if watching multiple devices
2023-03-06 06:20:21 -06:00
return:
Nothing
"""
2023-04-21 08:02:53 -05:00
if multiple_device_enabled :
for output in self . multiple_device_output :
self . watch_output . append ( output )
2023-03-06 06:20:21 -06:00
2023-04-21 08:02:53 -05:00
self . multiple_device_output = [ ]
else :
output = { }
2023-03-06 06:20:21 -06:00
2023-04-21 08:02:53 -05:00
for key , value in self . output . items ( ) :
output [ key ] = value
self . watch_output . append ( output )
2023-03-06 06:20:21 -06:00
2023-04-21 08:02:53 -05:00
self . output = { }
2025-01-15 20:28:45 -06:00
def print_output ( self , multiple_device_enabled = False , watching_output = False , tabular = False , dual_csv_output = False , dynamic = False ) :
2023-03-06 06:20:21 -06:00
""" Print current output acording to format and then destination
params:
2023-04-21 08:02:53 -05:00
multiple_device_enabled (bool) - True if printing output from
2023-03-06 06:20:21 -06:00
multiple devices
2023-04-21 08:02:53 -05:00
watching_output (bool) - True if printing watch output
2025-01-15 20:28:45 -06:00
dynamic (bool) - Defaults to False. True turns on dynamic resizing for
left justified table output
2023-03-06 06:20:21 -06:00
return:
Nothing
"""
if self . is_json_format ( ) :
2023-04-21 08:02:53 -05:00
self . _print_json_output ( multiple_device_enabled = multiple_device_enabled ,
watching_output = watching_output )
2023-03-06 06:20:21 -06:00
elif self . is_csv_format ( ) :
2024-06-09 16:31:34 -05:00
if dual_csv_output :
self . _print_dual_csv_output ( multiple_device_enabled = multiple_device_enabled ,
watching_output = watching_output )
else :
self . _print_csv_output ( multiple_device_enabled = multiple_device_enabled ,
watching_output = watching_output )
2023-03-06 06:20:21 -06:00
elif self . is_human_readable_format ( ) :
2024-06-09 16:31:34 -05:00
# If tabular output is enabled, redirect to _print_tabular_output
if tabular :
2025-01-15 20:28:45 -06:00
self . _print_tabular_output ( multiple_device_enabled = multiple_device_enabled , watching_output = watching_output , dynamic = dynamic )
2024-06-09 16:31:34 -05:00
else :
self . _print_human_readable_output ( multiple_device_enabled = multiple_device_enabled ,
2024-06-24 10:35:34 -05:00
watching_output = watching_output )
2023-03-06 06:20:21 -06:00
2023-04-21 08:02:53 -05:00
def _print_json_output ( self , multiple_device_enabled = False , watching_output = False ) :
if multiple_device_enabled :
2023-04-19 23:58:33 -05:00
json_output = self . multiple_device_output
else :
2024-02-22 03:18:10 -06:00
json_output = [ self . output ]
2023-04-19 23:58:33 -05:00
2023-03-06 06:20:21 -06:00
if self . destination == ' stdout ' :
2023-04-21 08:02:53 -05:00
if json_output :
json_std_output = json . dumps ( json_output , indent = 4 )
2023-04-19 23:58:33 -05:00
print ( json_std_output )
2023-03-06 06:20:21 -06:00
else : # Write output to file
2023-04-21 08:02:53 -05:00
if watching_output : # Flush the full JSON output to the file on watch command completion
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' w ' , encoding = " utf-8 " ) as output_file :
2023-03-06 06:20:21 -06:00
json . dump ( self . watch_output , output_file , indent = 4 )
else :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' a ' , encoding = " utf-8 " ) as output_file :
2023-04-19 23:58:33 -05:00
json . dump ( json_output , output_file , indent = 4 )
2023-03-06 06:20:21 -06:00
2025-04-23 00:08:43 -05:00
def combine_arrays_to_json ( self ) :
2025-06-03 12:08:25 -05:00
combined_json = { }
if self . store_cpu_json_output :
combined_json [ " cpu_data " ] = self . store_cpu_json_output
if self . store_core_json_output :
combined_json [ " core_data " ] = self . store_core_json_output
if self . store_gpu_json_output :
combined_json [ " gpu_data " ] = self . store_gpu_json_output
if self . store_xgmi_metric_json_output :
combined_json [ " xgmi_metric " ] = self . store_xgmi_metric_json_output
2025-10-29 13:03:43 -05:00
if self . store_xgmi_source_status_json_output :
combined_json [ " link_port_status " ] = self . store_xgmi_source_status_json_output
2025-06-03 12:08:25 -05:00
if self . store_xgmi_link_status_json_output :
combined_json [ " link_status " ] = self . store_xgmi_link_status_json_output
2025-06-04 14:52:27 -05:00
if self . store_current_partition_json_output :
combined_json [ " current_partition " ] = self . store_current_partition_json_output
if self . store_memory_partition_json_output :
combined_json [ " memory_partition " ] = self . store_memory_partition_json_output
if self . store_partition_profiles_json_output :
combined_json [ " partition_profiles " ] = self . store_partition_profiles_json_output
if self . store_partition_resources_json_output :
combined_json [ " partition_resources " ] = self . store_partition_resources_json_output
2025-06-03 12:08:25 -05:00
2025-12-19 13:10:42 -06:00
if self . destination == ' stdout ' :
json_std_output = json . dumps ( combined_json , indent = 4 )
print ( json_std_output )
else :
with open ( self . destination , ' w ' , encoding = " utf-8 " ) as output_file :
json . dump ( combined_json , output_file , indent = 4 )
2025-04-23 00:08:43 -05:00
2023-04-21 08:02:53 -05:00
def _print_csv_output ( self , multiple_device_enabled = False , watching_output = False ) :
if multiple_device_enabled :
2023-04-19 23:58:33 -05:00
stored_csv_output = self . multiple_device_output
else :
if not isinstance ( self . output , list ) :
stored_csv_output = [ self . output ]
2023-09-24 21:25:35 -05:00
if stored_csv_output :
csv_keys = set ( )
for output in stored_csv_output :
for key in output :
csv_keys . add ( key )
for index , output_dict in enumerate ( stored_csv_output ) :
remaining_keys = csv_keys - set ( output_dict . keys ( ) )
for key in remaining_keys :
stored_csv_output [ index ] [ key ] = " N/A "
2023-03-06 06:20:21 -06:00
if self . destination == ' stdout ' :
2023-04-21 08:02:53 -05:00
if stored_csv_output :
2023-09-24 21:25:35 -05:00
# Get the header as a list of the first element to maintain order
2023-04-19 23:58:33 -05:00
csv_header = stored_csv_output [ 0 ] . keys ( )
2023-04-21 08:02:53 -05:00
csv_stdout_output = self . CsvStdoutBuilder ( )
writer = csv . DictWriter ( csv_stdout_output , csv_header )
2023-04-19 23:58:33 -05:00
writer . writeheader ( )
writer . writerows ( stored_csv_output )
2023-04-21 08:02:53 -05:00
print ( str ( csv_stdout_output ) )
else :
if watching_output :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' w ' , newline = ' ' , encoding = " utf-8 " ) as output_file :
2023-04-21 08:02:53 -05:00
if self . watch_output :
2023-09-24 21:25:35 -05:00
csv_keys = set ( )
for output in self . watch_output :
for key in output :
csv_keys . add ( key )
for index , output_dict in enumerate ( self . watch_output ) :
remaining_keys = csv_keys - set ( output_dict . keys ( ) )
for key in remaining_keys :
self . watch_output [ index ] [ key ] = " N/A "
# Get the header as a list of the first element to maintain order
2023-04-21 08:02:53 -05:00
csv_header = self . watch_output [ 0 ] . keys ( )
writer = csv . DictWriter ( output_file , csv_header )
writer . writeheader ( )
writer . writerows ( self . watch_output )
else :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' a ' , newline = ' ' , encoding = " utf-8 " ) as output_file :
2023-09-24 21:25:35 -05:00
# Get the header as a list of the first element to maintain order
2023-04-21 08:02:53 -05:00
csv_header = stored_csv_output [ 0 ] . keys ( )
writer = csv . DictWriter ( output_file , csv_header )
writer . writeheader ( )
writer . writerows ( stored_csv_output )
2023-03-06 06:20:21 -06:00
2024-06-09 16:31:34 -05:00
def _print_dual_csv_output ( self , multiple_device_enabled = False , watching_output = False ) :
if multiple_device_enabled :
stored_csv_output = self . multiple_device_output
else :
if not isinstance ( self . output , list ) :
stored_csv_output = [ self . output ]
primary_csv_output = [ ]
secondary_csv_output = [ ]
if stored_csv_output :
# Split stored_csv_output into primary_csv and secondary_csv
for output_dict in stored_csv_output :
if ' process_list ' in output_dict :
# Add a new entry for each process_info
for process_info_dict in output_dict [ ' process_list ' ] :
secondary_output_dict = { }
if watching_output :
secondary_output_dict [ ' timestamp ' ] = output_dict [ ' timestamp ' ]
secondary_output_dict [ ' gpu ' ] = output_dict [ ' gpu ' ]
if isinstance ( process_info_dict [ " process_info " ] , dict ) :
for process_field , process_value in process_info_dict [ " process_info " ] . items ( ) :
if isinstance ( process_value , dict ) :
for key , value in process_value . items ( ) :
secondary_output_dict [ key ] = value
else :
secondary_output_dict [ process_field ] = process_value
else :
# Handle no process found case
secondary_output_dict [ " process_info " ] = process_info_dict [ " process_info " ]
secondary_csv_output . append ( secondary_output_dict )
primary_output_dict = { }
for key , value in output_dict . items ( ) :
if key != ' process_list ' :
primary_output_dict [ key ] = value
primary_csv_output . append ( primary_output_dict )
# Ensure uniform data within primary and secondary csv outputs
if primary_csv_output :
primary_keys = set ( )
for output in primary_csv_output :
for key in output :
primary_keys . add ( key )
# insert empty data to align with keys that may not exist
for index , output_dict in enumerate ( primary_csv_output ) :
remaining_keys = primary_keys - set ( output_dict . keys ( ) )
for key in remaining_keys :
primary_csv_output [ index ] [ key ] = " N/A "
if secondary_csv_output :
secondary_keys = set ( )
for output in secondary_csv_output :
for key in output :
secondary_keys . add ( key )
# insert empty data to align with keys that may not exist
for index , output_dict in enumerate ( secondary_csv_output ) :
remaining_keys = secondary_keys - set ( output_dict . keys ( ) )
for key in remaining_keys :
secondary_csv_output [ index ] [ key ] = " N/A "
if self . destination == ' stdout ' :
if primary_csv_output :
# Get the header as a list of the first element to maintain order
csv_header = primary_csv_output [ 0 ] . keys ( )
csv_stdout_output = self . CsvStdoutBuilder ( )
writer = csv . DictWriter ( csv_stdout_output , csv_header )
writer . writeheader ( )
writer . writerows ( primary_csv_output )
print ( str ( csv_stdout_output ) )
if secondary_csv_output :
# Get the header as a list of the first element to maintain order
csv_header = secondary_csv_output [ 0 ] . keys ( )
csv_stdout_output = self . CsvStdoutBuilder ( )
writer = csv . DictWriter ( csv_stdout_output , csv_header )
writer . writeheader ( )
writer . writerows ( secondary_csv_output )
print ( str ( csv_stdout_output ) )
if watching_output :
print ( )
else :
if watching_output :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' w ' , newline = ' ' , encoding = " utf-8 " ) as output_file :
2024-06-09 16:31:34 -05:00
primary_csv_output = [ ]
secondary_csv_output = [ ]
if self . watch_output :
# Split watch_output into primary_csv and secondary_csv
for output_dict in self . watch_output :
if ' process_list ' in output_dict :
# Add a new entry for each process_info
for process_info_dict in output_dict [ ' process_list ' ] :
secondary_output_dict = { }
if watching_output :
secondary_output_dict [ ' timestamp ' ] = output_dict [ ' timestamp ' ]
secondary_output_dict [ ' gpu ' ] = output_dict [ ' gpu ' ]
if isinstance ( process_info_dict [ " process_info " ] , dict ) :
for process_field , process_value in process_info_dict [ " process_info " ] . items ( ) :
if isinstance ( process_value , dict ) :
for key , value in process_value . items ( ) :
secondary_output_dict [ key ] = value
else :
secondary_output_dict [ process_field ] = process_value
else :
# Handle no process found case
secondary_output_dict [ " process_info " ] = process_info_dict [ " process_info " ]
secondary_csv_output . append ( secondary_output_dict )
primary_output_dict = { }
for key , value in output_dict . items ( ) :
if key != ' process_list ' :
primary_output_dict [ key ] = value
primary_csv_output . append ( primary_output_dict )
# Ensure uniform data within primary and secondary csv outputs
if primary_csv_output :
primary_keys = set ( )
for output in primary_csv_output :
for key in output :
primary_keys . add ( key )
# insert empty data to align with keys that may not exist
for index , output_dict in enumerate ( primary_csv_output ) :
remaining_keys = primary_keys - set ( output_dict . keys ( ) )
for key in remaining_keys :
primary_csv_output [ index ] [ key ] = " N/A "
if secondary_csv_output :
secondary_keys = set ( )
for output in secondary_csv_output :
for key in output :
secondary_keys . add ( key )
# insert empty data to align with keys that may not exist
for index , output_dict in enumerate ( secondary_csv_output ) :
remaining_keys = secondary_keys - set ( output_dict . keys ( ) )
for key in remaining_keys :
secondary_csv_output [ index ] [ key ] = " N/A "
if primary_csv_output :
# Get the header as a list of the first element to maintain order
csv_header = primary_csv_output [ 0 ] . keys ( )
writer = csv . DictWriter ( output_file , csv_header )
writer . writeheader ( )
writer . writerows ( primary_csv_output )
if secondary_csv_output :
output_file . write ( " \n " )
csv_header = secondary_csv_output [ 0 ] . keys ( )
writer = csv . DictWriter ( output_file , csv_header )
writer . writeheader ( )
writer . writerows ( secondary_csv_output )
else :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' a ' , newline = ' ' , encoding = " utf-8 " ) as output_file :
2024-06-09 16:31:34 -05:00
if primary_csv_output :
# Get the header as a list of the first element to maintain order
csv_header = primary_csv_output [ 0 ] . keys ( )
writer = csv . DictWriter ( output_file , csv_header )
writer . writeheader ( )
writer . writerows ( primary_csv_output )
if secondary_csv_output :
output_file . write ( " \n " )
csv_header = secondary_csv_output [ 0 ] . keys ( )
writer = csv . DictWriter ( output_file , csv_header )
writer . writeheader ( )
writer . writerows ( secondary_csv_output )
2023-12-06 16:57:26 -06:00
def _print_human_readable_output ( self , multiple_device_enabled = False , watching_output = False , tabular = False ) :
2024-06-09 16:31:34 -05:00
# If tabular output is enabled, redirect to _print_tabular_output
2023-12-14 07:18:38 -06:00
if tabular :
2024-06-09 16:31:34 -05:00
self . _print_tabular_output ( multiple_device_enabled = multiple_device_enabled , watching_output = watching_output )
return
human_readable_output = ' '
2023-12-06 16:57:26 -06:00
2023-04-21 08:02:53 -05:00
if multiple_device_enabled :
2024-06-09 16:31:34 -05:00
for device_output in self . multiple_device_output :
human_readable_output + = self . _convert_json_to_human_readable ( device_output ) + ' \n '
2023-03-06 06:20:21 -06:00
else :
2024-06-09 16:31:34 -05:00
human_readable_output + = self . _convert_json_to_human_readable ( self . output )
2023-03-06 06:20:21 -06:00
if self . destination == ' stdout ' :
2023-03-21 16:57:55 -05:00
try :
2023-04-19 23:58:33 -05:00
# printing as unicode may fail if locale is not set properly
print ( human_readable_output )
2023-03-21 16:57:55 -05:00
except UnicodeEncodeError :
# print as ascii, ignore incompatible characters
2023-04-19 23:58:33 -05:00
print ( human_readable_output . encode ( ' ascii ' , ' ignore ' ) . decode ( ' ascii ' ) )
2023-03-06 06:20:21 -06:00
else :
2023-04-21 08:02:53 -05:00
if watching_output :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' w ' , encoding = " utf-8 " ) as output_file :
2023-04-21 08:02:53 -05:00
human_readable_output = ' '
for output in self . watch_output :
2024-06-09 16:31:34 -05:00
human_readable_output + = self . _convert_json_to_human_readable ( output )
2023-05-18 15:53:48 -05:00
output_file . write ( human_readable_output + ' \n ' )
2023-04-21 08:02:53 -05:00
else :
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' a ' , encoding = " utf-8 " ) as output_file :
2023-05-18 15:53:48 -05:00
output_file . write ( human_readable_output + ' \n ' )
2024-06-09 16:31:34 -05:00
2025-01-15 20:28:45 -06:00
def _print_tabular_output ( self , multiple_device_enabled = False , watching_output = False , dynamic = False ) :
2024-06-09 16:31:34 -05:00
primary_table = ' '
secondary_table = ' '
# Populate primary table without process_list
# Populate secondary table with process_list if exists
if multiple_device_enabled and self . multiple_device_output :
for device_output in self . multiple_device_output :
if ' process_list ' in device_output :
process_table_dict = { }
if watching_output :
process_table_dict [ ' timestamp ' ] = device_output [ ' timestamp ' ]
process_table_dict [ ' gpu ' ] = device_output [ ' gpu ' ]
process_table_dict [ ' process_list ' ] = device_output [ ' process_list ' ]
secondary_table + = self . _convert_json_to_tabular ( process_table_dict ) + ' \n '
# Add primary table keys without process_list
primary_table_output = { }
for key , value in device_output . items ( ) :
if key != ' process_list ' :
primary_table_output [ key ] = value
2025-01-15 20:28:45 -06:00
primary_table + = self . _convert_json_to_tabular ( primary_table_output , dynamic = dynamic ) + ' \n '
2024-06-09 16:31:34 -05:00
else : # Single device output
if ' process_list ' in self . output :
process_table_dict = { }
if watching_output :
process_table_dict [ ' timestamp ' ] = self . output [ ' timestamp ' ]
process_table_dict [ ' gpu ' ] = self . output [ ' gpu ' ]
process_table_dict [ ' process_list ' ] = self . output [ ' process_list ' ]
secondary_table + = self . _convert_json_to_tabular ( process_table_dict ) + ' \n '
# Add primary table keys without process_list
primary_table_output = { }
for key , value in self . output . items ( ) :
if key != ' process_list ' :
primary_table_output [ key ] = value
2025-01-15 20:28:45 -06:00
primary_table + = self . _convert_json_to_tabular ( primary_table_output , dynamic = dynamic ) + ' \n '
2024-06-09 16:31:34 -05:00
primary_table = primary_table . rstrip ( )
secondary_table = secondary_table . rstrip ( )
# Add primary table title and header to primary_table
if primary_table :
primary_table_heading = ' '
if self . table_title :
primary_table_heading = self . table_title + ' : \n '
2025-03-20 17:23:01 -05:00
if self . warning_message : # Add warning message below the table title
primary_table_heading + = self . warning_message + ' \n '
2024-06-09 16:31:34 -05:00
primary_table_heading + = self . table_header + ' \n '
primary_table = primary_table_heading + primary_table
# Add secondary table title and header to secondary_table
# Currently just process_info uses this logic
if secondary_table :
secondary_table_heading = ' '
if self . secondary_table_title :
secondary_table_heading = ' \n ' + self . secondary_table_title + ' : \n '
secondary_table_heading + = self . secondary_table_header + ' \n '
secondary_table = secondary_table_heading + secondary_table
if self . destination == ' stdout ' :
try :
# printing as unicode may fail if locale is not set properly
print ( primary_table )
if secondary_table :
print ( secondary_table )
if watching_output :
print ( " \n " )
except UnicodeEncodeError :
# print as ascii, ignore incompatible characters
print ( primary_table . encode ( ' ascii ' , ' ignore ' ) . decode ( ' ascii ' ) )
if secondary_table :
print ( secondary_table . encode ( ' ascii ' , ' ignore ' ) . decode ( ' ascii ' ) )
if watching_output :
print ( " \n " )
else :
if watching_output : # Write all stored watched output to a file
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' w ' , encoding = " utf-8 " ) as output_file :
2024-06-09 16:31:34 -05:00
primary_table = ' '
secondary_table = ' '
# Add process_list to the secondary_table
# Add remaining watch_output to the primary_table
for device_output in self . watch_output :
# if process_list is detected in device_output store in secondary_table
if ' process_list ' in device_output :
process_table_dict = {
' timestamp ' : device_output [ ' timestamp ' ] ,
' gpu ' : device_output [ ' gpu ' ] ,
' process_list ' : device_output [ ' process_list ' ]
}
secondary_table + = self . _convert_json_to_tabular ( process_table_dict ) + ' \n '
# Add primary table keys without process_list
primary_table_output = { }
for key , value in device_output . items ( ) :
if key != ' process_list ' :
primary_table_output [ key ] = value
2025-01-15 20:28:45 -06:00
primary_table + = self . _convert_json_to_tabular ( primary_table_output , dynamic = dynamic ) + ' \n '
2024-06-09 16:31:34 -05:00
primary_table = primary_table . rstrip ( ) # Remove trailing new line
secondary_table = secondary_table . rstrip ( )
# Add primary table title and header to primary_table
if primary_table :
primary_table_heading = ' '
if self . table_title :
primary_table_heading = self . table_title + ' : \n '
2025-03-20 17:23:01 -05:00
if self . warning_message : # Add warning message below the table title
primary_table_heading + = self . warning_message + ' \n '
2024-06-09 16:31:34 -05:00
primary_table_heading + = self . table_header + ' \n '
primary_table = primary_table_heading + primary_table
# Add secondary table title and header to secondary_table
# Currently just process_info uses this logic
if secondary_table :
secondary_table_heading = ' '
if self . secondary_table_title :
secondary_table_heading = ' \n ' + self . secondary_table_title + ' : \n '
secondary_table_heading + = self . secondary_table_header + ' \n '
secondary_table = secondary_table_heading + secondary_table
# Write both full tables to the file
output_file . write ( primary_table )
if secondary_table :
output_file . write ( " \n " + secondary_table )
else : # Write all singular output to a file
2024-07-12 08:41:25 -05:00
with self . destination . open ( ' a ' , encoding = " utf-8 " ) as output_file :
2024-06-09 16:31:34 -05:00
output_file . write ( primary_table + ' \n ' )
output_file . write ( secondary_table )
2025-05-29 17:14:58 -05:00
def print_default_output ( self , output : Dict ) :
# some template lines
default_line_1 = " +------------------------------------------------------------------------------+ "
2025-06-13 16:43:56 -05:00
default_line_2 = " |-------------------------------------+----------------------------------------| "
default_line_3 = " |=====================================+========================================| "
default_line_4 = " +-------------------------------------+----------------------------------------+ "
2025-06-02 17:15:15 -05:00
default_line_5 = " |==============================================================================| "
2025-05-29 17:14:58 -05:00
# print the version information first
2025-05-30 18:40:18 -05:00
amd_smi_version = str ( output [ ' version_info ' ] [ ' amd-smi ' ] )
2025-12-04 23:23:34 -06:00
if len ( amd_smi_version ) > 60 :
amd_smi_version = amd_smi_version [ : 57 ] + " ... "
2025-05-29 17:14:58 -05:00
rocm_version = " N/A "
if output [ ' version_info ' ] [ ' rocm version ' ] [ 0 ] :
2025-05-30 18:40:18 -05:00
rocm_version = str ( output [ ' version_info ' ] [ ' rocm version ' ] [ 1 ] ) . ljust ( 8 )
driver_version = output [ ' version_info ' ] [ ' amdgpu version ' ]
if driver_version == " N/A " :
amdgpu_version = " N/A " . ljust ( 8 )
else :
2025-08-06 10:36:22 -05:00
# Example driver version string for amdgpu: 6.8.0-60 : 'Linuxversion6.8.0-60-generic(buildd@lcy02-amd64-098)(x86_64-linux-gnu-gcc-12(Ubuntu12.3.0-1ubuntu1~22.04)12.3.0,GNUld(GNUBinutilsforUbuntu)2.38)#63~22.04.1-UbuntuSMPPREEMPT_DYNAMICTueApr2219:00:15UTC2'
# Extract version before "-generic" if it exists
if ' -generic ' in driver_version [ ' driver_version ' ] :
# Extract version using regex to find pattern like "6.8.0-60"
match = re . search ( r ' ( \ d+ \ . \ d+ \ . \ d+- \ d+) ' , driver_version [ ' driver_version ' ] )
if match :
2025-12-04 23:23:34 -06:00
amdgpu_version = match . group ( 1 ) [ : 80 ]
2025-08-06 10:36:22 -05:00
else :
2025-12-04 23:23:34 -06:00
amdgpu_version = " N/A "
2025-08-06 10:36:22 -05:00
else :
2025-12-04 23:23:34 -06:00
amdgpu_version = str ( driver_version [ ' driver_version ' ] ) [ : 80 ]
2025-08-21 17:58:39 -05:00
fw_pldm_version = str ( output [ ' version_info ' ] [ ' fw pldm version ' ] )
vbios_version = str ( output [ ' version_info ' ] [ ' vbios version ' ] )
2026-01-15 13:11:58 -06:00
kernel_version = str ( output [ ' version_info ' ] [ ' kernel version ' ] )
2025-05-29 17:14:58 -05:00
# print GPU info
print ( default_line_1 )
2025-12-04 23:23:34 -06:00
# Split the version line into 3 lines, each wrapping to the same width
print ( " | AMD-SMI {0:40s} {1:19s} | " . format ( amd_smi_version . ljust ( 40 ) , " " ) )
2026-01-15 13:11:58 -06:00
# Print amdgpu or kernel version based on availability, if neither then don't print
if amdgpu_version . strip ( ) != " N/A " :
2025-12-04 23:23:34 -06:00
print ( " | amdgpu Version: {0:40s} {1:19s} | " . format ( amdgpu_version , " " ) )
2026-01-15 13:11:58 -06:00
elif kernel_version . strip ( ) != " N/A " :
print ( " | OS kernel Version: {0:40s} {1:19s} | " . format ( kernel_version , " " ) )
2025-12-04 23:23:34 -06:00
if rocm_version != " N/A " :
print ( " | ROCm Version: {0:40s} {1:19s} | " . format ( rocm_version , " " ) )
# only print if the version is not "N/A"
if vbios_version != " N/A " :
print ( " | VBIOS Version: {0:22s} {1:35s} | " . format ( vbios_version , " " ) )
if fw_pldm_version != " N/A " :
print ( " | FW PLDM: {0:15s} {1:42s} | " . format ( fw_pldm_version , " " ) )
print ( " | Platform: {0:25.25s} {1:34s} | " . format ( str ( self . helpers . os_info ( ) ) , " " ) )
2025-05-29 17:14:58 -05:00
print ( default_line_2 )
2025-06-13 16:43:56 -05:00
print ( " | BDF GPU-Name | Mem-Uti Temp UEC Power-Usage | " )
print ( " | GPU HIP-ID OAM-ID Partition-Mode | GFX-Uti Fan Mem-Usage | " )
2025-05-29 17:14:58 -05:00
print ( default_line_3 )
line_count = 0
end = len ( output [ ' gpu_info_list ' ] ) - 1
for gpu_info in output [ ' gpu_info_list ' ] :
bdf = str ( gpu_info [ ' bdf ' ] ) . ljust ( 12 )
market_name = str ( gpu_info [ ' market_name ' ] )
if len ( market_name ) > 22 :
market_name = ( " ... " + market_name [ - 19 : ] )
market_name = market_name . rjust ( 22 )
mem_util = gpu_info [ ' mem_util ' ]
if mem_util != " N/A " :
mem_util = str ( mem_util ) + " % "
2025-06-13 16:43:56 -05:00
mem_util = mem_util . ljust ( 5 )
2025-05-29 17:14:58 -05:00
temp = gpu_info [ ' temp ' ]
if temp != " N/A " :
temp = str ( temp ) + " \u00b0 C "
temp = temp . rjust ( 6 )
2025-06-13 16:43:56 -05:00
u_ecc = str ( gpu_info [ ' uncorr_ecc ' ] ) . ljust ( 5 )
2025-05-29 17:14:58 -05:00
power_usage = gpu_info [ ' power_usage ' ]
if power_usage != " N/A " :
2025-12-08 21:36:45 -06:00
power_limit = gpu_info [ ' power_usage ' ] [ ' power_limit ' ]
if power_limit != 0 :
power_limit = f " / { power_limit } "
else :
power_limit = " "
power_usage = f " { gpu_info [ ' power_usage ' ] [ ' current_power ' ] } { power_limit } W "
2025-06-13 16:43:56 -05:00
power_usage = str ( power_usage ) . rjust ( 13 )
2025-05-29 17:14:58 -05:00
gpu_id = str ( gpu_info [ ' gpu_id ' ] ) . rjust ( 3 )
hip_id = str ( gpu_info [ ' hip_id ' ] ) . rjust ( 6 )
2025-06-13 16:43:56 -05:00
oam_id = str ( gpu_info [ ' oam_id ' ] ) . rjust ( 6 )
2025-05-29 17:14:58 -05:00
partition_modes = str ( gpu_info [ ' partition_mode ' ] ) . rjust ( 14 )
gfx_util = gpu_info [ ' gfx_util ' ]
if gfx_util != " N/A " :
gfx_util = str ( gfx_util ) + " % "
2025-06-13 16:43:56 -05:00
gfx_util = gfx_util . ljust ( 5 )
2025-05-29 17:14:58 -05:00
fan = gpu_info [ ' fan ' ]
if fan != " N/A " :
fan = str ( fan ) + " % "
2025-06-13 16:43:56 -05:00
fan = fan . rjust ( 6 )
2025-05-29 17:14:58 -05:00
mem_usage = gpu_info [ ' mem_usage ' ]
if mem_usage != " N/A " :
2025-12-09 21:49:06 -06:00
# Support both VRAM and GTT memory types for APU-aware display
if ' used_gtt ' in mem_usage and ' total_gtt ' in mem_usage :
# GTT memory selected (likely APU)
mem_usage = f " { gpu_info [ ' mem_usage ' ] [ ' used_gtt ' ] } / { gpu_info [ ' mem_usage ' ] [ ' total_gtt ' ] } MB "
elif ' used_vram ' in mem_usage and ' total_vram ' in mem_usage :
# VRAM memory selected (standard or APU with more VRAM)
mem_usage = f " { gpu_info [ ' mem_usage ' ] [ ' used_vram ' ] } / { gpu_info [ ' mem_usage ' ] [ ' total_vram ' ] } MB "
else :
# Fallback if neither format is found
mem_usage = " N/A "
2025-06-13 16:43:56 -05:00
mem_usage = mem_usage . rjust ( 21 )
print ( " | {0:12.12s} {1:22.22s} | {2:5.5s} {3:6.6s} {4:5.5s} {5:13.13s} | " . format ( bdf , market_name , mem_util , temp , u_ecc , power_usage ) )
print ( " | {0:3.3s} {1:6.6s} {2:6.6s} {3:14.14s} | {4:5.5s} {5:6.6s} {6:21.21s} | " . format ( gpu_id , hip_id , oam_id , partition_modes , gfx_util , fan , mem_usage ) )
2025-05-29 17:14:58 -05:00
if line_count < end :
print ( default_line_2 )
line_count + = 1
print ( default_line_4 )
2025-06-02 17:15:15 -05:00
# print process list of all GPUs last
print ( default_line_1 )
print ( " | Processes: | " )
2025-06-17 19:59:34 -05:00
print ( " | GPU PID Process Name GTT_MEM VRAM_MEM MEM_USAGE CU % | " )
2025-06-02 17:15:15 -05:00
print ( default_line_5 )
2025-07-07 11:26:10 -05:00
elevated_permission_error = False
2025-06-02 17:15:15 -05:00
if len ( output [ ' processes ' ] ) != 0 :
for process in output [ ' processes ' ] :
gpu_id = str ( process [ ' gpu ' ] ) . rjust ( 4 )
pid = str ( process [ ' pid ' ] ) . rjust ( 9 )
2025-08-04 13:19:50 -05:00
if str ( process [ ' name ' ] ) == " N/A " :
process_name = " N/A " . ljust ( 19 )
else :
process_name = str ( process [ ' name ' ] ) . split ( ' / ' ) [ - 1 ] . ljust ( 19 )
2025-06-13 16:43:56 -05:00
gtt_mem = str ( process [ ' gtt ' ] ) . rjust ( 8 )
vram_mem = str ( process [ ' vram ' ] ) . rjust ( 8 )
2025-06-02 17:15:15 -05:00
mem_usage = str ( process [ ' mem_usage ' ] ) . rjust ( 9 )
2025-06-18 19:37:01 -05:00
if process [ ' cu_occupancy ' ] [ ' total_num_cu ' ] != " N/A " and process [ ' cu_occupancy ' ] [ ' current_cu ' ] != " N/A " :
cu_occupancy = ( str ( round ( process [ ' cu_occupancy ' ] [ ' current_cu ' ] / process [ ' cu_occupancy ' ] [ ' total_num_cu ' ] * 100 , 1 ) ) + " % " ) . rjust ( 7 )
2025-06-17 19:59:34 -05:00
else :
cu_occupancy = " N/A "
2025-10-30 12:37:21 -05:00
print ( " | {0:4.4s} {1:9.9s} {2:19.19s} {3:8.8s} {4:8.8s} {5:9.9s} {6:7.7s} | " . format (
gpu_id , pid , process_name , gtt_mem , vram_mem , mem_usage , cu_occupancy ) )
2025-07-07 11:26:10 -05:00
if process [ ' name ' ] == " N/A " :
elevated_permission_error = True
2025-06-02 17:15:15 -05:00
else :
print ( " | No running processes found | " )
2025-07-07 11:26:10 -05:00
print ( default_line_1 )
if elevated_permission_error :
print ( " Process Name may require elevated permissions. " )