431 řádky
19 KiB
Python
431 řádky
19 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
NCCL Tuner Configuration Optimizer
|
||
|
|
|
||
|
|
Reads a CSV file containing performance data across different tuning parameters
|
||
|
|
and generates optimal NCCL tuner configurations based on the best performing
|
||
|
|
combinations.
|
||
|
|
|
||
|
|
By default, creates growing size ranges that interpolate between the actual data sizes
|
||
|
|
for each unique dimension (node count, rank count combination). This ensures that
|
||
|
|
different cluster configurations get their own optimized size boundaries, as
|
||
|
|
performance characteristics often vary significantly between topologies.
|
||
|
|
|
||
|
|
Each dimension gets its own set of ranges starting from 0 and extending to the maximum
|
||
|
|
size for that dimension, with boundaries at midpoints between consecutive data sizes.
|
||
|
|
|
||
|
|
CSV Input Format:
|
||
|
|
collective,size_bytes,algorithm,protocol,channels,nodes,ranks,pipeOps,regBuff,bandwidth_gbps,latency_us
|
||
|
|
|
||
|
|
Output Format (NCCL Tuner Config):
|
||
|
|
collective_type,min_bytes,max_bytes,algorithm,protocol,channels,nNodes,nRanks,numPipeOps,regBuff
|
||
|
|
|
||
|
|
Usage Examples:
|
||
|
|
# Auto-create dimension-specific interpolated ranges (default)
|
||
|
|
python3 optimize_config.py data.csv
|
||
|
|
|
||
|
|
# Use custom size ranges (applied to all topologies)
|
||
|
|
python3 optimize_config.py data.csv --size-ranges "0-1024,1025-65536,65537-1048576"
|
||
|
|
|
||
|
|
# Use hardcoded default ranges (applied to all topologies)
|
||
|
|
python3 optimize_config.py data.csv --no-auto-ranges
|
||
|
|
"""
|
||
|
|
|
||
|
|
import csv
|
||
|
|
import argparse
|
||
|
|
import sys
|
||
|
|
import os
|
||
|
|
from collections import defaultdict
|
||
|
|
from typing import Dict, List, Tuple, Any
|
||
|
|
|
||
|
|
class PerformanceData:
|
||
|
|
def __init__(self, row: Dict[str, str]):
|
||
|
|
self.collective = row['collective']
|
||
|
|
self.size_bytes = int(row['size_bytes'])
|
||
|
|
self.algorithm = row['algorithm']
|
||
|
|
self.protocol = row['protocol']
|
||
|
|
self.channels = int(row['channels']) if row['channels'] != '-1' else -1
|
||
|
|
self.nodes = int(row['nodes']) if row['nodes'] != '-1' else -1
|
||
|
|
self.ranks = int(row['ranks']) if row['ranks'] != '-1' else -1
|
||
|
|
self.pipeOps = int(row['pipeOps']) if row['pipeOps'] != '-1' else -1
|
||
|
|
self.regBuff = int(row['regBuff']) if row['regBuff'] != '-1' else -1
|
||
|
|
|
||
|
|
# Performance metrics
|
||
|
|
self.bandwidth_gbps = float(row.get('bandwidth_gbps', 0)) # Higher is better
|
||
|
|
self.latency_us = float(row.get('latency_us', 0)) # Lower is better
|
||
|
|
|
||
|
|
def get_config_key(self) -> Tuple:
|
||
|
|
"""Generate a key for grouping similar configurations"""
|
||
|
|
return (self.collective, self.nodes, self.ranks, self.pipeOps, self.regBuff)
|
||
|
|
|
||
|
|
def get_size_range_key(self, topology_size_ranges: Dict[Tuple[int, int], List[Tuple[int, int]]]) -> Tuple[int, int]:
|
||
|
|
"""Find which size range this data point belongs to for its dimension"""
|
||
|
|
topology_key = (self.nodes, self.ranks)
|
||
|
|
|
||
|
|
# Get size ranges for this dimension, or fall back to default
|
||
|
|
if topology_key in topology_size_ranges:
|
||
|
|
size_ranges = topology_size_ranges[topology_key]
|
||
|
|
elif (-1, -1) in topology_size_ranges:
|
||
|
|
size_ranges = topology_size_ranges[(-1, -1)]
|
||
|
|
else:
|
||
|
|
# Fallback to first available dimension ranges
|
||
|
|
size_ranges = next(iter(topology_size_ranges.values()))
|
||
|
|
|
||
|
|
for min_size, max_size in size_ranges:
|
||
|
|
if min_size <= self.size_bytes <= max_size:
|
||
|
|
return (min_size, max_size)
|
||
|
|
# If no range found, create a single-point range
|
||
|
|
return (self.size_bytes, self.size_bytes)
|
||
|
|
|
||
|
|
class ConfigOptimizer:
|
||
|
|
def __init__(self, optimization_metric: str = 'latency_us'):
|
||
|
|
self.optimization_metric = optimization_metric
|
||
|
|
# Default size ranges - will be overridden by auto-detection
|
||
|
|
self.size_ranges = [
|
||
|
|
(0, 1024),
|
||
|
|
(1025, 64*1024),
|
||
|
|
(64*1024+1, 1024*1024),
|
||
|
|
(1024*1024+1, 16*1024*1024),
|
||
|
|
(16*1024*1024+1, 4*1024*1024*1024-1)
|
||
|
|
]
|
||
|
|
self.auto_size_ranges = True
|
||
|
|
|
||
|
|
def set_size_ranges(self, ranges: List[Tuple[int, int]]):
|
||
|
|
"""Set custom size ranges for optimization"""
|
||
|
|
self.size_ranges = ranges
|
||
|
|
self.auto_size_ranges = False
|
||
|
|
|
||
|
|
def auto_determine_size_ranges(self, data: List[PerformanceData]) -> Dict[Tuple[int, int], List[Tuple[int, int]]]:
|
||
|
|
"""Create growing size ranges for each unique (nodes, ranks) dimension"""
|
||
|
|
if not data:
|
||
|
|
return {(-1, -1): self.size_ranges}
|
||
|
|
|
||
|
|
# Group data by dimension (nodes, ranks)
|
||
|
|
topology_data = defaultdict(list)
|
||
|
|
for item in data:
|
||
|
|
topology_key = (item.nodes, item.ranks)
|
||
|
|
topology_data[topology_key].append(item)
|
||
|
|
|
||
|
|
topology_ranges = {}
|
||
|
|
|
||
|
|
for topology_key, items in topology_data.items():
|
||
|
|
nodes, ranks = topology_key
|
||
|
|
|
||
|
|
# Extract unique sizes for this dimension and sort them
|
||
|
|
unique_sizes = sorted(set(item.size_bytes for item in items))
|
||
|
|
|
||
|
|
if len(unique_sizes) <= 1:
|
||
|
|
# Only one size, create a single range from 0 to that size
|
||
|
|
size = unique_sizes[0] if unique_sizes else 0
|
||
|
|
ranges = [(0, size)]
|
||
|
|
else:
|
||
|
|
# Create growing ranges that interpolate between data points
|
||
|
|
ranges = []
|
||
|
|
|
||
|
|
for i, size in enumerate(unique_sizes):
|
||
|
|
if i == 0:
|
||
|
|
# First range: 0 to midpoint between first and second size
|
||
|
|
if len(unique_sizes) > 1:
|
||
|
|
next_size = unique_sizes[i + 1]
|
||
|
|
max_size = (size + next_size) // 2
|
||
|
|
else:
|
||
|
|
max_size = size
|
||
|
|
min_size = 0
|
||
|
|
elif i == len(unique_sizes) - 1:
|
||
|
|
# Last range: previous max + 1 to current size (and beyond)
|
||
|
|
min_size = ranges[-1][1] + 1
|
||
|
|
max_size = size
|
||
|
|
else:
|
||
|
|
# Intermediate ranges: previous max + 1 to midpoint with next size
|
||
|
|
min_size = ranges[-1][1] + 1
|
||
|
|
next_size = unique_sizes[i + 1]
|
||
|
|
max_size = (size + next_size) // 2
|
||
|
|
|
||
|
|
ranges.append((min_size, max_size))
|
||
|
|
|
||
|
|
topology_ranges[topology_key] = ranges
|
||
|
|
|
||
|
|
print(f"Dimension {nodes} nodes, {ranks} ranks: {len(ranges)} size ranges from {len(unique_sizes)} unique sizes:")
|
||
|
|
for i, (min_size, max_size) in enumerate(ranges):
|
||
|
|
# Count data points that fall in this range for this dimension
|
||
|
|
count = sum(1 for item in items if min_size <= item.size_bytes <= max_size)
|
||
|
|
actual_sizes = sorted(set(item.size_bytes for item in items if min_size <= item.size_bytes <= max_size))
|
||
|
|
if actual_sizes:
|
||
|
|
size_list = ', '.join(f"{s:,}" for s in actual_sizes[:3])
|
||
|
|
if len(actual_sizes) > 3:
|
||
|
|
size_list += f", ... (+{len(actual_sizes)-3} more)"
|
||
|
|
print(f" Range {i+1}: {min_size:,} - {max_size:,} bytes ({count} data points, sizes: {size_list})")
|
||
|
|
|
||
|
|
return topology_ranges
|
||
|
|
|
||
|
|
def load_data(self, csv_file: str) -> List[PerformanceData]:
|
||
|
|
"""Load performance data from CSV file"""
|
||
|
|
data = []
|
||
|
|
try:
|
||
|
|
with open(csv_file, 'r') as f:
|
||
|
|
reader = csv.DictReader(f)
|
||
|
|
for row in reader:
|
||
|
|
try:
|
||
|
|
data.append(PerformanceData(row))
|
||
|
|
except (ValueError, KeyError) as e:
|
||
|
|
print(f"Warning: Skipping invalid row: {row} - {e}")
|
||
|
|
except FileNotFoundError:
|
||
|
|
print(f"Error: File {csv_file} not found")
|
||
|
|
sys.exit(1)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error reading {csv_file}: {e}")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
print(f"Loaded {len(data)} performance data points")
|
||
|
|
|
||
|
|
# Auto-determine size ranges if enabled
|
||
|
|
if self.auto_size_ranges and data:
|
||
|
|
self.topology_size_ranges = self.auto_determine_size_ranges(data)
|
||
|
|
else:
|
||
|
|
# Use default ranges for all topologies
|
||
|
|
self.topology_size_ranges = {(-1, -1): self.size_ranges}
|
||
|
|
|
||
|
|
return data
|
||
|
|
|
||
|
|
def is_better(self, new_data: PerformanceData, current_best: PerformanceData) -> bool:
|
||
|
|
"""Determine if new_data is better than current_best"""
|
||
|
|
if self.optimization_metric == 'bandwidth_gbps':
|
||
|
|
return new_data.bandwidth_gbps > current_best.bandwidth_gbps
|
||
|
|
elif self.optimization_metric == 'latency_us':
|
||
|
|
return new_data.latency_us < current_best.latency_us
|
||
|
|
else:
|
||
|
|
# Default to latency
|
||
|
|
return new_data.latency_us < current_best.latency_us
|
||
|
|
|
||
|
|
def optimize_configurations(self, data: List[PerformanceData]) -> List[str]:
|
||
|
|
"""Find optimal configurations and return as NCCL config strings"""
|
||
|
|
# Group data by configuration key and size range
|
||
|
|
grouped_data = defaultdict(lambda: defaultdict(list))
|
||
|
|
|
||
|
|
for item in data:
|
||
|
|
config_key = item.get_config_key()
|
||
|
|
size_range = item.get_size_range_key(self.topology_size_ranges)
|
||
|
|
grouped_data[config_key][size_range].append(item)
|
||
|
|
|
||
|
|
# Store optimal configurations before combining ranges
|
||
|
|
optimal_configs = []
|
||
|
|
|
||
|
|
for config_key, size_ranges_dict in grouped_data.items():
|
||
|
|
collective, nodes, ranks, pipeOps, regBuff = config_key
|
||
|
|
|
||
|
|
for (min_size, max_size), items in size_ranges_dict.items():
|
||
|
|
if not items:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Find the best performing configuration for this size range
|
||
|
|
best_item = items[0]
|
||
|
|
for item in items[1:]:
|
||
|
|
if self.is_better(item, best_item):
|
||
|
|
best_item = item
|
||
|
|
|
||
|
|
# Store the optimal configuration with its range
|
||
|
|
optimal_configs.append({
|
||
|
|
'collective': collective,
|
||
|
|
'min_size': min_size,
|
||
|
|
'max_size': max_size,
|
||
|
|
'algorithm': best_item.algorithm,
|
||
|
|
'protocol': best_item.protocol,
|
||
|
|
'channels': best_item.channels,
|
||
|
|
'nodes': best_item.nodes,
|
||
|
|
'ranks': best_item.ranks,
|
||
|
|
'pipeOps': best_item.pipeOps,
|
||
|
|
'regBuff': best_item.regBuff,
|
||
|
|
'metric_value': getattr(best_item, self.optimization_metric)
|
||
|
|
})
|
||
|
|
|
||
|
|
# Combine sequential ranges with identical tunings
|
||
|
|
combined_configs = self.combine_sequential_ranges(optimal_configs)
|
||
|
|
|
||
|
|
# Generate config strings
|
||
|
|
configs = []
|
||
|
|
for config in combined_configs:
|
||
|
|
config_str = f"{config['collective']},{config['min_size']},{config['max_size']},{config['algorithm']},{config['protocol']},{config['channels']},{config['nodes']},{config['ranks']},{config['pipeOps']},{config['regBuff']}"
|
||
|
|
configs.append(config_str)
|
||
|
|
|
||
|
|
print(f"Optimal for {config['collective']} [{config['min_size']}-{config['max_size']}] nodes={config['nodes']} ranks={config['ranks']}: "
|
||
|
|
f"{config['algorithm']}/{config['protocol']} channels={config['channels']} "
|
||
|
|
f"({self.optimization_metric}={config['metric_value']:.3f})")
|
||
|
|
|
||
|
|
return configs
|
||
|
|
|
||
|
|
def combine_sequential_ranges(self, configs: List[Dict]) -> List[Dict]:
|
||
|
|
"""Combine sequential ranges that have identical tuning parameters"""
|
||
|
|
if not configs:
|
||
|
|
return configs
|
||
|
|
|
||
|
|
# Group by collective and topology (nodes, ranks)
|
||
|
|
topology_groups = defaultdict(list)
|
||
|
|
for config in configs:
|
||
|
|
topology_key = (config['collective'], config['nodes'], config['ranks'],
|
||
|
|
config['pipeOps'], config['regBuff'])
|
||
|
|
topology_groups[topology_key].append(config)
|
||
|
|
|
||
|
|
combined_configs = []
|
||
|
|
|
||
|
|
for topology_key, topology_configs in topology_groups.items():
|
||
|
|
# Sort by min_size to ensure proper ordering
|
||
|
|
topology_configs.sort(key=lambda x: x['min_size'])
|
||
|
|
|
||
|
|
# Group by tuning parameters (algorithm, protocol, channels)
|
||
|
|
tuning_groups = defaultdict(list)
|
||
|
|
for config in topology_configs:
|
||
|
|
tuning_key = (config['algorithm'], config['protocol'], config['channels'])
|
||
|
|
tuning_groups[tuning_key].append(config)
|
||
|
|
|
||
|
|
# For each tuning group, combine sequential ranges
|
||
|
|
for tuning_key, tuning_configs in tuning_groups.items():
|
||
|
|
if not tuning_configs:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Sort by min_size
|
||
|
|
tuning_configs.sort(key=lambda x: x['min_size'])
|
||
|
|
|
||
|
|
# Combine sequential ranges
|
||
|
|
current_config = tuning_configs[0].copy()
|
||
|
|
|
||
|
|
for next_config in tuning_configs[1:]:
|
||
|
|
# Check if ranges are adjacent or overlapping
|
||
|
|
if current_config['max_size'] + 1 >= next_config['min_size']:
|
||
|
|
# Extend the current range
|
||
|
|
current_config['max_size'] = max(current_config['max_size'], next_config['max_size'])
|
||
|
|
# Update metric value to the better one
|
||
|
|
if self.optimization_metric == 'bandwidth_gbps':
|
||
|
|
if next_config['metric_value'] > current_config['metric_value']:
|
||
|
|
current_config['metric_value'] = next_config['metric_value']
|
||
|
|
else: # latency_us or default
|
||
|
|
if next_config['metric_value'] < current_config['metric_value']:
|
||
|
|
current_config['metric_value'] = next_config['metric_value']
|
||
|
|
else:
|
||
|
|
# Gap between ranges, save current and start new one
|
||
|
|
combined_configs.append(current_config)
|
||
|
|
current_config = next_config.copy()
|
||
|
|
|
||
|
|
# Add the last configuration
|
||
|
|
combined_configs.append(current_config)
|
||
|
|
|
||
|
|
# Sort final configs by collective, nodes, ranks, then min_size
|
||
|
|
combined_configs.sort(key=lambda x: (x['collective'], x['nodes'], x['ranks'], x['min_size']))
|
||
|
|
|
||
|
|
original_count = len(configs)
|
||
|
|
combined_count = len(combined_configs)
|
||
|
|
if combined_count < original_count:
|
||
|
|
print(f"Combined {original_count} ranges into {combined_count} ranges "
|
||
|
|
f"(reduced by {original_count - combined_count})")
|
||
|
|
|
||
|
|
return combined_configs
|
||
|
|
|
||
|
|
def append_to_config_file(self, configs: List[str], config_file: str, add_header: bool = True):
|
||
|
|
"""Append optimized configurations to NCCL tuner config file"""
|
||
|
|
try:
|
||
|
|
# Create directory if it doesn't exist
|
||
|
|
config_dir = os.path.dirname(config_file)
|
||
|
|
if config_dir and not os.path.exists(config_dir):
|
||
|
|
os.makedirs(config_dir)
|
||
|
|
print(f"Created directory: {config_dir}")
|
||
|
|
|
||
|
|
# Check if file exists and has content
|
||
|
|
file_exists = os.path.exists(config_file)
|
||
|
|
add_separator = False
|
||
|
|
|
||
|
|
if file_exists:
|
||
|
|
with open(config_file, 'r') as f:
|
||
|
|
content = f.read().strip()
|
||
|
|
add_separator = len(content) > 0
|
||
|
|
print(f"Appending to existing file: {config_file}")
|
||
|
|
else:
|
||
|
|
print(f"Creating new file: {config_file}")
|
||
|
|
|
||
|
|
with open(config_file, 'a') as f:
|
||
|
|
if add_separator:
|
||
|
|
f.write("\n\n")
|
||
|
|
|
||
|
|
if add_header:
|
||
|
|
f.write(f"# Optimized configurations generated by optimize_config.py\n")
|
||
|
|
f.write(f"# Optimization metric: {self.optimization_metric}\n")
|
||
|
|
f.write(f"# Format: collective_type,min_bytes,max_bytes,algorithm,protocol,channels,nNodes,nRanks,numPipeOps,regBuff\n")
|
||
|
|
|
||
|
|
for config in configs:
|
||
|
|
f.write(f"{config}\n")
|
||
|
|
|
||
|
|
if file_exists:
|
||
|
|
print(f"Appended {len(configs)} optimized configurations to {config_file}")
|
||
|
|
else:
|
||
|
|
print(f"Created {config_file} with {len(configs)} optimized configurations")
|
||
|
|
|
||
|
|
except PermissionError:
|
||
|
|
print(f"Error: Permission denied writing to {config_file}")
|
||
|
|
print("Try running with appropriate permissions or choose a different output location")
|
||
|
|
sys.exit(1)
|
||
|
|
except OSError as e:
|
||
|
|
print(f"Error: Cannot create/write to {config_file}: {e}")
|
||
|
|
print("Check that the path is valid and you have write permissions")
|
||
|
|
sys.exit(1)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Unexpected error writing to {config_file}: {e}")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(description="Optimize NCCL tuner configurations from performance data")
|
||
|
|
parser.add_argument("csv_file", help="Input CSV file with performance data")
|
||
|
|
parser.add_argument("-o", "--output", default="nccl_tuner.conf",
|
||
|
|
help="Output NCCL tuner config file (default: nccl_tuner.conf)")
|
||
|
|
parser.add_argument("-m", "--metric", choices=['bandwidth_gbps', 'latency_us'],
|
||
|
|
default='latency_us', help="Optimization metric (default: latency_us)")
|
||
|
|
parser.add_argument("--no-header", action="store_true",
|
||
|
|
help="Don't add header comments to output file")
|
||
|
|
parser.add_argument("--dry-run", action="store_true",
|
||
|
|
help="Print configurations without writing to file")
|
||
|
|
parser.add_argument("--no-auto-ranges", action="store_true",
|
||
|
|
help="Disable automatic size range determination (use default ranges)")
|
||
|
|
parser.add_argument("--size-ranges", type=str,
|
||
|
|
help="Custom size ranges as comma-separated pairs: 'min1-max1,min2-max2,...'")
|
||
|
|
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
optimizer = ConfigOptimizer(args.metric)
|
||
|
|
|
||
|
|
# Handle size range configuration
|
||
|
|
if args.size_ranges:
|
||
|
|
# Parse custom size ranges
|
||
|
|
try:
|
||
|
|
ranges = []
|
||
|
|
for range_str in args.size_ranges.split(','):
|
||
|
|
min_size, max_size = map(int, range_str.split('-'))
|
||
|
|
ranges.append((min_size, max_size))
|
||
|
|
optimizer.set_size_ranges(ranges)
|
||
|
|
print(f"Using custom size ranges: {ranges}")
|
||
|
|
except ValueError:
|
||
|
|
print("Error: Invalid size ranges format. Use 'min1-max1,min2-max2,...'")
|
||
|
|
sys.exit(1)
|
||
|
|
elif args.no_auto_ranges:
|
||
|
|
# Disable auto-ranging
|
||
|
|
optimizer.auto_size_ranges = False
|
||
|
|
print("Using default hardcoded size ranges")
|
||
|
|
else:
|
||
|
|
# Auto-ranging is enabled by default - creates one bucket per unique size
|
||
|
|
optimizer.auto_size_ranges = True
|
||
|
|
print("Auto-ranging enabled: will create one bucket per unique size in data")
|
||
|
|
|
||
|
|
# Load and optimize data
|
||
|
|
data = optimizer.load_data(args.csv_file)
|
||
|
|
if not data:
|
||
|
|
print("No valid data found in CSV file")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
configs = optimizer.optimize_configurations(data)
|
||
|
|
|
||
|
|
if args.dry_run:
|
||
|
|
print("\nGenerated configurations:")
|
||
|
|
for config in configs:
|
||
|
|
print(config)
|
||
|
|
else:
|
||
|
|
optimizer.append_to_config_file(configs, args.output, not args.no_header)
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|