diff --git a/Dockerfile b/Dockerfile index d891f44..a6714d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -127,6 +127,7 @@ COPY scripts/99-toolbox-banner.sh /etc/profile.d/99-toolbox-banner.sh COPY scripts/zz-venv-last.sh /etc/profile.d/zz-venv-last.sh COPY scripts/start_vllm.py /opt/start-vllm COPY scripts/start_vllm_cluster.py /opt/start-vllm-cluster +COPY scripts/cluster_manager.py /opt/cluster_manager.py COPY scripts/models.py /opt/models.py COPY benchmarks/max_context_results.json /opt/max_context_results.json COPY benchmarks/run_vllm_bench.py /opt/run_vllm_bench.py @@ -135,6 +136,7 @@ COPY benchmarks/find_max_context.py /opt/find_max_context.py COPY rdma_cluster/compare_eth_vs_rdma.sh /opt/compare_eth_vs_rdma.sh COPY scripts/configure_cluster.sh /opt/configure_cluster.sh RUN chmod +x /opt/configure_cluster.sh + RUN chmod +x /opt/start-vllm /opt/start-vllm-cluster /opt/vllm_cluster_bench.py /opt/compare_eth_vs_rdma.sh /opt/find_max_context.py /opt/run_vllm_bench.py && \ ln -s /opt/start-vllm /usr/local/bin/start-vllm && \ ln -s /opt/start-vllm-cluster /usr/local/bin/start-vllm-cluster && \ diff --git a/benchmarks/vllm_cluster_bench.py b/benchmarks/vllm_cluster_bench.py index 56e4db7..755bb6c 100755 --- a/benchmarks/vllm_cluster_bench.py +++ b/benchmarks/vllm_cluster_bench.py @@ -45,107 +45,56 @@ MODELS_TO_RUN = models.MODELS_TO_RUN # UTILS (Adapted for Cluster) # ========================= + +# ========================= +# CLUSTER MANAGER INTEGRATION +# ========================= +try: + import cluster_manager +except ImportError: + sys.path.append(str(Path(__file__).parent.parent / "scripts")) + import cluster_manager + +# Defaults for Cluster +HEAD_IP = os.getenv("VLLM_HEAD_IP", "192.168.100.1") +WORKER_IP = os.getenv("VLLM_WORKER_IP", "192.168.100.2") + def log(msg): print(f"\n[CLUSTER-BENCH] {msg}") -def get_ray_nodes(): - """Returns a list of active Ray node IPs.""" - try: - res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if res.returncode != 0: - return [] - - nodes = [] - in_active_section = False - for line in res.stdout.splitlines(): - if "Active:" in line: - in_active_section = True - continue - if "Pending:" in line or "Recent failures:" in line: - in_active_section = False - - if in_active_section: - # Look for "1 node_" pattern - # Existing logic checked for startswith("1 node_") - # We use regex to be robust and capture the IP - match = re.search(r"node_(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", line) - if match: - nodes.append(match.group(1)) - - return nodes - except: - return [] +def restart_cluster(): + log("Restarting Ray Cluster (Clean State)...") + + # 1. Stop Cluster (Best Effort) + cluster_manager.stop_cluster() + + # 2. Start Head + if not cluster_manager.setup_head_node(HEAD_IP): + log("ERROR: Failed to start HEAD node.") + sys.exit(1) + + # 3. Start Worker + # Give head a moment + time.sleep(5) + if not cluster_manager.setup_worker_node(WORKER_IP, HEAD_IP): + log("ERROR: Failed to start WORKER node.") + sys.exit(1) + + # 4. Wait + if not cluster_manager.wait_for_cluster(): + log("ERROR: Cluster failed to initialize.") + sys.exit(1) + + log("Cluster Ready.") -def check_ray_status(): - """Checks if Ray cluster is active with at least 2 nodes.""" - nodes = get_ray_nodes() - return len(nodes) >= 2 - -def get_net_iface(ip_prefix="192.168.100"): - """ - Auto-detects the interface that serves the cluster network. - Assumes standard 192.168.100.x setup from start_vllm_cluster.py - """ - try: - # ip -o addr show | grep 192.168.100 - cmd = f"ip -o addr show | grep {ip_prefix}" - res = subprocess.check_output(cmd, shell=True, text=True).strip() - # Output format: 2: eth0 inet 192.168.100.1/24 ... - parts = res.split() - if len(parts) >= 2: - return parts[1] # Interface name - except: - pass - return "eth0" # Fallback +def get_net_iface(): + return cluster_manager.get_net_iface() def get_local_ip(iface): - try: - cmd = f"ip -o -4 addr show {iface} | awk '{{print $4}}' | cut -d/ -f1" - return subprocess.check_output(cmd, shell=True, text=True).strip() - except: - return "127.0.0.1" - -def nuke_vllm_cache_on_node(ip, is_local=False): - """Clears vLLM cache on a specific node.""" - cmd_str = f"Locally" if is_local else f"on {ip}" - print(f"Clearing vLLM cache {cmd_str}...", end="", flush=True) - - try: - if is_local: - cache = Path.home() / ".cache" / "vllm" - if cache.exists(): - subprocess.run(["rm", "-rf", str(cache)], check=True) - cache.mkdir(parents=True, exist_ok=True) - else: - # Remote SSH - ssh_cmd = [ - "ssh", "-o", "StrictHostKeyChecking=no", ip, - "rm -rf ~/.cache/vllm && mkdir -p ~/.cache/vllm" - ] - subprocess.run(ssh_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - print(" Done.") - except Exception as e: - print(f" Failed ({e}).") + return cluster_manager.get_local_ip(iface) def nuke_vllm_cache(): - """Clears vLLM cache on ALL cluster nodes.""" - nodes = get_ray_nodes() - rdma_iface = get_net_iface() - local_ip = get_local_ip(rdma_iface) - - # If no nodes found (unexpected if we are running bench), try just local - if not nodes: - nuke_vllm_cache_on_node(local_ip, is_local=True) - return + cluster_manager.nuke_vllm_cache_cluster() - for node_ip in nodes: - # Check if node is local - # Simple string match, but IPs might vary (localhost vs 192.168...) - # We trust get_local_ip returns the IP used in the cluster (192.168.100.x) - is_local = (node_ip == local_ip) or (node_ip == "127.0.0.1") - nuke_vllm_cache_on_node(node_ip, is_local) - - time.sleep(2) def get_dataset(): # Same as original @@ -261,7 +210,8 @@ def run_bench_set(model, backend_name, output_dir, extra_env=None): log(f"ERROR: System error: {e}") def run_cluster_throughput(model): - # 1. Default Run (Triton usually, unless global envs set) + # 1. Default Run (Triton) + restart_cluster() run_bench_set( model, "Default", @@ -269,6 +219,7 @@ def run_cluster_throughput(model): ) # 2. ROCm Attention Run + restart_cluster() run_bench_set( model, "ROCm-Attn", @@ -279,6 +230,7 @@ def run_cluster_throughput(model): } ) + def print_summary(): print(f"\n{'MODEL (TP=2)':<50} | {'Triton':<8} | {'ROCm':<8}") print("-" * 75) @@ -305,9 +257,12 @@ def print_summary(): print("-" * 75) if __name__ == "__main__": - if not check_ray_status(): - log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.") - sys.exit(1) + # if not check_ray_status(): + # log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.") + # sys.exit(1) + # We now handle this by restarting the cluster ourselves. + pass + log("Ray Cluster Detected. Starting Benchmarks (Dual Backend)...") diff --git a/scripts/cluster_manager.py b/scripts/cluster_manager.py new file mode 100644 index 0000000..a8af722 --- /dev/null +++ b/scripts/cluster_manager.py @@ -0,0 +1,204 @@ +import subprocess +import time +import os + +def get_net_iface(ip_prefix="192.168.100"): + """ + Auto-detects the interface that serves the cluster network. + Assumes standard 192.168.100.x setup from start_vllm_cluster.py + """ + try: + # ip -o addr show | grep 192.168.100 + cmd = f"ip -o addr show | grep {ip_prefix}" + res = subprocess.check_output(cmd, shell=True, text=True).strip() + # Output format: 2: eth0 inet 192.168.100.1/24 ... + parts = res.split() + if len(parts) >= 2: + return parts[1] # Interface name + except: + pass + return "eth0" # Fallback + +def get_local_ip(iface): + try: + cmd = f"ip -o -4 addr show {iface} | awk '{{print $4}}' | cut -d/ -f1" + return subprocess.check_output(cmd, shell=True, text=True).strip() + except: + return "127.0.0.1" + +def get_subnet_from_ip(ip): + """Accurately gets the /24 subnet string for the given IP.""" + parts = ip.split('.') + return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24" + +def stop_cluster(nodes=None): + """ + Stops Ray on the given nodes (list of IPs). + If nodes is None, does nothing (caller should identify nodes first if needed, + but typically for a clean start we might just rely on 'ray stop' on each setup). + Actually, to be safe, we can try to stop local ray. + """ + subprocess.run(["ray", "stop", "--force"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + +def setup_worker_node(worker_ip, head_ip): + subnet = get_subnet_from_ip(worker_ip) + + # Script to run on worker + script = f""" + source /etc/profile + # Silece the kill command + ray stop --force > /dev/null 2>&1 || true + export RAY_DISABLE_METRICS=1 + export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1 + export RAY_memory_monitor_refresh_ms=0 + export VLLM_HOST_IP={worker_ip} + export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) + export NCCL_SOCKET_IFNAME=$RDMA_IFACE + export GLOO_SOCKET_IFNAME=$RDMA_IFACE + # Stability for RDMA + export NCCL_IB_TIMEOUT=23 + export NCCL_IB_RETRY_CNT=7 + echo "Starting Ray Worker on {worker_ip} connecting to {head_ip}..." + ray start --address='{head_ip}:6379' --num-gpus=1 --num-cpus=8 --disable-usage-stats + """ + + print(f"Setting up Worker Node ({worker_ip})...") + + # Use bash -s to read script from stdin + # Command: ssh user@host "toolbox run -c vllm -- bash -s" + ssh_cmd = [ + "ssh", "-o", "StrictHostKeyChecking=no", worker_ip, + "toolbox run -c vllm -- bash -s" + ] + + try: + subprocess.run(ssh_cmd, input=script.encode(), check=True) + return True + except subprocess.CalledProcessError as e: + print(f"Failed to setup worker: {e}") + return False + +def setup_head_node(head_ip): + subnet = get_subnet_from_ip(head_ip) + + print(f"Setting up Head Node ({head_ip})...") + + script = f""" + # Silence the kill command + ray stop --force > /dev/null 2>&1 || true + export RAY_DISABLE_METRICS=1 + export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1 + export RAY_memory_monitor_refresh_ms=0 + export VLLM_HOST_IP={head_ip} + export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) + export NCCL_SOCKET_IFNAME=$RDMA_IFACE + export GLOO_SOCKET_IFNAME=$RDMA_IFACE + # Stability for RDMA + export NCCL_IB_TIMEOUT=23 + export NCCL_IB_RETRY_CNT=7 + echo "Starting Ray Head on {head_ip}..." + ray start --head --port=6379 --node-ip-address={head_ip} --num-gpus=1 --num-cpus=8 --disable-usage-stats + """ + + try: + # Run locally + subprocess.run(["bash", "-s"], input=script.encode(), check=True) + return True + except subprocess.CalledProcessError as e: + print(f"Failed to setup head: {e}") + return False + +def get_ray_nodes(): + """Returns a list of active Ray node IPs.""" + try: + res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if res.returncode != 0: + return [] + + nodes = [] + in_active_section = False + import re + for line in res.stdout.splitlines(): + if "Active:" in line: + in_active_section = True + continue + if "Pending:" in line or "Recent failures:" in line: + in_active_section = False + + if in_active_section: + match = re.search(r"node_(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", line) + if match: + nodes.append(match.group(1)) + + return nodes + except: + return [] + +def check_ray_status(): + """Returns (active_nodes, total_gpus) parsing 'ray status' output roughly.""" + nodes = get_ray_nodes() + # Assume 1 GPU per node for now as per strix halo setup + return len(nodes), len(nodes) + +def wait_for_cluster(expected_nodes=2, timeout=60): + print(f"Waiting for Ray cluster to initialize (expecting {expected_nodes} nodes)...") + for i in range(timeout): + nodes, gpus = check_ray_status() + if i % 5 == 0: + print(f"Check {i}/{timeout}: Active Nodes={nodes}") + if nodes >= expected_nodes: + print("Cluster is Ready!") + time.sleep(2) + return True + time.sleep(1) + + print("Timeout waiting for cluster.") + return False + +def nuke_vllm_cache_on_node(ip, is_local=False): + """Clears vLLM cache on a specific node.""" + cmd_str = f"Locally" if is_local else f"on {ip}" + print(f"Clearing vLLM cache {cmd_str}...", end="", flush=True) + + try: + if is_local: + from pathlib import Path + cache = Path.home() / ".cache" / "vllm" + if cache.exists(): + subprocess.run(["rm", "-rf", str(cache)], check=True) + cache.mkdir(parents=True, exist_ok=True) + else: + # Remote SSH + ssh_cmd = [ + "ssh", "-o", "StrictHostKeyChecking=no", ip, + "rm -rf ~/.cache/vllm && mkdir -p ~/.cache/vllm" + ] + subprocess.run(ssh_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + print(" Done.") + except Exception as e: + print(f" Failed ({e}).") + +def nuke_vllm_cache_cluster(): + """Clears vLLM cache on ALL cluster nodes.""" + nodes = get_ray_nodes() + # Assuming we are running on Head, which is one of the nodes. + # We need to detect which IP is "local" + # Or just run 'ray stop' first? + # The requirement is often to clear cache BEFORE start or between runs. + # If ray is down, 'get_ray_nodes' returns empty. + # So this is best used when cluster is UP. + + rdma_iface = get_net_iface() + local_ip = get_local_ip(rdma_iface) + + if not nodes: + # Fallback to just local? + nuke_vllm_cache_on_node(local_ip, is_local=True) + return + + for node_ip in nodes: + is_local = (node_ip == local_ip) or (node_ip == "127.0.0.1") + nuke_vllm_cache_on_node(node_ip, is_local) + + time.sleep(2) diff --git a/scripts/start_vllm_cluster.py b/scripts/start_vllm_cluster.py index d6bb561..4477665 100755 --- a/scripts/start_vllm_cluster.py +++ b/scripts/start_vllm_cluster.py @@ -98,159 +98,40 @@ def run_dialog(args): def show_info(title, msg): run_dialog(["--title", title, "--msgbox", msg, "12", "60"]) + +# Import Shared Cluster Manager +try: + import cluster_manager +except ImportError: + # Try importing from current directory if script is run directly + sys.path.append(str(Path(__file__).parent)) + import cluster_manager + +# Delegate Functions to Cluster Manager def get_subnet_from_ip(ip): - """Accurately gets the /24 subnet string for the given IP.""" - parts = ip.split('.') - return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24" - -def setup_ips_dialog(current_head, current_worker): - """ - Uses dialog --form to let user edit Head and Worker IPs simultaneously. - Returns (new_head, new_worker) or None if cancelled. - """ - # Layout: - # Label 1 (Head) at 1,1 - # Input 1 at 1,20 - # Label 2 (Worker) at 2,1 - # Input 2 at 2,20 - - cmd = [ - "dialog", - "--title", "Configure Cluster IPs", - "--form", "Edit the IP addresses for the Cluster nodes:", - "10", "60", "2", - "Head Node IP:", "1", "1", current_head, "1", "20", "20", "0", - "Worker Node IP:", "2", "1", current_worker, "2", "20", "20", "0" - ] - - try: - # dialog --form outputs to stderr: "field1\nfield2\n..." - res = subprocess.run(cmd, stderr=subprocess.PIPE, check=True, text=True) - lines = res.stderr.strip().split('\n') - if len(lines) >= 2: - return lines[0], lines[1] - except subprocess.CalledProcessError: - return None - return None - -def setup_worker_node(worker_ip, head_ip): - subnet = get_subnet_from_ip(worker_ip) - - # Script to run on worker - script = f""" - source /etc/profile - # Silece the kill command - ray stop --force > /dev/null 2>&1 || true - export RAY_DISABLE_METRICS=1 - export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1 - export RAY_memory_monitor_refresh_ms=0 - export VLLM_HOST_IP={worker_ip} - export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) - export NCCL_SOCKET_IFNAME=$RDMA_IFACE - export GLOO_SOCKET_IFNAME=$RDMA_IFACE - # Stability for RDMA - export NCCL_IB_TIMEOUT=23 - export NCCL_IB_RETRY_CNT=7 - echo "Starting Ray Worker on {worker_ip} connecting to {head_ip}..." - ray start --address='{head_ip}:6379' --num-gpus=1 --num-cpus=8 --disable-usage-stats - """ - - print(f"Setting up Worker Node ({worker_ip})...") - - # Use bash -s to read script from stdin - # Command: ssh user@host "toolbox run -c vllm -- bash -s" - ssh_cmd = [ - "ssh", "-o", "StrictHostKeyChecking=no", worker_ip, - "toolbox run -c vllm -- bash -s" - ] - - try: - subprocess.run(ssh_cmd, input=script.encode(), check=True) - return True - except subprocess.CalledProcessError as e: - print(f"Failed to setup worker: {e}") - return False - -def setup_head_node(head_ip): - subnet = get_subnet_from_ip(head_ip) - - print(f"Setting up Head Node ({head_ip})...") - - script = f""" - # Silence the kill command - ray stop --force > /dev/null 2>&1 || true - export RAY_DISABLE_METRICS=1 - export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1 - export RAY_memory_monitor_refresh_ms=0 - export VLLM_HOST_IP={head_ip} - export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) - export NCCL_SOCKET_IFNAME=$RDMA_IFACE - export GLOO_SOCKET_IFNAME=$RDMA_IFACE - # Stability for RDMA - export NCCL_IB_TIMEOUT=23 - export NCCL_IB_RETRY_CNT=7 - echo "Starting Ray Head on {head_ip}..." - ray start --head --port=6379 --node-ip-address={head_ip} --num-gpus=1 --num-cpus=8 --disable-usage-stats - """ - - try: - # Run locally - subprocess.run(["bash", "-s"], input=script.encode(), check=True) - return True - except subprocess.CalledProcessError as e: - print(f"Failed to setup head: {e}") - return False + return cluster_manager.get_subnet_from_ip(ip) def check_ray_status(): - """Returns (active_nodes, total_gpus) parsing 'ray status' output roughly.""" - try: - res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if res.returncode != 0: - return 0, 0 - - output = res.stdout - active_nodes = 0 - in_active_section = False - for line in output.splitlines(): - if "Active:" in line: - in_active_section = True - continue - if "Pending:" in line or "Recent failures:" in line: - in_active_section = False - - if in_active_section and line.strip().startswith("1 node_"): - active_nodes += 1 - - return active_nodes, 2 # Assume 2 GPUs as per success criteria - except: - return 0, 0 + return cluster_manager.check_ray_status() def wait_for_cluster(): - print("Waiting for Ray cluster to initialize (expecting 2 nodes)...") - for i in range(30): - nodes, gpus = check_ray_status() - print(f"Check {i+1}/30: Active Nodes={nodes}") - if nodes >= 2: - print("Cluster is Ready!") - time.sleep(2) - return True - time.sleep(2) - - print("Timeout waiting for cluster.") - return False + return cluster_manager.wait_for_cluster() def nuke_vllm_cache(): - """Removes vLLM cache directory.""" - cache = Path.home() / ".cache" / "vllm" - if cache.exists(): - try: - print(f"Clearing vLLM cache at {cache}...", end="", flush=True) - subprocess.run(["rm", "-rf", str(cache)], check=True) - cache.mkdir(parents=True, exist_ok=True) - print(" Done.") - time.sleep(1) - except Exception as e: - print(f" Failed: {e}") + # Only nukes local cache on the head node for now, or use cluster nuke? + # The original script just did local nuke. + # cluster_manager has nuke_vllm_cache_on_node and nuke_vllm_cache_cluster + # Let's use the local ip one effectively + rdma = cluster_manager.get_net_iface() + local = cluster_manager.get_local_ip(rdma) + cluster_manager.nuke_vllm_cache_on_node(local, is_local=True) + +def setup_worker_node(worker_ip, head_ip): + return cluster_manager.setup_worker_node(worker_ip, head_ip) + +def setup_head_node(head_ip): + return cluster_manager.setup_head_node(head_ip) + def get_verified_config(model_id, tp_size, max_seqs): """Reads max_context_results.json."""