Functional Tests for Ext-Profiler Plugin (#2007)

* Add functional tests for CSV Tuner Plugin

* Updated directory structure

* Updated and renamed directories

* Updated csv conf files

* Added tests for ext-profiler

* Updated readme

* Updated readme
Этот коммит содержится в:
Kapil S. Pawar
2025-11-18 11:20:39 -06:00
коммит произвёл GitHub
родитель 461e61d10e
Коммит c7f400dbff
11 изменённых файлов: 3096 добавлений и 17 удалений
+3
Просмотреть файл
@@ -18,3 +18,6 @@ venv/
# Ignore build artifacts
build/
# Ignore profiler dump folders
profiler_dumps/
+65 -12
Просмотреть файл
@@ -1,8 +1,12 @@
# RCCL CSV Tuner Plugin Tests
# RCCL Plugin Tests
## Description
This directory contains automated tests for the RCCL (ROCm Communication Collectives Library) CSV Tuner Plugin. The test suite validates the functionality of the CSV-based tuning plugin across different collective operations (AllReduce, Broadcast, Reduce, AllGather, and ReduceScatter) and various configuration scenarios.
This directory contains automated tests for RCCL (ROCm Communication Collectives Library) plugins:
1. **CSV Tuner Plugin**: Validates the functionality of the CSV-based tuning plugin across different collective operations (AllReduce, Broadcast, Reduce, AllGather, and ReduceScatter) and various configuration scenarios.
2. **Profiler Plugin**: Validates the profiler plugin that captures detailed runtime events for collective and P2P operations, including Group, Collective, P2P, ProxyOp, ProxyStep, and GPU kernel events.
The tests are written in Python using the pytest framework, making it easy to run, maintain, and extend the test coverage.
@@ -16,6 +20,7 @@ ext-plugins/
├── .gitignore # Git ignore rules for Python/pytest artifacts
├── venv/ # Python virtual environment (created after setup)
├── logs/ # Test execution logs and output files
├── profiler_dumps/ # Profiler plugin output (JSON trace files)
├── assets/ # Test configuration files and assets
│ └── csv_confs/ # CSV configuration files for testing
│ ├── incorrect_values_config.conf
@@ -27,12 +32,20 @@ ext-plugins/
│ └── valid_config_without_wildcards.conf
└── tests/ # Test suite directory
├── conftest.py # Pytest fixtures and shared test configuration
── ext-tuner/ # CSV Tuner Plugin specific tests
── ext-tuner/ # CSV Tuner Plugin specific tests
│ ├── test_allreduce.py
│ ├── test_broadcast.py
│ ├── test_reduce.py
│ ├── test_allgather.py
│ └── test_reducescatter.py
└── ext-profiler/ # Profiler Plugin specific tests
├── test_allreduce.py
├── test_broadcast.py
├── test_reduce.py
├── test_allgather.py
── test_reducescatter.py
── test_alltoall.py
├── test_reducescatter.py
└── test_sendrecv.py
```
## Installation & Setup
@@ -52,17 +65,21 @@ RCCL_TESTS_DIR = "path/to/rccl-tests"
Replace these placeholder paths with your actual installation directories before running the tests.
### Building the RCCL CSV Tuner Plugin
### Building the RCCL Plugins
Before running the tests, you need to build the RCCL CSV tuner plugin library. The plugin is located in the `ext-tuner/example` directory.
Before running the tests, you need to build the RCCL plugin libraries.
#### Step 1: Navigate to the plugin directory
#### Building the CSV Tuner Plugin
The CSV tuner plugin is located in the `ext-tuner/example` directory.
**Step 1: Navigate to the plugin directory**
```bash
cd rccl/ext-tuner/example
```
#### Step 2: Build the plugin
**Step 2: Build the plugin**
```bash
make
@@ -70,6 +87,31 @@ make
This will compile the plugin and create `libnccl-tuner-example.so` in the same directory.
#### Building the Profiler Plugin
The profiler plugin is located in the `ext-plugins/example` directory.
**Step 1: Navigate to the plugin directory**
```bash
cd rccl/test/ext-plugins/example
```
**Step 2: Build the plugin**
```bash
make
```
This will compile the plugin and create `libnccl-profiler.so` in the same directory. The profiler plugin captures detailed runtime events including:
- **Group events**: High-level operation grouping
- **Collective events**: AllReduce, Broadcast, Reduce, ReduceScatter operations
- **P2P events**: Send, Recv, AllGather, AllToAll operations
- **ProxyOp events**: Network proxy operations (ScheduleSend, ScheduleRecv, ProgressSend, ProgressRecv)
- **ProxyStep events**: Detailed network steps (SendWait, RecvWait, GPU waits)
- **ProxyCtrl events**: Proxy thread control (Append, Sleep)
- **GPU events**: Kernel channel execution
### Step 1: Create Virtual Environment
Create a Python virtual environment to isolate the test dependencies:
@@ -82,7 +124,7 @@ python3 -m venv venv
Activate the virtual environment using the appropriate command for your shell:
**On Linux:**
**On Linux/Mac (bash/zsh):**
```bash
source venv/bin/activate
```
@@ -119,18 +161,27 @@ pytest -v --cache-clear
Tests are organized using pytest markers. You can run specific groups of tests:
**Run CSV Plugin tests:**
**Run plugin-specific tests:**
```bash
pytest -m mark.ext_tuner --cache-clear
pytest -m ext_tuner --cache-clear # CSV Tuner Plugin tests only
pytest -m ext_profiler --cache-clear # Profiler Plugin tests only
```
**Run tests for specific collective operations:**
**Run tests for specific collective operations (across all plugins):**
```bash
pytest -m allreduce --cache-clear # AllReduce tests
pytest -m broadcast --cache-clear # Broadcast tests
pytest -m reduce --cache-clear # Reduce tests
pytest -m allgather --cache-clear # AllGather tests
pytest -m reducescatter --cache-clear # ReduceScatter tests
pytest -m alltoall --cache-clear # AllToAll tests (profiler only)
pytest -m sendrecv --cache-clear # SendRecv tests (profiler only)
```
**Combine markers to run specific tests:**
```bash
pytest -m "ext_profiler and allreduce" --cache-clear # Profiler AllReduce tests only
pytest -m "ext_tuner and broadcast" --cache-clear # Tuner Broadcast tests only
```
### Run Tests with Log Output
@@ -157,3 +208,5 @@ pytest --verbose --tb=short
```
- **Log Files**: Test execution logs are stored in the `logs/` directory for later review and debugging.
- **Profiler Output**: Profiler plugin tests generate JSON trace files in the `profiler_dumps/` directory. These files contain detailed event traces that can be analyzed for debugging or performance analysis. The directory is automatically cleaned before each test session by the pytest fixture.
+6 -2
Просмотреть файл
@@ -2,11 +2,15 @@
[pytest]
markers =
ext_tuner: marks tests related to CSV Tuner Plugin
ext_profiler: marks tests related to Profiler Plugin
allreduce: marks tests related to AllReduce collective
broadcast: marks tests related to Broadcast collective
reduce: marks tests related to Reduce collective
allgather: marks tests related to AllGather collective
reducescatter: marks tests related to ReduceScatter collective
multinode: marks tests related to multi-node configuration
alltoall: marks tests related to AllToAll collective
sendrecv: marks tests related to SendRecv collective
multinode: marks tests related to multi-node operations
testpaths = tests
testpaths = tests
addopts = --import-mode=importlib
+117 -3
Просмотреть файл
@@ -9,6 +9,8 @@ import pytest
import subprocess
import re
from types import SimpleNamespace
import json
import glob
WORKDIR = os.getcwd()
@@ -16,10 +18,14 @@ RCCL_INSTALL_DIR = "path/to/rccl"
OMPI_INSTALL_DIR = "path/to/ompi/install"
RCCL_TESTS_DIR = "path/to/rccl-tests"
# Plugin Paths
# Ext-Tuner Paths
PLUGIN_DIR = f"{RCCL_INSTALL_DIR}/ext-tuner/example"
PLUGIN_SO = f"{PLUGIN_DIR}/libnccl-tuner-example.so"
# Ext-Profiler Paths
PROFILER_DIR = f"{RCCL_INSTALL_DIR}/ext-profiler/example"
PROFILER_SO = f"{PROFILER_DIR}/librccl-profiler.so"
# CSV Configs
VALID_CONFIG_WITH_WILDCARDS = os.path.join(WORKDIR, "assets/csv_confs/valid_config_with_wildcards.conf")
VALID_CONFIG_WITHOUT_WILDCARDS = os.path.join(WORKDIR, "assets/csv_confs/valid_config_without_wildcards.conf")
@@ -32,6 +38,9 @@ MULTINODE_CONFIG = os.path.join(WORKDIR, "assets/csv_confs/multinode_config.conf
LOGDIR = os.path.join(WORKDIR, "logs")
os.makedirs(LOGDIR, exist_ok=True)
PROFILER_DUMP_DIR = os.path.join(WORKDIR, "profiler_dumps")
os.makedirs(PROFILER_DUMP_DIR, exist_ok=True)
# Helper Functions
def get_avg_bus_bandwidth(log_content: str):
"""Extract average bus bandwidth from RCCL test log"""
@@ -81,6 +90,74 @@ def get_available_nodes():
except (subprocess.CalledProcessError, FileNotFoundError):
return []
def validate_json_trace(trace_file):
"""Validate that a trace file is valid JSON and follows chrome trace format"""
if not os.path.exists(trace_file):
return False, "File does not exist"
try:
with open(trace_file, 'r') as f:
data = json.load(f)
# Basic validation for chrome trace format
if not isinstance(data, list):
return False, "Trace must be a JSON array"
# Check for at least one valid event
if len(data) > 1: # More than just the closing empty object
# Validate some events have required fields
valid_events = 0
for event in data:
if isinstance(event, dict) and 'name' in event and 'ph' in event:
valid_events += 1
if valid_events == 0:
return False, "No valid trace events found"
return True, f"Valid JSON trace with {len(data)} entries"
except json.JSONDecodeError as e:
return False, f"Invalid JSON: {e}"
except Exception as e:
return False, f"Error reading file: {e}"
def check_event_in_log(log_file, event_string):
"""Check if a specific event string appears in the log file"""
if not os.path.exists(log_file):
return False
with open(log_file, 'r') as f:
content = f.read()
return event_string in content
def count_events_in_trace(trace_file, event_name=None, category=None):
"""Count events in a trace file, optionally filtered by name or category"""
if not os.path.exists(trace_file):
return 0
try:
with open(trace_file, 'r') as f:
data = json.load(f)
count = 0
for event in data:
if not isinstance(event, dict):
continue
match = True
if event_name and event.get('name') != event_name:
match = False
if category and event.get('cat') != category:
match = False
if match and 'name' in event: # Valid event
count += 1
return count
except:
return 0
# Pytest Fixture
@pytest.fixture(scope="session")
def paths():
@@ -92,6 +169,8 @@ def paths():
PLUGIN_DIR=PLUGIN_DIR,
PLUGIN_SO=PLUGIN_SO,
RCCL_TESTS_DIR=RCCL_TESTS_DIR,
PROFILER_DIR=PROFILER_DIR,
PROFILER_SO=PROFILER_SO,
# CSV Configs
VALID_CONFIG_WITH_WILDCARDS=VALID_CONFIG_WITH_WILDCARDS,
VALID_CONFIG_WITHOUT_WILDCARDS=VALID_CONFIG_WITHOUT_WILDCARDS,
@@ -101,9 +180,44 @@ def paths():
SINGLENODE_CONFIG=SINGLENODE_CONFIG,
MULTINODE_CONFIG=MULTINODE_CONFIG,
LOGDIR=LOGDIR,
# Helper Functions
PROFILER_DUMP_DIR=PROFILER_DUMP_DIR,
# Helper Functions for Ext-Tuner
get_avg_bus_bandwidth=get_avg_bus_bandwidth,
check_node_interface=check_node_interface,
find_common_interface=find_common_interface,
get_available_nodes=get_available_nodes,
)
# Helper Functions for Ext-Profiler
validate_json_trace=validate_json_trace,
check_event_in_log=check_event_in_log,
count_events_in_trace=count_events_in_trace,
)
def pytest_runtest_setup(item):
"""Check plugin availability before running each test"""
# Check for ext_tuner marker
if item.get_closest_marker("ext_tuner"):
if not os.path.exists(PLUGIN_SO):
pytest.skip(f"Tuner plugin library not found at: {PLUGIN_SO}")
# Check for ext_profiler marker
if item.get_closest_marker("ext_profiler"):
if not os.path.exists(PROFILER_SO):
pytest.skip(f"Profiler plugin library not found at: {PROFILER_SO}")
@pytest.fixture(scope="session", autouse=True)
def clear_profiler_dump(request):
"""Automatically clear profiler dump folder once before ext_profiler tests"""
# Check if any test in the session has ext_profiler marker
has_profiler_tests = any(
item.get_closest_marker("ext_profiler")
for item in request.session.items
)
if has_profiler_tests:
# Clear all JSON files in the profiler dump directory (including subdirectories)
pattern = os.path.join(PROFILER_DUMP_DIR, "**", "*.json")
for trace_file in glob.glob(pattern, recursive=True):
try:
os.remove(trace_file)
except OSError:
pass # Ignore errors if file doesn't exist or can't be removed
+414
Просмотреть файл
@@ -0,0 +1,414 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import glob
import pytest
@pytest.mark.ext_profiler
@pytest.mark.allgather
def test_profiler_initialization(paths):
"""Test profiler functionality with AllGather operations."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"AllGather test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for AllGather collective events
allgather_events = paths.count_events_in_trace(trace_file, event_name="AllGather")
assert allgather_events > 0, f"Should have AllGather events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.allgather
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"AllGather test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or Collective events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
allgather_events = paths.count_events_in_trace(trace_file, event_name="AllGather")
assert allgather_events == 0, f"Should have no AllGather events with mask=0 in {trace_file}, found {allgather_events}"
@pytest.mark.ext_profiler
@pytest.mark.allgather
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node AllGather using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--bind-to", "none",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed AllGather profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for P2P events
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, \
f"Should have P2P events (AllGather uses Send/Recv) in {trace_file}, found {p2p_events}"
# Verify Send and Recv events
send_events = paths.count_events_in_trace(trace_file, event_name="Send")
recv_events = paths.count_events_in_trace(trace_file, event_name="Recv")
assert send_events > 0, f"Should have Send events in {trace_file}, found {send_events}"
assert recv_events > 0, f"Should have Recv events in {trace_file}, found {recv_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.allgather
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node AllGather operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if not nodelist:
pytest.skip("Multinode test requires SLURM allocation")
if len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allgather_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_gather_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allgather_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node AllGather profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per AllGather call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for P2P events
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, \
f"Should have P2P events (AllGather uses Send/Recv) in {trace_file}, found {p2p_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# Check for Send and Recv operations
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events > 0 or schedule_recv_events > 0, \
f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}"
# Verify NET (ProxyStep) events exist
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events > 0, \
f"Should have NET events in {trace_file}, found {net_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
+416
Просмотреть файл
@@ -0,0 +1,416 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import pytest
import glob
@pytest.mark.ext_profiler
@pytest.mark.allreduce
def test_profiler_initialization(paths):
"""Test profiler functionality with AllReduce operations."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"AllReduce profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for AllReduce events
allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce")
assert allreduce_events > 0, f"Should have AllReduce events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.allreduce
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"AllReduce test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized (it should still initialize)
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or Collective events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce")
assert allreduce_events == 0, f"Should have no AllReduce events with mask=0 in {trace_file}, found {allreduce_events}"
@pytest.mark.ext_profiler
@pytest.mark.allreduce
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node AllReduce using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed AllReduce profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for AllReduce events
allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce")
assert allreduce_events > 0, \
f"Should have AllReduce events in {trace_file}, found {allreduce_events}"
# With KernelCh enabled (bit 6), we should see GPU kernel channel events
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# With ProxyCtrl enabled (bit 5), we should see Append/Sleep events
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.allreduce
@pytest.mark.multinode
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node AllReduce operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if len(nodelist) == 0:
pytest.skip("No nodes available")
elif len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, but only {len(nodelist)} available: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "allreduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/all_reduce_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "allreduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node AllReduce profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per AllReduce call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for AllReduce events
allreduce_events = paths.count_events_in_trace(trace_file, event_name="AllReduce")
assert allreduce_events > 0, \
f"Should have AllReduce events in {trace_file}, found {allreduce_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# With ProxyOp enabled (bit 3), check for Send and Recv operations
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events > 0, \
f"Should have ScheduleSend events in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events > 0, \
f"Should have ScheduleRecv events in {trace_file}, found {schedule_recv_events}"
# With ProxyStep enabled (bit 4), verify network step events exist
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events > 0, \
f"Should have NET events in {trace_file}, found {net_events}"
# Check for specific ProxyStep events
recv_wait_events = paths.count_events_in_trace(trace_file, event_name="RecvWait")
send_wait_events = paths.count_events_in_trace(trace_file, event_name="SendWait")
assert recv_wait_events > 0, \
f"Should have RecvWait events in {trace_file}, found {recv_wait_events}"
assert send_wait_events > 0, \
f"Should have SendWait events in {trace_file}, found {send_wait_events}"
# With KernelCh enabled (bit 6), we should see GPU kernel channel events
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# With ProxyCtrl enabled (bit 5), we should see Append/Sleep events
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
+420
Просмотреть файл
@@ -0,0 +1,420 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import glob
import pytest
@pytest.mark.ext_profiler
@pytest.mark.alltoall
def test_profiler_initialization(paths):
"""Test profiler functionality with AllToAll operations."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "7", # Group (1) + Coll (2) + P2P (4) = 7 (AllToAll is P2P)
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"AllToAll test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for P2P events (AllToAll is implemented as Send/Recv P2P operations)
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, f"Should have P2P events (AllToAll uses Send/Recv) in {trace_file}"
# Verify Send and Recv events exist
send_events = paths.count_events_in_trace(trace_file, event_name="Send")
recv_events = paths.count_events_in_trace(trace_file, event_name="Recv")
assert send_events > 0, f"Should have Send events in {trace_file}"
assert recv_events > 0, f"Should have Recv events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.alltoall
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"AllToAll test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or Collective events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
alltoall_events = paths.count_events_in_trace(trace_file, event_name="AllToAll")
assert alltoall_events == 0, f"Should have no AllToAll events with mask=0 in {trace_file}, found {alltoall_events}"
@pytest.mark.ext_profiler
@pytest.mark.alltoall
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node AllToAll using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--bind-to", "none",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed AllToAll profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for P2P events (AllToAll is implemented as Send/Recv operations)
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, \
f"Should have P2P events (AllToAll uses Send/Recv) in {trace_file}, found {p2p_events}"
# Verify Send and Recv events
send_events = paths.count_events_in_trace(trace_file, event_name="Send")
recv_events = paths.count_events_in_trace(trace_file, event_name="Recv")
assert send_events > 0, f"Should have Send events in {trace_file}, found {send_events}"
assert recv_events > 0, f"Should have Recv events in {trace_file}, found {recv_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.alltoall
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node AllToAll operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if not nodelist:
pytest.skip("Multinode test requires SLURM allocation")
if len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "alltoall_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/alltoall_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "alltoall_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node AllToAll profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per AllToAll call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for P2P events (AllToAll is implemented as Send/Recv operations)
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, \
f"Should have P2P events (AllToAll uses Send/Recv) in {trace_file}, found {p2p_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# Check for Send and Recv operations
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events > 0 or schedule_recv_events > 0, \
f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}"
# Verify NET (ProxyStep) events exist
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events > 0, \
f"Should have NET events in {trace_file}, found {net_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
+408
Просмотреть файл
@@ -0,0 +1,408 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import glob
import pytest
@pytest.mark.ext_profiler
@pytest.mark.broadcast
def test_profiler_initialization(paths):
"""Test profiler functionality with Broadcast operations."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Broadcast test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for Broadcast events
broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast")
assert broadcast_events > 0, f"Should have Broadcast events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.broadcast
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Broadcast test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or Collective events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast")
assert broadcast_events == 0, f"Should have no Broadcast events with mask=0 in {trace_file}, found {broadcast_events}"
@pytest.mark.ext_profiler
@pytest.mark.broadcast
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node Broadcast using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--bind-to", "none",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed Broadcast profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for Broadcast events
broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast")
assert broadcast_events > 0, \
f"Should have Broadcast events in {trace_file}, found {broadcast_events}"
# With KernelCh enabled (bit 6), we should see GPU kernel channel events
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# With ProxyCtrl enabled (bit 5), we should see Append/Sleep events
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.broadcast
@pytest.mark.multinode
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node Broadcast operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if not nodelist:
pytest.skip("Multinode test requires SLURM allocation")
if len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "broadcast_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_GROUP_POOL_SIZE": "128",
"NCCL_PROFILE_COLL_POOL_SIZE": "128",
"NCCL_PROFILE_P2P_POOL_SIZE": "2048",
"NCCL_PROFILE_PROXY_CTRL_POOL_SIZE": "256",
"NCCL_PROFILE_PROXY_DETACH_POOL_SIZE": "1024",
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/broadcast_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "broadcast_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node Broadcast profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per Broadcast call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for Broadcast events
broadcast_events = paths.count_events_in_trace(trace_file, event_name="Broadcast")
assert broadcast_events > 0, \
f"Should have Broadcast events in {trace_file}, found {broadcast_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# With ProxyStep enabled (bit 4), verify network step events exist
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events > 0, \
f"Should have NET events in {trace_file}, found {net_events}"
# With KernelCh enabled (bit 6), we should see GPU kernel channel events
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# With ProxyCtrl enabled (bit 5), we should see Append/Sleep events
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
+413
Просмотреть файл
@@ -0,0 +1,413 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import glob
import pytest
@pytest.mark.ext_profiler
@pytest.mark.reduce
def test_profiler_initialization(paths):
"""Test profiler functionality with Reduce operations."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Reduce test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for Reduce collective events
reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce")
assert reduce_events > 0, f"Should have Reduce events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.reduce
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Reduce test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or Collective events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce")
assert reduce_events == 0, f"Should have no Reduce events with mask=0 in {trace_file}, found {reduce_events}"
@pytest.mark.ext_profiler
@pytest.mark.reduce
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node Reduce using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--bind-to", "none",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed Reduce profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for Reduce events
reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce")
assert reduce_events > 0, \
f"Should have Reduce events in {trace_file}, found {reduce_events}"
# With KernelCh enabled (bit 6), we should see GPU kernel channel events
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# With ProxyCtrl enabled (bit 5), we should see Append/Sleep events
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.reduce
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node Reduce operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if not nodelist:
pytest.skip("Multinode test requires SLURM allocation")
if len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reduce_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_GROUP_POOL_SIZE": "128",
"NCCL_PROFILE_COLL_POOL_SIZE": "128",
"NCCL_PROFILE_P2P_POOL_SIZE": "2048",
"NCCL_PROFILE_PROXY_CTRL_POOL_SIZE": "256",
"NCCL_PROFILE_PROXY_DETACH_POOL_SIZE": "1024",
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reduce_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node Reduce profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per Reduce call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for Reduce events
reduce_events = paths.count_events_in_trace(trace_file, event_name="Reduce")
assert reduce_events > 0, \
f"Should have Reduce events in {trace_file}, found {reduce_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# With ProxyOp enabled (bit 3), check for Send and Recv operations
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events > 0 or schedule_recv_events > 0, \
f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}"
# With ProxyStep enabled (bit 4), verify network step events exist
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events > 0, \
f"Should have NET events in {trace_file}, found {net_events}"
# With KernelCh enabled (bit 6), we should see GPU kernel channel events
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# With ProxyCtrl enabled (bit 5), we should see Append/Sleep events
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events (Append or Sleep) in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
+408
Просмотреть файл
@@ -0,0 +1,408 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import glob
import pytest
@pytest.mark.ext_profiler
@pytest.mark.reducescatter
def test_profiler_initialization(paths):
"""Test profiler functionality with ReduceScatter operations."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "3", # Group (1) + Coll (2) = 3
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"ReduceScatter test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for ReduceScatter collective events
reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter")
assert reducescatter_events > 0, f"Should have ReduceScatter events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.reducescatter
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"ReduceScatter test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or Collective events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter")
assert reducescatter_events == 0, f"Should have no ReduceScatter events with mask=0 in {trace_file}, found {reducescatter_events}"
@pytest.mark.ext_profiler
@pytest.mark.reducescatter
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node ReduceScatter using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--bind-to", "none",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed ReduceScatter profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for ReduceScatter events
reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter")
assert reducescatter_events > 0, \
f"Should have ReduceScatter events in {trace_file}, found {reducescatter_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.reducescatter
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node ReduceScatter operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if not nodelist:
pytest.skip("Multinode test requires SLURM allocation")
if len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "reducescatter_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/reduce_scatter_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "reducescatter_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node ReduceScatter profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per ReduceScatter call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for ReduceScatter events
reducescatter_events = paths.count_events_in_trace(trace_file, event_name="ReduceScatter")
assert reducescatter_events > 0, \
f"Should have ReduceScatter events in {trace_file}, found {reducescatter_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# Check for Send and Recv operations
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events > 0 or schedule_recv_events > 0, \
f"Should have ScheduleSend or ScheduleRecv events in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}"
# Verify NET (ProxyStep) events exist
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events > 0, \
f"Should have NET events in {trace_file}, found {net_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
+426
Просмотреть файл
@@ -0,0 +1,426 @@
# *************************************************************************
# * Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
# *
# * See LICENSE.txt for license information
# ************************************************************************
import os
import subprocess
import glob
import pytest
@pytest.mark.ext_profiler
@pytest.mark.sendrecv
def test_profiler_initialization(paths):
"""Test profiler functionality with SendRecv operations.
Note: SendRecv is a P2P operation, so we check for Send/Recv P2P events."""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "profiler_initialization")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "7", # Group (1) + Coll (2) + P2P (4) = 7 (SendRecv is P2P)
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "profiler_initialization.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"SendRecv test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}"
# Check for P2P events (SendRecv is implemented as Send/Recv P2P operations)
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, f"Should have P2P events (SendRecv uses Send/Recv) in {trace_file}"
# Verify Send and Recv events exist
send_events = paths.count_events_in_trace(trace_file, event_name="Send")
recv_events = paths.count_events_in_trace(trace_file, event_name="Recv")
assert send_events > 0, f"Should have Send events in {trace_file}"
assert recv_events > 0, f"Should have Recv events in {trace_file}"
@pytest.mark.ext_profiler
@pytest.mark.sendrecv
def test_invalid_mask_value(paths):
"""Test profiler behavior with invalid event mask (0 = no events)"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "invalid_mask_value_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "0", # Invalid: no events enabled
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "4",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf",
"-b", "1",
"-e", "8M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "invalid_mask_value_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"SendRecv test should still succeed even with invalid mask, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized even with mask=0. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 4, \
f"Should have 4 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file - with mask=0, trace files should be nearly empty
# They should contain valid JSON but no actual profiling events
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} should still be valid JSON: {message}"
# With mask=0, there should be no Group or P2P events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events == 0, f"Should have no Group events with mask=0 in {trace_file}, found {group_events}"
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events == 0, f"Should have no P2P events (SendRecv) with mask=0 in {trace_file}, found {p2p_events}"
@pytest.mark.ext_profiler
@pytest.mark.sendrecv
def test_single_node_detailed_profiling(paths):
"""Test profiler with single-node SendRecv using full event mask (255) across wide message range"""
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "single_node_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", "8",
"--bind-to", "none",
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "single_node_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Single-node detailed SendRecv profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == 8, \
f"Should have 8 trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we capture all event types
# However, single-node behavior differs significantly from multi-node
# Check for Group events
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, \
f"Should have Group events in {trace_file}, found {group_events}"
# Check for P2P events (SendRecv is implemented as Send/Recv operations)
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, \
f"Should have P2P events (SendRecv uses Send/Recv) in {trace_file}, found {p2p_events}"
# Verify Send and Recv events
send_events = paths.count_events_in_trace(trace_file, event_name="Send")
recv_events = paths.count_events_in_trace(trace_file, event_name="Recv")
assert send_events > 0, f"Should have Send events in {trace_file}, found {send_events}"
assert recv_events > 0, f"Should have Recv events in {trace_file}, found {recv_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
proxy_ctrl_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_ctrl_events > 0, \
f"Should have PROXY (ProxyCtrl) events in {trace_file}, found {proxy_ctrl_events}"
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# We should NOT see ProxyOp network events (ScheduleSend/Recv, ProgressSend/Recv)
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
assert schedule_send_events == 0, \
f"Single-node should have NO ScheduleSend events (no network) in {trace_file}, found {schedule_send_events}"
assert schedule_recv_events == 0, \
f"Single-node should have NO ScheduleRecv events (no network) in {trace_file}, found {schedule_recv_events}"
# Should also NOT see ProxyStep network events (RecvWait, SendWait, etc.)
net_events = paths.count_events_in_trace(trace_file, category="NET")
assert net_events == 0, \
f"Single-node should have NO NET (ProxyStep) events in {trace_file}, found {net_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"
@pytest.mark.ext_profiler
@pytest.mark.sendrecv
def test_multinode_detailed_profiling(paths):
"""Test profiler with multi-node SendRecv operations using full event mask (255)"""
# Get available nodes using the shared function
nodelist = paths.get_available_nodes()
# Skip test if no nodes available (SLURM not available) or less than 2 nodes
if not nodelist:
pytest.skip("Multinode test requires SLURM allocation")
if len(nodelist) < 2:
pytest.skip(f"Multinode test requires at least 2 nodes, found {len(nodelist)}: {nodelist}")
# Check for common network interface across all nodes
common_interface = paths.find_common_interface(nodelist)
if common_interface is None:
pytest.skip(f"Multinode test requires all nodes to have the same network interface (eth0 or eth1).")
# Build host specification string (8 processes per node)
host_spec = ",".join([f"{node}:8" for node in nodelist])
total_processes = len(nodelist) * 8
print(f"Using host specification: {host_spec}")
dump_dir = os.path.join(paths.PROFILER_DUMP_DIR, "sendrecv_profiler_dumps")
os.makedirs(dump_dir, exist_ok=True)
dump_file_base = os.path.join(dump_dir, "multinode_detailed_profiling")
# Remove any existing trace files
trace_pattern = f"{dump_file_base}*.json"
for f in glob.glob(trace_pattern):
os.remove(f)
env = os.environ.copy()
env.update({
"PATH": f"{paths.OMPI_INSTALL_DIR}/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"{paths.RCCL_INSTALL_DIR}:{paths.OMPI_INSTALL_DIR}/lib:{paths.PROFILER_DIR}:{env.get('LD_LIBRARY_PATH', '')}",
"HSA_NO_SCRATCH_RECLAIM": "1",
"NCCL_IGNORE_CPU_AFFINITY": "1",
"NCCL_PROFILER_PLUGIN": paths.PROFILER_SO,
"NCCL_PROFILE_EVENT_MASK": "255", # All events: Group (1) + Coll (2) + P2P (4) + ProxyOp (8) + ProxyStep (16) + ProxyCtrl (32) + KernelCh (64) + NetPlugin (128) = 255
"NCCL_PROFILE_DUMP_FILE": dump_file_base,
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": common_interface,
"NCCL_DMABUF_ENABLE": "1",
})
args = [
f"{paths.OMPI_INSTALL_DIR}/bin/mpirun", "-np", f"{total_processes}",
"--host", host_spec,
"--mca", "pml", "ucx",
"--mca", "btl", "^vader,openib",
f"{paths.RCCL_TESTS_DIR}/build/sendrecv_perf",
"-b", "8",
"-e", "128M",
"-f", "2",
"-g", "1",
]
log_dir = os.path.join(paths.LOGDIR, "sendrecv_ext_profiler_test_logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "multinode_detailed_profiling.log")
with open(log_file, "w") as logfile:
result = subprocess.run(
args,
env=env,
stdout=logfile,
stderr=subprocess.STDOUT,
universal_newlines=True
)
assert result.returncode == 0, f"Multi-node SendRecv profiling test failed, see {log_file}"
# Verify plugin initialized
assert paths.check_event_in_log(log_file, "PROFILER/Plugin: init"), \
f"Plugin should have initialized. Check {log_file}"
# Verify trace files were created (one per rank)
trace_files = glob.glob(trace_pattern)
assert len(trace_files) == total_processes, \
f"Should have {total_processes} trace files (one per rank), found {len(trace_files)}: {trace_files}"
# Validate each trace file
for trace_file in trace_files:
is_valid, message = paths.validate_json_trace(trace_file)
assert is_valid, f"Trace file {trace_file} validation failed: {message}"
# With NCCL_PROFILE_EVENT_MASK=255, we should capture all event types
# Check for Group events (one per SendRecv call)
group_events = paths.count_events_in_trace(trace_file, category="GROUP")
assert group_events > 0, f"Should have Group events in {trace_file}, found {group_events}"
# Check for P2P events (SendRecv is implemented as Send/Recv operations)
p2p_events = paths.count_events_in_trace(trace_file, category="P2P")
assert p2p_events > 0, \
f"Should have P2P events (SendRecv uses Send/Recv) in {trace_file}, found {p2p_events}"
# For multi-node tests, verify ProxyOp events exist
proxy_events = paths.count_events_in_trace(trace_file, category="PROXY")
assert proxy_events > 0, \
f"Should have Proxy events in {trace_file}, found {proxy_events}"
# Check for Send and Recv operations (only for ranks with network activity)
# Note: In SendRecv, not all ranks participate in network communication
schedule_send_events = paths.count_events_in_trace(trace_file, event_name="ScheduleSend")
schedule_recv_events = paths.count_events_in_trace(trace_file, event_name="ScheduleRecv")
# Only check if rank has significant proxy activity (> 100 events indicates network participation)
if proxy_events > 100:
assert schedule_send_events > 0 or schedule_recv_events > 0, \
f"Rank with {proxy_events} proxy events should have ScheduleSend or ScheduleRecv in {trace_file}, found Send={schedule_send_events}, Recv={schedule_recv_events}"
# Verify NET (ProxyStep) events exist (only for ranks with network activity)
net_events = paths.count_events_in_trace(trace_file, category="NET")
if proxy_events > 100:
assert net_events > 0, \
f"Rank with {proxy_events} proxy events should have NET events in {trace_file}, found {net_events}"
# Verify GPU kernel channel events exist
kernel_events = paths.count_events_in_trace(trace_file, category="GPU")
assert kernel_events > 0, \
f"Should have GPU (KernelCh) events in {trace_file}, found {kernel_events}"
# Verify ProxyCtrl events exist
append_events = paths.count_events_in_trace(trace_file, event_name="Append")
sleep_events = paths.count_events_in_trace(trace_file, event_name="Sleep")
assert append_events > 0 or sleep_events > 0, \
f"Should have ProxyCtrl events in {trace_file}, found Append={append_events}, Sleep={sleep_events}"
# Verify trace file exists and has content
trace_file_size = os.path.getsize(trace_file)
assert trace_file_size > 0, \
f"Trace file {trace_file} is empty"