diff --git a/test/ext-plugins/.gitignore b/test/ext-plugins/.gitignore index 2a2778dbca..00b72b7b70 100644 --- a/test/ext-plugins/.gitignore +++ b/test/ext-plugins/.gitignore @@ -18,3 +18,6 @@ venv/ # Ignore build artifacts build/ + +# Ignore profiler dump folders +profiler_dumps/ diff --git a/test/ext-plugins/README.md b/test/ext-plugins/README.md index 01b2a68b1e..f35d212cc1 100644 --- a/test/ext-plugins/README.md +++ b/test/ext-plugins/README.md @@ -1,8 +1,12 @@ -# RCCL CSV Tuner Plugin Tests +# RCCL Plugin Tests ## Description -This directory contains automated tests for the RCCL (ROCm Communication Collectives Library) CSV Tuner Plugin. The test suite validates the functionality of the CSV-based tuning plugin across different collective operations (AllReduce, Broadcast, Reduce, AllGather, and ReduceScatter) and various configuration scenarios. +This directory contains automated tests for RCCL (ROCm Communication Collectives Library) plugins: + +1. **CSV Tuner Plugin**: Validates the functionality of the CSV-based tuning plugin across different collective operations (AllReduce, Broadcast, Reduce, AllGather, and ReduceScatter) and various configuration scenarios. + +2. **Profiler Plugin**: Validates the profiler plugin that captures detailed runtime events for collective and P2P operations, including Group, Collective, P2P, ProxyOp, ProxyStep, and GPU kernel events. The tests are written in Python using the pytest framework, making it easy to run, maintain, and extend the test coverage. @@ -16,6 +20,7 @@ ext-plugins/ ├── .gitignore # Git ignore rules for Python/pytest artifacts ├── venv/ # Python virtual environment (created after setup) ├── logs/ # Test execution logs and output files +├── profiler_dumps/ # Profiler plugin output (JSON trace files) ├── assets/ # Test configuration files and assets │ └── csv_confs/ # CSV configuration files for testing │ ├── incorrect_values_config.conf @@ -27,12 +32,20 @@ ext-plugins/ │ └── valid_config_without_wildcards.conf └── tests/ # Test suite directory ├── conftest.py # Pytest fixtures and shared test configuration - └── ext-tuner/ # CSV Tuner Plugin specific tests + ├── ext-tuner/ # CSV Tuner Plugin specific tests + │ ├── test_allreduce.py + │ ├── test_broadcast.py + │ ├── test_reduce.py + │ ├── test_allgather.py + │ └── test_reducescatter.py + └── ext-profiler/ # Profiler Plugin specific tests ├── test_allreduce.py ├── test_broadcast.py ├── test_reduce.py ├── test_allgather.py - └── test_reducescatter.py + ├── test_alltoall.py + ├── test_reducescatter.py + └── test_sendrecv.py ``` ## Installation & Setup @@ -52,17 +65,21 @@ RCCL_TESTS_DIR = "path/to/rccl-tests" Replace these placeholder paths with your actual installation directories before running the tests. -### Building the RCCL CSV Tuner Plugin +### Building the RCCL Plugins -Before running the tests, you need to build the RCCL CSV tuner plugin library. The plugin is located in the `ext-tuner/example` directory. +Before running the tests, you need to build the RCCL plugin libraries. -#### Step 1: Navigate to the plugin directory +#### Building the CSV Tuner Plugin + +The CSV tuner plugin is located in the `ext-tuner/example` directory. + +**Step 1: Navigate to the plugin directory** ```bash cd rccl/ext-tuner/example ``` -#### Step 2: Build the plugin +**Step 2: Build the plugin** ```bash make @@ -70,6 +87,31 @@ make This will compile the plugin and create `libnccl-tuner-example.so` in the same directory. +#### Building the Profiler Plugin + +The profiler plugin is located in the `ext-plugins/example` directory. + +**Step 1: Navigate to the plugin directory** + +```bash +cd rccl/test/ext-plugins/example +``` + +**Step 2: Build the plugin** + +```bash +make +``` + +This will compile the plugin and create `libnccl-profiler.so` in the same directory. The profiler plugin captures detailed runtime events including: +- **Group events**: High-level operation grouping +- **Collective events**: AllReduce, Broadcast, Reduce, ReduceScatter operations +- **P2P events**: Send, Recv, AllGather, AllToAll operations +- **ProxyOp events**: Network proxy operations (ScheduleSend, ScheduleRecv, ProgressSend, ProgressRecv) +- **ProxyStep events**: Detailed network steps (SendWait, RecvWait, GPU waits) +- **ProxyCtrl events**: Proxy thread control (Append, Sleep) +- **GPU events**: Kernel channel execution + ### Step 1: Create Virtual Environment Create a Python virtual environment to isolate the test dependencies: @@ -82,7 +124,7 @@ python3 -m venv venv Activate the virtual environment using the appropriate command for your shell: -**On Linux:** +**On Linux/Mac (bash/zsh):** ```bash source venv/bin/activate ``` @@ -119,18 +161,27 @@ pytest -v --cache-clear Tests are organized using pytest markers. You can run specific groups of tests: -**Run CSV Plugin tests:** +**Run plugin-specific tests:** ```bash -pytest -m mark.ext_tuner --cache-clear +pytest -m ext_tuner --cache-clear # CSV Tuner Plugin tests only +pytest -m ext_profiler --cache-clear # Profiler Plugin tests only ``` -**Run tests for specific collective operations:** +**Run tests for specific collective operations (across all plugins):** ```bash pytest -m allreduce --cache-clear # AllReduce tests pytest -m broadcast --cache-clear # Broadcast tests pytest -m reduce --cache-clear # Reduce tests pytest -m allgather --cache-clear # AllGather tests pytest -m reducescatter --cache-clear # ReduceScatter tests +pytest -m alltoall --cache-clear # AllToAll tests (profiler only) +pytest -m sendrecv --cache-clear # SendRecv tests (profiler only) +``` + +**Combine markers to run specific tests:** +```bash +pytest -m "ext_profiler and allreduce" --cache-clear # Profiler AllReduce tests only +pytest -m "ext_tuner and broadcast" --cache-clear # Tuner Broadcast tests only ``` ### Run Tests with Log Output @@ -157,3 +208,5 @@ pytest --verbose --tb=short ``` - **Log Files**: Test execution logs are stored in the `logs/` directory for later review and debugging. + +- **Profiler Output**: Profiler plugin tests generate JSON trace files in the `profiler_dumps/` directory. These files contain detailed event traces that can be analyzed for debugging or performance analysis. The directory is automatically cleaned before each test session by the pytest fixture. diff --git a/test/ext-plugins/pytest.ini b/test/ext-plugins/pytest.ini index 61ad276a54..cf468b032d 100644 --- a/test/ext-plugins/pytest.ini +++ b/test/ext-plugins/pytest.ini @@ -2,11 +2,15 @@ [pytest] markers = ext_tuner: marks tests related to CSV Tuner Plugin + ext_profiler: marks tests related to Profiler Plugin allreduce: marks tests related to AllReduce collective broadcast: marks tests related to Broadcast collective reduce: marks tests related to Reduce collective allgather: marks tests related to AllGather collective reducescatter: marks tests related to ReduceScatter collective - multinode: marks tests related to multi-node configuration + alltoall: marks tests related to AllToAll collective + sendrecv: marks tests related to SendRecv collective + multinode: marks tests related to multi-node operations -testpaths = tests \ No newline at end of file +testpaths = tests +addopts = --import-mode=importlib diff --git a/test/ext-plugins/tests/conftest.py b/test/ext-plugins/tests/conftest.py index a2b02837c9..2a7f3ef737 100644 --- a/test/ext-plugins/tests/conftest.py +++ b/test/ext-plugins/tests/conftest.py @@ -9,6 +9,8 @@ import pytest import subprocess import re from types import SimpleNamespace +import json +import glob WORKDIR = os.getcwd() @@ -16,10 +18,14 @@ RCCL_INSTALL_DIR = "path/to/rccl" OMPI_INSTALL_DIR = "path/to/ompi/install" RCCL_TESTS_DIR = "path/to/rccl-tests" -# Plugin Paths +# Ext-Tuner Paths PLUGIN_DIR = f"{RCCL_INSTALL_DIR}/ext-tuner/example" PLUGIN_SO = f"{PLUGIN_DIR}/libnccl-tuner-example.so" +# Ext-Profiler Paths +PROFILER_DIR = f"{RCCL_INSTALL_DIR}/ext-profiler/example" +PROFILER_SO = f"{PROFILER_DIR}/librccl-profiler.so" + # CSV Configs VALID_CONFIG_WITH_WILDCARDS = os.path.join(WORKDIR, "assets/csv_confs/valid_config_with_wildcards.conf") VALID_CONFIG_WITHOUT_WILDCARDS = os.path.join(WORKDIR, "assets/csv_confs/valid_config_without_wildcards.conf") @@ -32,6 +38,9 @@ MULTINODE_CONFIG = os.path.join(WORKDIR, "assets/csv_confs/multinode_config.conf LOGDIR = os.path.join(WORKDIR, "logs") os.makedirs(LOGDIR, exist_ok=True) +PROFILER_DUMP_DIR = os.path.join(WORKDIR, "profiler_dumps") +os.makedirs(PROFILER_DUMP_DIR, exist_ok=True) + # Helper Functions def get_avg_bus_bandwidth(log_content: str): """Extract average bus bandwidth from RCCL test log""" @@ -81,6 +90,74 @@ def get_available_nodes(): except (subprocess.CalledProcessError, FileNotFoundError): return [] +def validate_json_trace(trace_file): + """Validate that a trace file is valid JSON and follows chrome trace format""" + + if not os.path.exists(trace_file): + return False, "File does not exist" + + try: + with open(trace_file, 'r') as f: + data = json.load(f) + + # Basic validation for chrome trace format + if not isinstance(data, list): + return False, "Trace must be a JSON array" + + # Check for at least one valid event + if len(data) > 1: # More than just the closing empty object + # Validate some events have required fields + valid_events = 0 + for event in data: + if isinstance(event, dict) and 'name' in event and 'ph' in event: + valid_events += 1 + + if valid_events == 0: + return False, "No valid trace events found" + + return True, f"Valid JSON trace with {len(data)} entries" + except json.JSONDecodeError as e: + return False, f"Invalid JSON: {e}" + except Exception as e: + return False, f"Error reading file: {e}" + +def check_event_in_log(log_file, event_string): + """Check if a specific event string appears in the log file""" + if not os.path.exists(log_file): + return False + + with open(log_file, 'r') as f: + content = f.read() + return event_string in content + +def count_events_in_trace(trace_file, event_name=None, category=None): + """Count events in a trace file, optionally filtered by name or category""" + + if not os.path.exists(trace_file): + return 0 + + try: + with open(trace_file, 'r') as f: + data = json.load(f) + + count = 0 + for event in data: + if not isinstance(event, dict): + continue + + match = True + if event_name and event.get('name') != event_name: + match = False + if category and event.get('cat') != category: + match = False + + if match and 'name' in event: # Valid event + count += 1 + + return count + except: + return 0 + # Pytest Fixture @pytest.fixture(scope="session") def paths(): @@ -92,6 +169,8 @@ def paths(): PLUGIN_DIR=PLUGIN_DIR, PLUGIN_SO=PLUGIN_SO, RCCL_TESTS_DIR=RCCL_TESTS_DIR, + PROFILER_DIR=PROFILER_DIR, + PROFILER_SO=PROFILER_SO, # CSV Configs VALID_CONFIG_WITH_WILDCARDS=VALID_CONFIG_WITH_WILDCARDS, VALID_CONFIG_WITHOUT_WILDCARDS=VALID_CONFIG_WITHOUT_WILDCARDS, @@ -101,9 +180,44 @@ def paths(): SINGLENODE_CONFIG=SINGLENODE_CONFIG, MULTINODE_CONFIG=MULTINODE_CONFIG, LOGDIR=LOGDIR, - # Helper Functions + PROFILER_DUMP_DIR=PROFILER_DUMP_DIR, + # Helper Functions for Ext-Tuner get_avg_bus_bandwidth=get_avg_bus_bandwidth, check_node_interface=check_node_interface, find_common_interface=find_common_interface, get_available_nodes=get_available_nodes, - ) \ No newline at end of file + # Helper Functions for Ext-Profiler + validate_json_trace=validate_json_trace, + check_event_in_log=check_event_in_log, + count_events_in_trace=count_events_in_trace, + ) + +def pytest_runtest_setup(item): + """Check plugin availability before running each test""" + # Check for ext_tuner marker + if item.get_closest_marker("ext_tuner"): + if not os.path.exists(PLUGIN_SO): + pytest.skip(f"Tuner plugin library not found at: {PLUGIN_SO}") + + # Check for ext_profiler marker + if item.get_closest_marker("ext_profiler"): + if not os.path.exists(PROFILER_SO): + pytest.skip(f"Profiler plugin library not found at: {PROFILER_SO}") + +@pytest.fixture(scope="session", autouse=True) +def clear_profiler_dump(request): + """Automatically clear profiler dump folder once before ext_profiler tests""" + # Check if any test in the session has ext_profiler marker + has_profiler_tests = any( + item.get_closest_marker("ext_profiler") + for item in request.session.items + ) + + if has_profiler_tests: + # Clear all JSON files in the profiler dump directory (including subdirectories) + pattern = os.path.join(PROFILER_DUMP_DIR, "**", "*.json") + for trace_file in glob.glob(pattern, recursive=True): + try: + os.remove(trace_file) + except OSError: + pass # Ignore errors if file doesn't exist or can't be removed diff --git a/test/ext-plugins/tests/ext-profiler/test_allgather.py b/test/ext-plugins/tests/ext-profiler/test_allgather.py new file mode 100644 index 0000000000..3ea8dfe86d --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_allgather.py @@ -0,0 +1,414 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import glob +import pytest + +@pytest.mark.ext_profiler +@pytest.mark.allgather +def test_profiler_initialization(paths): + """Test profiler functionality with AllGather operations.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"AllGather test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for AllGather collective events + allgather_events = paths.count_events_in_trace(trace_file, event_name="AllGather") + assert allgather_events > 0, f"Should have AllGather events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.allgather +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"AllGather test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or Collective events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + allgather_events = paths.count_events_in_trace(trace_file, event_name="AllGather") + assert allgather_events == 0, f"Should have no AllGather events with mask=0 in {trace_file}, found {allgather_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.allgather +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node AllGather using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--bind-to", "none", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed AllGather profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for P2P events + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, \ + f"Should have P2P events (AllGather uses Send/Recv) in {trace_file}, found {p2p_events}" + + # Verify Send and Recv events + send_events = paths.count_events_in_trace(trace_file, event_name="Send") + recv_events = paths.count_events_in_trace(trace_file, event_name="Recv") + assert send_events > 0, f"Should have Send events in {trace_file}, found {send_events}" + assert recv_events > 0, f"Should have Recv events in {trace_file}, found {recv_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.allgather +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node AllGather operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if not nodelist: + pytest.skip("Multinode test requires SLURM allocation") + + if len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node AllGather profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per AllGather call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for P2P events + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, \ + f"Should have P2P events (AllGather uses Send/Recv) in {trace_file}, found {p2p_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # Check for Send and Recv operations + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events > 0 or schedule_recv_events > 0, \ + f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}" + + # Verify NET (ProxyStep) events exist + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events > 0, \ + f"Should have NET events in {trace_file}, found {net_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + diff --git a/test/ext-plugins/tests/ext-profiler/test_allreduce.py b/test/ext-plugins/tests/ext-profiler/test_allreduce.py new file mode 100644 index 0000000000..eaa4ac508d --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_allreduce.py @@ -0,0 +1,416 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import pytest +import glob + +@pytest.mark.ext_profiler +@pytest.mark.allreduce +def test_profiler_initialization(paths): + """Test profiler functionality with AllReduce operations.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"AllReduce profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for AllReduce events + allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce") + assert allreduce_events > 0, f"Should have AllReduce events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.allreduce +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"AllReduce test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized (it should still initialize) + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or Collective events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce") + assert allreduce_events == 0, f"Should have no AllReduce events with mask=0 in {trace_file}, found {allreduce_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.allreduce +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node AllReduce using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed AllReduce profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for AllReduce events + allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce") + assert allreduce_events > 0, \ + f"Should have AllReduce events in {trace_file}, found {allreduce_events}" + + # With KernelCh enabled (bit 6), we should see GPU kernel channel events + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # With ProxyCtrl enabled (bit 5), we should see Append/Sleep events + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.allreduce +@pytest.mark.multinode +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node AllReduce operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if len(nodelist) == 0: + pytest.skip("No nodes available") + elif len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, but only {len(nodelist)} available: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node AllReduce profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per AllReduce call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for AllReduce events + allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce") + assert allreduce_events > 0, \ + f"Should have AllReduce events in {trace_file}, found {allreduce_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # With ProxyOp enabled (bit 3), check for Send and Recv operations + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events > 0, \ + f"Should have ScheduleSend events in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events > 0, \ + f"Should have ScheduleRecv events in {trace_file}, found {schedule_recv_events}" + + # With ProxyStep enabled (bit 4), verify network step events exist + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events > 0, \ + f"Should have NET events in {trace_file}, found {net_events}" + + # Check for specific ProxyStep events + recv_wait_events = paths.count_events_in_trace(trace_file, event_name="RecvWait") + send_wait_events = paths.count_events_in_trace(trace_file, event_name="SendWait") + assert recv_wait_events > 0, \ + f"Should have RecvWait events in {trace_file}, found {recv_wait_events}" + assert send_wait_events > 0, \ + f"Should have SendWait events in {trace_file}, found {send_wait_events}" + + # With KernelCh enabled (bit 6), we should see GPU kernel channel events + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # With ProxyCtrl enabled (bit 5), we should see Append/Sleep events + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" \ No newline at end of file diff --git a/test/ext-plugins/tests/ext-profiler/test_alltoall.py b/test/ext-plugins/tests/ext-profiler/test_alltoall.py new file mode 100644 index 0000000000..c2ed845ece --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_alltoall.py @@ -0,0 +1,420 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import glob +import pytest + +@pytest.mark.ext_profiler +@pytest.mark.alltoall +def test_profiler_initialization(paths): + """Test profiler functionality with AllToAll operations.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "7", # Group (1) + Coll (2) + P2P (4) = 7 (AllToAll is P2P) + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"AllToAll test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for P2P events (AllToAll is implemented as Send/Recv P2P operations) + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, f"Should have P2P events (AllToAll uses Send/Recv) in {trace_file}" + + # Verify Send and Recv events exist + send_events = paths.count_events_in_trace(trace_file, event_name="Send") + recv_events = paths.count_events_in_trace(trace_file, event_name="Recv") + assert send_events > 0, f"Should have Send events in {trace_file}" + assert recv_events > 0, f"Should have Recv events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.alltoall +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"AllToAll test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or Collective events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + alltoall_events = paths.count_events_in_trace(trace_file, event_name="AllToAll") + assert alltoall_events == 0, f"Should have no AllToAll events with mask=0 in {trace_file}, found {alltoall_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.alltoall +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node AllToAll using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--bind-to", "none", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed AllToAll profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for P2P events (AllToAll is implemented as Send/Recv operations) + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, \ + f"Should have P2P events (AllToAll uses Send/Recv) in {trace_file}, found {p2p_events}" + + # Verify Send and Recv events + send_events = paths.count_events_in_trace(trace_file, event_name="Send") + recv_events = paths.count_events_in_trace(trace_file, event_name="Recv") + assert send_events > 0, f"Should have Send events in {trace_file}, found {send_events}" + assert recv_events > 0, f"Should have Recv events in {trace_file}, found {recv_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.alltoall +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node AllToAll operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if not nodelist: + pytest.skip("Multinode test requires SLURM allocation") + + if len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node AllToAll profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per AllToAll call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for P2P events (AllToAll is implemented as Send/Recv operations) + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, \ + f"Should have P2P events (AllToAll uses Send/Recv) in {trace_file}, found {p2p_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # Check for Send and Recv operations + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events > 0 or schedule_recv_events > 0, \ + f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}" + + # Verify NET (ProxyStep) events exist + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events > 0, \ + f"Should have NET events in {trace_file}, found {net_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + diff --git a/test/ext-plugins/tests/ext-profiler/test_broadcast.py b/test/ext-plugins/tests/ext-profiler/test_broadcast.py new file mode 100644 index 0000000000..a2604d1117 --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_broadcast.py @@ -0,0 +1,408 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import glob +import pytest + +@pytest.mark.ext_profiler +@pytest.mark.broadcast +def test_profiler_initialization(paths): + """Test profiler functionality with Broadcast operations.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Broadcast test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for Broadcast events + broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast") + assert broadcast_events > 0, f"Should have Broadcast events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.broadcast +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Broadcast test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or Collective events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast") + assert broadcast_events == 0, f"Should have no Broadcast events with mask=0 in {trace_file}, found {broadcast_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.broadcast +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node Broadcast using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--bind-to", "none", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed Broadcast profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for Broadcast events + broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast") + assert broadcast_events > 0, \ + f"Should have Broadcast events in {trace_file}, found {broadcast_events}" + + # With KernelCh enabled (bit 6), we should see GPU kernel channel events + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # With ProxyCtrl enabled (bit 5), we should see Append/Sleep events + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.broadcast +@pytest.mark.multinode +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node Broadcast operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if not nodelist: + pytest.skip("Multinode test requires SLURM allocation") + + if len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_GROUP_POOL_SIZE": "128", + "NCCL_PROFILE_COLL_POOL_SIZE": "128", + "NCCL_PROFILE_P2P_POOL_SIZE": "2048", + "NCCL_PROFILE_PROXY_CTRL_POOL_SIZE": "256", + "NCCL_PROFILE_PROXY_DETACH_POOL_SIZE": "1024", + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node Broadcast profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per Broadcast call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for Broadcast events + broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast") + assert broadcast_events > 0, \ + f"Should have Broadcast events in {trace_file}, found {broadcast_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # With ProxyStep enabled (bit 4), verify network step events exist + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events > 0, \ + f"Should have NET events in {trace_file}, found {net_events}" + + # With KernelCh enabled (bit 6), we should see GPU kernel channel events + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # With ProxyCtrl enabled (bit 5), we should see Append/Sleep events + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + diff --git a/test/ext-plugins/tests/ext-profiler/test_reduce.py b/test/ext-plugins/tests/ext-profiler/test_reduce.py new file mode 100644 index 0000000000..d33bb3e08e --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_reduce.py @@ -0,0 +1,413 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import glob +import pytest + +@pytest.mark.ext_profiler +@pytest.mark.reduce +def test_profiler_initialization(paths): + """Test profiler functionality with Reduce operations.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Reduce test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for Reduce collective events + reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce") + assert reduce_events > 0, f"Should have Reduce events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.reduce +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Reduce test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or Collective events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce") + assert reduce_events == 0, f"Should have no Reduce events with mask=0 in {trace_file}, found {reduce_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.reduce +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node Reduce using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--bind-to", "none", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed Reduce profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for Reduce events + reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce") + assert reduce_events > 0, \ + f"Should have Reduce events in {trace_file}, found {reduce_events}" + + # With KernelCh enabled (bit 6), we should see GPU kernel channel events + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # With ProxyCtrl enabled (bit 5), we should see Append/Sleep events + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.reduce +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node Reduce operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if not nodelist: + pytest.skip("Multinode test requires SLURM allocation") + + if len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_GROUP_POOL_SIZE": "128", + "NCCL_PROFILE_COLL_POOL_SIZE": "128", + "NCCL_PROFILE_P2P_POOL_SIZE": "2048", + "NCCL_PROFILE_PROXY_CTRL_POOL_SIZE": "256", + "NCCL_PROFILE_PROXY_DETACH_POOL_SIZE": "1024", + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node Reduce profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per Reduce call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for Reduce events + reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce") + assert reduce_events > 0, \ + f"Should have Reduce events in {trace_file}, found {reduce_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # With ProxyOp enabled (bit 3), check for Send and Recv operations + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events > 0 or schedule_recv_events > 0, \ + f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}" + + # With ProxyStep enabled (bit 4), verify network step events exist + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events > 0, \ + f"Should have NET events in {trace_file}, found {net_events}" + + # With KernelCh enabled (bit 6), we should see GPU kernel channel events + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # With ProxyCtrl enabled (bit 5), we should see Append/Sleep events + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + diff --git a/test/ext-plugins/tests/ext-profiler/test_reducescatter.py b/test/ext-plugins/tests/ext-profiler/test_reducescatter.py new file mode 100644 index 0000000000..d4777bec76 --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_reducescatter.py @@ -0,0 +1,408 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import glob +import pytest + +@pytest.mark.ext_profiler +@pytest.mark.reducescatter +def test_profiler_initialization(paths): + """Test profiler functionality with ReduceScatter operations.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"ReduceScatter test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for ReduceScatter collective events + reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter") + assert reducescatter_events > 0, f"Should have ReduceScatter events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.reducescatter +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"ReduceScatter test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or Collective events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter") + assert reducescatter_events == 0, f"Should have no ReduceScatter events with mask=0 in {trace_file}, found {reducescatter_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.reducescatter +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node ReduceScatter using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--bind-to", "none", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed ReduceScatter profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for ReduceScatter events + reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter") + assert reducescatter_events > 0, \ + f"Should have ReduceScatter events in {trace_file}, found {reducescatter_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.reducescatter +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node ReduceScatter operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if not nodelist: + pytest.skip("Multinode test requires SLURM allocation") + + if len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node ReduceScatter profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per ReduceScatter call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for ReduceScatter events + reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter") + assert reducescatter_events > 0, \ + f"Should have ReduceScatter events in {trace_file}, found {reducescatter_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # Check for Send and Recv operations + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events > 0 or schedule_recv_events > 0, \ + f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}" + + # Verify NET (ProxyStep) events exist + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events > 0, \ + f"Should have NET events in {trace_file}, found {net_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + diff --git a/test/ext-plugins/tests/ext-profiler/test_sendrecv.py b/test/ext-plugins/tests/ext-profiler/test_sendrecv.py new file mode 100644 index 0000000000..064b8ff882 --- /dev/null +++ b/test/ext-plugins/tests/ext-profiler/test_sendrecv.py @@ -0,0 +1,426 @@ +# ************************************************************************* +# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved. +# * +# * See LICENSE.txt for license information +# ************************************************************************ + +import os +import subprocess +import glob +import pytest + +@pytest.mark.ext_profiler +@pytest.mark.sendrecv +def test_profiler_initialization(paths): + """Test profiler functionality with SendRecv operations. + + Note: SendRecv is a P2P operation, so we check for Send/Recv P2P events.""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "profiler_initialization") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "7", # Group (1) + Coll (2) + P2P (4) = 7 (SendRecv is P2P) + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "profiler_initialization.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"SendRecv test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}" + + # Check for P2P events (SendRecv is implemented as Send/Recv P2P operations) + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, f"Should have P2P events (SendRecv uses Send/Recv) in {trace_file}" + + # Verify Send and Recv events exist + send_events = paths.count_events_in_trace(trace_file, event_name="Send") + recv_events = paths.count_events_in_trace(trace_file, event_name="Recv") + assert send_events > 0, f"Should have Send events in {trace_file}" + assert recv_events > 0, f"Should have Recv events in {trace_file}" + + +@pytest.mark.ext_profiler +@pytest.mark.sendrecv +def test_invalid_mask_value(paths): + """Test profiler behavior with invalid event mask (0 = no events)""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf", + "-b", "1", + "-e", "8M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"SendRecv test should still succeed even with invalid mask, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized even with mask=0. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 4, \ + f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file - with mask=0, trace files should be nearly empty + # They should contain valid JSON but no actual profiling events + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}" + + # With mask=0, there should be no Group or P2P events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}" + + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events == 0, f"Should have no P2P events (SendRecv) with mask=0 in {trace_file}, found {p2p_events}" + + +@pytest.mark.ext_profiler +@pytest.mark.sendrecv +def test_single_node_detailed_profiling(paths): + """Test profiler with single-node SendRecv using full event mask (255) across wide message range""" + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8", + "--bind-to", "none", + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "single_node_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Single-node detailed SendRecv profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == 8, \ + f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we capture all event types + # However, single-node behavior differs significantly from multi-node + + # Check for Group events + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, \ + f"Should have Group events in {trace_file}, found {group_events}" + + # Check for P2P events (SendRecv is implemented as Send/Recv operations) + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, \ + f"Should have P2P events (SendRecv uses Send/Recv) in {trace_file}, found {p2p_events}" + + # Verify Send and Recv events + send_events = paths.count_events_in_trace(trace_file, event_name="Send") + recv_events = paths.count_events_in_trace(trace_file, event_name="Recv") + assert send_events > 0, f"Should have Send events in {trace_file}, found {send_events}" + assert recv_events > 0, f"Should have Recv events in {trace_file}, found {recv_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_ctrl_events > 0, \ + f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}" + + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv) + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + assert schedule_send_events == 0, \ + f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}" + assert schedule_recv_events == 0, \ + f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}" + + # Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.) + net_events = paths.count_events_in_trace(trace_file, category="NET") + assert net_events == 0, \ + f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" + + +@pytest.mark.ext_profiler +@pytest.mark.sendrecv +def test_multinode_detailed_profiling(paths): + """Test profiler with multi-node SendRecv operations using full event mask (255)""" + + # Get available nodes using the shared function + nodelist = paths.get_available_nodes() + + # Skip test if no nodes available (SLURM not available) or less than 2 nodes + if not nodelist: + pytest.skip("Multinode test requires SLURM allocation") + + if len(nodelist) < 2: + pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}") + + # Check for common network interface across all nodes + common_interface = paths.find_common_interface(nodelist) + if common_interface is None: + pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).") + + # Build host specification string (8 processes per node) + host_spec = ",".join([f"{node}:8" for node in nodelist]) + total_processes = len(nodelist) * 8 + print(f"Using host specification: {host_spec}") + + dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps") + os.makedirs(dump_dir, exist_ok=True) + + dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling") + + # Remove any existing trace files + trace_pattern = f"{dump_file_base}*.json" + for f in glob.glob(trace_pattern): + os.remove(f) + + env = os.environ.copy() + env.update({ + "PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}", + "LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}", + "HSA_NO_SCRATCH_RECLAIM": "1", + "NCCL_IGNORE_CPU_AFFINITY": "1", + "NCCL_PROFILER_PLUGIN": paths.PROFILER_SO, + "NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255 + "NCCL_PROFILE_DUMP_FILE": dump_file_base, + "NCCL_DEBUG": "INFO", + "NCCL_SOCKET_IFNAME": common_interface, + "NCCL_DMABUF_ENABLE": "1", + }) + + args = [ + f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}", + "--host", host_spec, + "--mca", "pml", "ucx", + "--mca", "btl", "^vader,openib", + f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf", + "-b", "8", + "-e", "128M", + "-f", "2", + "-g", "1", + ] + + log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, "multinode_detailed_profiling.log") + with open(log_file, "w") as logfile: + result = subprocess.run( + args, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + + assert result.returncode == 0, f"Multi-node SendRecv profiling test failed, see {log_file}" + + # Verify plugin initialized + assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \ + f"Plugin should have initialized. Check {log_file}" + + # Verify trace files were created (one per rank) + trace_files = glob.glob(trace_pattern) + assert len(trace_files) == total_processes, \ + f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}" + + # Validate each trace file + for trace_file in trace_files: + is_valid, message = paths.validate_json_trace(trace_file) + assert is_valid, f"Trace file {trace_file} validation failed: {message}" + + # With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types + + # Check for Group events (one per SendRecv call) + group_events = paths.count_events_in_trace(trace_file, category="GROUP") + assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}" + + # Check for P2P events (SendRecv is implemented as Send/Recv operations) + p2p_events = paths.count_events_in_trace(trace_file, category="P2P") + assert p2p_events > 0, \ + f"Should have P2P events (SendRecv uses Send/Recv) in {trace_file}, found {p2p_events}" + + # For multi-node tests, verify ProxyOp events exist + proxy_events = paths.count_events_in_trace(trace_file, category="PROXY") + assert proxy_events > 0, \ + f"Should have Proxy events in {trace_file}, found {proxy_events}" + + # Check for Send and Recv operations (only for ranks with network activity) + # Note: In SendRecv, not all ranks participate in network communication + schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend") + schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv") + # Only check if rank has significant proxy activity (> 100 events indicates network participation) + if proxy_events > 100: + assert schedule_send_events > 0 or schedule_recv_events > 0, \ + f"Rank with {proxy_events} proxy events should have ScheduleSend or ScheduleRecv in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}" + + # Verify NET (ProxyStep) events exist (only for ranks with network activity) + net_events = paths.count_events_in_trace(trace_file, category="NET") + if proxy_events > 100: + assert net_events > 0, \ + f"Rank with {proxy_events} proxy events should have NET events in {trace_file}, found {net_events}" + + # Verify GPU kernel channel events exist + kernel_events = paths.count_events_in_trace(trace_file, category="GPU") + assert kernel_events > 0, \ + f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}" + + # Verify ProxyCtrl events exist + append_events = paths.count_events_in_trace(trace_file, event_name="Append") + sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep") + assert append_events > 0 or sleep_events > 0, \ + f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}" + + # Verify trace file exists and has content + trace_file_size = os.path.getsize(trace_file) + assert trace_file_size > 0, \ + f"Trace file {trace_file} is empty" +