#!/usr/bin/env python3 import subprocess, time, json, sys, os, requests, argparse from pathlib import Path # ========================= # ⚙️ GLOBAL SETTINGS # ========================= # CLUSTER CONFIG: 2x Strix Halo (TP=2) # User requested specifically to test with TP=2 on the cluster. CLUSTER_TP = 2 GPU_UTIL = "0.90" # THROUGHPUT CONFIG (Same as run_vllm_bench) OFF_NUM_PROMPTS = 200 OFF_FORCED_OUTPUT = "512" DEFAULT_BATCH_TOKENS = "8192" RESULTS_DIR = Path("cluster_benchmark_results") RESULTS_DIR.mkdir(exist_ok=True) # Reuse the model table from the main benchmark script # We can just import it or copy it. Importing is cleaner but might rely on path. # For standalone robustness, I will copy the minimal needed config or import if possible. # Since this is a new file in root/benchmarks? No, likely scripts/ or same dir. # Let's assume it's in the same dir as run_vllm_bench.py. try: from run_vllm_bench import MODEL_TABLE, MODELS_TO_RUN except ImportError: # Fallback if run directly and path issues sys.path.append(os.path.dirname(__file__)) from run_vllm_bench import MODEL_TABLE, MODELS_TO_RUN # ========================= # UTILS (Adapted for Cluster) # ========================= def log(msg): print(f"\n[CLUSTER-BENCH] {msg}") def check_ray_status(): """Checks if Ray cluster is active with at least 2 nodes.""" try: res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if res.returncode != 0: return False # Basic check for 2 nodes active_nodes = 0 in_active_section = False for line in res.stdout.splitlines(): if "Active:" in line: in_active_section = True continue if "Pending:" in line or "Recent failures:" in line: in_active_section = False if in_active_section and line.strip().startswith("1 node_"): active_nodes += 1 return active_nodes >= 2 except: return False def get_net_iface(ip_prefix="192.168.100"): """ Auto-detects the interface that serves the cluster network. Assumes standard 192.168.100.x setup from start_vllm_cluster.py """ try: # ip -o addr show | grep 192.168.100 cmd = f"ip -o addr show | grep {ip_prefix}" res = subprocess.check_output(cmd, shell=True, text=True).strip() # Output format: 2: eth0 inet 192.168.100.1/24 ... parts = res.split() if len(parts) >= 2: return parts[1] # Interface name except: pass return "eth0" # Fallback def get_local_ip(iface): try: cmd = f"ip -o -4 addr show {iface} | awk '{{print $4}}' | cut -d/ -f1" return subprocess.check_output(cmd, shell=True, text=True).strip() except: return "127.0.0.1" def nuke_vllm_cache(): cache = Path.home() / ".cache" / "vllm" if cache.exists(): try: print(f"Clearing vLLM cache...", end="", flush=True) subprocess.run(["rm", "-rf", str(cache)], check=True) cache.mkdir(parents=True, exist_ok=True) print(" Done.") time.sleep(2) except: pass def get_dataset(): # Same as original data_path = Path("ShareGPT_V3_unfiltered_cleaned_split.json") if data_path.exists(): return str(data_path) log("Downloading ShareGPT dataset...") url = "https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json" try: r = requests.get(url, stream=True, timeout=15) r.raise_for_status() with open(data_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return str(data_path) except Exception as e: log(f"WARNING: ShareGPT download failed ({e}). using RANDOM.") return None def get_cluster_env(): # Detect Interface and IP rdma_iface = get_net_iface() host_ip = get_local_ip(rdma_iface) env = os.environ.copy() # Critical Cluster Envs (Match start_vllm_cluster.py) env["RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES"] = "1" env["VLLM_HOST_IP"] = host_ip env["NCCL_SOCKET_IFNAME"] = rdma_iface env["GLOO_SOCKET_IFNAME"] = rdma_iface # RCCL specific env["NCCL_IB_GID_INDEX"] = "1" env["NCCL_IB_DISABLE"] = "0" env["NCCL_NET_GDR_LEVEL"] = "0" return env def get_model_args(model): config = MODEL_TABLE.get(model, {"max_num_seqs": "32"}) util = config.get("gpu_util", GPU_UTIL) cmd = [ "--model", model, "--gpu-memory-utilization", util, "--dtype", "auto", "--tensor-parallel-size", str(CLUSTER_TP), "--max-num-seqs", config["max_num_seqs"], "--distributed-executor-backend", "ray" ] # Optional ctx if "ctx" in config: cmd.extend(["--max-model-len", config["ctx"]]) if config.get("trust_remote"): cmd.append("--trust-remote-code") # Respect config for Eager Mode (Apple-to-Apples with TP=1) if config.get("enforce_eager"): cmd.append("--enforce-eager") return cmd def run_bench_set(model, backend_name, output_dir, extra_env=None): model_safe = model.replace("/", "_") output_dir_path = Path(output_dir) output_dir_path.mkdir(parents=True, exist_ok=True) output_file = output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}_throughput.json" if output_file.exists(): log(f"SKIP {model} [{backend_name}] (Result exists)") return dataset_path = get_dataset() dataset_args = ["--dataset-name", "sharegpt", "--dataset-path", dataset_path] if dataset_path else ["--input-len", "1024"] batch_tokens = MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS) log(f"START {model} [TP={CLUSTER_TP} | {backend_name}]...") nuke_vllm_cache() cmd = ["vllm", "bench", "throughput"] + get_model_args(model) cmd.extend([ "--num-prompts", str(OFF_NUM_PROMPTS), "--max-num-batched-tokens", batch_tokens, "--output-len", OFF_FORCED_OUTPUT, "--output-json", str(output_file), "--disable-log-stats" ]) cmd.extend(dataset_args) env = get_cluster_env() # Model specific envs model_env = MODEL_TABLE[model].get("env", {}) env.update(model_env) # Run specific envs (e.g. ROCm attention) if extra_env: env.update(extra_env) try: log(f"Command: {' '.join(cmd)}") subprocess.run(cmd, check=True, env=env) except subprocess.CalledProcessError as e: log(f"ERROR: Failed {model} [{backend_name}] (Exit {e.returncode})") except Exception as e: log(f"ERROR: System error: {e}") def run_cluster_throughput(model): # 1. Default Run (Triton usually, unless global envs set) run_bench_set( model, "Default", RESULTS_DIR ) # 2. ROCm Attention Run run_bench_set( model, "ROCm-Attn", "benchmark_results_rocm_attn/benchmark_results", extra_env={ "VLLM_V1_USE_PREFILL_DECODE_ATTENTION": "1", "VLLM_USE_TRITON_FLASH_ATTN": "0" } ) def print_summary(): print(f"\n{'MODEL (TP=2)':<50} | {'Triton':<8} | {'ROCm':<8}") print("-" * 75) for m in MODELS_TO_RUN: msafe = m.replace("/", "_") # Default try: p1 = RESULTS_DIR / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json" d1 = json.loads(p1.read_text()) val1 = f"{d1.get('tokens_per_second', 0):.1f}" except: val1 = "N/A" # ROCm try: p2 = Path("benchmark_results_rocm_attn/benchmark_results") / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json" d2 = json.loads(p2.read_text()) val2 = f"{d2.get('tokens_per_second', 0):.1f}" except: val2 = "N/A" name_cell = m.split('/')[-1] print(f"{name_cell:<50} | {val1:<8} | {val2:<8}") print("-" * 75) if __name__ == "__main__": if not check_ray_status(): log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.") sys.exit(1) log("Ray Cluster Detected. Starting Benchmarks (Dual Backend)...") for m in MODELS_TO_RUN: run_cluster_throughput(m) print_summary()