Files
amd-strix-halo-vllm-toolboxes/benchmarks/vllm_cluster_bench.py
T

274 rader
8.2 KiB
Python
Normal vy Historik

#!/usr/bin/env python3
import subprocess, time, json, sys, os, requests, argparse, re
from pathlib import Path
# Import models immediately to access globals
try:
import models
except ImportError:
# If in /opt, this should work if path includes ., otherwise:
sys.path.append(os.getcwd())
try:
import models
# Also try parent/scripts for local dev if above failed?
except ImportError:
sys.path.append(str(Path(__file__).parent.parent / "scripts"))
import models
# =========================
# ⚙️ GLOBAL SETTINGS
# =========================
# CLUSTER CONFIG: 2x Strix Halo (TP=2)
# User requested specifically to test with TP=2 on the cluster.
CLUSTER_TP = 2
GPU_UTIL = "0.90"
# THROUGHPUT CONFIG (Imported from models.py)
OFF_NUM_PROMPTS = models.OFF_NUM_PROMPTS
OFF_FORCED_OUTPUT = models.OFF_FORCED_OUTPUT
DEFAULT_BATCH_TOKENS = models.DEFAULT_BATCH_TOKENS
RESULTS_DIR = Path("benchmark_results")
RESULTS_DIR.mkdir(exist_ok=True)
# Reuse the model table from the main benchmark script
# We can just import it or copy it. Importing is cleaner but might rely on path.
# For standalone robustness, I will copy the minimal needed config or import if possible.
# Since this is a new file in root/benchmarks? No, likely scripts/ or same dir.
# Let's assume it's in the same dir as run_vllm_bench.py.
MODEL_TABLE = models.MODEL_TABLE
MODELS_TO_RUN = models.MODELS_TO_RUN
# =========================
# UTILS (Adapted for Cluster)
# =========================
# =========================
# CLUSTER MANAGER INTEGRATION
# =========================
try:
import cluster_manager
except ImportError:
sys.path.append(str(Path(__file__).parent.parent / "scripts"))
import cluster_manager
# Defaults for Cluster
HEAD_IP = os.getenv("VLLM_HEAD_IP", "192.168.100.1")
WORKER_IP = os.getenv("VLLM_WORKER_IP", "192.168.100.2")
def log(msg): print(f"\n[CLUSTER-BENCH] {msg}")
def restart_cluster():
log("Restarting Ray Cluster (Clean State)...")
# 1. Stop Cluster (Best Effort)
cluster_manager.stop_cluster()
# 2. Start Head
if not cluster_manager.setup_head_node(HEAD_IP):
log("ERROR: Failed to start HEAD node.")
sys.exit(1)
# 3. Start Worker
# Give head a moment
time.sleep(5)
if not cluster_manager.setup_worker_node(WORKER_IP, HEAD_IP):
log("ERROR: Failed to start WORKER node.")
sys.exit(1)
# 4. Wait
if not cluster_manager.wait_for_cluster():
log("ERROR: Cluster failed to initialize.")
sys.exit(1)
log("Cluster Ready.")
def get_net_iface():
return cluster_manager.get_net_iface()
def get_local_ip(iface):
return cluster_manager.get_local_ip(iface)
def nuke_vllm_cache():
# We use explicit IPs because ray status might return Hex IDs which we can't SSH to.
cluster_manager.nuke_vllm_cache_cluster(nodes=[HEAD_IP, WORKER_IP])
def get_dataset():
# Same as original
data_path = Path("ShareGPT_V3_unfiltered_cleaned_split.json")
if data_path.exists(): return str(data_path)
log("Downloading ShareGPT dataset...")
url = "https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json"
try:
r = requests.get(url, stream=True, timeout=15)
r.raise_for_status()
with open(data_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
return str(data_path)
except Exception as e:
log(f"WARNING: ShareGPT download failed ({e}). using RANDOM.")
return None
def get_cluster_env():
# Detect Interface and IP
rdma_iface = get_net_iface()
host_ip = get_local_ip(rdma_iface)
env = os.environ.copy()
# Critical Cluster Envs (Match start_vllm_cluster.py)
env["RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES"] = "1"
env["VLLM_HOST_IP"] = host_ip
env["NCCL_SOCKET_IFNAME"] = rdma_iface
env["GLOO_SOCKET_IFNAME"] = rdma_iface
# RCCL specific
env["NCCL_IB_GID_INDEX"] = "1"
env["NCCL_IB_DISABLE"] = "0"
env["NCCL_NET_GDR_LEVEL"] = "0"
# Stability for RDMA (Fix for high-throughput models like Gemma 3)
env["NCCL_IB_TIMEOUT"] = "23" # ~32 seconds (default is 18/~1s)
env["NCCL_IB_RETRY_CNT"] = "7" # Default is 3, increase for lossy networks
return env
def get_model_args(model):
config = MODEL_TABLE.get(model, {"max_num_seqs": "32"})
util = config.get("gpu_util", GPU_UTIL)
cmd = [
"--model", model,
"--gpu-memory-utilization", util,
"--dtype", "auto",
"--tensor-parallel-size", str(CLUSTER_TP),
"--max-num-seqs", config["max_num_seqs"],
"--distributed-executor-backend", "ray"
]
# Optional ctx
if "ctx" in config:
cmd.extend(["--max-model-len", config["ctx"]])
if config.get("trust_remote"): cmd.append("--trust-remote-code")
if config.get("enforce_eager"): cmd.append("--enforce-eager")
return cmd
def run_bench_set(model, backend_name, output_dir, extra_env=None):
model_safe = model.replace("/", "_")
output_dir_path = Path(output_dir)
output_dir_path.mkdir(parents=True, exist_ok=True)
output_file = output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}_throughput.json"
if output_file.exists():
log(f"SKIP {model} [{backend_name}] (Result exists)")
return
dataset_path = get_dataset()
dataset_args = ["--dataset-name", "sharegpt", "--dataset-path", dataset_path] if dataset_path else ["--input-len", "1024"]
batch_tokens = MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS)
log(f"START {model} [TP={CLUSTER_TP} | {backend_name}]...")
nuke_vllm_cache()
cmd = ["vllm", "bench", "throughput"] + get_model_args(model)
cmd.extend([
"--num-prompts", str(OFF_NUM_PROMPTS),
"--max-num-batched-tokens", batch_tokens,
"--output-len", OFF_FORCED_OUTPUT,
"--output-json", str(output_file),
"--disable-log-stats"
])
cmd.extend(dataset_args)
if backend_name == "ROCm-Attn":
cmd.extend(["--attention-backend", "ROCM_ATTN"])
env = get_cluster_env()
# Model specific envs
model_env = MODEL_TABLE[model].get("env", {})
env.update(model_env)
# Run specific envs (e.g. ROCm attention)
if extra_env:
env.update(extra_env)
try:
log(f"Command: {' '.join(cmd)}")
subprocess.run(cmd, check=True, env=env)
except subprocess.CalledProcessError as e:
log(f"ERROR: Failed {model} [{backend_name}] (Exit {e.returncode})")
except Exception as e:
log(f"ERROR: System error: {e}")
def run_cluster_throughput(model):
# 1. Default Run (Triton)
restart_cluster()
run_bench_set(
model,
"Default",
RESULTS_DIR
)
# 2. ROCm Attention Run
restart_cluster()
run_bench_set(
model,
"ROCm-Attn",
"benchmark_results_rocm",
extra_env={}
)
def print_summary():
print(f"\n{'MODEL (TP=2)':<50} | {'Triton':<8} | {'ROCm':<8}")
print("-" * 75)
for m in MODELS_TO_RUN:
msafe = m.replace("/", "_")
# Default
try:
p1 = RESULTS_DIR / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json"
d1 = json.loads(p1.read_text())
val1 = f"{d1.get('tokens_per_second', 0):.1f}"
except: val1 = "N/A"
# ROCm
try:
p2 = Path("benchmark_results_rocm") / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json"
d2 = json.loads(p2.read_text())
val2 = f"{d2.get('tokens_per_second', 0):.1f}"
except: val2 = "N/A"
name_cell = m.split('/')[-1]
print(f"{name_cell:<50} | {val1:<8} | {val2:<8}")
print("-" * 75)
if __name__ == "__main__":
# if not check_ray_status():
# log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.")
# sys.exit(1)
# We now handle this by restarting the cluster ourselves.
pass
log("Ray Cluster Detected. Starting Benchmarks (Dual Backend)...")
for m in MODELS_TO_RUN:
run_cluster_throughput(m)
print_summary()