additional-code-coverage-compute (#763)
* added additional functions to test_utils.py * added code coverage for db_connector.py * Update test_profile_general.py Added additional roofline test cases Signed-off-by: jamessiddeley-amd <James.Siddeley@amd.com> * updated coverage mi_gpu_spec.py 73% -> 94% * added parser.py coverage * removed redundant comments * added test_utils and test_db_connector --------- Signed-off-by: jamessiddeley-amd <James.Siddeley@amd.com>
This commit is contained in:
committed by
GitHub
parent
a59b1ea6e6
commit
a6463f5e98
@@ -324,6 +324,32 @@ add_test(
|
||||
${PROJECT_SOURCE_DIR}/tests/test_gpu_specs.py
|
||||
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR})
|
||||
|
||||
# ---------------------------
|
||||
# DB Connector tests
|
||||
# ---------------------------
|
||||
|
||||
add_test(
|
||||
NAME test_db_connector
|
||||
COMMAND
|
||||
${Python3_EXECUTABLE} -m pytest
|
||||
--junitxml=tests/test_db_connector.xml ${COV_OPTION}
|
||||
${PROJECT_SOURCE_DIR}/tests/test_db_connector.py
|
||||
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
|
||||
)
|
||||
|
||||
# ---------------------------
|
||||
# Utils tests
|
||||
# ---------------------------
|
||||
|
||||
add_test(
|
||||
NAME test_utils
|
||||
COMMAND
|
||||
${Python3_EXECUTABLE} -m pytest
|
||||
--junitxml=tests/test_utils.xml ${COV_OPTION}
|
||||
${PROJECT_SOURCE_DIR}/tests/test_utils.py
|
||||
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
|
||||
)
|
||||
|
||||
# ---------
|
||||
# Install
|
||||
# ---------
|
||||
|
||||
@@ -683,6 +683,9 @@ def test_baseline(binary_handler_analyze_rocprof_compute):
|
||||
)
|
||||
assert code == 1
|
||||
|
||||
# =============================================================================
|
||||
# Test cases for Parser.py
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_dependency_MI100(binary_handler_analyze_rocprof_compute):
|
||||
@@ -693,3 +696,433 @@ def test_dependency_MI100(binary_handler_analyze_rocprof_compute):
|
||||
)
|
||||
assert code == 0
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_parser_utility_functions():
|
||||
"""Test parser utility functions edge cases"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import to_min, to_max, to_avg, to_median, to_std, to_int, to_quantile, to_round, to_mod, to_concat
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
try:
|
||||
result = to_min(None, None)
|
||||
assert np.isnan(result), "to_min with all None should return nan"
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
try:
|
||||
result = to_min(None, 5)
|
||||
assert False, "Should have crashed"
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
result = to_min(7, 3, 9, 1)
|
||||
assert result == 1, "to_min should return minimum value"
|
||||
|
||||
try:
|
||||
result = to_max(None, None)
|
||||
assert np.isnan(result), "to_max with all None should return nan"
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
try:
|
||||
result = to_max(None, 5)
|
||||
assert False, "Should have crashed"
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
result = to_max(7, 3, 9, 1)
|
||||
assert result == 9, "to_max should return maximum value"
|
||||
|
||||
result = to_median(None)
|
||||
assert result is None, "to_median should return None for None input"
|
||||
|
||||
try:
|
||||
to_median("invalid_string")
|
||||
assert False, "to_median should raise exception for invalid type"
|
||||
except Exception as e:
|
||||
assert "unsupported type" in str(e)
|
||||
|
||||
try:
|
||||
to_std("invalid_string")
|
||||
assert False, "to_std should raise exception for invalid type"
|
||||
except Exception as e:
|
||||
assert "unsupported type" in str(e)
|
||||
|
||||
result = to_int(None)
|
||||
assert result is None, "to_int should return None for None input"
|
||||
|
||||
try:
|
||||
to_int(["list", "not", "supported"])
|
||||
assert False, "to_int should raise exception for invalid type"
|
||||
except Exception as e:
|
||||
assert "unsupported type" in str(e)
|
||||
|
||||
result = to_quantile(None, 0.5)
|
||||
assert result is None, "to_quantile should return None for None input"
|
||||
|
||||
try:
|
||||
to_quantile("invalid_string", 0.5)
|
||||
assert False, "to_quantile should raise exception for invalid type"
|
||||
except Exception as e:
|
||||
assert "unsupported type" in str(e)
|
||||
|
||||
result = to_concat("hello", "world")
|
||||
assert result == "helloworld", "to_concat should concatenate strings"
|
||||
|
||||
result = to_concat(123, 456)
|
||||
assert result == "123456", "to_concat should convert to strings and concatenate"
|
||||
|
||||
series = pd.Series([1.234, 2.567, 3.890])
|
||||
result = to_round(series, 2)
|
||||
expected = pd.Series([1.23, 2.57, 3.89])
|
||||
pd.testing.assert_series_equal(result, expected)
|
||||
|
||||
result = to_round(3.14159, 2)
|
||||
assert result == 3.14, "to_round should round scalar values"
|
||||
|
||||
series = pd.Series([10, 15, 20])
|
||||
result = to_mod(series, 3)
|
||||
expected = pd.Series([1, 0, 2])
|
||||
pd.testing.assert_series_equal(result, expected)
|
||||
|
||||
result = to_mod(10, 3)
|
||||
assert result == 1, "to_mod should return modulo for scalars"
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_parser_error_handling():
|
||||
"""Test parser error handling paths"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import build_eval_string, update_denom_string, calc_builtin_var
|
||||
|
||||
try:
|
||||
build_eval_string("AVG(SQ_WAVES)", None)
|
||||
assert False, "Should have raised exception for None coll_level"
|
||||
except Exception as e:
|
||||
assert "coll_level can not be None" in str(e)
|
||||
|
||||
assert build_eval_string("", "pmc_perf") == ""
|
||||
assert update_denom_string("", "per_wave") == ""
|
||||
|
||||
class MockSysInfo:
|
||||
total_l2_chan = 32
|
||||
|
||||
sys_info = MockSysInfo()
|
||||
try:
|
||||
calc_builtin_var("$unsupported_var", sys_info)
|
||||
assert False, "Should have raised exception for unsupported var"
|
||||
except SystemExit:
|
||||
pass
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_parser_error_handling():
|
||||
"""Test parser error handling paths"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import build_eval_string, update_denom_string, calc_builtin_var
|
||||
|
||||
try:
|
||||
build_eval_string("AVG(SQ_WAVES)", None)
|
||||
assert False, "Should have raised exception for None coll_level"
|
||||
except Exception as e:
|
||||
assert "coll_level can not be None" in str(e)
|
||||
|
||||
assert build_eval_string("", "pmc_perf") == ""
|
||||
assert update_denom_string("", "per_wave") == ""
|
||||
|
||||
class MockSysInfo:
|
||||
total_l2_chan = 32
|
||||
|
||||
sys_info = MockSysInfo()
|
||||
try:
|
||||
calc_builtin_var("$unsupported_var", sys_info)
|
||||
assert False, "Should have raised exception for unsupported var"
|
||||
except SystemExit:
|
||||
pass
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_missing_file_handling(binary_handler_analyze_rocprof_compute):
|
||||
"""Test handling of missing files"""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
code = binary_handler_analyze_rocprof_compute(
|
||||
["analyze", "--path", temp_dir]
|
||||
)
|
||||
assert code != 0
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_ast_transformer_edge_cases():
|
||||
"""Simplified test focusing on the actual code paths"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import CodeTransformer
|
||||
import ast
|
||||
|
||||
transformer = CodeTransformer()
|
||||
|
||||
unknown_call = ast.Call(
|
||||
func=ast.Name(id='UNKNOWN_FUNCTION', ctx=ast.Load()),
|
||||
args=[ast.Constant(value=5) if hasattr(ast, 'Constant') else ast.Num(n=5)],
|
||||
keywords=[]
|
||||
)
|
||||
|
||||
try:
|
||||
result = transformer.visit_Call(unknown_call)
|
||||
if hasattr(result.func, 'id') and result.func.id == 'UNKNOWN_FUNCTION':
|
||||
assert False, "Function name should have been changed or exception raised"
|
||||
except Exception as e:
|
||||
assert "Unknown call" in str(e), f"Expected 'Unknown call' in error, got: {str(e)}"
|
||||
|
||||
supported_call = ast.Call(
|
||||
func=ast.Name(id='MIN', ctx=ast.Load()),
|
||||
args=[ast.Constant(value=5) if hasattr(ast, 'Constant') else ast.Num(n=5)],
|
||||
keywords=[]
|
||||
)
|
||||
|
||||
try:
|
||||
result = transformer.visit_Call(supported_call)
|
||||
assert result.func.id == 'to_min', f"Expected 'to_min', got: {result.func.id}"
|
||||
except Exception as e:
|
||||
assert False, f"Supported function call should not raise exception: {e}"
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_analyze_with_debug_mode(binary_handler_analyze_rocprof_compute):
|
||||
"""Test analyze to cover debug paths in eval_metric - using direct function call"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import eval_metric
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
mock_dfs = {
|
||||
1: pd.DataFrame({
|
||||
'Metric_ID': ['1.1.0'],
|
||||
'Metric': ['Test Metric'],
|
||||
'Expr': ['AVG(SQ_WAVES)'],
|
||||
'coll_level': ['pmc_perf']
|
||||
}).set_index('Metric_ID')
|
||||
}
|
||||
|
||||
mock_dfs_type = {1: 'metric_table'}
|
||||
|
||||
class MockSysInfo:
|
||||
ip_blocks = "standard"
|
||||
se_per_gpu = 4
|
||||
pipes_per_gpu = 4
|
||||
cu_per_gpu = 64
|
||||
simd_per_cu = 4
|
||||
sqc_per_gpu = 16
|
||||
lds_banks_per_cu = 32
|
||||
cur_sclk = 1800.0
|
||||
cur_mclk = 1200.0
|
||||
max_sclk = 2100.0
|
||||
max_mclk = 1600.0
|
||||
max_waves_per_cu = 40
|
||||
num_hbm_channels = 4
|
||||
total_l2_chan = 32
|
||||
num_xcd = 1
|
||||
wave_size = 64
|
||||
|
||||
sys_info = MockSysInfo()
|
||||
|
||||
raw_pmc_df = {
|
||||
'pmc_perf': pd.DataFrame({
|
||||
'SQ_WAVES': [100, 200, 150],
|
||||
'GRBM_GUI_ACTIVE': [1000, 2000, 1500],
|
||||
'End_Timestamp': [1000000, 2000000, 1500000],
|
||||
'Start_Timestamp': [0, 1000000, 500000]
|
||||
})
|
||||
}
|
||||
|
||||
try:
|
||||
eval_metric(mock_dfs, mock_dfs_type, sys_info, raw_pmc_df, debug=True)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_filter_combinations_coverage(binary_handler_analyze_rocprof_compute):
|
||||
"""Test basic filters that should work"""
|
||||
for dir in ["tests/workloads/vcopy/MI100", "tests/workloads/vcopy/MI200"]:
|
||||
if os.path.exists(dir):
|
||||
workload_dir = test_utils.setup_workload_dir(dir)
|
||||
|
||||
code = binary_handler_analyze_rocprof_compute(
|
||||
["analyze", "--path", workload_dir]
|
||||
)
|
||||
assert code == 0
|
||||
|
||||
code = binary_handler_analyze_rocprof_compute(
|
||||
["analyze", "--path", workload_dir, "--block", "SQ"]
|
||||
)
|
||||
assert code == 0
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_apply_filters_direct():
|
||||
"""Test apply_filters function directly to cover filter branches"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import apply_filters
|
||||
import pandas as pd
|
||||
|
||||
class MockWorkload:
|
||||
def __init__(self):
|
||||
self.raw_pmc = pd.DataFrame({
|
||||
('pmc_perf', 'GPU_ID'): [0, 0, 1, 1],
|
||||
('pmc_perf', 'Kernel_Name'): ['vecCopy', 'vecAdd', 'vecCopy', 'vecMul'],
|
||||
('pmc_perf', 'Dispatch_ID'): [0, 1, 2, 3],
|
||||
('pmc_perf', 'Node'): ['node0', 'node0', 'node1', 'node1']
|
||||
})
|
||||
self.raw_pmc.columns = pd.MultiIndex.from_tuples(self.raw_pmc.columns)
|
||||
|
||||
filter_nodes = None
|
||||
filter_gpu_ids = None
|
||||
filter_kernel_ids = None
|
||||
filter_dispatch_ids = None
|
||||
|
||||
workload = MockWorkload()
|
||||
|
||||
workload.filter_gpu_ids = "0"
|
||||
result = apply_filters(workload, "/tmp", False, False)
|
||||
assert len(result) == 2
|
||||
|
||||
workload.filter_gpu_ids = None
|
||||
workload.filter_kernel_ids = ["vecCopy"]
|
||||
result = apply_filters(workload, "/tmp", False, False)
|
||||
assert len(result) == 2
|
||||
|
||||
workload.filter_kernel_ids = None
|
||||
workload.filter_dispatch_ids = ["0", "1"]
|
||||
result = apply_filters(workload, "/tmp", False, False)
|
||||
assert len(result) == 2
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_missing_files_scenarios(binary_handler_analyze_rocprof_compute):
|
||||
"""Test scenarios with missing files to cover error paths"""
|
||||
import tempfile
|
||||
import shutil
|
||||
|
||||
for dir in ["tests/workloads/vcopy/MI100", "tests/workloads/vcopy/MI200"]:
|
||||
if os.path.exists(dir):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
workload_dir = os.path.join(temp_dir, "incomplete_workload")
|
||||
shutil.copytree(dir, workload_dir)
|
||||
|
||||
csv_files = ["pmc_perf_1.csv", "pmc_perf_2.csv", "timestamps.csv"]
|
||||
for csv_file in csv_files:
|
||||
csv_path = os.path.join(workload_dir, csv_file)
|
||||
if os.path.exists(csv_path):
|
||||
os.remove(csv_path)
|
||||
|
||||
code = binary_handler_analyze_rocprof_compute(
|
||||
["analyze", "--path", workload_dir]
|
||||
)
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_pc_sampling_basic_coverage():
|
||||
"""Test PC sampling functions with minimal data"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import load_pc_sampling_data, search_pc_sampling_record
|
||||
import tempfile
|
||||
|
||||
class MockWorkload:
|
||||
filter_kernel_ids = []
|
||||
|
||||
workload = MockWorkload()
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
result = load_pc_sampling_data(workload, temp_dir, "none", "count")
|
||||
assert result.empty
|
||||
|
||||
result = load_pc_sampling_data(workload, temp_dir, "missing", "count")
|
||||
assert result.empty
|
||||
|
||||
workload.filter_kernel_ids = [0, 1, 2] # Multiple kernels
|
||||
result = load_pc_sampling_data(workload, temp_dir, "test", "count")
|
||||
assert result.empty
|
||||
|
||||
result = search_pc_sampling_record([])
|
||||
assert result is None
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_build_dfs_edge_cases():
|
||||
"""Test build_dfs and gen_counter_list with various configurations"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import gen_counter_list
|
||||
|
||||
visited, counters = gen_counter_list(None)
|
||||
assert not visited
|
||||
assert counters == []
|
||||
|
||||
visited, counters = gen_counter_list(123)
|
||||
assert not visited
|
||||
assert counters == []
|
||||
|
||||
visited, counters = gen_counter_list("AVG(SQ_WAVES + TCC_HIT)")
|
||||
assert visited
|
||||
assert "SQ_WAVES" in counters
|
||||
assert "TCC_HIT" in counters
|
||||
|
||||
visited, counters = gen_counter_list("Start_Timestamp + End_Timestamp")
|
||||
assert visited
|
||||
|
||||
visited, counters = gen_counter_list("INVALID SYNTAX !!!")
|
||||
assert not visited
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_update_functions_coverage():
|
||||
"""Test update_denom_string and update_normUnit_string branches"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import update_denom_string, update_normUnit_string
|
||||
|
||||
result = update_denom_string("AVG(SQ_WAVES / $denom)", "per_wave")
|
||||
assert "$denom" not in result
|
||||
assert "SQ_WAVES" in result
|
||||
|
||||
result = update_denom_string("AVG(DATA / $denom)", "per_cycle")
|
||||
assert "$GRBM_GUI_ACTIVE_PER_XCD" in result
|
||||
|
||||
result = update_denom_string("AVG(DATA / $denom)", "per_second")
|
||||
assert "End_Timestamp - Start_Timestamp" in result
|
||||
|
||||
result = update_denom_string("AVG(DATA / $denom)", "unsupported_unit")
|
||||
assert "$denom" in result
|
||||
|
||||
result = update_normUnit_string("(Prefix + $normUnit)", "per_wave")
|
||||
assert "per wave" in result.lower()
|
||||
assert result[0].isupper()
|
||||
@@ -0,0 +1,386 @@
|
||||
##############################################################################bl
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2025 Advanced Micro Devices, Inc. All Rights Reserved.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
##############################################################################el
|
||||
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import shutil
|
||||
import sys
|
||||
import logging
|
||||
from unittest.mock import Mock, patch, MagicMock, call
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
|
||||
logging.TRACE = logging.DEBUG - 5
|
||||
logging.addLevelName(logging.TRACE, "TRACE")
|
||||
def trace_logger(message, *args, **kwargs):
|
||||
logging.log(logging.TRACE, message, *args, **kwargs)
|
||||
setattr(logging, "trace", trace_logger)
|
||||
|
||||
from db_connector import DatabaseConnector
|
||||
|
||||
"""
|
||||
Tests for the DatabaseConnector class that tests almost methods with initialization,
|
||||
CSV import, database removal, and error handling.
|
||||
The tests use mocks instead of a real MongoDB server for speed and reliability.
|
||||
"""
|
||||
|
||||
class TestDatabaseConnector:
|
||||
|
||||
@pytest.fixture
|
||||
def mock_args_import(self):
|
||||
"""Mock arguments for import operation"""
|
||||
args = Mock()
|
||||
args.username = "test_user"
|
||||
args.password = "test_pass"
|
||||
args.host = "localhost"
|
||||
args.port = 27017
|
||||
args.team = "test_team"
|
||||
args.workload = "/app/tests/workloads/device_filter/MI100"
|
||||
args.upload = True
|
||||
args.remove = False
|
||||
args.kernel_verbose = False
|
||||
return args
|
||||
|
||||
@pytest.fixture
|
||||
def mock_args_remove(self):
|
||||
"""Mock arguments for remove operation"""
|
||||
args = Mock()
|
||||
args.username = "test_user"
|
||||
args.password = "test_pass"
|
||||
args.host = "localhost"
|
||||
args.port = 27017
|
||||
args.team = "test_team"
|
||||
args.workload = "rocprofiler-compute_test_team_workload_mi100"
|
||||
args.upload = False
|
||||
args.remove = True
|
||||
args.kernel_verbose = False
|
||||
return args
|
||||
|
||||
def test_init(self, mock_args_import):
|
||||
"""Test DatabaseConnector initialization"""
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
assert connector.args == mock_args_import
|
||||
assert isinstance(connector.cache, dict)
|
||||
assert len(connector.cache) == 0
|
||||
|
||||
expected_connection_info = {
|
||||
"username": "test_user",
|
||||
"password": "test_pass",
|
||||
"host": "localhost",
|
||||
"port": "27017",
|
||||
"team": "test_team",
|
||||
"workload": "/app/tests/workloads/device_filter/MI100",
|
||||
"db": None,
|
||||
}
|
||||
assert connector.connection_info == expected_connection_info
|
||||
assert connector.interaction_type is None
|
||||
assert connector.client is None
|
||||
|
||||
@patch('db_connector.pd.read_csv')
|
||||
@patch('db_connector.Path')
|
||||
def test_prep_import_success(self, mock_path, mock_read_csv, mock_args_import):
|
||||
"""Test successful prep_import"""
|
||||
# Setup mocks
|
||||
mock_path.return_value.joinpath.return_value = "/fake/path/sysinfo.csv"
|
||||
mock_path.return_value.is_file.return_value = True
|
||||
|
||||
mock_sysinfo = pd.DataFrame({
|
||||
'gpu_model': ['MI100 '],
|
||||
'workload_name': [' test_workload']
|
||||
})
|
||||
mock_read_csv.return_value = mock_sysinfo
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
connector.prep_import()
|
||||
|
||||
expected_db = "rocprofiler-compute_test_team_test_workload_MI100"
|
||||
assert connector.connection_info["db"] == expected_db
|
||||
|
||||
@patch('db_connector.pd.read_csv')
|
||||
@patch('db_connector.Path')
|
||||
def test_prep_import_missing_file(self, mock_path, mock_read_csv, mock_args_import):
|
||||
"""Test prep_import when sysinfo.csv is missing"""
|
||||
mock_path.return_value.joinpath.return_value = "/fake/path/sysinfo.csv"
|
||||
mock_path.return_value.is_file.return_value = False
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)) as mock_console_error:
|
||||
with pytest.raises(SystemExit):
|
||||
connector.prep_import()
|
||||
|
||||
mock_console_error.assert_called_with(
|
||||
"database", "Unable to parse SoC and/or workload name from sysinfo.csv"
|
||||
)
|
||||
|
||||
@patch('db_connector.pd.read_csv')
|
||||
@patch('db_connector.Path')
|
||||
def test_prep_import_key_error(self, mock_path, mock_read_csv, mock_args_import):
|
||||
"""Test prep_import when required fields are missing"""
|
||||
mock_path.return_value.joinpath.return_value = "/fake/path/sysinfo.csv"
|
||||
mock_path.return_value.is_file.return_value = True
|
||||
|
||||
mock_sysinfo = pd.DataFrame({'other_column': ['value']})
|
||||
mock_read_csv.return_value = mock_sysinfo
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)) as mock_console_error:
|
||||
with pytest.raises(SystemExit):
|
||||
connector.prep_import()
|
||||
|
||||
assert mock_console_error.called
|
||||
error_call = mock_console_error.call_args[0][0]
|
||||
assert "Outdated workload" in error_call
|
||||
|
||||
@patch('db_connector.tqdm')
|
||||
@patch('db_connector.os.listdir')
|
||||
@patch('db_connector.console_log')
|
||||
@patch('db_connector.console_warning')
|
||||
@patch('db_connector.kernel_name_shortener')
|
||||
@patch('db_connector.MongoClient')
|
||||
@patch('db_connector.pd.read_csv')
|
||||
def test_db_import_success(self, mock_read_csv, mock_mongo_client, mock_kernel_shortener,
|
||||
mock_console_warning, mock_console_log, mock_listdir,
|
||||
mock_tqdm, mock_args_import):
|
||||
"""Test successful database import"""
|
||||
mock_listdir.return_value = ['test_data.csv', 'empty_file.csv', 'non_csv.txt']
|
||||
mock_tqdm.return_value = mock_listdir.return_value
|
||||
|
||||
test_df = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
|
||||
mock_read_csv.side_effect = [
|
||||
test_df,
|
||||
pd.errors.EmptyDataError()
|
||||
]
|
||||
|
||||
mock_client_instance = MagicMock()
|
||||
mock_db = MagicMock()
|
||||
mock_collection = MagicMock()
|
||||
mock_workload_db = MagicMock()
|
||||
mock_workload_col = MagicMock()
|
||||
|
||||
mock_mongo_client.return_value = mock_client_instance
|
||||
mock_client_instance.__getitem__.side_effect = lambda x: {
|
||||
'rocprofiler-compute_test_team_test_workload_MI100': mock_db,
|
||||
'workload_names': mock_workload_db
|
||||
}.get(x, mock_db)
|
||||
mock_db.__getitem__.return_value = mock_collection
|
||||
mock_workload_db.__getitem__.return_value = mock_workload_col
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
connector.connection_info["workload"] = "/fake/workload/path"
|
||||
connector.client = mock_client_instance
|
||||
|
||||
with patch.object(connector, 'prep_import') as mock_prep:
|
||||
mock_prep.return_value = None
|
||||
connector.connection_info["db"] = "rocprofiler-compute_test_team_test_workload_MI100"
|
||||
|
||||
connector.db_import()
|
||||
|
||||
mock_collection.insert_many.assert_called_once()
|
||||
mock_workload_col.replace_one.assert_called_once()
|
||||
|
||||
@patch('db_connector.console_log')
|
||||
def test_db_remove_success(self, mock_console_log, mock_args_remove):
|
||||
"""Test successful database removal"""
|
||||
mock_client = MagicMock()
|
||||
mock_db_to_remove = MagicMock()
|
||||
mock_workload_names_db = MagicMock()
|
||||
mock_names_col = MagicMock()
|
||||
|
||||
mock_client.__getitem__.side_effect = lambda x: {
|
||||
'rocprofiler-compute_test_team_workload_mi100': mock_db_to_remove,
|
||||
'workload_names': mock_workload_names_db
|
||||
}[x]
|
||||
mock_workload_names_db.__getitem__.return_value = mock_names_col
|
||||
mock_db_to_remove.list_collection_names.return_value = ['col1', 'col2']
|
||||
|
||||
connector = DatabaseConnector(mock_args_remove)
|
||||
connector.client = mock_client
|
||||
|
||||
connector.db_remove()
|
||||
|
||||
mock_client.drop_database.assert_called_once_with(mock_db_to_remove)
|
||||
mock_names_col.delete_many.assert_called_once_with(
|
||||
{"name": "rocprofiler-compute_test_team_workload_mi100"}
|
||||
)
|
||||
|
||||
def test_pre_processing_no_action_specified(self, mock_args_import):
|
||||
"""Test pre_processing when neither upload nor remove is specified"""
|
||||
mock_args_import.upload = False
|
||||
mock_args_import.remove = False
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
def test_pre_processing_remove_invalid_workload_name(self, mock_args_remove):
|
||||
"""Test pre_processing remove with invalid workload name"""
|
||||
mock_args_remove.workload = "invalid_name"
|
||||
|
||||
connector = DatabaseConnector(mock_args_remove)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
def test_pre_processing_remove_missing_host_username(self, mock_args_remove):
|
||||
"""Test pre_processing remove with missing host/username"""
|
||||
mock_args_remove.host = None
|
||||
mock_args_remove.username = None
|
||||
|
||||
connector = DatabaseConnector(mock_args_remove)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
def test_pre_processing_remove_protected_database(self, mock_args_remove):
|
||||
"""Test pre_processing remove with protected database names"""
|
||||
mock_args_remove.workload = "admin"
|
||||
|
||||
connector = DatabaseConnector(mock_args_remove)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
@patch('db_connector.Path')
|
||||
@patch('db_connector.is_workload_empty')
|
||||
@patch('db_connector.getpass.getpass')
|
||||
@patch('db_connector.console_log')
|
||||
@patch('db_connector.MongoClient')
|
||||
def test_pre_processing_import_password_prompt_success(self, mock_mongo_client, mock_console_log,
|
||||
mock_getpass, mock_is_workload_empty,
|
||||
mock_path, mock_args_import):
|
||||
"""Test pre_processing import with password prompt success"""
|
||||
mock_args_import.password = ""
|
||||
mock_getpass.return_value = "prompted_password"
|
||||
|
||||
mock_path.return_value.absolute.return_value.is_dir.return_value = True
|
||||
mock_path.return_value.absolute.return_value.resolve.return_value = "/resolved/path"
|
||||
|
||||
mock_client_instance = MagicMock()
|
||||
mock_mongo_client.return_value = mock_client_instance
|
||||
mock_client_instance.server_info.return_value = {}
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
connector.pre_processing()
|
||||
|
||||
mock_getpass.assert_called_once()
|
||||
mock_console_log.assert_called_with("database", "Password received")
|
||||
|
||||
@patch('db_connector.Path')
|
||||
@patch('db_connector.is_workload_empty')
|
||||
@patch('db_connector.MongoClient')
|
||||
def test_pre_processing_import_connection_failure(self, mock_mongo_client, mock_is_workload_empty,
|
||||
mock_path, mock_args_import):
|
||||
"""Test pre_processing import with MongoDB connection failure"""
|
||||
mock_path.return_value.absolute.return_value.is_dir.return_value = True
|
||||
mock_path.return_value.absolute.return_value.resolve.return_value = "/resolved/path"
|
||||
|
||||
mock_client_instance = MagicMock()
|
||||
mock_mongo_client.return_value = mock_client_instance
|
||||
mock_client_instance.server_info.side_effect = Exception("Connection failed")
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
@patch('db_connector.Path')
|
||||
@patch('db_connector.is_workload_empty')
|
||||
def test_pre_processing_import_missing_required_fields(self, mock_is_workload_empty, mock_path, mock_args_import):
|
||||
"""Test pre_processing import with missing required fields"""
|
||||
mock_args_import.host = None
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
@patch('db_connector.Path')
|
||||
def test_pre_processing_import_invalid_workload_path(self, mock_path, mock_args_import):
|
||||
"""Test pre_processing import with invalid workload path"""
|
||||
mock_path.return_value.absolute.return_value.is_dir.return_value = False
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
def test_pre_processing_import_team_name_too_long(self, mock_args_import):
|
||||
"""Test pre_processing import with team name exceeding limit"""
|
||||
mock_args_import.team = "this_team_name_is_way_too_long"
|
||||
|
||||
connector = DatabaseConnector(mock_args_import)
|
||||
|
||||
with patch('db_connector.console_error', side_effect=SystemExit(1)):
|
||||
with pytest.raises(SystemExit):
|
||||
connector.pre_processing()
|
||||
|
||||
|
||||
class TestDatabaseConnectorIntegration:
|
||||
"""Simple integration test"""
|
||||
|
||||
@patch('db_connector.Path')
|
||||
@patch('db_connector.pd.read_csv')
|
||||
def test_prep_import_with_real_workload_path(self, mock_read_csv, mock_path):
|
||||
"""Test prep_import with actual workload path structure"""
|
||||
args = Mock()
|
||||
args.username = "test_user"
|
||||
args.password = "test_pass"
|
||||
args.host = "localhost"
|
||||
args.port = 27017
|
||||
args.team = "test_team"
|
||||
args.workload = "/app/tests/workloads/device_filter/MI100"
|
||||
args.upload = True
|
||||
args.remove = False
|
||||
args.kernel_verbose = False
|
||||
|
||||
mock_path.return_value.joinpath.return_value = "/app/tests/workloads/device_filter/MI100/sysinfo.csv"
|
||||
mock_path.return_value.is_file.return_value = True
|
||||
|
||||
mock_sysinfo = pd.DataFrame({
|
||||
'gpu_model': ['MI100'],
|
||||
'workload_name': ['device_filter']
|
||||
})
|
||||
mock_read_csv.return_value = mock_sysinfo
|
||||
|
||||
connector = DatabaseConnector(args)
|
||||
connector.prep_import()
|
||||
|
||||
expected_db = "rocprofiler-compute_test_team_device_filter_MI100"
|
||||
assert connector.connection_info["db"] == expected_db
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
+189
-1
@@ -25,8 +25,13 @@
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import pytest
|
||||
import yaml
|
||||
import tempfile
|
||||
import os
|
||||
from importlib.machinery import SourceFileLoader
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import patch, mock_open, MagicMock
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
@@ -195,3 +200,186 @@ def test_num_xcds_cli_output():
|
||||
|
||||
assert compute_partition_actual is not None
|
||||
assert int(num_xcd_actual) == num_xcds.get(compute_partition_actual.lower(), -1)
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_load_yaml_file_not_found():
|
||||
"""Test _load_yaml with non-existent file - covers lines 104-105"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
non_existent_path = "/path/that/does/not/exist/file.yaml"
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs._load_yaml(non_existent_path)
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_load_yaml_invalid_yaml():
|
||||
"""Test _load_yaml with corrupted YAML - covers lines 106-107"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
f.write('invalid: yaml: content: [\nunclosed bracket')
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs._load_yaml(temp_path)
|
||||
finally:
|
||||
os.unlink(temp_path)
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_load_yaml_generic_exception():
|
||||
"""Test _load_yaml generic exception handling - covers lines 108-111"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch('builtins.open', side_effect=PermissionError("Access denied")):
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs._load_yaml("some_file.yaml")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_gpu_series_dict_uninitialized():
|
||||
"""Test get_gpu_series_dict when dict not populated - covers lines 182-185"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_gpu_series_dict', {}):
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs.get_gpu_series_dict()
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_gpu_series_uninitialized():
|
||||
"""Test get_gpu_series when dict not populated - covers lines 191-194"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_gpu_series_dict', {}):
|
||||
with pytest.raises(SystemExit):
|
||||
result = MIGPUSpecs.get_gpu_series("gfx942")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_perfmon_config_uninitialized():
|
||||
"""Test get_perfmon_config when dict not populated - covers lines 210-213"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_perfmon_config', {}):
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs.get_perfmon_config("gfx942")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_gpu_model_uninitialized():
|
||||
"""Test get_gpu_model when dict not populated - covers lines 223-226"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_gpu_model_dict', {}):
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs.get_gpu_model("gfx942", "29857")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_gpu_model_invalid_chip_id():
|
||||
"""Test get_gpu_model with invalid chip_id - covers lines 235-236"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_gpu_model("gfx942", "99999")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_gpu_model_invalid_arch():
|
||||
"""Test get_gpu_model with invalid architecture - covers lines 243-244"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_gpu_model("gfx999", "12345")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_gpu_model_none_result():
|
||||
"""Test get_gpu_model when result is None - covers lines 246-248"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_chip_id_dict', {999: None}):
|
||||
result = MIGPUSpecs.get_gpu_model("gfx942", "999")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_no_compute_partition_data():
|
||||
"""Test get_num_xcds when no compute partition data found - covers lines 307-309"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
mock_dict = {"gfx942": None}
|
||||
with patch.object(MIGPUSpecs, '_gpu_arch_to_compute_partition_dict', mock_dict):
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx942")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_uninitialized_dict():
|
||||
"""Test get_num_xcds when XCD dict not populated - covers lines 315-317"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_num_xcds_dict', {}):
|
||||
with pytest.raises(SystemExit):
|
||||
MIGPUSpecs.get_num_xcds(gpu_arch="gfx950", gpu_model="MI350")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_unknown_gpu_model():
|
||||
"""Test get_num_xcds with unknown gpu model - covers lines 319-321"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx950", gpu_model="UNKNOWN_MODEL")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_no_compute_partition():
|
||||
"""Test get_num_xcds with no compute partition - covers lines 325-327"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx950", gpu_model="MI350", compute_partition="")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_unknown_compute_partition():
|
||||
"""Test get_num_xcds with unknown compute partition - covers lines 329-332"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx950", gpu_model="MI350", compute_partition="UNKNOWN")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_none_partition_value():
|
||||
"""Test get_num_xcds when partition value is None - covers lines 338-340"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
mock_dict = {"mi350": {"spx": None}}
|
||||
with patch.object(MIGPUSpecs, '_num_xcds_dict', mock_dict):
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx950", gpu_model="MI350", compute_partition="spx")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_no_gpu_model():
|
||||
"""Test get_num_xcds with no gpu model - covers line 342"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx950", gpu_model="", compute_partition="spx")
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_chip_id_dict_empty():
|
||||
"""Test get_chip_id_dict when dict is empty - covers line 352"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_chip_id_dict', {}):
|
||||
with patch('src.utils.mi_gpu_spec.console_error') as mock_error:
|
||||
result = MIGPUSpecs.get_chip_id_dict()
|
||||
mock_error.assert_called_once()
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_get_num_xcds_dict_empty():
|
||||
"""Test get_num_xcds_dict when dict is empty - covers line 359"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
with patch.object(MIGPUSpecs, '_num_xcds_dict', {}):
|
||||
with patch('src.utils.mi_gpu_spec.console_error') as mock_error:
|
||||
result = MIGPUSpecs.get_num_xcds_dict()
|
||||
mock_error.assert_called_once()
|
||||
@pytest.mark.misc
|
||||
def test_normal_functionality_still_works():
|
||||
"""Ensure that normal paths still work after adding error handling tests"""
|
||||
from src.utils.mi_gpu_spec import MIGPUSpecs
|
||||
|
||||
result = MIGPUSpecs.get_gpu_model("gfx906", None)
|
||||
assert result is not None
|
||||
|
||||
result = MIGPUSpecs.get_gpu_series("gfx906")
|
||||
assert result is not None
|
||||
|
||||
result = MIGPUSpecs.get_num_xcds(gpu_arch="gfx906")
|
||||
assert result == 1
|
||||
@@ -28,6 +28,7 @@ import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from importlib.machinery import SourceFileLoader
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
@@ -596,17 +597,16 @@ def test_roof_kernel_names(binary_handler_profile_rocprof_compute):
|
||||
# assert successful run
|
||||
assert returncode == 0
|
||||
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
|
||||
|
||||
if soc == "MI100":
|
||||
assert sorted(list(file_dict.keys())) == ALL_CSVS_MI100
|
||||
else:
|
||||
assert sorted(list(file_dict.keys())) == sorted(
|
||||
(
|
||||
[f for f in ROOF_ONLY_FILES if f != "timestamps.csv"]
|
||||
if using_v3()
|
||||
else ROOF_ONLY_FILES
|
||||
)
|
||||
+ ["kernelName_legend.pdf"]
|
||||
)
|
||||
expected_files = (
|
||||
[f for f in ROOF_ONLY_FILES if f != "timestamps.csv"]
|
||||
if using_v3()
|
||||
else ROOF_ONLY_FILES
|
||||
) + ["kernelName_legend.pdf"]
|
||||
assert sorted(list(file_dict.keys())) == sorted(expected_files)
|
||||
|
||||
validate(
|
||||
inspect.stack()[0][3],
|
||||
@@ -617,6 +617,456 @@ def test_roof_kernel_names(binary_handler_profile_rocprof_compute):
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_multiple_data_types(binary_handler_profile_rocprof_compute):
|
||||
"""Test roofline with multiple data types"""
|
||||
if soc in ("MI100"):
|
||||
# roofline is not supported on MI100
|
||||
pytest.skip("Roofline not supported on MI100")
|
||||
return
|
||||
|
||||
# test multiple data types
|
||||
data_types = ["FP32"] # start with just FP32 to avoid complex validation
|
||||
|
||||
for dtype in data_types:
|
||||
options = ["--device", "0", "--roof-only", "--kernel-names",
|
||||
"--roofline-data-type", dtype]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
try:
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
if returncode == 0:
|
||||
assert os.path.exists(f"{workload_dir}/pmc_perf.csv")
|
||||
|
||||
file_dict = test_utils.check_csv_files(workload_dir, 1, num_kernels)
|
||||
expected_files = (
|
||||
[f for f in ROOF_ONLY_FILES if f != "timestamps.csv"]
|
||||
if using_v3()
|
||||
else ROOF_ONLY_FILES
|
||||
) + ["kernelName_legend.pdf"]
|
||||
assert sorted(list(file_dict.keys())) == sorted(expected_files)
|
||||
else:
|
||||
pass
|
||||
finally:
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_invalid_data_type(binary_handler_profile_rocprof_compute):
|
||||
"""Test roofline with invalid data type"""
|
||||
if soc in ("MI100"):
|
||||
# roofline is not supported on MI100
|
||||
pytest.skip("Roofline not supported on MI100")
|
||||
return
|
||||
|
||||
# test invalid data types
|
||||
invalid_options = ["--device", "0", "--roof-only", "--kernel-names",
|
||||
"--roofline-data-type", "INVALID_TYPE"]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
try:
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, invalid_options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
assert returncode >= 0
|
||||
|
||||
finally:
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_file_validation(binary_handler_profile_rocprof_compute):
|
||||
"""Test file validation paths in roofline"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Roofline not supported on MI100")
|
||||
return
|
||||
|
||||
options = ["--device", "0", "--roof-only"]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
try:
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
if returncode == 0:
|
||||
assert os.path.exists(f"{workload_dir}/pmc_perf.csv")
|
||||
|
||||
roofline_csv = f"{workload_dir}/roofline.csv"
|
||||
if os.path.exists(roofline_csv):
|
||||
import pandas as pd
|
||||
df = pd.read_csv(roofline_csv)
|
||||
assert len(df) >= 0
|
||||
|
||||
finally:
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_kernel_names_validation_error(binary_handler_profile_rocprof_compute):
|
||||
"""
|
||||
Test validate_parameters() error: --roof-only is required for --kernel-names
|
||||
This should trigger console_error("--roof-only is required for --kernel-names")
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
# roofline is not supported on MI100
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
options = ["--device", "0", "--kernel-names"] # missing --roof-only
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
assert returncode != 0
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_workload_dir_not_set_error():
|
||||
"""
|
||||
Test roof_setup() error: "Workload directory is not set. Cannot perform setup."
|
||||
This covers lines 113-117
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
try:
|
||||
from roofline import Roofline
|
||||
from utils.specs import generate_machine_specs
|
||||
|
||||
class MockArgs:
|
||||
def __init__(self):
|
||||
self.roof_only = True
|
||||
self.kernel_names = False
|
||||
self.mem_level = "ALL"
|
||||
self.sort = "ALL"
|
||||
self.roofline_data_type = ["FP32"]
|
||||
|
||||
args = MockArgs()
|
||||
mspec = generate_machine_specs(None)
|
||||
|
||||
run_parameters = {
|
||||
"workload_dir": None,
|
||||
"device_id": 0,
|
||||
"sort_type": "kernels",
|
||||
"mem_level": "ALL",
|
||||
"include_kernel_names": False,
|
||||
"is_standalone": True,
|
||||
"roofline_data_type": ["FP32"],
|
||||
}
|
||||
|
||||
roofline_instance = Roofline(args, mspec, run_parameters)
|
||||
|
||||
from io import StringIO
|
||||
import contextlib
|
||||
|
||||
captured_output = StringIO()
|
||||
|
||||
with contextlib.redirect_stderr(captured_output):
|
||||
try:
|
||||
roofline_instance.roof_setup()
|
||||
except SystemExit:
|
||||
pass
|
||||
|
||||
assert True
|
||||
|
||||
except ImportError:
|
||||
pytest.skip("Could not import roofline module for direct testing")
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_workload_dir_validation(binary_handler_profile_rocprof_compute):
|
||||
if soc in ("MI100"):
|
||||
assert True
|
||||
return
|
||||
|
||||
options = ["--device", "0", "--roof-only"]
|
||||
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
assert returncode == 0
|
||||
|
||||
nested_dir = os.path.join(workload_dir, "nested", "structure")
|
||||
os.makedirs(nested_dir, exist_ok=True)
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, nested_dir, options, check_success=False, roof=True
|
||||
)
|
||||
assert returncode == 0
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_empty_kernel_names_handling(binary_handler_profile_rocprof_compute):
|
||||
"""
|
||||
Test empirical_roofline() when num_kernels == 0
|
||||
This should trigger the "No kernel names found" log message
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
options = [
|
||||
"--device", "0",
|
||||
"--roof-only",
|
||||
"--kernel-names",
|
||||
"--kernel", "nonexistent_kernel_name_that_should_not_match_anything"
|
||||
]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_unsupported_datatype_error(binary_handler_profile_rocprof_compute):
|
||||
"""
|
||||
Test datatype validation error in empirical_roofline()
|
||||
This should trigger console_error for unsupported datatype
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
options = [
|
||||
"--device", "0",
|
||||
"--roof-only",
|
||||
"--roofline-data-type", "UNSUPPORTED_TYPE"
|
||||
]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_plot_modes(binary_handler_profile_rocprof_compute):
|
||||
if soc in ("MI100"):
|
||||
assert True
|
||||
return
|
||||
|
||||
plot_configurations = [
|
||||
{
|
||||
"options": ["--device", "0", "--roof-only", "--roofline-data-type", "FP32"],
|
||||
"expected_files": ["empirRoof_gpu-0_FP32.pdf"]
|
||||
},
|
||||
{
|
||||
"options": ["--device", "0", "--roof-only", "--roofline-data-type", "FP16"],
|
||||
"expected_files": ["empirRoof_gpu-0_FP16.pdf"]
|
||||
},
|
||||
{
|
||||
"options": ["--device", "0", "--roof-only", "--kernel-names"],
|
||||
"expected_files": ["kernelName_legend.pdf"]
|
||||
}
|
||||
]
|
||||
|
||||
for config_test in plot_configurations:
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, config_test["options"],
|
||||
check_success=False, roof=True
|
||||
)
|
||||
assert returncode == 0
|
||||
|
||||
for expected_file in config_test["expected_files"]:
|
||||
expected_path = os.path.join(workload_dir, expected_file)
|
||||
if os.path.exists(expected_path):
|
||||
assert os.path.getsize(expected_path) > 0
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_cli_plot_generation(binary_handler_profile_rocprof_compute):
|
||||
if soc in ("MI100"):
|
||||
assert True
|
||||
return
|
||||
|
||||
try:
|
||||
import plotext as plt
|
||||
cli_available = True
|
||||
except ImportError:
|
||||
cli_available = False
|
||||
|
||||
if cli_available:
|
||||
options = ["--device", "0", "--roof-only"]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
else:
|
||||
pytest.skip("plotext not available for CLI testing")
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roof_error_handling(binary_handler_profile_rocprof_compute):
|
||||
if soc in ("MI100"):
|
||||
assert True
|
||||
return
|
||||
|
||||
options = ["--device", "0", "--roof-only"]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
pmc_perf_path = os.path.join(workload_dir, "pmc_perf.csv")
|
||||
if os.path.exists(pmc_perf_path):
|
||||
os.remove(pmc_perf_path)
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_missing_file_handling(binary_handler_profile_rocprof_compute):
|
||||
"""
|
||||
Test handling of missing roofline.csv file
|
||||
This should trigger error message in cli_generate_plot()
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
try:
|
||||
from roofline import Roofline
|
||||
from utils.specs import generate_machine_specs
|
||||
|
||||
class MockArgs:
|
||||
def __init__(self):
|
||||
self.roof_only = True
|
||||
self.kernel_names = False
|
||||
self.mem_level = "ALL"
|
||||
self.sort = "ALL"
|
||||
self.roofline_data_type = ["FP32"]
|
||||
|
||||
args = MockArgs()
|
||||
mspec = generate_machine_specs(None)
|
||||
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
run_parameters = {
|
||||
"workload_dir": workload_dir,
|
||||
"device_id": 0,
|
||||
"sort_type": "kernels",
|
||||
"mem_level": "ALL",
|
||||
"include_kernel_names": False,
|
||||
"is_standalone": True,
|
||||
"roofline_data_type": ["FP32"],
|
||||
}
|
||||
|
||||
roofline_instance = Roofline(args, mspec, run_parameters)
|
||||
|
||||
result = roofline_instance.cli_generate_plot("FP32")
|
||||
|
||||
assert result is None
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
except ImportError:
|
||||
pytest.skip("Could not import roofline module for direct testing")
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_invalid_datatype_cli(binary_handler_profile_rocprof_compute):
|
||||
"""
|
||||
Test CLI plot generation with invalid datatype
|
||||
This should trigger error in cli_generate_plot() lines 617-624
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
try:
|
||||
from roofline import Roofline
|
||||
from utils.specs import generate_machine_specs
|
||||
|
||||
class MockArgs:
|
||||
def __init__(self):
|
||||
self.roof_only = True
|
||||
self.kernel_names = False
|
||||
self.mem_level = "ALL"
|
||||
self.sort = "ALL"
|
||||
self.roofline_data_type = ["FP32"]
|
||||
|
||||
args = MockArgs()
|
||||
mspec = generate_machine_specs(None)
|
||||
|
||||
run_parameters = {
|
||||
"workload_dir": test_utils.get_output_dir(),
|
||||
"device_id": 0,
|
||||
"sort_type": "kernels",
|
||||
"mem_level": "ALL",
|
||||
"include_kernel_names": False,
|
||||
"is_standalone": True,
|
||||
"roofline_data_type": ["FP32"],
|
||||
}
|
||||
|
||||
roofline_instance = Roofline(args, mspec, run_parameters)
|
||||
|
||||
result = roofline_instance.cli_generate_plot("INVALID_DATATYPE")
|
||||
|
||||
assert result is None
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], run_parameters["workload_dir"])
|
||||
|
||||
except ImportError:
|
||||
pytest.skip("Could not import roofline module for direct testing")
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_roofline_ceiling_data_validation(binary_handler_profile_rocprof_compute):
|
||||
"""
|
||||
Test ceiling data validation in generate_plot()
|
||||
This covers error handling in lines 516-526
|
||||
"""
|
||||
if soc in ("MI100"):
|
||||
pytest.skip("Skipping roofline test for MI100")
|
||||
return
|
||||
|
||||
options = ["--device", "0", "--roof-only", "--mem-level", "INVALID_LEVEL"]
|
||||
workload_dir = test_utils.get_output_dir()
|
||||
|
||||
returncode = binary_handler_profile_rocprof_compute(
|
||||
config, workload_dir, options, check_success=False, roof=True
|
||||
)
|
||||
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_device_filter(binary_handler_profile_rocprof_compute):
|
||||
options = ["--device", "0"]
|
||||
@@ -2193,3 +2643,33 @@ def test_list_metrics(binary_handler_profile_rocprof_compute):
|
||||
# workload dir should be empty
|
||||
assert not os.listdir(workload_dir)
|
||||
test_utils.clean_output_dir(config["cleanup"], workload_dir)
|
||||
|
||||
@pytest.mark.misc
|
||||
def test_comprehensive_error_paths():
|
||||
"""Simplified test for error path coverage"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from utils.parser import build_comparable_columns, calc_builtin_var, build_eval_string
|
||||
|
||||
columns = build_comparable_columns("ms")
|
||||
expected = ["Count(ms)", "Sum(ms)", "Mean(ms)", "Median(ms)", "Standard Deviation(ms)"]
|
||||
for expected_col in expected:
|
||||
assert expected_col in columns
|
||||
|
||||
class MockSysInfo:
|
||||
total_l2_chan = 16
|
||||
|
||||
sys_info = MockSysInfo()
|
||||
result = calc_builtin_var(42, sys_info)
|
||||
assert result == 42
|
||||
|
||||
result = calc_builtin_var("$total_l2_chan", sys_info)
|
||||
assert result == 16
|
||||
|
||||
try:
|
||||
build_eval_string("test", None)
|
||||
assert False, "Should raise exception for None coll_level"
|
||||
except Exception as e:
|
||||
assert "coll_level can not be None" in str(e)
|
||||
|
||||
+1140
-101
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user