From 588773f9bf787cb53d84b47499b91a751ef9d3e1 Mon Sep 17 00:00:00 2001 From: vedithal-amd Date: Tue, 23 Dec 2025 13:12:18 -0500 Subject: [PATCH] [rocprofiler-compute] Fix for multi process workload profiling (#2418) * Fix for multi process workload profiling Native counter collection tool updates: * Do not dump empty counter data for a process * Use PID instead of UUID for dumped csv files to facilitate correlation * Handle merging multiple pairs of rocpd (from sdk tool) and csv (from native tool) files * Handle merging multiple pairs of csv (from sdk tool) and csv (from native tool) files Rocpd output format updates: * Merge multiple rocpd databases into a single csv * Reset dispatch id and kernel id for unique dispatches and unique kernels respectively * Retain multiple rocpd databases per run for multi process workloads * Add test case for multiprocess profiling using rocflop workload * Add rocflop * Fix native counter csv to rocprofv3 csv conversion * Use kernel_id instead of dispatch_id to correlate native counter csv and kernel trace csv * python formatting using ruff 0.14 instead of 0.13 --- projects/rocprofiler-compute/CHANGELOG.md | 3 + projects/rocprofiler-compute/CMakeLists.txt | 1 + .../rocprofiler-compute/sample/rocflop.cpp | 681 ++++++++++++++++++ .../src/lib/rocprofiler_compute_tool.cpp | 54 +- .../src/rocprof_compute_tui/widgets/charts.py | 6 +- .../src/utils/analysis_orm.py | 3 +- .../src/utils/mi_gpu_spec.py | 3 +- .../rocprofiler-compute/src/utils/parser.py | 20 +- .../src/utils/rocpd_data.py | 31 +- projects/rocprofiler-compute/src/utils/tty.py | 3 +- .../rocprofiler-compute/src/utils/utils.py | 187 ++--- .../rocprofiler-compute/tests/CMakeLists.txt | 8 + .../tests/test_profile_general.py | 24 + 13 files changed, 880 insertions(+), 144 deletions(-) create mode 100644 projects/rocprofiler-compute/sample/rocflop.cpp diff --git a/projects/rocprofiler-compute/CHANGELOG.md b/projects/rocprofiler-compute/CHANGELOG.md index 4ce51c86f2..e55041e700 100644 --- a/projects/rocprofiler-compute/CHANGELOG.md +++ b/projects/rocprofiler-compute/CHANGELOG.md @@ -45,6 +45,9 @@ Full documentation for ROCm Compute Profiler is available at [https://rocm.docs. * Fix the check to prevent showing table where a column is full of N/A * Improve detection of empty values when metric evalulation fails due to counter data missing +* Fix the wrong logic in native counter csv to rocprofv3 csv conversion + * Use kernel_id instead of dispatch_id to correlate native counter csv and kernel trace csv + ### Removed * Removed "VL1 Lat" metric for AMD Instinct MI300 series GPUs, due to MI300 series not supporting TCP_TCP_LATENCY_sum counter. diff --git a/projects/rocprofiler-compute/CMakeLists.txt b/projects/rocprofiler-compute/CMakeLists.txt index bedea3c56f..ab1d4866a8 100644 --- a/projects/rocprofiler-compute/CMakeLists.txt +++ b/projects/rocprofiler-compute/CMakeLists.txt @@ -748,6 +748,7 @@ if(INSTALL_TESTS) tests/hip_dynamic_shared tests/laplace_eqn tests/mat_mul_max + tests/rocflop DESTINATION ${CMAKE_INSTALL_LIBEXECDIR}/${PROJECT_NAME}/tests COMPONENT tests ) diff --git a/projects/rocprofiler-compute/sample/rocflop.cpp b/projects/rocprofiler-compute/sample/rocflop.cpp new file mode 100644 index 0000000000..a2fe1438ee --- /dev/null +++ b/projects/rocprofiler-compute/sample/rocflop.cpp @@ -0,0 +1,681 @@ +// Copied from https://github.com/benrichard-amd/rocflop/tree/82f197e12314bab694fc70451a2b495b4f51bf90 + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using float16 = _Float16; + +// Vector types. Useful for packed math (where supported) and MFMA inputs. +template +using vecT = T __attribute__((ext_vector_type(Rank))); + +template using vec4 = vecT; +template using vec8 = vecT; + + +// Kernels + + +template __global__ void fma_throughput(vec4* buffer, int count) +{ + const T k = 1.0; + + const int grid_size = gridDim.x * blockDim.x; + const int tid = blockDim.x * blockIdx.x + threadIdx.x; + + vec4* ptr = buffer; + + vec4 value0 = ptr[0 * grid_size + tid]; + vec4 value1 = ptr[1 * grid_size + tid]; + vec4 value2 = ptr[2 * grid_size + tid]; + vec4 value3 = ptr[3 * grid_size + tid]; + + for(int j = 0; j < count; j++) { + for(int j = 0; j < 64; j++) { + + // 16 FMA ops + value0 = value0 * value0 + k; + value1 = value1 * value1 + k; + value2 = value2 * value2 + k; + value3 = value3 * value3 + k; + } + } + + ptr[tid] = value0 + value1 + value2 + value3; +} + +__global__ void matmul_fp16_throughput(vec4* inputs, vec4* outputs, int count) +{ + int grid_size = gridDim.x * blockDim.x; + int tid = blockDim.x * blockIdx.x + threadIdx.x; + + vec4* ptr = inputs; + + vec4 value0 = ptr[0 * grid_size + tid]; + vec4 value1 = ptr[1 * grid_size + tid]; + vec4 value2 = ptr[2 * grid_size + tid]; + vec4 value3 = ptr[3 * grid_size + tid]; + + vec4 accum0; + vec4 accum1; + vec4 accum2; + vec4 accum3; + for(int i = 0; i < count; i++) { + for(int j = 0; j < 64; j++) { + // 4 MFMA ops + accum0 = __builtin_amdgcn_mfma_f32_16x16x16f16(value0, value0, accum0, 0, 0, 0); + accum1 = __builtin_amdgcn_mfma_f32_16x16x16f16(value1, value1, accum1, 0, 0, 0); + accum2 = __builtin_amdgcn_mfma_f32_16x16x16f16(value2, value2, accum2, 0, 0, 0); + accum3 = __builtin_amdgcn_mfma_f32_16x16x16f16(value3, value3, accum3, 0, 0, 0); + } + } + + outputs[tid] = accum0 + accum1 + accum2 + accum3; +} + +__global__ void sparse_matmul_fp16_throughput(vec4* input0, vec8* input1, vec4* outputs, int count) +{ + int grid_size = gridDim.x * blockDim.x; + int tid = blockDim.x * blockIdx.x + threadIdx.x; + + vec4* x_ptr = input0; + vec8* y_ptr = input1; + + vec4 x0 = x_ptr[0 * grid_size + tid]; + vec4 x1 = x_ptr[1 * grid_size + tid]; + vec4 x2 = x_ptr[2 * grid_size + tid]; + vec4 x3 = x_ptr[3 * grid_size + tid]; + + vec8 y0 = y_ptr[0 * grid_size + tid]; + vec8 y1 = y_ptr[1 * grid_size + tid]; + vec8 y2 = y_ptr[2 * grid_size + tid]; + vec8 y3 = y_ptr[3 * grid_size + tid]; + + vec4 accum0; + vec4 accum1; + vec4 accum2; + vec4 accum3; + + for(int i = 0; i < count; i++) { + for(int j = 0; j < 64; j++) { + // 4 SMFMAC ops + accum0 = __builtin_amdgcn_smfmac_f32_16x16x32_f16(x0, y0, accum0, 0, 0, 0); + accum1 = __builtin_amdgcn_smfmac_f32_16x16x32_f16(x1, y1, accum1, 0, 0, 0); + accum2 = __builtin_amdgcn_smfmac_f32_16x16x32_f16(x2, y2, accum2, 0, 0, 0); + accum3 = __builtin_amdgcn_smfmac_f32_16x16x32_f16(x3, y3, accum3, 0, 0, 0); + } + } + + outputs[tid] = accum0 + accum1 + accum2 + accum3; +} + +__global__ void matmul_fp32_throughput(float* inputs, vec4* outputs, int count) +{ + int grid_size = gridDim.x * blockDim.x; + int tid = blockDim.x * blockIdx.x + threadIdx.x; + + float* ptr = inputs; + + float value0 = ptr[0 * grid_size + tid]; + float value1 = ptr[1 * grid_size + tid]; + float value2 = ptr[2 * grid_size + tid]; + float value3 = ptr[2 * grid_size + tid]; + + vec4 accum0; + vec4 accum1; + vec4 accum2; + vec4 accum3; + for(int i = 0; i < count; i++) { + for(int j = 0; j < 64; j++) { + // 4 MFMA ops + accum0 = __builtin_amdgcn_mfma_f32_16x16x4f32(value0, value0, accum0, 0, 0, 0); + accum1 = __builtin_amdgcn_mfma_f32_16x16x4f32(value1, value1, accum1, 0, 0, 0); + accum2 = __builtin_amdgcn_mfma_f32_16x16x4f32(value2, value2, accum2, 0, 0, 0); + accum3 = __builtin_amdgcn_mfma_f32_16x16x4f32(value3, value3, accum3, 0, 0, 0); + } + } + + outputs[tid] = accum0 + accum1 + accum2 + accum3; +} + +void HIP_CALL(hipError_t err) +{ + if(err != hipSuccess) { + std::cout << "HIP Error: " << (int)err << " " << hipGetErrorString(err) << std::endl; + exit(1); + } +} + +struct GCNArch { + int major; + int minor; + int rev; +}; + +GCNArch get_gcn_arch(int device) +{ + hipDeviceProp_t props; + + HIP_CALL(hipGetDeviceProperties(&props, device)); + + // Example: gfx908:sramecc+:xnack- + std::string arch_full(props.gcnArchName); + + // Extract number e.g. "908" + std::string gfx_str = arch_full.substr(3, arch_full.find_first_of(':')); + + int gfx_num = std::stoi(gfx_str, nullptr, 16); + + GCNArch arch; + arch.major = (gfx_num & 0xff00) >> 8; + arch.minor = (gfx_num & 0x00f0) >> 4; + arch.rev = (gfx_num & 0x000f); + + return arch; +} + +enum : uint32_t { + VALU_FP32 = 1 << 0, + VALU_FP16 = 1 << 1, + VALU_FP64 = 1 << 2, + MATRIX_FP16 = 1 << 3, + MATRIX_FP32 = 1 << 4, + SMATRIX_FP16 = 1 << 5, + VALU_INT32 = 1 << 6, + + ALL = (uint32_t)-1 +}; + +// Timer for measuring kernel duration +class HIPTimer { + +private: + hipEvent_t m_start; + hipEvent_t m_stop; + +public: + HIPTimer() + { + HIP_CALL(hipEventCreate(&m_start)); + HIP_CALL(hipEventCreate(&m_stop)); + } + + void start() + { + HIP_CALL(hipEventRecord(m_start)); + } + + void stop() + { + HIP_CALL(hipEventRecord(m_stop)); + } + + double elapsed() + { + float ms; + HIP_CALL(hipEventElapsedTime(&ms, m_start, m_stop)); + + return (double)ms / 1000.0; + } +}; + +// Host code + +template double fma_throughput_test(int device, int count, int runs = 1) +{ + vec4* buffer = nullptr; + + hipDeviceProp_t props; + HIP_CALL(hipGetDeviceProperties(&props, device)); + + int blocks = props.multiProcessorCount * 512; + int threads_per_block = 64; + int total_threads = blocks * threads_per_block; + + HIP_CALL(hipMalloc(&buffer, sizeof(vec4) * total_threads * 4)); + + HIPTimer t; + t.start(); + for(int i = 0; i < runs; i++) { + fma_throughput<<>>(buffer, count); + } + t.stop(); + HIP_CALL(hipDeviceSynchronize()); + + double elapsed = t.elapsed(); + double ops = (double)total_threads * count * 64 * 16 * runs; + double flops = (double)ops * 2.0 / elapsed; + + HIP_CALL(hipFree(buffer)); + + return flops; +} + +template double matmul_throughput_test(int device, int count, int runs = 1) +{ + const int wave_size = 64; + int k; + int m; + int n; + + if(std::is_same::value) { + m = 16; + n = 16; + k = 16; + } else if(std::is_same::value) { + m = 16; + n = 16; + k = 4; + } else { + assert(false); + } + + int ops_per_matmul = k * m * n * 2; + + void* buffer = nullptr; + void* accum = nullptr; + + hipDeviceProp_t props; + HIP_CALL(hipGetDeviceProperties(&props, device)); + + int blocks = props.multiProcessorCount * 512; + int threads_per_block = wave_size; + int total_threads = blocks * threads_per_block; + + HIP_CALL(hipMalloc(&buffer, 4 * sizeof(matT) * m * k * total_threads)); + HIP_CALL(hipMalloc(&accum, sizeof(accumT) * m * n * total_threads)); + + HIPTimer t; + t.start(); + for(int i = 0; i < runs; i++) { + if(std::is_same::value && std::is_same::value) { + matmul_fp16_throughput<<>>((vec4*)buffer, (vec4*)accum, count); + } else if(std::is_same::value && std::is_same::value) { + matmul_fp32_throughput<<>>((float*)buffer, (vec4*)accum, count); + } + } + t.stop(); + HIP_CALL(hipDeviceSynchronize()); + + double elapsed = t.elapsed(); + double ops = (double)blocks * count * 64 * 4 * runs; + double flops = (double)ops * ops_per_matmul / elapsed; + + HIP_CALL(hipFree(buffer)); + HIP_CALL(hipFree(accum)); + + return flops; +} + +template double sparse_matmul_throughput_test(int device, int count, int runs = 1) +{ + const int wave_size = 64; + int k; + int m; + int n; + + if(std::is_same::value) { + m = 16; + n = 16; + k = 32; + } else { + assert(false); + } + + int ops_per_matmul = k * m * n * 2; + + void* buffer1 = nullptr; + void* buffer2 = nullptr; + void* accum = nullptr; + + hipDeviceProp_t props; + HIP_CALL(hipGetDeviceProperties(&props, device)); + + int blocks = props.multiProcessorCount * 512; + int threads_per_block = wave_size; + int total_threads = blocks * threads_per_block; + + HIP_CALL(hipMalloc(&buffer1, 4 * sizeof(matT) * m * k * total_threads)); + HIP_CALL(hipMalloc(&buffer2, 8 * sizeof(matT) * n * k * total_threads)); + HIP_CALL(hipMalloc(&accum, sizeof(accumT) * m * n * total_threads)); + + HIPTimer t; + t.start(); + for(int i = 0; i < runs; i++) { + if(std::is_same::value && std::is_same::value) { + sparse_matmul_fp16_throughput<<>>((vec4*)buffer1, + (vec8*)buffer2, (vec4*)accum, count); + } + } + t.stop(); + HIP_CALL(hipDeviceSynchronize()); + + double elapsed = t.elapsed(); + double ops = (double)blocks * count * 64 * 4 * runs; + double flops = (double)ops * ops_per_matmul / elapsed; + + HIP_CALL(hipFree(buffer1)); + HIP_CALL(hipFree(buffer2)); + HIP_CALL(hipFree(accum)); + + return flops; +} + +struct Result { + int device = -1; + double valu_fp16 = 0; + double valu_fp32 = 0; + double valu_fp64 = 0; + double valu_int32 = 0; + double mfma_fp16 = 0; + double mfma_fp32 = 0; + double smfmac_fp16 = 0; + + // Used for sorting + bool operator<(const Result& other) { + return device < other.device; + } +}; + +void print_result(const Result& res, uint32_t mask) +{ + if(mask & VALU_FP16) { + printf("VALU FP16: %8.2f TFLOPS\n", res.valu_fp16 / 1e12); + } + if(mask & VALU_FP32) { + printf("VALU FP32: %8.2f TFLOPS\n", res.valu_fp32 / 1e12); + } + if(mask & VALU_FP64) { + printf("VALU FP64: %8.2f TFLOPS\n", res.valu_fp64 / 1e12); + } + if(mask & VALU_INT32) { + printf("VALU INT32: %8.2f TIOPS\n", res.valu_int32 / 1e12); + } + if(mask & MATRIX_FP16) { + printf("MFMA FP16: %8.2f TFLOPS\n", res.mfma_fp16 / 1e12); + } + if(mask & MATRIX_FP32) { + printf("MFMA FP32: %8.2f TFLOPS\n", res.mfma_fp32 / 1e12); + } + if(mask & SMATRIX_FP16) { + printf("SMFMAC FP16: %8.2f TFLOPS\n", res.smfmac_fp16 / 1e12); + + } +} + +Result run_tests(int device, int runs, uint32_t mask) +{ + int device_count; + + HIP_CALL(hipGetDeviceCount(&device_count)); + + if(device >= device_count) { + std::cout << "Device " << device << " does not exist. Skipping..." << std::endl; + exit(1); + } + + HIP_CALL(hipSetDevice(device)); + GCNArch arch = get_gcn_arch(device); + + Result res = {.device = device}; + + if(mask & VALU_FP16) { + res.valu_fp16 = fma_throughput_test(device, 4096, runs); + } + + if(mask & VALU_FP32) { + res.valu_fp32 = fma_throughput_test(device, 4096, runs); + } + + if(mask & VALU_FP64) { + res.valu_fp64 = fma_throughput_test(device, 4096, runs); + } + + if(mask & VALU_INT32) { + res.valu_int32 = fma_throughput_test(device, 4096, runs); + } + + if(mask & MATRIX_FP16) { + if(arch.major == 0x9 && (arch.minor >= 0x4 || (arch.minor == 0 && arch.rev >= 8))) { + res.mfma_fp16 = matmul_throughput_test(device, 4096, runs); + } else { + res.mfma_fp16 = 0; + } + } + + if(mask & MATRIX_FP32) { + if(arch.major == 0x9 && (arch.minor >= 0x4 || (arch.minor == 0 && arch.rev >= 8))) { + res.mfma_fp32 = matmul_throughput_test(device, 4096, runs); + } else { + res.mfma_fp32 = 0; + } + } + + if(mask & SMATRIX_FP16) { + if(arch.major == 9 && arch.minor >= 4) { + res.smfmac_fp16 = sparse_matmul_throughput_test(device, 4096, runs); + } else { + res.smfmac_fp16 = 0; + } + } + return res; +} + +// Use fork() followed by exec() to run child process. For some reason +// rocprof does not pick up the child processes when only fork() is +// used. +pid_t fork_process(int device, int runs, uint32_t mask, int fd) +{ + pid_t pid = fork(); + + if(pid != 0) { + return pid; + } + + std::string str_device = std::to_string(device); + std::string str_runs = std::to_string(runs); + std::string str_mask = std::to_string(mask); + std::string str_fd = std::to_string(fd); + + char* const args[] = { + (char*)"CHILD", + (char*)str_device.c_str(), + (char*)str_runs.c_str(), + (char*)str_mask.c_str(), + (char*)str_fd.c_str(), + NULL + }; + + execv("/proc/self/exe", args); + std::cout << "execv() failed: " << std::strerror(errno) << std::endl; + exit(1); +} + +void run(std::vector& devices, int runs, uint32_t mask) +{ + std::vector pids; + + // We will receive results from the child processes using a pipe + int fd[2]; + + if(pipe(fd)) { + std::cout << std::strerror(errno) << std::endl; + exit(1); + } + + // Start a new process for each GPU + for(auto d : devices) { + pid_t pid = fork_process(d, runs, mask, fd[1]); + + pids.push_back(pid); + } + + // Wait for all processes to finish + for(auto pid : pids) { + int status; + waitpid(pid, &status, 0); + } + + // Set the read to non-blocking + int flags = fcntl(fd[0], F_GETFL, 0); + fcntl(fd[0], F_SETFL, flags | O_NONBLOCK); + + // Read records from pipe + std::vector results(pids.size()); + int count = read(fd[0], results.data(), results.size() * sizeof(Result)) / sizeof(Result); + + results.resize(count); + + // Sort results by GPU id + std::sort(results.begin(), results.end()); + + // Print results + for(auto r : results) { + std::cout << std::endl << "GPU " << r.device << std::endl; + print_result(r, mask); + } + + Result total; + for(auto r : results) { + total.valu_fp16 += r.valu_fp16; + total.valu_fp32 += r.valu_fp32; + total.valu_fp64 += r.valu_fp64; + total.valu_int32 += r.valu_int32; + total.mfma_fp16 += r.mfma_fp16; + total.mfma_fp32 += r.mfma_fp32; + total.smfmac_fp16 += r.smfmac_fp16; + } + std::cout << std::endl << "System total" << std::endl; + print_result(total, mask); +} + + +void usage() +{ + std::cout << "--device ID Use device with the given numerical ID" << std::endl; + std::cout << "--devices IDS | ALL Comma-separated list of device Ids (e.g., 1,2,3)" << std::endl; + std::cout << " ALL for all devices" << std::endl; + std::cout << "--runs RUNS Number of times each kernel is dispatched" << std::endl; + + std::cout << "--fp16 Run FP16 (VALU) test" << std::endl; + std::cout << "--fp32 Run FP32 (VALU) test" << std::endl; + std::cout << "--fp64 Run FP64 (VALU) test" << std::endl; + std::cout << "--matfp16 Run FP16 (MFMA) test" << std::endl; + std::cout << "--matfp32 Run FP32 (MFMA) test" << std::endl; + std::cout << "--smatfp16 Run FP16 (SMFMAC) test" << std::endl; +} + +int main(int argc, char** argv) +{ + if(std::string(argv[0]) == "CHILD") { + int device = atoi(argv[1]); + int runs = atoi(argv[2]); + uint32_t mask = atoi(argv[3]); + int fd = atoi(argv[4]); + + Result res = run_tests(device, runs, mask); + + write(fd, &res, sizeof(res)); + return 0; + } + + int runs = 1; + + uint32_t mask = 0; + bool all_devices = false; + std::vector devices; + int device_count; + int device = 0; + + HIP_CALL(hipGetDeviceCount(&device_count)); + + int i = 1; + while(i < argc) { + std::string arg = std::string(argv[i]); + + if(arg == "--help") { + usage(); + return 0; + } else if(arg == "--device") { + devices.push_back(atoi(argv[i + 1])); + // Skip next + i++; + } else if(arg == "--devices") { + // Parse comma-separated string of numbers + std::string s(argv[i + 1]); + + if(s == "all" || s == "ALL") { + all_devices = true; + } else { + std::stringstream ss(s); + std::string r; + while(getline(ss, r, ',')) { + devices.push_back(std::stoi(r)); + } + } + // Skip next + i++; + } else if(arg == "--runs") { + runs = atoi(argv[i + 1]); + + // Skip next + i++; + } else if(arg == "--fp32") { + mask |= VALU_FP32; + } else if(arg == "--fp64") { + mask |= VALU_FP64; + } else if(arg == "--fp16") { + mask |= VALU_FP16; + } else if(arg == "--int32") { + mask |= VALU_INT32; + } else if(arg == "--matfp16") { + mask |= MATRIX_FP16; + } else if(arg == "--matfp32") { + mask |= MATRIX_FP32; + } else if(arg == "--smatfp16") { + mask |= SMATRIX_FP16; + } else { + std::cout << "Invalid argument '" << arg << "'" << std::endl; + std::cout << std::endl; + usage(); + return 1; + } + + i++; + } + + if(all_devices) { + for(int i = 0; i < device_count; i++ ){ + devices.push_back(i); + } + } + + // Verify device ID's + for(auto d : devices) { + if(d >= device_count) { + std::cout << "Invalid device ordinal: " << d << std::endl; + return 1; + } + } + + if(devices.size() == 0) { + devices.push_back(0); + } + + if(mask == 0) { + mask = ALL; + } + + run(devices, runs, mask); + + return 0; +} + + diff --git a/projects/rocprofiler-compute/src/lib/rocprofiler_compute_tool.cpp b/projects/rocprofiler-compute/src/lib/rocprofiler_compute_tool.cpp index ce33ff4d87..f9ffc95b1c 100644 --- a/projects/rocprofiler-compute/src/lib/rocprofiler_compute_tool.cpp +++ b/projects/rocprofiler-compute/src/lib/rocprofiler_compute_tool.cpp @@ -77,11 +77,11 @@ for the agent and returns a pointer to it. #include #include #include -#include #include #include #include #include +#include #include #include @@ -148,7 +148,7 @@ struct counter_info_record_t { // Tool data struct, now includes a vector of counter_info_record_t struct tool_data_t { std::mutex mut{}; - std::unique_ptr output_stream{nullptr}; + std::string output_filename{}; std::unordered_map counter_id_name_map{}; std::string requested_counters{}; std::string kernel_filter_include_regex{}; @@ -614,14 +614,28 @@ void generate_output(tool_data_t *tool_data) { }), tool_data->counter_records.end()); } - + if (tool_data->counter_records.empty()) { + return; + } // Write collected counter records and clean up - if (auto &os = tool_data->output_stream) { + if (!tool_data->output_filename.empty()) { + std::ofstream ofs(tool_data->output_filename); + if (!ofs.is_open()) { + std::cerr << "Failed to open output file: " << tool_data->output_filename + << std::endl; + return; + } + // Write header at the beginning of the file + ofs << "dispatch_id,gpu_id,kernel_id,lds_per_workgroup," + "counter_id,counter_name,counter_value\n"; for (const auto &r : tool_data->counter_records) - *os << r.dispatch_id << ',' << r.agent_id << "," << r.kernel_id << ',' + ofs << r.dispatch_id << ',' << r.agent_id << "," << r.kernel_id << ',' << r.LDS_memory_size << ',' << r.counter_id << ',' << r.counter_name << ',' << r.counter_value << '\n'; - os->flush(); + ofs.flush(); + std::clog << "[rocprofiler-compute] [" << __FUNCTION__ + << "] Counter collection data has been written to: " + << tool_data->output_filename << std::endl; } } @@ -638,18 +652,13 @@ void tool_fini(void *user_data) { } // namespace -std::unique_ptr create_tool_data(rocprofiler_client_id_t *id) { +std::unique_ptr +create_tool_data(rocprofiler_client_id_t * /*id*/) { auto tool_data = std::make_unique(); - // Generate a unique output filename using a random hex string (no libuuid - // dependency) - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution dis(0, 0xFFFFFFFF); - std::stringstream filename_ss; - filename_ss << std::hex << dis(gen); + // Generate a unique output filename using the process ID std::string base_filename = - "counter_collection_" + filename_ss.str().substr(0, 8) + ".csv"; + std::to_string(getpid()) + "_native_counter_collection.csv"; // Require ROCPROF_OUTPUT_PATH to be set, otherwise error out std::string filename; @@ -664,20 +673,7 @@ std::unique_ptr create_tool_data(rocprofiler_client_id_t *id) { // Use the generated base filename along with ROCPROF_OUTPUT_PATH filename += base_filename; - // Set output stream to file - auto ofs = std::make_unique(filename); - if (!ofs->is_open()) { - throw std::runtime_error("Failed to open output file: " + filename); - } - tool_data->output_stream = std::move(ofs); - // Write header at the beginning of the file - *tool_data->output_stream << "dispatch_id,gpu_id,kernel_id,lds_per_workgroup," - "counter_id,counter_name,counter_value\n"; - tool_data->output_stream->flush(); - - // Write to clog the path of the logging file - std::clog << id->name << " [" << __FUNCTION__ - << "] Logging counter collection to: " << filename << std::endl; + tool_data->output_filename = filename; // Store ROCPROF env. vars. in tool_data diff --git a/projects/rocprofiler-compute/src/rocprof_compute_tui/widgets/charts.py b/projects/rocprofiler-compute/src/rocprof_compute_tui/widgets/charts.py index fa12e7d197..41086ddf96 100644 --- a/projects/rocprofiler-compute/src/rocprof_compute_tui/widgets/charts.py +++ b/projects/rocprofiler-compute/src/rocprof_compute_tui/widgets/charts.py @@ -61,7 +61,8 @@ def simple_bar(df: pd.DataFrame, title: Optional[str] = None) -> Optional[str]: if "Metric" in df.columns and "Avg" in df.columns: metric_dict = ( - pd.DataFrame([df["Metric"], df["Avg"]]) + pd + .DataFrame([df["Metric"], df["Avg"]]) .replace("", 0) .replace(float("inf"), -1) # It should not happen .replace(float("-inf"), -1) @@ -258,7 +259,8 @@ def px_simple_multi_bar( for group, metric in nested_bar.items(): dfigs.append( - px.bar( + px + .bar( title=group, x=metric.values(), y=metric.keys(), diff --git a/projects/rocprofiler-compute/src/utils/analysis_orm.py b/projects/rocprofiler-compute/src/utils/analysis_orm.py index d6d8839db3..f2315fc768 100644 --- a/projects/rocprofiler-compute/src/utils/analysis_orm.py +++ b/projects/rocprofiler-compute/src/utils/analysis_orm.py @@ -219,7 +219,8 @@ def get_views() -> list[TextClause]: select( Kernel.kernel_name, (Dispatch.end_timestamp - Dispatch.start_timestamp).label("duration"), - func.row_number() + func + .row_number() .over( partition_by=Kernel.kernel_name, order_by=Dispatch.end_timestamp - Dispatch.start_timestamp, diff --git a/projects/rocprofiler-compute/src/utils/mi_gpu_spec.py b/projects/rocprofiler-compute/src/utils/mi_gpu_spec.py index 82a4e22beb..42a8810cb7 100644 --- a/projects/rocprofiler-compute/src/utils/mi_gpu_spec.py +++ b/projects/rocprofiler-compute/src/utils/mi_gpu_spec.py @@ -132,7 +132,8 @@ class MIGPUSpecs: cls._all_gpu_models.append(curr_gpu_model) cls._gpu_model_dict[curr_gpu_arch].append(curr_gpu_model) cls._num_xcds_dict[curr_gpu_model] = ( - models.get("partition_mode", {}) + models + .get("partition_mode", {}) .get("compute_partition_mode", {}) .get("num_xcds", {}) ) diff --git a/projects/rocprofiler-compute/src/utils/parser.py b/projects/rocprofiler-compute/src/utils/parser.py index 8e0f764314..4ae6c9759a 100755 --- a/projects/rocprofiler-compute/src/utils/parser.py +++ b/projects/rocprofiler-compute/src/utils/parser.py @@ -580,7 +580,8 @@ def gen_counter_list(formula: str) -> tuple[bool, list[str]]: return visited, counters try: tree = ast.parse( - formula.replace("$normUnit", "SQ_WAVES") + formula + .replace("$normUnit", "SQ_WAVES") .replace("$denom", "SQ_WAVES") .replace( "$numActiveCUs", @@ -1606,9 +1607,9 @@ def load_pc_sampling_data_per_kernel( pc_sample_instructions = search_key_in_json(file_name, "pc_sample_instructions") df["instruction"] = ( df["inst_index"].apply( - lambda x: pc_sample_instructions[x] - if x < len(pc_sample_instructions) - else None + lambda x: ( + pc_sample_instructions[x] if x < len(pc_sample_instructions) else None + ) ) if pc_sample_instructions else None @@ -1618,9 +1619,11 @@ def load_pc_sampling_data_per_kernel( pc_sample_comments = search_key_in_json(file_name, "pc_sample_comments") df["source_line"] = ( df["inst_index"].apply( - lambda x: f".../{Path(pc_sample_comments[x]).name}" - if x < len(pc_sample_comments) - else None + lambda x: ( + f".../{Path(pc_sample_comments[x]).name}" + if x < len(pc_sample_comments) + else None + ) ) if pc_sample_comments else None @@ -1719,7 +1722,8 @@ def load_pc_sampling_data( # Group by Instruction_Comment and aggregate grouped_counts = ( - merged_df.groupby("Instruction_Comment") + merged_df + .groupby("Instruction_Comment") .agg( count=("Instruction_Comment", "count"), instruction=("Instruction", "first"), diff --git a/projects/rocprofiler-compute/src/utils/rocpd_data.py b/projects/rocprofiler-compute/src/utils/rocpd_data.py index 564073786e..9fa4bfb5ac 100644 --- a/projects/rocprofiler-compute/src/utils/rocpd_data.py +++ b/projects/rocprofiler-compute/src/utils/rocpd_data.py @@ -38,6 +38,7 @@ COUNTERS_COLLECTION_QUERY = """ SELECT agent_id as GPU_ID, dispatch_id as Dispatch_ID, + pid as PID, grid_size as Grid_Size, workgroup_size as Workgroup_Size, lds_block_size as LDS_Per_Workgroup, @@ -61,24 +62,28 @@ TABLE_NAME_PREFIX_QUERY = ( INSERT_QUERY = "INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" -def convert_db_to_csv( - db_path: str, +def convert_dbs_to_csv( + db_paths: list[str], csv_file_path: str, ) -> None: """ - Read rocpd database and write to CSV file + Read rocpd databases and write to CSV file """ - # Read counters_collection view from the database and write to CSV + # Read counters_collection view from the databases and write to CSV try: - with closing(sqlite3.connect(db_path)) as conn: - with closing(conn.execute(COUNTERS_COLLECTION_QUERY)) as cursor: - with open(csv_file_path, "w", newline="") as csvfile: - writer = csv.writer(csvfile) - writer.writerow([ - description[0] for description in cursor.description - ]) - for row in cursor: - writer.writerow(row) + with open(csv_file_path, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + header_written = False + for db_path in db_paths: + with closing(sqlite3.connect(db_path)) as conn: + with closing(conn.execute(COUNTERS_COLLECTION_QUERY)) as cursor: + if not header_written: + writer.writerow([ + description[0] for description in cursor.description + ]) + header_written = True + for row in cursor: + writer.writerow(row) except OSError as e: console_error(f"Database error while converting to CSV: {e}") except Exception as e: diff --git a/projects/rocprofiler-compute/src/utils/tty.py b/projects/rocprofiler-compute/src/utils/tty.py index e41b2565c0..0a4a188054 100644 --- a/projects/rocprofiler-compute/src/utils/tty.py +++ b/projects/rocprofiler-compute/src/utils/tty.py @@ -426,7 +426,8 @@ def format_table_output( and "Value" in df.columns ): mem_data = ( - pd.DataFrame([df["Metric"], df["Value"]]) + pd + .DataFrame([df["Metric"], df["Value"]]) .transpose() .set_index("Metric") .to_dict()["Value"] diff --git a/projects/rocprofiler-compute/src/utils/utils.py b/projects/rocprofiler-compute/src/utils/utils.py index 053847929e..d360fbdd2b 100644 --- a/projects/rocprofiler-compute/src/utils/utils.py +++ b/projects/rocprofiler-compute/src/utils/utils.py @@ -885,24 +885,48 @@ def run_prof( rocprof_cmd == "rocprofiler-sdk" and options["ROCPROF_COUNTER_COLLECTION"] == "0" ): - # Update rocpd database with counter csv created by native tool - rocpd_data.update_rocpd_pmc_events( - pd.read_csv(glob.glob(workload_dir + "/out/pmc_1/*.csv")[0]), - glob.glob(workload_dir + "/out/pmc_1/*/*.db")[0], - ) + for db_name in glob.glob(workload_dir + "/out/pmc_1/*/*.db"): + pid = Path(db_name).stem.split("_")[0] + rocpd_data.update_rocpd_pmc_events( + pd.read_csv( + f"{workload_dir}/out/pmc_1/{pid}_native_counter_collection.csv" + ), + db_name, + ) + console_debug(f"Updated rocpd db {db_name} with native tool counters.") # Write results_fbase.csv - rocpd_data.convert_db_to_csv( - glob.glob(workload_dir + "/out/pmc_1/*/*.db")[0], + rocpd_data.convert_dbs_to_csv( + glob.glob(workload_dir + "/out/pmc_1/*/*.db"), workload_dir + f"/results_{fbase}.csv", ) + combined_df = pd.read_csv(workload_dir + f"/results_{fbase}.csv") + # Reset Dispatch_ID based on PID, Kernel_Name, Grid_Size, + # Workgroup_Size, LDS_Per_Workgroup + combined_df["Dispatch_ID"] = combined_df.groupby( + ["PID", "Kernel_Name", "Grid_Size", "Workgroup_Size", "LDS_Per_Workgroup"], + sort=False, + ).ngroup() + # Reset Kernel_ID based on Kernel_Name, Grid_Size, + # Workgroup_Size, LDS_Per_Workgroup + combined_df["Kernel_ID"] = combined_df.groupby( + ["Kernel_Name", "Grid_Size", "Workgroup_Size", "LDS_Per_Workgroup"], + sort=False, + ).ngroup() + # Drop PID since its not required + combined_df = combined_df.drop(columns=["PID"]) + combined_df.to_csv(workload_dir + f"/results_{fbase}.csv", index=False) + if retain_rocpd_output: - shutil.copyfile( - glob.glob(workload_dir + "/out/pmc_1/*/*.db")[0], - workload_dir + "/" + fbase + ".db", - ) - console_warning( - f"Retaining large raw rocpd database: {workload_dir}/{fbase}.db" - ) + for db_path in glob.glob(workload_dir + "/out/pmc_1/*/*.db"): + pid = Path(db_path).stem.split("_")[0] + shutil.copyfile( + db_path, + workload_dir + f"/{fbase}_{pid}.db", + ) + console_warning( + f"Retaining large raw rocpd database: " + f"{workload_dir}/{fbase}_{pid}.db" + ) # Remove temp directory shutil.rmtree(workload_dir + "/" + "out") return @@ -1064,81 +1088,66 @@ def convert_native_counter_collection_csv(workload_dir: str) -> None: trace to write counter collection csv in rocprofiler-sdk format for further processing to pmc_perf.csv file """ - counter_data = pd.read_csv( - glob.glob(f"{workload_dir}/out/pmc_1/*.csv")[0], index_col=False - ) - # Group by on counter_data based on dispatch_id and - # counter_id and sum the counter_value - counter_data = counter_data.groupby( - ["dispatch_id", "counter_name"], as_index=False - ).agg({"counter_value": "sum"}) - kernel_data_filename = glob.glob(f"{workload_dir}/out/pmc_1/*/*_kernel_trace.csv")[ - 0 - ] - kernel_data = pd.read_csv(kernel_data_filename) - rocprofv3_counter_data = pd.DataFrame({ - "Correlation_Id": counter_data["dispatch_id"], - "Dispatch_Id": counter_data["dispatch_id"], - "Agent_Id": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Agent_Id" - ].values, - "Queue_Id": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Queue_Id" - ].values, - "Process_Id": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Thread_Id" - ].values, - "Thread_Id": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Thread_Id" - ].values, - "Grid_Size": ( - kernel_data.iloc[counter_data["dispatch_id"] - 1][ - ["Grid_Size_X", "Grid_Size_Y", "Grid_Size_Z"] - ] - .prod(axis=1) - .values - ), - "Kernel_Id": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Kernel_Id" - ].values, - "Kernel_Name": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Kernel_Name" - ].values, - "Workgroup_Size": ( - kernel_data.iloc[counter_data["dispatch_id"] - 1][ - ["Workgroup_Size_X", "Workgroup_Size_Y", "Workgroup_Size_Z"] - ] - .prod(axis=1) - .values - ), - "LDS_Block_Size": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "LDS_Block_Size" - ].values, - "Scratch_Size": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Scratch_Size" - ].values, - "VGPR_Count": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "VGPR_Count" - ].values, - "Accum_VGPR_Count": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Accum_VGPR_Count" - ].values, - "SGPR_Count": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "SGPR_Count" - ].values, - "Counter_Name": counter_data["counter_name"], - "Counter_Value": counter_data["counter_value"], - "Start_Timestamp": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "Start_Timestamp" - ].values, - "End_Timestamp": kernel_data.iloc[counter_data["dispatch_id"] - 1][ - "End_Timestamp" - ].values, - }) - rocprofv3_counter_data.to_csv( - kernel_data_filename.replace("kernel_trace", "counter_collection"), - index=False, - ) + for native_filename in glob.glob( + f"{workload_dir}/out/pmc_1/*_native_counter_collection.csv" + ): + counter_data = pd.read_csv(native_filename, index_col=False) + # Group by on dispatch_id and counter_id and sum the counter_value, + # Other rows in group have the same value, so take the first one + groupby_cols = ["dispatch_id", "counter_name"] + agg_dict = { + col: "first" for col in counter_data.columns if col not in groupby_cols + } + # Overwrite counter_value aggregation to sum + agg_dict["counter_value"] = "sum" + counter_data = counter_data.groupby(groupby_cols, as_index=False).agg(agg_dict) + + pid = Path(native_filename).stem.split("_")[0] + kernel_data_filename = glob.glob( + f"{workload_dir}/out/pmc_1/*/{pid}_kernel_trace.csv" + )[0] + kernel_data = pd.read_csv(kernel_data_filename) + + # Merge counter_data with kernel_data on kernel_id + merged_data = pd.merge( + counter_data, + kernel_data, + left_on="kernel_id", + right_on="Kernel_Id", + how="left", + ) + + rocprofv3_counter_data = pd.DataFrame({ + "Correlation_Id": merged_data["dispatch_id"], + "Dispatch_Id": merged_data["dispatch_id"], + "Agent_Id": merged_data["Agent_Id"], + "Queue_Id": merged_data["Queue_Id"], + "Process_Id": merged_data["Thread_Id"], + "Thread_Id": merged_data["Thread_Id"], + "Grid_Size": ( + merged_data[["Grid_Size_X", "Grid_Size_Y", "Grid_Size_Z"]].prod(axis=1) + ), + "Kernel_Id": merged_data["Kernel_Id"], + "Kernel_Name": merged_data["Kernel_Name"], + "Workgroup_Size": ( + merged_data[ + ["Workgroup_Size_X", "Workgroup_Size_Y", "Workgroup_Size_Z"] + ].prod(axis=1) + ), + "LDS_Block_Size": merged_data["LDS_Block_Size"], + "Scratch_Size": merged_data["Scratch_Size"], + "VGPR_Count": merged_data["VGPR_Count"], + "Accum_VGPR_Count": merged_data["Accum_VGPR_Count"], + "SGPR_Count": merged_data["SGPR_Count"], + "Counter_Name": merged_data["counter_name"], + "Counter_Value": merged_data["counter_value"], + "Start_Timestamp": merged_data["Start_Timestamp"], + "End_Timestamp": merged_data["End_Timestamp"], + }) + rocprofv3_counter_data.to_csv( + kernel_data_filename.replace("kernel_trace", "counter_collection"), + index=False, + ) def process_rocprofv3_output(workload_dir: str, using_native_tool: bool) -> list[str]: diff --git a/projects/rocprofiler-compute/tests/CMakeLists.txt b/projects/rocprofiler-compute/tests/CMakeLists.txt index 60bb856f14..24bce39d50 100644 --- a/projects/rocprofiler-compute/tests/CMakeLists.txt +++ b/projects/rocprofiler-compute/tests/CMakeLists.txt @@ -67,3 +67,11 @@ set_target_properties( laplace_eqn PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/tests ) + +set(ROCFLOP_SOURCES ../sample/rocflop.cpp) +set_source_files_properties(${ROCFLOP_SOURCES} PROPERTIES LANGUAGE HIP) +add_executable(rocflop ${ROCFLOP_SOURCES}) +set_target_properties( + rocflop + PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/tests +) diff --git a/projects/rocprofiler-compute/tests/test_profile_general.py b/projects/rocprofiler-compute/tests/test_profile_general.py index 21d21cb71e..9e19468a8b 100644 --- a/projects/rocprofiler-compute/tests/test_profile_general.py +++ b/projects/rocprofiler-compute/tests/test_profile_general.py @@ -68,6 +68,7 @@ config["app_mat_mul_max"] = ["./tests/mat_mul_max"] config["app_hip_dynamic_shared"] = ["./tests/hip_dynamic_shared"] config["app_laplace_eqn"] = ["./tests/laplace_eqn", "-i", "5000"] config["app_laplace_eqn_iter"] = ["./tests/laplace_eqn", "-i", "15000"] +config["rocflop"] = ["./tests/rocflop", "--device", "0"] config["cleanup"] = True config["COUNTER_LOGGING"] = False config["METRIC_COMPARE"] = False @@ -637,6 +638,29 @@ def test_path(binary_handler_profile_rocprof_compute): test_utils.clean_output_dir(config["cleanup"], workload_dir) +@pytest.mark.path +def test_path_rocflop( + binary_handler_profile_rocprof_compute, +): + # Test whether multiprocess workloads like rocflop are handled correctly + workload_dir = test_utils.get_output_dir() + options = ["--block", "2.1.1"] + _ = binary_handler_profile_rocprof_compute( + config, + workload_dir, + options, + check_success=True, + roof=False, + app_name="rocflop", + ) + pmc_perf_df = test_utils.check_csv_files(workload_dir, num_devices, num_kernels)[ + "pmc_perf.csv" + ] + # Ensure non zero length of df + assert len(pmc_perf_df) > 0 + test_utils.clean_output_dir(config["cleanup"], workload_dir) + + @pytest.mark.path def test_path_no_native(binary_handler_profile_rocprof_compute): workload_dir = test_utils.get_output_dir()