314 γραμμές
10 KiB
Python
Εκτελέσιμο Αρχείο
314 γραμμές
10 KiB
Python
Εκτελέσιμο Αρχείο
#!/usr/bin/env python3
|
|
import subprocess, time, json, sys, os, requests, argparse, re
|
|
from pathlib import Path
|
|
|
|
# =========================
|
|
# ⚙️ GLOBAL SETTINGS
|
|
# =========================
|
|
|
|
# CLUSTER CONFIG: 2x Strix Halo (TP=2)
|
|
# User requested specifically to test with TP=2 on the cluster.
|
|
CLUSTER_TP = 2
|
|
GPU_UTIL = "0.90"
|
|
|
|
# THROUGHPUT CONFIG (Same as run_vllm_bench)
|
|
OFF_NUM_PROMPTS = 200
|
|
OFF_FORCED_OUTPUT = "512"
|
|
DEFAULT_BATCH_TOKENS = "8192"
|
|
|
|
RESULTS_DIR = Path("cluster_benchmark_results")
|
|
RESULTS_DIR.mkdir(exist_ok=True)
|
|
|
|
# Reuse the model table from the main benchmark script
|
|
# We can just import it or copy it. Importing is cleaner but might rely on path.
|
|
# For standalone robustness, I will copy the minimal needed config or import if possible.
|
|
# Since this is a new file in root/benchmarks? No, likely scripts/ or same dir.
|
|
# Let's assume it's in the same dir as run_vllm_bench.py.
|
|
|
|
|
|
try:
|
|
import models
|
|
except ImportError:
|
|
# If in /opt, this should work if path includes ., otherwise:
|
|
sys.path.append(os.getcwd())
|
|
try:
|
|
import models
|
|
# Also try parent/scripts for local dev if above failed?
|
|
except ImportError:
|
|
sys.path.append(str(Path(__file__).parent.parent / "scripts"))
|
|
import models
|
|
|
|
MODEL_TABLE = models.MODEL_TABLE
|
|
MODELS_TO_RUN = models.MODELS_TO_RUN
|
|
|
|
# =========================
|
|
# UTILS (Adapted for Cluster)
|
|
# =========================
|
|
|
|
def log(msg): print(f"\n[CLUSTER-BENCH] {msg}")
|
|
|
|
def get_ray_nodes():
|
|
"""Returns a list of active Ray node IPs."""
|
|
try:
|
|
res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
if res.returncode != 0:
|
|
return []
|
|
|
|
nodes = []
|
|
in_active_section = False
|
|
for line in res.stdout.splitlines():
|
|
if "Active:" in line:
|
|
in_active_section = True
|
|
continue
|
|
if "Pending:" in line or "Recent failures:" in line:
|
|
in_active_section = False
|
|
|
|
if in_active_section:
|
|
# Look for "1 node_<IP>" pattern
|
|
# Existing logic checked for startswith("1 node_")
|
|
# We use regex to be robust and capture the IP
|
|
match = re.search(r"node_(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", line)
|
|
if match:
|
|
nodes.append(match.group(1))
|
|
|
|
return nodes
|
|
except:
|
|
return []
|
|
|
|
def check_ray_status():
|
|
"""Checks if Ray cluster is active with at least 2 nodes."""
|
|
nodes = get_ray_nodes()
|
|
return len(nodes) >= 2
|
|
|
|
def get_net_iface(ip_prefix="192.168.100"):
|
|
"""
|
|
Auto-detects the interface that serves the cluster network.
|
|
Assumes standard 192.168.100.x setup from start_vllm_cluster.py
|
|
"""
|
|
try:
|
|
# ip -o addr show | grep 192.168.100
|
|
cmd = f"ip -o addr show | grep {ip_prefix}"
|
|
res = subprocess.check_output(cmd, shell=True, text=True).strip()
|
|
# Output format: 2: eth0 inet 192.168.100.1/24 ...
|
|
parts = res.split()
|
|
if len(parts) >= 2:
|
|
return parts[1] # Interface name
|
|
except:
|
|
pass
|
|
return "eth0" # Fallback
|
|
|
|
def get_local_ip(iface):
|
|
try:
|
|
cmd = f"ip -o -4 addr show {iface} | awk '{{print $4}}' | cut -d/ -f1"
|
|
return subprocess.check_output(cmd, shell=True, text=True).strip()
|
|
except:
|
|
return "127.0.0.1"
|
|
|
|
def nuke_vllm_cache_on_node(ip, is_local=False):
|
|
"""Clears vLLM cache on a specific node."""
|
|
cmd_str = f"Locally" if is_local else f"on {ip}"
|
|
print(f"Clearing vLLM cache {cmd_str}...", end="", flush=True)
|
|
|
|
try:
|
|
if is_local:
|
|
cache = Path.home() / ".cache" / "vllm"
|
|
if cache.exists():
|
|
subprocess.run(["rm", "-rf", str(cache)], check=True)
|
|
cache.mkdir(parents=True, exist_ok=True)
|
|
else:
|
|
# Remote SSH
|
|
ssh_cmd = [
|
|
"ssh", "-o", "StrictHostKeyChecking=no", ip,
|
|
"rm -rf ~/.cache/vllm && mkdir -p ~/.cache/vllm"
|
|
]
|
|
subprocess.run(ssh_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
print(" Done.")
|
|
except Exception as e:
|
|
print(f" Failed ({e}).")
|
|
|
|
def nuke_vllm_cache():
|
|
"""Clears vLLM cache on ALL cluster nodes."""
|
|
nodes = get_ray_nodes()
|
|
rdma_iface = get_net_iface()
|
|
local_ip = get_local_ip(rdma_iface)
|
|
|
|
# If no nodes found (unexpected if we are running bench), try just local
|
|
if not nodes:
|
|
nuke_vllm_cache_on_node(local_ip, is_local=True)
|
|
return
|
|
|
|
for node_ip in nodes:
|
|
# Check if node is local
|
|
# Simple string match, but IPs might vary (localhost vs 192.168...)
|
|
# We trust get_local_ip returns the IP used in the cluster (192.168.100.x)
|
|
is_local = (node_ip == local_ip) or (node_ip == "127.0.0.1")
|
|
nuke_vllm_cache_on_node(node_ip, is_local)
|
|
|
|
time.sleep(2)
|
|
|
|
def get_dataset():
|
|
# Same as original
|
|
data_path = Path("ShareGPT_V3_unfiltered_cleaned_split.json")
|
|
if data_path.exists(): return str(data_path)
|
|
|
|
log("Downloading ShareGPT dataset...")
|
|
url = "https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json"
|
|
try:
|
|
r = requests.get(url, stream=True, timeout=15)
|
|
r.raise_for_status()
|
|
with open(data_path, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
|
|
return str(data_path)
|
|
except Exception as e:
|
|
log(f"WARNING: ShareGPT download failed ({e}). using RANDOM.")
|
|
return None
|
|
|
|
def get_cluster_env():
|
|
# Detect Interface and IP
|
|
rdma_iface = get_net_iface()
|
|
host_ip = get_local_ip(rdma_iface)
|
|
|
|
env = os.environ.copy()
|
|
|
|
# Critical Cluster Envs (Match start_vllm_cluster.py)
|
|
env["RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES"] = "1"
|
|
env["VLLM_HOST_IP"] = host_ip
|
|
env["NCCL_SOCKET_IFNAME"] = rdma_iface
|
|
env["GLOO_SOCKET_IFNAME"] = rdma_iface
|
|
# RCCL specific
|
|
env["NCCL_IB_GID_INDEX"] = "1"
|
|
env["NCCL_IB_DISABLE"] = "0"
|
|
env["NCCL_NET_GDR_LEVEL"] = "0"
|
|
|
|
return env
|
|
|
|
def get_model_args(model):
|
|
config = MODEL_TABLE.get(model, {"max_num_seqs": "32"})
|
|
util = config.get("gpu_util", GPU_UTIL)
|
|
|
|
cmd = [
|
|
"--model", model,
|
|
"--gpu-memory-utilization", util,
|
|
"--dtype", "auto",
|
|
"--tensor-parallel-size", str(CLUSTER_TP),
|
|
"--max-num-seqs", config["max_num_seqs"],
|
|
"--distributed-executor-backend", "ray"
|
|
]
|
|
|
|
# Optional ctx
|
|
if "ctx" in config:
|
|
cmd.extend(["--max-model-len", config["ctx"]])
|
|
|
|
if config.get("trust_remote"): cmd.append("--trust-remote-code")
|
|
|
|
# ALWAYS Enforce Eager Mode for Cluster Benchmarks (TP=2)
|
|
# Distributed Graph Capture is unstable/prone to hangs on Strix Halo Cluster
|
|
cmd.append("--enforce-eager")
|
|
|
|
return cmd
|
|
|
|
def run_bench_set(model, backend_name, output_dir, extra_env=None):
|
|
model_safe = model.replace("/", "_")
|
|
output_dir_path = Path(output_dir)
|
|
output_dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
output_file = output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}_throughput.json"
|
|
|
|
if output_file.exists():
|
|
log(f"SKIP {model} [{backend_name}] (Result exists)")
|
|
return
|
|
|
|
dataset_path = get_dataset()
|
|
dataset_args = ["--dataset-name", "sharegpt", "--dataset-path", dataset_path] if dataset_path else ["--input-len", "1024"]
|
|
|
|
batch_tokens = MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS)
|
|
|
|
log(f"START {model} [TP={CLUSTER_TP} | {backend_name}]...")
|
|
|
|
nuke_vllm_cache()
|
|
|
|
cmd = ["vllm", "bench", "throughput"] + get_model_args(model)
|
|
cmd.extend([
|
|
"--num-prompts", str(OFF_NUM_PROMPTS),
|
|
"--max-num-batched-tokens", batch_tokens,
|
|
"--output-len", OFF_FORCED_OUTPUT,
|
|
"--output-json", str(output_file),
|
|
"--disable-log-stats"
|
|
])
|
|
cmd.extend(dataset_args)
|
|
|
|
env = get_cluster_env()
|
|
|
|
# Model specific envs
|
|
model_env = MODEL_TABLE[model].get("env", {})
|
|
env.update(model_env)
|
|
|
|
# Run specific envs (e.g. ROCm attention)
|
|
if extra_env:
|
|
env.update(extra_env)
|
|
|
|
try:
|
|
log(f"Command: {' '.join(cmd)}")
|
|
subprocess.run(cmd, check=True, env=env)
|
|
except subprocess.CalledProcessError as e:
|
|
log(f"ERROR: Failed {model} [{backend_name}] (Exit {e.returncode})")
|
|
except Exception as e:
|
|
log(f"ERROR: System error: {e}")
|
|
|
|
def run_cluster_throughput(model):
|
|
# 1. Default Run (Triton usually, unless global envs set)
|
|
run_bench_set(
|
|
model,
|
|
"Default",
|
|
RESULTS_DIR
|
|
)
|
|
|
|
# 2. ROCm Attention Run
|
|
run_bench_set(
|
|
model,
|
|
"ROCm-Attn",
|
|
"benchmark_results_rocm_attn/benchmark_results",
|
|
extra_env={
|
|
"VLLM_V1_USE_PREFILL_DECODE_ATTENTION": "1",
|
|
"VLLM_USE_TRITON_FLASH_ATTN": "0"
|
|
}
|
|
)
|
|
|
|
def print_summary():
|
|
print(f"\n{'MODEL (TP=2)':<50} | {'Triton':<8} | {'ROCm':<8}")
|
|
print("-" * 75)
|
|
|
|
for m in MODELS_TO_RUN:
|
|
msafe = m.replace("/", "_")
|
|
|
|
# Default
|
|
try:
|
|
p1 = RESULTS_DIR / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json"
|
|
d1 = json.loads(p1.read_text())
|
|
val1 = f"{d1.get('tokens_per_second', 0):.1f}"
|
|
except: val1 = "N/A"
|
|
|
|
# ROCm
|
|
try:
|
|
p2 = Path("benchmark_results_rocm_attn/benchmark_results") / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json"
|
|
d2 = json.loads(p2.read_text())
|
|
val2 = f"{d2.get('tokens_per_second', 0):.1f}"
|
|
except: val2 = "N/A"
|
|
|
|
name_cell = m.split('/')[-1]
|
|
print(f"{name_cell:<50} | {val1:<8} | {val2:<8}")
|
|
print("-" * 75)
|
|
|
|
if __name__ == "__main__":
|
|
if not check_ray_status():
|
|
log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.")
|
|
sys.exit(1)
|
|
|
|
log("Ray Cluster Detected. Starting Benchmarks (Dual Backend)...")
|
|
|
|
for m in MODELS_TO_RUN:
|
|
run_cluster_throughput(m)
|
|
|
|
print_summary()
|