diff --git a/Dockerfile b/Dockerfile index 50620b9..210e7af 100644 --- a/Dockerfile +++ b/Dockerfile @@ -112,6 +112,7 @@ COPY scripts/cluster_manager.py /opt/cluster_manager.py COPY scripts/models.py /opt/models.py COPY benchmarks/max_context_results.json /opt/max_context_results.json +COPY benchmarks/bench_utils.py /opt/bench_utils.py COPY benchmarks/run_vllm_bench.py /opt/run_vllm_bench.py COPY benchmarks/vllm_cluster_bench.py /opt/vllm_cluster_bench.py COPY benchmarks/find_max_context.py /opt/find_max_context.py diff --git a/benchmarks/bench_utils.py b/benchmarks/bench_utils.py new file mode 100644 index 0000000..2a3787c --- /dev/null +++ b/benchmarks/bench_utils.py @@ -0,0 +1,14 @@ +import subprocess +import tempfile + +def run_dialog(args): + """Runs dialog and returns stderr (selection line). Returns None if user cancelled.""" + with tempfile.NamedTemporaryFile(mode="w+") as tf: + cmd = ["dialog"] + args + try: + # We don't trap stdout since dialog renders to TTY and writes choice to stderr + subprocess.run(cmd, stderr=tf, check=True) + tf.seek(0) + return tf.read().strip() + except subprocess.CalledProcessError: + return None # User cancelled/pressed ESC diff --git a/benchmarks/run_vllm_bench.py b/benchmarks/run_vllm_bench.py index cf7bc27..35029de 100644 --- a/benchmarks/run_vllm_bench.py +++ b/benchmarks/run_vllm_bench.py @@ -2,6 +2,12 @@ import subprocess, time, json, sys, os, requests, argparse from pathlib import Path +try: + import bench_utils +except ImportError: + sys.path.append(str(Path(__file__).parent)) + import bench_utils + # ========================= # ⚙️ GLOBAL SETTINGS @@ -89,38 +95,43 @@ def get_dataset(): -def get_model_args(model, tp_size): +def get_model_args(model, tp_size, overrides=None): config = MODEL_TABLE.get(model, {"max_num_seqs": "32"}) + overrides = overrides or {} # Allow per-model GPU utilization override - util = config.get("gpu_util", GPU_UTIL) + util = overrides.get("gpu_util", config.get("gpu_util", GPU_UTIL)) + max_seq_override = overrides.get("max_num_seqs", config.get("max_num_seqs", "32")) cmd = [ "--model", model, - "--gpu-memory-utilization", util, + "--gpu-memory-utilization", str(util), "--dtype", "auto", "--tensor-parallel-size", str(tp_size), - "--max-num-seqs", config["max_num_seqs"] + "--max-num-seqs", str(max_seq_override) ] # Optional: if a model really needs a hard limit, we can still support "ctx" in config, # but by default we rely on auto. - if "ctx" in config: - cmd.extend(["--max-model-len", config["ctx"]]) + if "ctx" in overrides or "ctx" in config: + cmd.extend(["--max-model-len", str(overrides.get("ctx", config.get("ctx")))]) if config.get("trust_remote"): cmd.append("--trust-remote-code") if config.get("enforce_eager"): cmd.append("--enforce-eager") return cmd -def run_throughput(model, tp_size, backend_name="Default", output_dir=RESULTS_DIR, extra_env=None): +def run_throughput(model, tp_size, backend_name="Default", output_dir=RESULTS_DIR, extra_env=None, overrides=None): if tp_size not in MODEL_TABLE[model]["valid_tp"]: return + overrides = overrides or {} model_safe = model.replace("/", "_") output_dir_path = Path(output_dir) output_dir_path.mkdir(parents=True, exist_ok=True) - output_file = output_dir_path / f"{model_safe}_tp{tp_size}_throughput.json" + tag = overrides.get("tag", "").strip() + tag_suffix = f"_{tag}" if tag else "" + output_file = output_dir_path / f"{model_safe}_tp{tp_size}{tag_suffix}_throughput.json" if output_file.exists(): log(f"SKIP {model} (TP={tp_size} | {backend_name})") @@ -130,13 +141,13 @@ def run_throughput(model, tp_size, backend_name="Default", output_dir=RESULTS_DI dataset_args = ["--dataset-name", "sharegpt", "--dataset-path", dataset_path] if dataset_path else ["--input-len", "1024"] # Retrieve Model-Specific Batch Tokens - batch_tokens = MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS) + batch_tokens = str(overrides.get("max_tokens", MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS))) log(f"START {model} (TP={tp_size} | {backend_name}) [Batch: {batch_tokens}]...") kill_vllm() nuke_vllm_cache() - cmd = ["vllm", "bench", "throughput"] + get_model_args(model, tp_size) + cmd = ["vllm", "bench", "throughput"] + get_model_args(model, tp_size, overrides) cmd.extend([ "--num-prompts", str(OFF_NUM_PROMPTS), "--max-num-batched-tokens", batch_tokens, @@ -197,6 +208,7 @@ def print_summary(tps): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--tp", type=int, nargs="+", default=[1]) + parser.add_argument("--tui", action="store_true", help="Launch interactive configuration UI") args = parser.parse_args() gpu_count = get_gpu_count() @@ -207,17 +219,86 @@ if __name__ == "__main__": log(f"Requested TP={args.tp} but only {gpu_count} GPU(s) detected. Nothing to run.") sys.exit(0) + selected_models = MODELS_TO_RUN + + if args.tui: + # TUI Model Selection + checklist_args = [ + "--clear", "--backtitle", "AMD vLLM Benchmark Launcher", + "--title", "Model Selection", + "--checklist", "Select models to benchmark:", "20", "65", "10" + ] + + for m in MODELS_TO_RUN: + m_name = m.split("/")[-1] + # All selected "on" by default + checklist_args.extend([m, m_name, "on"]) + + choice = bench_utils.run_dialog(checklist_args) + + if choice is None: + subprocess.run(["clear"]) + print("Cancelled by user.") + sys.exit(0) + + # Parse space-separated quoted output from dialog checklist + import shlex + selected_models = [m for m in shlex.split(choice)] + + if not selected_models: + subprocess.run(["clear"]) + print("No models selected. Exiting.") + sys.exit(0) + kill_vllm() for tp in valid_tp_args: - for m in MODELS_TO_RUN: + for m in selected_models: + overrides = {} + if args.tui: + config = MODEL_TABLE.get(m, {}) + default_seqs = config.get("max_num_seqs", "32") + default_tokens = config.get("max_tokens", DEFAULT_BATCH_TOKENS) + default_util = config.get("gpu_util", GPU_UTIL) + default_ctx = config.get("ctx", "auto") + + form_args = [ + "--clear", "--backtitle", f"AMD vLLM Benchmark Configuration (TP: {tp})", + "--title", f"Tune Parameters: {m.split('/')[-1]}", + "--form", "Edit the options below. Leave tag empty for no suffix.", + "15", "70", "5", + "Max Concurrent Seqs:", "1", "1", str(default_seqs), "1", "25", "15", "0", + "Max Batched Tokens:", "2", "1", str(default_tokens), "2", "25", "15", "0", + "GPU Utilization (0-1):", "3", "1", str(default_util), "3", "25", "15", "0", + "Max Context Length:", "4", "1", str(default_ctx), "4", "25", "15", "0", + "Filename Tag (Optional):", "5", "1", "", "5", "25", "15", "0" + ] + + form_res = bench_utils.run_dialog(form_args) + if form_res is None: + subprocess.run(["clear"]) + print(f"Skipping {m} (TP={tp}) due to user cancellation.") + continue + + lines = form_res.splitlines() + if len(lines) >= 5: + overrides["max_num_seqs"] = lines[0].strip() + overrides["max_tokens"] = lines[1].strip() + overrides["gpu_util"] = lines[2].strip() + + ctx_val = lines[3].strip() + if ctx_val and ctx_val.lower() != "auto": + overrides["ctx"] = ctx_val + + overrides["tag"] = lines[4].strip() + # 1. Default (Triton) - run_throughput(m, tp, "Default", RESULTS_DIR) + run_throughput(m, tp, "Default", RESULTS_DIR, overrides=overrides) # 2. ROCm Attention # We force this via CLI argument --attention-backend ROCM_ATTN below # No specific env vars needed if forcing backend. rocm_env = {} print(f"[DEBUG] Forcing ROCm Env: {rocm_env} + CLI: --attention-backend ROCM_ATTN") - run_throughput(m, tp, "ROCm-Attn", "benchmark_results_rocm", rocm_env) + run_throughput(m, tp, "ROCm-Attn", "benchmark_results_rocm", rocm_env, overrides=overrides) print_summary(valid_tp_args) diff --git a/benchmarks/vllm_cluster_bench.py b/benchmarks/vllm_cluster_bench.py index 126e1d2..0ac8caf 100755 --- a/benchmarks/vllm_cluster_bench.py +++ b/benchmarks/vllm_cluster_bench.py @@ -2,6 +2,12 @@ import subprocess, time, json, sys, os, requests, argparse, re from pathlib import Path +try: + import bench_utils +except ImportError: + sys.path.append(str(Path(__file__).parent)) + import bench_utils + # Import models immediately to access globals try: import models @@ -100,7 +106,8 @@ def restart_cluster(): log("Cluster Ready.") def get_net_iface(): - return cluster_manager.get_net_iface() + prefix = ".".join(HEAD_IP.split('.')[:3]) + return cluster_manager.get_net_iface(prefix) def get_local_ip(iface): return cluster_manager.get_local_ip(iface) @@ -154,22 +161,24 @@ def get_cluster_env(): return env -def get_model_args(model): +def get_model_args(model, overrides=None): config = MODEL_TABLE.get(model, {"max_num_seqs": "32"}) - util = config.get("gpu_util", GPU_UTIL) + overrides = overrides or {} + util = overrides.get("gpu_util", config.get("gpu_util", GPU_UTIL)) + max_seq_override = overrides.get("max_num_seqs", config.get("max_num_seqs", "32")) cmd = [ "--model", model, - "--gpu-memory-utilization", util, + "--gpu-memory-utilization", str(util), "--dtype", "auto", "--tensor-parallel-size", str(CLUSTER_TP), - "--max-num-seqs", config["max_num_seqs"], + "--max-num-seqs", str(max_seq_override), "--distributed-executor-backend", "ray" ] # Optional ctx - if "ctx" in config: - cmd.extend(["--max-model-len", config["ctx"]]) + if "ctx" in overrides or "ctx" in config: + cmd.extend(["--max-model-len", str(overrides.get("ctx", config.get("ctx")))]) if config.get("trust_remote"): cmd.append("--trust-remote-code") @@ -178,17 +187,20 @@ def get_model_args(model): return cmd -def get_benchmark_output_file(model, output_dir): +def get_benchmark_output_file(model, output_dir, tag=""): model_safe = model.replace("/", "_") output_dir_path = Path(output_dir) eth_suffix = "_eth" if FORCE_ETH else "" - return output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}{eth_suffix}_throughput.json" + tag_suffix = f"_{tag}" if tag else "" + return output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}{eth_suffix}{tag_suffix}_throughput.json" -def run_bench_set(model, backend_name, output_dir, extra_env=None): +def run_bench_set(model, backend_name, output_dir, extra_env=None, overrides=None): output_dir_path = Path(output_dir) output_dir_path.mkdir(parents=True, exist_ok=True) + overrides = overrides or {} - output_file = get_benchmark_output_file(model, output_dir) + tag = overrides.get("tag", "").strip() + output_file = get_benchmark_output_file(model, output_dir, tag) if output_file.exists(): log(f"SKIP {model} [{backend_name}] (Result exists)") @@ -197,13 +209,13 @@ def run_bench_set(model, backend_name, output_dir, extra_env=None): dataset_path = get_dataset() dataset_args = ["--dataset-name", "sharegpt", "--dataset-path", dataset_path] if dataset_path else ["--input-len", "1024"] - batch_tokens = MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS) + batch_tokens = str(overrides.get("max_tokens", MODEL_TABLE.get(model, {}).get("max_tokens", DEFAULT_BATCH_TOKENS))) log(f"START {model} [TP={CLUSTER_TP} | {backend_name}]...") - nuke_vllm_cache() + nuke_vllm_cache(HEAD_IP) - cmd = ["vllm", "bench", "throughput"] + get_model_args(model) + cmd = ["vllm", "bench", "throughput"] + get_model_args(model, overrides) cmd.extend([ "--num-prompts", str(OFF_NUM_PROMPTS), "--max-num-batched-tokens", batch_tokens, @@ -234,20 +246,24 @@ def run_bench_set(model, backend_name, output_dir, extra_env=None): except Exception as e: log(f"ERROR: System error: {e}") -def run_cluster_throughput(model): +def run_cluster_throughput(model, overrides=None): + overrides = overrides or {} + tag = overrides.get("tag", "").strip() + # 1. Default Run (Triton) - if get_benchmark_output_file(model, RESULTS_DIR).exists(): + if get_benchmark_output_file(model, RESULTS_DIR, tag).exists(): log(f"SKIP {model} [Default] (Result exists)") else: restart_cluster() run_bench_set( model, "Default", - RESULTS_DIR + RESULTS_DIR, + overrides=overrides ) # 2. ROCm Attention Run - if get_benchmark_output_file(model, "benchmark_results_rocm").exists(): + if get_benchmark_output_file(model, "benchmark_results_rocm", tag).exists(): log(f"SKIP {model} [ROCm-Attn] (Result exists)") else: restart_cluster() @@ -255,7 +271,8 @@ def run_cluster_throughput(model): model, "ROCm-Attn", "benchmark_results_rocm", - extra_env={} + extra_env={}, + overrides=overrides ) @@ -290,11 +307,73 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="VLLM Cluster Benchmark") parser.add_argument("--eth-only", action="store_true", help="Run benchmark using only Ethernet (disable RDMA/RoCE)") parser.add_argument("--debug-nccl", action="store_true", help="Enable NCCL Debug logging (INFO level for Transport tracking)") + parser.add_argument("--tui", action="store_true", help="Launch interactive configuration UI") args = parser.parse_args() FORCE_ETH = args.eth_only FORCE_DEBUG_NCCL = args.debug_nccl + selected_models = MODELS_TO_RUN + + if args.tui: + # 1. Cluster IPs Configuration + form_args = [ + "--clear", "--backtitle", "AMD VLLM Cluster Configuration", + "--title", "Cluster Network Details", + "--form", "Verify Head and Worker IPs for this run:", + "10", "60", "2", + "Head Node IP:", "1", "1", HEAD_IP, "1", "20", "20", "0", + "Worker Node IP:", "2", "1", WORKER_IP, "2", "20", "20", "0" + ] + res = bench_utils.run_dialog(form_args) + if res is None: + subprocess.run(["clear"]) + print("Cancelled by user.") + sys.exit(0) + + lines = res.splitlines() + if len(lines) >= 2: + HEAD_IP = lines[0].strip() + WORKER_IP = lines[1].strip() + os.environ["VLLM_HEAD_IP"] = HEAD_IP + os.environ["VLLM_WORKER_IP"] = WORKER_IP + + # 2. Network Options (ETH / Debug) + eth_status = "on" if FORCE_ETH else "off" + debug_status = "on" if FORCE_DEBUG_NCCL else "off" + check_args = [ + "--title", "Network Overrides", + "--checklist", "Select custom backend flags:", "10", "60", "2", + "ETH_ONLY", "Force Ethernet (Disable RDMA/RoCE)", eth_status, + "DEBUG_NCCL", "Enable NCCL debug logs", debug_status + ] + flags_res = bench_utils.run_dialog(check_args) + if flags_res is not None: + FORCE_ETH = "ETH_ONLY" in flags_res + FORCE_DEBUG_NCCL = "DEBUG_NCCL" in flags_res + + # 3. Model Selection + checklist_args = [ + "--title", "Model Selection", + "--checklist", "Select models to benchmark:", "20", "65", "10" + ] + for m in MODELS_TO_RUN: + m_name = m.split("/")[-1] + checklist_args.extend([m, m_name, "on"]) + + choice = bench_utils.run_dialog(checklist_args) + if choice is None: + subprocess.run(["clear"]) + print("Cancelled by user.") + sys.exit(0) + + import shlex + selected_models = [m for m in shlex.split(choice)] + if not selected_models: + subprocess.run(["clear"]) + print("No models selected. Exiting.") + sys.exit(0) + log("Ray Cluster Detected. Starting Benchmarks (Dual Backend)...") if FORCE_ETH: log("Note: Ethernet ONLY mode enabled. RDMA/RoCE disabled.") @@ -302,7 +381,45 @@ if __name__ == "__main__": log("Note: NCCL Debug mode enabled (Transport Logging).") log("Note: Eager Mode (--enforce-eager) is ENABLED for cluster stability.") - for m in MODELS_TO_RUN: - run_cluster_throughput(m) + for m in selected_models: + overrides = {} + if args.tui: + config = MODEL_TABLE.get(m, {}) + default_seqs = config.get("max_num_seqs", "32") + default_tokens = config.get("max_tokens", DEFAULT_BATCH_TOKENS) + default_util = config.get("gpu_util", GPU_UTIL) + default_ctx = config.get("ctx", "auto") + + form_args = [ + "--clear", "--backtitle", f"AMD VLLM Cluster Benchmark Configuration (TP: {CLUSTER_TP})", + "--title", f"Tune Parameters: {m.split('/')[-1]}", + "--form", "Edit cluster model options. Leave tag empty for no suffix.", + "15", "70", "5", + "Max Concurrent Seqs:", "1", "1", str(default_seqs), "1", "25", "15", "0", + "Max Batched Tokens:", "2", "1", str(default_tokens), "2", "25", "15", "0", + "GPU Utilization (0-1):", "3", "1", str(default_util), "3", "25", "15", "0", + "Max Context Length:", "4", "1", str(default_ctx), "4", "25", "15", "0", + "Filename Tag (Optional):", "5", "1", "", "5", "25", "15", "0" + ] + + form_res = bench_utils.run_dialog(form_args) + if form_res is None: + subprocess.run(["clear"]) + print(f"Skipping {m} due to user cancellation.") + continue + + lines = form_res.splitlines() + if len(lines) >= 5: + overrides["max_num_seqs"] = lines[0].strip() + overrides["max_tokens"] = lines[1].strip() + overrides["gpu_util"] = lines[2].strip() + + ctx_val = lines[3].strip() + if ctx_val and ctx_val.lower() != "auto": + overrides["ctx"] = ctx_val + + overrides["tag"] = lines[4].strip() + + run_cluster_throughput(m, overrides=overrides) print_summary() diff --git a/scripts/cluster_manager.py b/scripts/cluster_manager.py index 676ca65..8f625ee 100644 --- a/scripts/cluster_manager.py +++ b/scripts/cluster_manager.py @@ -2,13 +2,17 @@ import subprocess import time import os -def get_net_iface(ip_prefix="192.168.100"): +def get_net_iface(ip_prefix=None): """ Auto-detects the interface that serves the cluster network. - Assumes standard 192.168.100.x setup from start_vllm_cluster.py + Assumes standard 192.168.100.x setup from start_vllm_cluster.py, but parameterizable. """ + if ip_prefix is None: + head_ip = os.getenv("VLLM_HEAD_IP", "192.168.100.1") + ip_prefix = ".".join(head_ip.split('.')[:3]) + try: - # ip -o addr show | grep 192.168.100 + # ip -o addr show | grep cmd = f"ip -o addr show | grep {ip_prefix}" res = subprocess.check_output(cmd, shell=True, text=True).strip() # Output format: 2: eth0 inet 192.168.100.1/24 ... diff --git a/scripts/start_vllm_cluster.py b/scripts/start_vllm_cluster.py index 577be8c..5b2c64d 100755 --- a/scripts/start_vllm_cluster.py +++ b/scripts/start_vllm_cluster.py @@ -96,12 +96,13 @@ def check_ray_status(): def wait_for_cluster(): return cluster_manager.wait_for_cluster() -def nuke_vllm_cache(): +def nuke_vllm_cache(head_ip): # Only nukes local cache on the head node for now, or use cluster nuke? # The original script just did local nuke. # cluster_manager has nuke_vllm_cache_on_node and nuke_vllm_cache_cluster # Let's use the local ip one effectively - rdma = cluster_manager.get_net_iface() + prefix = ".".join(head_ip.split('.')[:3]) + rdma = cluster_manager.get_net_iface(prefix) local = cluster_manager.get_local_ip(rdma) cluster_manager.nuke_vllm_cache_on_node(local, is_local=True) @@ -244,7 +245,7 @@ def configure_and_launch_vllm(model_idx, head_ip): subprocess.run(["clear"]) if clear_cache: - nuke_vllm_cache() + nuke_vllm_cache(head_ip) # Environment Setup # We need to set these variables in the current process before exec or pass them in env @@ -340,8 +341,8 @@ def main(): check_dependencies() # Default IPs - head_ip = "192.168.100.1" - worker_ip = "192.168.100.2" + head_ip = os.getenv("VLLM_HEAD_IP", "192.168.100.1") + worker_ip = os.getenv("VLLM_WORKER_IP", "192.168.100.2") while True: # Main Menu