Files
2025-09-24 09:07:20 -07:00

183 γραμμές
8.4 KiB
Python

# Copyright © Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
import os,time
from rdc_bootstrap import *
from RdcUtil import RdcUtil
from typing import Dict
default_field_ids = [
rdc_field_t.RDC_FI_GPU_MEMORY_USAGE,
rdc_field_t.RDC_FI_GPU_MEMORY_TOTAL,
rdc_field_t.RDC_FI_GPU_MM_ENC_UTIL,
rdc_field_t.RDC_FI_GPU_MM_DEC_UTIL,
rdc_field_t.RDC_FI_GPU_MEMORY_ACTIVITY,
rdc_field_t.RDC_FI_GPU_MEMORY_MAX_BANDWIDTH,
rdc_field_t.RDC_FI_GPU_MEMORY_CUR_BANDWIDTH,
rdc_field_t.RDC_FI_OAM_ID,
rdc_field_t.RDC_FI_POWER_USAGE,
rdc_field_t.RDC_FI_GPU_CLOCK,
rdc_field_t.RDC_FI_GPU_UTIL,
rdc_field_t.RDC_FI_GPU_TEMP,
rdc_field_t.RDC_FI_GPU_MEMORY_USAGE
]
default_unit_coverter = {
rdc_field_t.RDC_FI_GPU_MEMORY_USAGE: 0.000001, # MegaBytes
rdc_field_t.RDC_FI_GPU_MEMORY_TOTAL: 0.000001, # MegaBytes
rdc_field_t.RDC_FI_POWER_USAGE: 0.000001, # Watts
rdc_field_t.RDC_FI_GPU_CLOCK: 0.000001, # MHz
rdc_field_t.RDC_FI_GPU_TEMP: 0.001, # degree
}
class RdcReader:
# To run the RDC in embedded mode, set the ip_port = None
def __init__(self, ip_port = "localhost:50051", field_ids = default_field_ids,
unit_converter: Dict[int, float] = default_unit_coverter,
update_freq = 10000000, max_keep_age = 3600.0 , max_keep_samples = 1000,
field_group_name = "rdc_reader_field_group", gpu_group_name = "rdc_reader_gpu_group",
gpu_indexes = None, root_ca = "/etc/rdc/client/certs/rdc_cacert.pem",
client_cert = "/etc/rdc/client/certs/rdc_client_cert.pem",
client_key = "/etc/rdc/client/private/rdc_client_cert.key"):
result = rdc.rdc_init(0)
if rdc_status_t(result) != rdc_status_t.RDC_ST_OK:
raise Exception("RdcReader init fail: " + str(result))
self.rdc_util = RdcUtil()
self.unit_converter = unit_converter
self.rdc_handle = c_void_p()
self.is_standalone = True
if not ip_port: # embedded
self.is_standalone = False
result = rdc.rdc_start_embedded(rdc_operation_mode_t.RDC_OPERATION_MODE_AUTO, self.rdc_handle)
if rdc_status_t(result) != rdc_status_t.RDC_ST_OK:
raise Exception("RdcReader start as embedded fail: " + str(result))
else: # standalone
if root_ca == None or client_cert == None or client_key == None:
with_auth = False
root_ca_str = client_cert_str = client_key_str = None
else:
with_auth = True
root_ca_str = self.rdc_util.read_file(root_ca)
client_cert_str = self.rdc_util.read_file(client_cert)
client_key_str = self.rdc_util.read_file(client_key)
result = rdc.rdc_connect(ip_port.encode('utf-8'), self.rdc_handle, root_ca_str, client_cert_str, client_key_str)
if rdc_status_t(result) != rdc_status_t.RDC_ST_OK:
raise Exception("RdcReader standalone auth(" + str(with_auth) + ") connect to " + ip_port+ " fail: " + str(result))
# Create the GPU group
self.gpu_group_name = gpu_group_name.encode()
if gpu_indexes == None:
self.gpu_indexes = self.rdc_util.get_all_gpu_indexes(self.rdc_handle)
else:
self.gpu_indexes = []
for idx in gpu_indexes:
idx_str = str(idx)
encoded = idx_str.encode("utf-8")
phys_gpu = ctypes.c_uint32()
part_idx = ctypes.c_uint32()
if rdc.rdc_is_partition_string(encoded):
rc = rdc.rdc_parse_partition_string(encoded, ctypes.byref(phys_gpu), ctypes.byref(part_idx))
if not rc:
raise Exception("Rdc failed to parse partition string")
info = rdc_entity_info_t()
info.device_type = 0 #RDC_DEVICE_TYPE_GPU
info.entity_role = 1 #RDC_DEVICE_ROLE_PARTITION
info.instance_index = part_idx
info.device_index = phys_gpu
entity = rdc.rdc_get_entity_index_from_info(info)
self.gpu_indexes.append(entity)
else:
self.gpu_indexes.append(int(idx_str))
self.gpu_group_id, gpu_group_created = self.rdc_util.create_gpu_group(self.rdc_handle, self.gpu_group_name, self.gpu_indexes)
# Create the field group
self.field_ids = field_ids
self.field_group_name = field_group_name.encode()
self.field_group_id, field_group_created = self.rdc_util.create_field_group(self.rdc_handle, self.field_group_name, self.field_ids)
# Watch the fields
self.update_freq = update_freq
self.max_keep_age = max_keep_age
self.max_keep_samples = max_keep_samples
# Unwatch first to clean up what left from last run
rdc.rdc_field_unwatch(self.rdc_handle, self.gpu_group_id, self.field_group_id)
result = rdc.rdc_field_watch(self.rdc_handle, self.gpu_group_id,
self.field_group_id, self.update_freq, self.max_keep_age, self.max_keep_samples);
if rdc_status_t(result) != rdc_status_t.RDC_ST_OK:
raise Exception("RdcReader fail to watch group " + str(self.gpu_group_id) + ", field group " + str(self.field_group_id) + ":" + str(result))
# Process the fields periodically
def process(self):
has_succeed = False
for gindex in self.gpu_indexes:
for fid in self.field_ids:
value = rdc_field_value()
result = rdc.rdc_field_get_latest_value(self.rdc_handle,
gindex, fid, value)
if rdc_status_t(result) == rdc_status_t.RDC_ST_OK:
# Convert the unit
if self.unit_converter != None and fid in self.unit_converter:
if value.type.value == rdc_field_type_t.INTEGER:
value.value.l_int = int(value.value.l_int * self.unit_converter[fid])
if value.type.value == rdc_field_type_t.DOUBLE:
value.value.dbl = int(value.value.dbl * self.unit_converter[fid])
# convert from double to l_int
if value.type.value == rdc_field_type_t.DOUBLE:
value.value.l_int = int(value.value.dbl)
self.handle_field(gindex, value)
has_succeed = True
self.process_other_fields()
if len(self.gpu_indexes) != 0 and len(self.field_ids) != 0 and has_succeed == False:
self.try_reconnect()
def process_other_fields(self):
pass
def try_reconnect(self):
if self.is_standalone == False:
return
try:
# When rdcd restart, the GPU and field group need to be re-created.
self.gpu_group_id, gpu_group_created = self.rdc_util.create_gpu_group(self.rdc_handle, self.gpu_group_name, self.gpu_indexes)
self.field_group_id, field_group_created = self.rdc_util.create_field_group(self.rdc_handle, self.field_group_name, self.field_ids)
# rdcd restart requires to watch the group again
if gpu_group_created or field_group_created:
result = rdc.rdc_field_watch(self.rdc_handle, self.gpu_group_id,
self.field_group_id, self.update_freq, self.max_keep_age, self.max_keep_samples);
if rdc_status_t(result) != rdc_status_t.RDC_ST_OK:
raise Exception("RdcReader fail to watch group " + str(self.gpu_group_id) + ", field group " + str(self.field_group_id) + ":" + str(result))
except Exception as e:
print(e)
def handle_field(self, gpu_index, value):
info = rdc.rdc_get_info_from_entity_index(gpu_index)
if info.entity_role == 1: #RDC_DEVICE_ROLE_PARTITION_INSTANCE
gpu_str = f"g{info.device_index}.{info.instance_index}"
else:
gpu_str = str(info.device_index)
field_name = self.rdc_util.field_id_string(value.field_id)
print("%d %s:%d %s:%d" % (value.ts, gpu_str, value.field_id.value, field_name, value.value.l_int))
if __name__ == '__main__':
# Run the reader in embedded mode
reader = RdcReader(ip_port=None, update_freq=1000000)
while True:
time.sleep(1)
reader.process()