feat: Introduce vLLM cluster benchmarking and setup scripts, and expand the list of models for local benchmarks.
Dieser Commit ist enthalten in:
+3
-1
@@ -126,9 +126,11 @@ COPY scripts/01-rocm-env-for-triton.sh /etc/profile.d/01-rocm-env-for-triton.sh
|
|||||||
COPY scripts/99-toolbox-banner.sh /etc/profile.d/99-toolbox-banner.sh
|
COPY scripts/99-toolbox-banner.sh /etc/profile.d/99-toolbox-banner.sh
|
||||||
COPY scripts/zz-venv-last.sh /etc/profile.d/zz-venv-last.sh
|
COPY scripts/zz-venv-last.sh /etc/profile.d/zz-venv-last.sh
|
||||||
COPY scripts/start_vllm.py /usr/local/bin/start-vllm
|
COPY scripts/start_vllm.py /usr/local/bin/start-vllm
|
||||||
|
COPY scripts/start_vllm_cluster.py /usr/local/bin/start-vllm-cluster
|
||||||
COPY benchmarks/max_context_results.json /opt/max_context_results.json
|
COPY benchmarks/max_context_results.json /opt/max_context_results.json
|
||||||
COPY benchmarks/run_vllm_bench.py /opt/run_vllm_bench.py
|
COPY benchmarks/run_vllm_bench.py /opt/run_vllm_bench.py
|
||||||
RUN chmod 0644 /etc/profile.d/*.sh && chmod +x /usr/local/bin/start-vllm && chmod 0644 /opt/max_context_results.json
|
COPY benchmarks/vllm_cluster_bench.py /opt/vllm_cluster_bench.py
|
||||||
|
RUN chmod 0644 /etc/profile.d/*.sh && chmod +x /usr/local/bin/start-vllm && chmod +x /usr/local/bin/start-vllm-cluster && chmod +x /opt/vllm_cluster_bench.py && chmod 0644 /opt/max_context_results.json
|
||||||
RUN chmod 0644 /etc/profile.d/*.sh
|
RUN chmod 0644 /etc/profile.d/*.sh
|
||||||
RUN printf 'ulimit -S -c 0\n' > /etc/profile.d/90-nocoredump.sh && chmod 0644 /etc/profile.d/90-nocoredump.sh
|
RUN printf 'ulimit -S -c 0\n' > /etc/profile.d/90-nocoredump.sh && chmod 0644 /etc/profile.d/90-nocoredump.sh
|
||||||
|
|
||||||
|
|||||||
@@ -91,11 +91,11 @@ MODEL_TABLE = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
MODELS_TO_RUN = [
|
MODELS_TO_RUN = [
|
||||||
#"meta-llama/Meta-Llama-3.1-8B-Instruct",
|
"meta-llama/Meta-Llama-3.1-8B-Instruct",
|
||||||
#"google/gemma-3-12b-it",
|
"google/gemma-3-12b-it",
|
||||||
#"Qwen/Qwen3-14B-AWQ",
|
"Qwen/Qwen3-14B-AWQ",
|
||||||
#"openai/gpt-oss-20b",
|
"openai/gpt-oss-20b",
|
||||||
#"openai/gpt-oss-120b",
|
"openai/gpt-oss-120b",
|
||||||
"cpatonn/Qwen3-Coder-30B-A3B-Instruct-GPTQ-4bit",
|
"cpatonn/Qwen3-Coder-30B-A3B-Instruct-GPTQ-4bit",
|
||||||
"dazipe/Qwen3-Next-80B-A3B-Instruct-GPTQ-Int4A16",
|
"dazipe/Qwen3-Next-80B-A3B-Instruct-GPTQ-Int4A16",
|
||||||
]
|
]
|
||||||
@@ -112,14 +112,6 @@ def get_gpu_count():
|
|||||||
# Output format: "GPU[0] : Device Name: ..."
|
# Output format: "GPU[0] : Device Name: ..."
|
||||||
res = subprocess.run(["rocm-smi", "--showid"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
res = subprocess.run(["rocm-smi", "--showid"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||||
if res.returncode == 0:
|
if res.returncode == 0:
|
||||||
# Filter specifically for the target GPU as requested
|
|
||||||
# target_gpu = "AMD Radeon AI PRO R9700"
|
|
||||||
# count = 0
|
|
||||||
# for line in res.stdout.strip().split('\n'):
|
|
||||||
# if "Device Name" in line and target_gpu in line:
|
|
||||||
# count += 1
|
|
||||||
|
|
||||||
# return count if count > 0 else 1
|
|
||||||
return 1 # Force return 1 for Strix Halo APU
|
return 1 # Force return 1 for Strix Halo APU
|
||||||
else:
|
else:
|
||||||
log("rocm-smi failed, defaulting to 1 GPU (Hardcoded Fallback)")
|
log("rocm-smi failed, defaulting to 1 GPU (Hardcoded Fallback)")
|
||||||
|
|||||||
Ausführbare Datei
+239
@@ -0,0 +1,239 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import subprocess, time, json, sys, os, requests, argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# =========================
|
||||||
|
# ⚙️ GLOBAL SETTINGS
|
||||||
|
# =========================
|
||||||
|
|
||||||
|
# CLUSTER CONFIG: 2x Strix Halo (TP=2)
|
||||||
|
# User requested specifically to test with TP=2 on the cluster.
|
||||||
|
CLUSTER_TP = 2
|
||||||
|
GPU_UTIL = "0.90"
|
||||||
|
|
||||||
|
# THROUGHPUT CONFIG (Same as run_vllm_bench)
|
||||||
|
OFF_NUM_PROMPTS = 200
|
||||||
|
OFF_FORCED_OUTPUT = "512"
|
||||||
|
DEFAULT_BATCH_TOKENS = "8192"
|
||||||
|
|
||||||
|
RESULTS_DIR = Path("cluster_benchmark_results")
|
||||||
|
RESULTS_DIR.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
# Reuse the model table from the main benchmark script
|
||||||
|
# We can just import it or copy it. Importing is cleaner but might rely on path.
|
||||||
|
# For standalone robustness, I will copy the minimal needed config or import if possible.
|
||||||
|
# Since this is a new file in root/benchmarks? No, likely scripts/ or same dir.
|
||||||
|
# Let's assume it's in the same dir as run_vllm_bench.py.
|
||||||
|
|
||||||
|
try:
|
||||||
|
from run_vllm_bench import MODEL_TABLE, MODELS_TO_RUN
|
||||||
|
except ImportError:
|
||||||
|
# Fallback if run directly and path issues
|
||||||
|
sys.path.append(os.path.dirname(__file__))
|
||||||
|
from run_vllm_bench import MODEL_TABLE, MODELS_TO_RUN
|
||||||
|
|
||||||
|
# =========================
|
||||||
|
# UTILS (Adapted for Cluster)
|
||||||
|
# =========================
|
||||||
|
|
||||||
|
def log(msg): print(f"\n[CLUSTER-BENCH] {msg}")
|
||||||
|
|
||||||
|
def check_ray_status():
|
||||||
|
"""Checks if Ray cluster is active with at least 2 nodes."""
|
||||||
|
try:
|
||||||
|
res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||||
|
if res.returncode != 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Basic check for 2 nodes
|
||||||
|
active_nodes = 0
|
||||||
|
in_active_section = False
|
||||||
|
for line in res.stdout.splitlines():
|
||||||
|
if "Active:" in line:
|
||||||
|
in_active_section = True
|
||||||
|
continue
|
||||||
|
if "Pending:" in line or "Recent failures:" in line:
|
||||||
|
in_active_section = False
|
||||||
|
|
||||||
|
if in_active_section and line.strip().startswith("1 node_"):
|
||||||
|
active_nodes += 1
|
||||||
|
|
||||||
|
return active_nodes >= 2
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_net_iface(ip_prefix="192.168.100"):
|
||||||
|
"""
|
||||||
|
Auto-detects the interface that serves the cluster network.
|
||||||
|
Assumes standard 192.168.100.x setup from start_vllm_cluster.py
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# ip -o addr show | grep 192.168.100
|
||||||
|
cmd = f"ip -o addr show | grep {ip_prefix}"
|
||||||
|
res = subprocess.check_output(cmd, shell=True, text=True).strip()
|
||||||
|
# Output format: 2: eth0 inet 192.168.100.1/24 ...
|
||||||
|
parts = res.split()
|
||||||
|
if len(parts) >= 2:
|
||||||
|
return parts[1] # Interface name
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return "eth0" # Fallback
|
||||||
|
|
||||||
|
def get_local_ip(iface):
|
||||||
|
try:
|
||||||
|
cmd = f"ip -o -4 addr show {iface} | awk '{{print $4}}' | cut -d/ -f1"
|
||||||
|
return subprocess.check_output(cmd, shell=True, text=True).strip()
|
||||||
|
except:
|
||||||
|
return "127.0.0.1"
|
||||||
|
|
||||||
|
def nuke_vllm_cache():
|
||||||
|
cache = Path.home() / ".cache" / "vllm"
|
||||||
|
if cache.exists():
|
||||||
|
try:
|
||||||
|
print(f"Clearing vLLM cache...", end="", flush=True)
|
||||||
|
subprocess.run(["rm", "-rf", str(cache)], check=True)
|
||||||
|
cache.mkdir(parents=True, exist_ok=True)
|
||||||
|
print(" Done.")
|
||||||
|
time.sleep(2)
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
def get_dataset():
|
||||||
|
# Same as original
|
||||||
|
data_path = Path("ShareGPT_V3_unfiltered_cleaned_split.json")
|
||||||
|
if data_path.exists(): return str(data_path)
|
||||||
|
|
||||||
|
log("Downloading ShareGPT dataset...")
|
||||||
|
url = "https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json"
|
||||||
|
try:
|
||||||
|
r = requests.get(url, stream=True, timeout=15)
|
||||||
|
r.raise_for_status()
|
||||||
|
with open(data_path, 'wb') as f:
|
||||||
|
for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
|
||||||
|
return str(data_path)
|
||||||
|
except Exception as e:
|
||||||
|
log(f"WARNING: ShareGPT download failed ({e}). using RANDOM.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_cluster_env():
|
||||||
|
# Detect Interface and IP
|
||||||
|
rdma_iface = get_net_iface()
|
||||||
|
host_ip = get_local_ip(rdma_iface)
|
||||||
|
|
||||||
|
env = os.environ.copy()
|
||||||
|
|
||||||
|
# Critical Cluster Envs (Match start_vllm_cluster.py)
|
||||||
|
env["RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES"] = "1"
|
||||||
|
env["VLLM_HOST_IP"] = host_ip
|
||||||
|
env["NCCL_SOCKET_IFNAME"] = rdma_iface
|
||||||
|
env["GLOO_SOCKET_IFNAME"] = rdma_iface
|
||||||
|
# RCCL specific
|
||||||
|
env["NCCL_IB_GID_INDEX"] = "1"
|
||||||
|
env["NCCL_IB_DISABLE"] = "0"
|
||||||
|
env["NCCL_NET_GDR_LEVEL"] = "0"
|
||||||
|
|
||||||
|
return env
|
||||||
|
|
||||||
|
def get_model_args(model):
|
||||||
|
config = MODEL_TABLE.get(model, {"max_num_seqs": "32"})
|
||||||
|
util = config.get("gpu_util", GPU_UTIL)
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
"--model", model,
|
||||||
|
"--gpu-memory-utilization", util,
|
||||||
|
"--dtype", "auto",
|
||||||
|
"--tensor-parallel-size", str(CLUSTER_TP),
|
||||||
|
"--max-num-seqs", config["max_num_seqs"],
|
||||||
|
"--distributed-executor-backend", "ray"
|
||||||
|
]
|
||||||
|
|
||||||
|
# Optional ctx
|
||||||
|
if "ctx" in config:
|
||||||
|
cmd.extend(["--max-model-len", config["ctx"]])
|
||||||
|
|
||||||
|
if config.get("trust_remote"): cmd.append("--trust-remote-code")
|
||||||
|
|
||||||
|
# FORCED EAGER as per request for cluster stability
|
||||||
|
cmd.append("--enforce-eager")
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def run_cluster_throughput(model):
|
||||||
|
# Skip if TP=2 is not valid for this model
|
||||||
|
if CLUSTER_TP not in MODEL_TABLE[model]["valid_tp"]:
|
||||||
|
log(f"SKIP {model} (Support TP={MODEL_TABLE[model]['valid_tp']}, Cluster is TP={CLUSTER_TP})")
|
||||||
|
return
|
||||||
|
|
||||||
|
model_safe = model.replace("/", "_")
|
||||||
|
output_file = RESULTS_DIR / f"{model_safe}_cluster_tp{CLUSTER_TP}_throughput.json"
|
||||||
|
|
||||||
|
if output_file.exists():
|
||||||
|
log(f"SKIP {model} (Result exists)")
|
||||||
|
return
|
||||||
|
|
||||||
|
dataset_path = get_dataset()
|
||||||
|
dataset_args = ["--dataset-name", "sharegpt", "--dataset-path", dataset_path] if dataset_path else ["--input-len", "1024"]
|
||||||
|
|
||||||
|
batch_tokens = MODEL_TABLE[model].get("max_tokens", DEFAULT_BATCH_TOKENS)
|
||||||
|
|
||||||
|
log(f"START Cluster Bench {model} [TP={CLUSTER_TP} | Eager=True]...")
|
||||||
|
|
||||||
|
# Nuke cache between runs to be safe
|
||||||
|
nuke_vllm_cache()
|
||||||
|
|
||||||
|
# NOTE: We use 'vllm bench throughput' directly.
|
||||||
|
# It supports most 'vllm serve' args but we need to ensure it picks up the ray backend.
|
||||||
|
|
||||||
|
cmd = ["vllm", "bench", "throughput"] + get_model_args(model)
|
||||||
|
cmd.extend([
|
||||||
|
"--num-prompts", str(OFF_NUM_PROMPTS),
|
||||||
|
"--max-num-batched-tokens", batch_tokens,
|
||||||
|
"--output-len", OFF_FORCED_OUTPUT,
|
||||||
|
"--output-json", str(output_file),
|
||||||
|
"--disable-log-stats"
|
||||||
|
])
|
||||||
|
cmd.extend(dataset_args)
|
||||||
|
|
||||||
|
env = get_cluster_env()
|
||||||
|
|
||||||
|
# Add model specific envs
|
||||||
|
model_env = MODEL_TABLE[model].get("env", {})
|
||||||
|
env.update(model_env)
|
||||||
|
|
||||||
|
try:
|
||||||
|
log(f"Command: {' '.join(cmd)}")
|
||||||
|
subprocess.run(cmd, check=True, env=env)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
log(f"ERROR: Cluster Benchmark failed for {model} (Exit {e.returncode})")
|
||||||
|
except Exception as e:
|
||||||
|
log(f"ERROR: System error: {e}")
|
||||||
|
|
||||||
|
def print_summary():
|
||||||
|
print(f"\n{'MODEL (Cluster TP=2)':<50} | {'TOK/S':<10}")
|
||||||
|
print("-" * 65)
|
||||||
|
|
||||||
|
for m in MODELS_TO_RUN:
|
||||||
|
msafe = m.replace("/", "_")
|
||||||
|
try:
|
||||||
|
tdata = json.loads((RESULTS_DIR / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json").read_text())
|
||||||
|
tok_s = f"{tdata.get('tokens_per_second', 0):.1f}"
|
||||||
|
except:
|
||||||
|
if CLUSTER_TP not in MODEL_TABLE[m]["valid_tp"]:
|
||||||
|
tok_s = "SKIP"
|
||||||
|
else:
|
||||||
|
tok_s = "N/A"
|
||||||
|
|
||||||
|
name_cell = m.split('/')[-1]
|
||||||
|
print(f"{name_cell:<50} | {tok_s:<10}")
|
||||||
|
print("-" * 65)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if not check_ray_status():
|
||||||
|
log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
log("Ray Cluster Detected. Starting Benchmarks...")
|
||||||
|
|
||||||
|
for m in MODELS_TO_RUN:
|
||||||
|
run_cluster_throughput(m)
|
||||||
|
|
||||||
|
print_summary()
|
||||||
@@ -92,6 +92,8 @@ printf 'Repo : https://github.com/kyuz0/amd-strix-halo-vllm-toolboxes\n'
|
|||||||
printf 'Image : docker.io/kyuz0/vllm-therock-gfx1151:latest\n\n'
|
printf 'Image : docker.io/kyuz0/vllm-therock-gfx1151:latest\n\n'
|
||||||
printf 'Included:\n'
|
printf 'Included:\n'
|
||||||
printf ' - %-16s → %s\n' "start-vllm (TUI)" "Interactive launcher: Model select, Multi-GPU & Cache handling"
|
printf ' - %-16s → %s\n' "start-vllm (TUI)" "Interactive launcher: Model select, Multi-GPU & Cache handling"
|
||||||
|
printf ' - %-16s → %s\n' "start-vllm-cluster" "Cluster launcher: Setup Ray Head/Worker & Launch vLLM RCCL"
|
||||||
|
printf ' - %-16s → %s\n' "vllm-cluster-bench" "Cluster Benchmark: TP=2, Auto-detected Env, JSON Results"
|
||||||
printf ' - %-16s → %s\n' "vLLM server" "vllm serve meta-llama/Meta-Llama-3.1-8B-Instruct"
|
printf ' - %-16s → %s\n' "vLLM server" "vllm serve meta-llama/Meta-Llama-3.1-8B-Instruct"
|
||||||
printf ' - %-16s → %s\n' "API test" "curl localhost:8000/v1/chat/completions"
|
printf ' - %-16s → %s\n' "API test" "curl localhost:8000/v1/chat/completions"
|
||||||
echo
|
echo
|
||||||
|
|||||||
Ausführbare Datei
+501
@@ -0,0 +1,501 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Add benchmarks dir to path to import config
|
||||||
|
SCRIPT_DIR = Path(__file__).parent.resolve()
|
||||||
|
BENCH_DIR = SCRIPT_DIR.parent / "benchmarks"
|
||||||
|
OPT_DIR = Path("/opt")
|
||||||
|
|
||||||
|
# Check /opt first (Container), then local fallback
|
||||||
|
if (OPT_DIR / "run_vllm_bench.py").exists():
|
||||||
|
sys.path.append(str(OPT_DIR))
|
||||||
|
else:
|
||||||
|
sys.path.append(str(BENCH_DIR))
|
||||||
|
|
||||||
|
try:
|
||||||
|
from run_vllm_bench import MODEL_TABLE, MODELS_TO_RUN
|
||||||
|
except ImportError:
|
||||||
|
print("Error: Could not import run_vllm_bench.py config.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if (OPT_DIR / "max_context_results.json").exists():
|
||||||
|
RESULTS_FILE = OPT_DIR / "max_context_results.json"
|
||||||
|
else:
|
||||||
|
RESULTS_FILE = BENCH_DIR / "max_context_results.json"
|
||||||
|
|
||||||
|
HOST = os.getenv("HOST", "0.0.0.0")
|
||||||
|
PORT = os.getenv("PORT", "8000")
|
||||||
|
|
||||||
|
def get_discovered_models():
|
||||||
|
"""
|
||||||
|
Overrides the hardcoded MODELS_TO_RUN by looking at what we actually have results for.
|
||||||
|
"""
|
||||||
|
if not RESULTS_FILE.exists():
|
||||||
|
return MODELS_TO_RUN
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(RESULTS_FILE, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
verified_models = set()
|
||||||
|
for r in data:
|
||||||
|
if r.get("status") == "success":
|
||||||
|
verified_models.add(r["model"])
|
||||||
|
|
||||||
|
final_list = []
|
||||||
|
for m in sorted(list(verified_models)):
|
||||||
|
if m in MODEL_TABLE:
|
||||||
|
final_list.append(m)
|
||||||
|
|
||||||
|
if final_list:
|
||||||
|
return final_list
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: Model discovery failed ({e}). Using default list.")
|
||||||
|
|
||||||
|
return MODELS_TO_RUN
|
||||||
|
|
||||||
|
# Refresh the list of models to run based on what we found
|
||||||
|
MODELS_TO_RUN = get_discovered_models()
|
||||||
|
|
||||||
|
def check_dependencies():
|
||||||
|
missing = []
|
||||||
|
if not shutil.which("dialog"):
|
||||||
|
missing.append("dialog")
|
||||||
|
if not shutil.which("ssh"):
|
||||||
|
missing.append("ssh")
|
||||||
|
if not shutil.which("ray"):
|
||||||
|
missing.append("ray")
|
||||||
|
|
||||||
|
if missing:
|
||||||
|
print(f"Error: Missing dependencies: {', '.join(missing)}.")
|
||||||
|
print("Please install them (e.g., sudo dnf install dialog openssh-clients).")
|
||||||
|
print("Ensure 'ray' is in your PATH (pip install ray).")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def run_dialog(args):
|
||||||
|
"""Runs dialog and returns stderr (selection)."""
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w+") as tf:
|
||||||
|
cmd = ["dialog"] + args
|
||||||
|
try:
|
||||||
|
subprocess.run(cmd, stderr=tf, check=True)
|
||||||
|
tf.seek(0)
|
||||||
|
return tf.read().strip()
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
return None # User cancelled
|
||||||
|
|
||||||
|
def show_info(title, msg):
|
||||||
|
run_dialog(["--title", title, "--msgbox", msg, "12", "60"])
|
||||||
|
|
||||||
|
def get_subnet_from_ip(ip):
|
||||||
|
"""Accurately gets the /24 subnet string for the given IP."""
|
||||||
|
parts = ip.split('.')
|
||||||
|
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
|
||||||
|
|
||||||
|
def setup_ips_dialog(current_head, current_worker):
|
||||||
|
"""
|
||||||
|
Uses dialog --form to let user edit Head and Worker IPs simultaneously.
|
||||||
|
Returns (new_head, new_worker) or None if cancelled.
|
||||||
|
"""
|
||||||
|
# Layout:
|
||||||
|
# Label 1 (Head) at 1,1
|
||||||
|
# Input 1 at 1,20
|
||||||
|
# Label 2 (Worker) at 2,1
|
||||||
|
# Input 2 at 2,20
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
"dialog",
|
||||||
|
"--title", "Configure Cluster IPs",
|
||||||
|
"--form", "Edit the IP addresses for the Cluster nodes:",
|
||||||
|
"10", "60", "2",
|
||||||
|
"Head Node IP:", "1", "1", current_head, "1", "20", "20", "0",
|
||||||
|
"Worker Node IP:", "2", "1", current_worker, "2", "20", "20", "0"
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
# dialog --form outputs to stderr: "field1\nfield2\n..."
|
||||||
|
res = subprocess.run(cmd, stderr=subprocess.PIPE, check=True, text=True)
|
||||||
|
lines = res.stderr.strip().split('\n')
|
||||||
|
if len(lines) >= 2:
|
||||||
|
return lines[0], lines[1]
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
def setup_worker_node(worker_ip, head_ip):
|
||||||
|
subnet = get_subnet_from_ip(worker_ip)
|
||||||
|
|
||||||
|
# Script to run on worker
|
||||||
|
script = f"""
|
||||||
|
source /etc/profile
|
||||||
|
# Silece the kill command
|
||||||
|
ray stop --force > /dev/null 2>&1 || true
|
||||||
|
export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1
|
||||||
|
export RAY_memory_monitor_refresh_ms=0
|
||||||
|
export VLLM_HOST_IP={worker_ip}
|
||||||
|
export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1)
|
||||||
|
export NCCL_SOCKET_IFNAME=$RDMA_IFACE
|
||||||
|
export GLOO_SOCKET_IFNAME=$RDMA_IFACE
|
||||||
|
echo "Starting Ray Worker on {worker_ip} connecting to {head_ip}..."
|
||||||
|
ray start --address='{head_ip}:6379' --num-gpus=1 --num-cpus=8 --disable-usage-stats
|
||||||
|
"""
|
||||||
|
|
||||||
|
print(f"Setting up Worker Node ({worker_ip})...")
|
||||||
|
|
||||||
|
# Use bash -s to read script from stdin
|
||||||
|
# Command: ssh user@host "toolbox run -c vllm -- bash -s"
|
||||||
|
ssh_cmd = [
|
||||||
|
"ssh", "-o", "StrictHostKeyChecking=no", worker_ip,
|
||||||
|
"toolbox run -c vllm -- bash -s"
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.run(ssh_cmd, input=script.encode(), check=True)
|
||||||
|
return True
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Failed to setup worker: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def setup_head_node(head_ip):
|
||||||
|
subnet = get_subnet_from_ip(head_ip)
|
||||||
|
|
||||||
|
print(f"Setting up Head Node ({head_ip})...")
|
||||||
|
|
||||||
|
script = f"""
|
||||||
|
# Silence the kill command
|
||||||
|
ray stop --force > /dev/null 2>&1 || true
|
||||||
|
export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1
|
||||||
|
export RAY_memory_monitor_refresh_ms=0
|
||||||
|
export VLLM_HOST_IP={head_ip}
|
||||||
|
export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1)
|
||||||
|
export NCCL_SOCKET_IFNAME=$RDMA_IFACE
|
||||||
|
export GLOO_SOCKET_IFNAME=$RDMA_IFACE
|
||||||
|
echo "Starting Ray Head on {head_ip}..."
|
||||||
|
ray start --head --port=6379 --node-ip-address={head_ip} --num-gpus=1 --num-cpus=8 --disable-usage-stats
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run locally
|
||||||
|
subprocess.run(["bash", "-s"], input=script.encode(), check=True)
|
||||||
|
return True
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Failed to setup head: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def check_ray_status():
|
||||||
|
"""Returns (active_nodes, total_gpus) parsing 'ray status' output roughly."""
|
||||||
|
try:
|
||||||
|
res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||||
|
if res.returncode != 0:
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
output = res.stdout
|
||||||
|
active_nodes = 0
|
||||||
|
in_active_section = False
|
||||||
|
for line in output.splitlines():
|
||||||
|
if "Active:" in line:
|
||||||
|
in_active_section = True
|
||||||
|
continue
|
||||||
|
if "Pending:" in line or "Recent failures:" in line:
|
||||||
|
in_active_section = False
|
||||||
|
|
||||||
|
if in_active_section and line.strip().startswith("1 node_"):
|
||||||
|
active_nodes += 1
|
||||||
|
|
||||||
|
return active_nodes, 2 # Assume 2 GPUs as per success criteria
|
||||||
|
except:
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
def wait_for_cluster():
|
||||||
|
print("Waiting for Ray cluster to initialize (expecting 2 nodes)...")
|
||||||
|
for i in range(30):
|
||||||
|
nodes, gpus = check_ray_status()
|
||||||
|
print(f"Check {i+1}/30: Active Nodes={nodes}")
|
||||||
|
if nodes >= 2:
|
||||||
|
print("Cluster is Ready!")
|
||||||
|
time.sleep(2)
|
||||||
|
return True
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
print("Timeout waiting for cluster.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def nuke_vllm_cache():
|
||||||
|
"""Removes vLLM cache directory."""
|
||||||
|
cache = Path.home() / ".cache" / "vllm"
|
||||||
|
if cache.exists():
|
||||||
|
try:
|
||||||
|
print(f"Clearing vLLM cache at {cache}...", end="", flush=True)
|
||||||
|
subprocess.run(["rm", "-rf", str(cache)], check=True)
|
||||||
|
cache.mkdir(parents=True, exist_ok=True)
|
||||||
|
print(" Done.")
|
||||||
|
time.sleep(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Failed: {e}")
|
||||||
|
|
||||||
|
def get_verified_config(model_id, tp_size, max_seqs):
|
||||||
|
"""Reads max_context_results.json."""
|
||||||
|
default_config = {
|
||||||
|
"ctx": int(MODEL_TABLE.get(model_id, {}).get("ctx", 8192)),
|
||||||
|
"util": 0.90
|
||||||
|
}
|
||||||
|
|
||||||
|
if not RESULTS_FILE.exists():
|
||||||
|
return default_config
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(RESULTS_FILE, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
matches = [r for r in data
|
||||||
|
if r["model"] == model_id
|
||||||
|
and r["tp"] == tp_size
|
||||||
|
and r["max_seqs"] == max_seqs
|
||||||
|
and r["status"] == "success"]
|
||||||
|
|
||||||
|
if not matches:
|
||||||
|
return default_config
|
||||||
|
|
||||||
|
matches.sort(key=lambda x: (float(x["util"]), x["max_context_1_user"]), reverse=True)
|
||||||
|
best = matches[0]
|
||||||
|
return {
|
||||||
|
"ctx": best["max_context_1_user"],
|
||||||
|
"util": float(best["util"])
|
||||||
|
}
|
||||||
|
except:
|
||||||
|
return default_config
|
||||||
|
|
||||||
|
def configure_and_launch_vllm(model_idx, head_ip):
|
||||||
|
model_id = MODELS_TO_RUN[model_idx]
|
||||||
|
config = MODEL_TABLE[model_id]
|
||||||
|
name = model_id.split("/")[-1]
|
||||||
|
|
||||||
|
# Defaults
|
||||||
|
current_tp = 2 # Forced default for Cluster
|
||||||
|
current_seqs = 1
|
||||||
|
|
||||||
|
# Lookup Config
|
||||||
|
verified = get_verified_config(model_id, current_tp, current_seqs if isinstance(current_seqs, int) else 1)
|
||||||
|
current_ctx = verified["ctx"]
|
||||||
|
current_util = verified["util"]
|
||||||
|
|
||||||
|
clear_cache = False
|
||||||
|
use_eager = True # Default True for cluster as per request ("enforce-eager")
|
||||||
|
trust_remote = True # Default True as per request
|
||||||
|
|
||||||
|
while True:
|
||||||
|
cache_status = "YES" if clear_cache else "NO"
|
||||||
|
eager_status = "YES" if use_eager else "NO"
|
||||||
|
trust_status = "YES" if trust_remote else "NO"
|
||||||
|
|
||||||
|
menu_args = [
|
||||||
|
"--clear", "--backtitle", f"AMD VLLM CLUSTER Launcher (Head: {head_ip})",
|
||||||
|
"--title", f"Configuration: {name}",
|
||||||
|
"--menu", "Customize Launch Parameters:", "22", "65", "9",
|
||||||
|
"1", f"Tensor Parallelism: {current_tp} (Fixed)",
|
||||||
|
"2", f"Concurrent Requests: {current_seqs}",
|
||||||
|
"3", f"Context Length: {current_ctx}",
|
||||||
|
"4", f"GPU Utilization: {current_util}",
|
||||||
|
"5", f"Trust Remote Code: {trust_status}",
|
||||||
|
"6", f"Erase vLLM Cache: {cache_status}",
|
||||||
|
"7", f"Force Eager Mode: {eager_status}",
|
||||||
|
"8", "LAUNCH SERVER"
|
||||||
|
]
|
||||||
|
|
||||||
|
choice = run_dialog(menu_args)
|
||||||
|
if not choice: return False
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
# TP Selection - Allow change but warn?
|
||||||
|
new_tp = run_dialog([
|
||||||
|
"--title", "Tensor Parallelism",
|
||||||
|
"--rangebox", "Set TP Size:", "10", "40", "1", "8", str(current_tp)
|
||||||
|
])
|
||||||
|
if new_tp: current_tp = int(new_tp)
|
||||||
|
|
||||||
|
elif choice == "2":
|
||||||
|
new_seqs = run_dialog([
|
||||||
|
"--title", "Concurrent Requests",
|
||||||
|
"--inputbox", "Enter Max Concurrent Requests (or 'auto'):", "10", "40", str(current_seqs)
|
||||||
|
])
|
||||||
|
if new_seqs:
|
||||||
|
if new_seqs.lower().strip() == "auto":
|
||||||
|
current_seqs = "auto"
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
current_seqs = int(new_seqs)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
elif choice == "3":
|
||||||
|
new_ctx = run_dialog([
|
||||||
|
"--title", "Context Length",
|
||||||
|
"--inputbox", f"Enter Context Length (or 'auto'):", "10", "40", str(current_ctx)
|
||||||
|
])
|
||||||
|
if new_ctx:
|
||||||
|
if new_ctx.lower().strip() == "auto":
|
||||||
|
current_ctx = "auto"
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
current_ctx = int(new_ctx)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
elif choice == "4":
|
||||||
|
new_util = run_dialog([
|
||||||
|
"--title", "GPU Utilization",
|
||||||
|
"--inputbox", "Enter GPU Utilization (0.1 - 1.0):", "10", "40", str(current_util)
|
||||||
|
])
|
||||||
|
if new_util: current_util = float(new_util)
|
||||||
|
|
||||||
|
elif choice == "5":
|
||||||
|
trust_remote = not trust_remote
|
||||||
|
|
||||||
|
elif choice == "6":
|
||||||
|
clear_cache = not clear_cache
|
||||||
|
|
||||||
|
elif choice == "7":
|
||||||
|
use_eager = not use_eager
|
||||||
|
|
||||||
|
elif choice == "8":
|
||||||
|
break
|
||||||
|
|
||||||
|
# Build Command
|
||||||
|
subprocess.run(["clear"])
|
||||||
|
|
||||||
|
if clear_cache:
|
||||||
|
nuke_vllm_cache()
|
||||||
|
|
||||||
|
# Environment Setup
|
||||||
|
# We need to set these variables in the current process before exec or pass them in env
|
||||||
|
subnet = get_subnet_from_ip(head_ip)
|
||||||
|
|
||||||
|
# Compute RDMA IFACE dynamically
|
||||||
|
# Note: we need to run logical command to get the iface name
|
||||||
|
try:
|
||||||
|
iface_cmd = f"ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1"
|
||||||
|
rdma_iface = subprocess.check_output(iface_cmd, shell=True, text=True).strip()
|
||||||
|
except:
|
||||||
|
rdma_iface = "eth0" # Fallback
|
||||||
|
print("Warning: Could not detect RDMA IFACE, defaulting to eth0")
|
||||||
|
|
||||||
|
print(f"Detected RDMA Interface: {rdma_iface}")
|
||||||
|
|
||||||
|
env = os.environ.copy()
|
||||||
|
env["RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES"] = "1"
|
||||||
|
env["VLLM_HOST_IP"] = head_ip
|
||||||
|
env["NCCL_SOCKET_IFNAME"] = rdma_iface
|
||||||
|
env["NCCL_IB_GID_INDEX"] = "1"
|
||||||
|
env["NCCL_IB_DISABLE"] = "0"
|
||||||
|
env["NCCL_NET_GDR_LEVEL"] = "0"
|
||||||
|
|
||||||
|
# Also need this for Ray backend?
|
||||||
|
# vLLM usually handles ray connection if we pass --distributed-executor-backend ray
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
"vllm", "serve", model_id,
|
||||||
|
"--host", HOST,
|
||||||
|
"--port", PORT,
|
||||||
|
"--tensor-parallel-size", str(current_tp),
|
||||||
|
"--gpu-memory-utilization", str(current_util),
|
||||||
|
"--distributed-executor-backend", "ray",
|
||||||
|
"--dtype", "auto"
|
||||||
|
]
|
||||||
|
|
||||||
|
if str(current_seqs) != "auto":
|
||||||
|
cmd.extend(["--max-num-seqs", str(current_seqs)])
|
||||||
|
|
||||||
|
if str(current_ctx) != "auto":
|
||||||
|
cmd.extend(["--max-model-len", str(current_ctx)])
|
||||||
|
|
||||||
|
if trust_remote: cmd.append("--trust-remote-code")
|
||||||
|
if use_eager: cmd.append("--enforce-eager")
|
||||||
|
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print(f" Launching VLLM Cluster on Head: {head_ip}")
|
||||||
|
print(f" Model: {name}")
|
||||||
|
print(f" Config: TP={current_tp} | Seqs={current_seqs} | Ctx={current_ctx}")
|
||||||
|
print(f" Command: {' '.join(cmd)}")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
|
||||||
|
# Exec
|
||||||
|
os.execvpe("vllm", cmd, env)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
check_dependencies()
|
||||||
|
|
||||||
|
# Default IPs
|
||||||
|
head_ip = "192.168.100.1"
|
||||||
|
worker_ip = "192.168.100.2"
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Main Menu
|
||||||
|
# 1. Configure IPs
|
||||||
|
# 2. Start Cluster (Ray)
|
||||||
|
# 3. Start VLLM
|
||||||
|
# 4. Exit
|
||||||
|
|
||||||
|
choice = run_dialog([
|
||||||
|
"--clear", "--backtitle", "AMD VLLM RCCL Cluster Manager",
|
||||||
|
"--title", "Main Menu",
|
||||||
|
"--menu", "Select Action:", "15", "60", "5",
|
||||||
|
"1", f"Configure IPs (Head: {head_ip}, Worker: {worker_ip})",
|
||||||
|
"2", "Start Ray Cluster",
|
||||||
|
"3", "Ray Cluster Status",
|
||||||
|
"4", "Launch VLLM Serve",
|
||||||
|
"5", "Exit"
|
||||||
|
])
|
||||||
|
|
||||||
|
if not choice or choice == "5":
|
||||||
|
subprocess.run(["clear"])
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
res = setup_ips_dialog(head_ip, worker_ip)
|
||||||
|
if res:
|
||||||
|
head_ip, worker_ip = res
|
||||||
|
|
||||||
|
elif choice == "2":
|
||||||
|
subprocess.run(["clear"])
|
||||||
|
print("= Starting Ray Cluster Setup =")
|
||||||
|
# 1. Start Head
|
||||||
|
if setup_head_node(head_ip):
|
||||||
|
print("Head node started successfully. Waiting 5s before worker connection...")
|
||||||
|
time.sleep(5)
|
||||||
|
# 2. Start Worker
|
||||||
|
if setup_worker_node(worker_ip, head_ip):
|
||||||
|
# 3. Wait for full cluster
|
||||||
|
wait_for_cluster()
|
||||||
|
input("Press Enter to continue...")
|
||||||
|
|
||||||
|
elif choice == "3":
|
||||||
|
subprocess.run(["clear"])
|
||||||
|
print("= Ray Cluster Status =")
|
||||||
|
subprocess.run(["ray", "status"])
|
||||||
|
input("\nPress Enter to continue...")
|
||||||
|
|
||||||
|
elif choice == "4":
|
||||||
|
# Select Model
|
||||||
|
menu_items = []
|
||||||
|
for i, m_id in enumerate(MODELS_TO_RUN):
|
||||||
|
name = m_id.split("/")[-1]
|
||||||
|
menu_items.extend([str(i), name])
|
||||||
|
|
||||||
|
m_choice = run_dialog([
|
||||||
|
"--title", "Select Model",
|
||||||
|
"--menu", "Choose a model to serve:", "20", "60", "10"
|
||||||
|
] + menu_items)
|
||||||
|
|
||||||
|
if m_choice:
|
||||||
|
configure_and_launch_vllm(int(m_choice), head_ip)
|
||||||
|
# Note: execvpe replaces process, so we won't return here.
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
In neuem Issue referenzieren
Einen Benutzer sperren