From 91b6dbc2706f0539a06be091e97eb62120590ad3 Mon Sep 17 00:00:00 2001 From: Donato Capitella Date: Sun, 22 Feb 2026 20:07:34 +0000 Subject: [PATCH] feat: Display environment variables and allow to choose between RoCE/Ethernet and show RCCL debug information --- benchmarks/vllm_cluster_bench.py | 45 +++++++++---- scripts/cluster_manager.py | 99 +++++++++++++++++++++++++---- scripts/start_vllm_cluster.py | 105 ++++++++++++++++++++++++------- 3 files changed, 205 insertions(+), 44 deletions(-) diff --git a/benchmarks/vllm_cluster_bench.py b/benchmarks/vllm_cluster_bench.py index ced0ac0..126e1d2 100755 --- a/benchmarks/vllm_cluster_bench.py +++ b/benchmarks/vllm_cluster_bench.py @@ -23,6 +23,8 @@ except ImportError: # User requested specifically to test with TP=2 on the cluster. CLUSTER_TP = 2 GPU_UTIL = "0.90" +FORCE_ETH = False +FORCE_DEBUG_NCCL = False # THROUGHPUT CONFIG (Imported from models.py) OFF_NUM_PROMPTS = models.OFF_NUM_PROMPTS @@ -66,6 +68,15 @@ def log(msg): print(f"\n[CLUSTER-BENCH] {msg}") def restart_cluster(): log("Restarting Ray Cluster (Clean State)...") + # Push config to env so cluster_manager picks it up for daemon injection + os.environ["NCCL_IB_DISABLE"] = "1" if FORCE_ETH else "0" + if FORCE_DEBUG_NCCL: + os.environ["NCCL_DEBUG"] = "INFO" + os.environ["NCCL_DEBUG_SUBSYS"] = "INIT,NET" + else: + os.environ.pop("NCCL_DEBUG", None) + os.environ.pop("NCCL_DEBUG_SUBSYS", None) + # 1. Stop Cluster (Best Effort) cluster_manager.stop_cluster() @@ -130,13 +141,17 @@ def get_cluster_env(): env["GLOO_SOCKET_IFNAME"] = rdma_iface # RCCL specific env["NCCL_IB_GID_INDEX"] = "1" - env["NCCL_IB_DISABLE"] = "0" + env["NCCL_IB_DISABLE"] = "1" if FORCE_ETH else "0" env["NCCL_NET_GDR_LEVEL"] = "0" # Stability for RDMA (Fix for high-throughput models like Gemma 3) env["NCCL_IB_TIMEOUT"] = "23" # ~32 seconds (default is 18/~1s) env["NCCL_IB_RETRY_CNT"] = "7" # Default is 3, increase for lossy networks + if FORCE_DEBUG_NCCL: + env["NCCL_DEBUG"] = "INFO" + env["NCCL_DEBUG_SUBSYS"] = "INIT,NET" + return env def get_model_args(model): @@ -166,7 +181,8 @@ def get_model_args(model): def get_benchmark_output_file(model, output_dir): model_safe = model.replace("/", "_") output_dir_path = Path(output_dir) - return output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}_throughput.json" + eth_suffix = "_eth" if FORCE_ETH else "" + return output_dir_path / f"{model_safe}_cluster_tp{CLUSTER_TP}{eth_suffix}_throughput.json" def run_bench_set(model, backend_name, output_dir, extra_env=None): output_dir_path = Path(output_dir) @@ -244,7 +260,9 @@ def run_cluster_throughput(model): def print_summary(): - print(f"\n{'MODEL (TP=2)':<50} | {'Triton':<8} | {'ROCm':<8}") + eth_suffix = "_eth" if FORCE_ETH else "" + title_suffix = " (Ethernet ONLY)" if FORCE_ETH else "" + print(f"\n{f'MODEL (TP=2){title_suffix}':<50} | {'Triton':<8} | {'ROCm':<8}") print("-" * 75) for m in MODELS_TO_RUN: @@ -252,14 +270,14 @@ def print_summary(): # Default try: - p1 = RESULTS_DIR / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json" + p1 = RESULTS_DIR / f"{msafe}_cluster_tp{CLUSTER_TP}{eth_suffix}_throughput.json" d1 = json.loads(p1.read_text()) val1 = f"{d1.get('tokens_per_second', 0):.1f}" except: val1 = "N/A" # ROCm try: - p2 = Path("benchmark_results_rocm") / f"{msafe}_cluster_tp{CLUSTER_TP}_throughput.json" + p2 = Path("benchmark_results_rocm") / f"{msafe}_cluster_tp{CLUSTER_TP}{eth_suffix}_throughput.json" d2 = json.loads(p2.read_text()) val2 = f"{d2.get('tokens_per_second', 0):.1f}" except: val2 = "N/A" @@ -269,14 +287,19 @@ def print_summary(): print("-" * 75) if __name__ == "__main__": - # if not check_ray_status(): - # log("ERROR: Ray Cluster not ready. Please start it with 'start-vllm-cluster' first.") - # sys.exit(1) - # We now handle this by restarting the cluster ourselves. - pass + parser = argparse.ArgumentParser(description="VLLM Cluster Benchmark") + parser.add_argument("--eth-only", action="store_true", help="Run benchmark using only Ethernet (disable RDMA/RoCE)") + parser.add_argument("--debug-nccl", action="store_true", help="Enable NCCL Debug logging (INFO level for Transport tracking)") + args = parser.parse_args() + + FORCE_ETH = args.eth_only + FORCE_DEBUG_NCCL = args.debug_nccl - log("Ray Cluster Detected. Starting Benchmarks (Dual Backend)...") + if FORCE_ETH: + log("Note: Ethernet ONLY mode enabled. RDMA/RoCE disabled.") + if FORCE_DEBUG_NCCL: + log("Note: NCCL Debug mode enabled (Transport Logging).") log("Note: Eager Mode (--enforce-eager) is ENABLED for cluster stability.") for m in MODELS_TO_RUN: diff --git a/scripts/cluster_manager.py b/scripts/cluster_manager.py index 11d796b..676ca65 100644 --- a/scripts/cluster_manager.py +++ b/scripts/cluster_manager.py @@ -31,34 +31,76 @@ def get_subnet_from_ip(ip): parts = ip.split('.') return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24" -def stop_cluster(nodes=None): +def stop_cluster(worker_ip=None): """ - Stops Ray on the given nodes (list of IPs). - If nodes is None, does nothing (caller should identify nodes first if needed, - but typically for a clean start we might just rely on 'ray stop' on each setup). - Actually, to be safe, we can try to stop local ray. + Stops Ray locally and on the worker node if provided. """ + print("Stopping Ray cluster locally...") subprocess.run(["ray", "stop", "--force"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + if worker_ip: + print(f"Stopping Ray cluster on worker ({worker_ip})...") + ssh_cmd = [ + "ssh", "-o", "StrictHostKeyChecking=no", worker_ip, + "toolbox", "run", "-c", "vllm", "--", "ray", "stop", "--force" + ] + try: + subprocess.run(ssh_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except subprocess.CalledProcessError as e: + print(f"Warning: Failed to stop worker node completely: {e}") def setup_worker_node(worker_ip, head_ip): subnet = get_subnet_from_ip(worker_ip) - # Script to run on worker + # Read overrides from current env + nccl_disable_val = os.getenv("NCCL_IB_DISABLE", "0") + nccl_debug_val = os.getenv("NCCL_DEBUG", "") + script = f""" source /etc/profile - # Silece the kill command + # Silence the kill command ray stop --force > /dev/null 2>&1 || true + + # Calculate Interface dynamically + RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) + + echo "\\n--- Ray Worker Environment ({worker_ip}) ---" + echo "export RAY_DISABLE_METRICS=1" + echo "export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1" + echo "export RAY_memory_monitor_refresh_ms=0" + echo "export VLLM_HOST_IP={worker_ip}" + echo "export RDMA_IFACE=$RDMA_IFACE" + echo "export NCCL_SOCKET_IFNAME=$RDMA_IFACE" + echo "export GLOO_SOCKET_IFNAME=$RDMA_IFACE" + echo "export NCCL_IB_TIMEOUT=23" + echo "export NCCL_IB_RETRY_CNT=7" + echo "export NCCL_IB_DISABLE={nccl_disable_val}" + export RAY_DISABLE_METRICS=1 export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1 export RAY_memory_monitor_refresh_ms=0 export VLLM_HOST_IP={worker_ip} - export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) + export RDMA_IFACE=$RDMA_IFACE export NCCL_SOCKET_IFNAME=$RDMA_IFACE export GLOO_SOCKET_IFNAME=$RDMA_IFACE # Stability for RDMA export NCCL_IB_TIMEOUT=23 export NCCL_IB_RETRY_CNT=7 - echo "Starting Ray Worker on {worker_ip} connecting to {head_ip}..." + export NCCL_IB_DISABLE={nccl_disable_val} + """ + if nccl_debug_val: + script += f""" + echo "export NCCL_DEBUG={nccl_debug_val}" + echo "export NCCL_DEBUG_SUBSYS=INIT,NET" + export NCCL_DEBUG={nccl_debug_val} + export NCCL_DEBUG_SUBSYS=INIT,NET + """ + + script += f""" + echo "\\nStarting Ray Worker on {worker_ip} connecting to {head_ip}..." + if [ "{nccl_disable_val}" = "1" ]; then + echo "Note: Worker is configured with NCCL_IB_DISABLE=1 (Ethernet Forced)" + fi ray start --address='{head_ip}:6379' --num-gpus=1 --num-cpus=8 --disable-usage-stats """ @@ -83,20 +125,55 @@ def setup_head_node(head_ip): print(f"Setting up Head Node ({head_ip})...") + # Read overrides from current env + nccl_disable_val = os.getenv("NCCL_IB_DISABLE", "0") + nccl_debug_val = os.getenv("NCCL_DEBUG", "") + script = f""" # Silence the kill command ray stop --force > /dev/null 2>&1 || true + + # Calculate Interface dynamically + RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) + + echo "\\n--- Ray Head Environment ({head_ip}) ---" + echo "export RAY_DISABLE_METRICS=1" + echo "export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1" + echo "export RAY_memory_monitor_refresh_ms=0" + echo "export VLLM_HOST_IP={head_ip}" + echo "export RDMA_IFACE=$RDMA_IFACE" + echo "export NCCL_SOCKET_IFNAME=$RDMA_IFACE" + echo "export GLOO_SOCKET_IFNAME=$RDMA_IFACE" + echo "export NCCL_IB_TIMEOUT=23" + echo "export NCCL_IB_RETRY_CNT=7" + echo "export NCCL_IB_DISABLE={nccl_disable_val}" + export RAY_DISABLE_METRICS=1 export RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1 export RAY_memory_monitor_refresh_ms=0 export VLLM_HOST_IP={head_ip} - export RDMA_IFACE=$(ip -o addr show to {subnet} | awk '{{print $2}}' | head -n1) + export RDMA_IFACE=$RDMA_IFACE export NCCL_SOCKET_IFNAME=$RDMA_IFACE export GLOO_SOCKET_IFNAME=$RDMA_IFACE # Stability for RDMA export NCCL_IB_TIMEOUT=23 export NCCL_IB_RETRY_CNT=7 - echo "Starting Ray Head on {head_ip}..." + export NCCL_IB_DISABLE={nccl_disable_val} + """ + + if nccl_debug_val: + script += f""" + echo "export NCCL_DEBUG={nccl_debug_val}" + echo "export NCCL_DEBUG_SUBSYS=INIT,NET" + export NCCL_DEBUG={nccl_debug_val} + export NCCL_DEBUG_SUBSYS=INIT,NET + """ + + script += f""" + echo "\\nStarting Ray Head on {head_ip}..." + if [ "{nccl_disable_val}" = "1" ]; then + echo "Note: Head is configured with NCCL_IB_DISABLE=1 (Ethernet Forced)" + fi ray start --head --port=6379 --node-ip-address={head_ip} --num-gpus=1 --num-cpus=8 --disable-usage-stats --include-dashboard=false """ diff --git a/scripts/start_vllm_cluster.py b/scripts/start_vllm_cluster.py index e31c72e..577be8c 100755 --- a/scripts/start_vllm_cluster.py +++ b/scripts/start_vllm_cluster.py @@ -266,7 +266,6 @@ def configure_and_launch_vllm(model_idx, head_ip): env["VLLM_HOST_IP"] = head_ip env["NCCL_SOCKET_IFNAME"] = rdma_iface env["NCCL_IB_GID_INDEX"] = "1" - env["NCCL_IB_DISABLE"] = "0" env["NCCL_NET_GDR_LEVEL"] = "0" # Also need this for Ray backend? @@ -297,7 +296,20 @@ def configure_and_launch_vllm(model_idx, head_ip): print(f" Config: TP={current_tp} | Seqs={current_seqs} | Ctx={current_ctx}") if use_eager: print(" Note: Eager Mode Enabled (Recommended for Cluster Stability)") - print(f" Command: {' '.join(cmd)}") + + print("\n --- Environment Variables ---") + vars_to_print = [ + "RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES", + "VLLM_HOST_IP", + "NCCL_SOCKET_IFNAME", + "NCCL_IB_GID_INDEX", + "NCCL_NET_GDR_LEVEL" + ] + for k in vars_to_print: + if k in env: + print(f" export {k}={env[k]}") + + print(f"\n Command: {' '.join(cmd)}") print("="*60 + "\n") # Exec @@ -335,21 +347,24 @@ def main(): # Main Menu # 1. Configure IPs # 2. Start Cluster (Ray) - # 3. Start VLLM - # 4. Exit + # 3. Stop Ray Cluster + # 4. Ray Cluster Status + # 5. Launch VLLM Serve + # 6. Exit choice = run_dialog([ "--clear", "--backtitle", "AMD VLLM RCCL Cluster Manager", "--title", "Main Menu", - "--menu", "Select Action:", "15", "60", "5", + "--menu", "Select Action:", "16", "60", "6", "1", f"Configure IPs (Head: {head_ip}, Worker: {worker_ip})", "2", "Start Ray Cluster", - "3", "Ray Cluster Status", - "4", "Launch VLLM Serve", - "5", "Exit" + "3", "Stop Ray Cluster", + "4", "Ray Cluster Status", + "5", "Launch VLLM Serve", + "6", "Exit" ]) - if not choice or choice == "5": + if not choice or choice == "6": subprocess.run(["clear"]) sys.exit(0) @@ -359,25 +374,71 @@ def main(): head_ip, worker_ip = res elif choice == "2": - subprocess.run(["clear"]) - print("= Starting Ray Cluster Setup =") - # 1. Start Head - if setup_head_node(head_ip): - print("Head node started successfully. Waiting 5s before worker connection...") - time.sleep(5) - # 2. Start Worker - if setup_worker_node(worker_ip, head_ip): - # 3. Wait for full cluster - wait_for_cluster() - input("Press Enter to continue...") + force_ethernet = False + enable_nccl_debug = False + + while True: + eth_status = "YES" if force_ethernet else "NO" + debug_status = "YES" if enable_nccl_debug else "NO" + + c_choice = run_dialog([ + "--clear", "--backtitle", "AMD VLLM RCCL Cluster Manager", + "--title", "Cluster Network Configuration", + "--menu", "Set Network Parameters before starting Ray:", "15", "65", "3", + "1", f"Force Ethernet (Disable RDMA/RoCE): {eth_status}", + "2", f"Enable NCCL Debug Logging: {debug_status}", + "3", "START CLUSTER" + ]) + if not c_choice: break + + if c_choice == "1": + force_ethernet = not force_ethernet + elif c_choice == "2": + enable_nccl_debug = not enable_nccl_debug + elif c_choice == "3": + os.environ["NCCL_IB_DISABLE"] = "1" if force_ethernet else "0" + if enable_nccl_debug: + os.environ["NCCL_DEBUG"] = "INFO" + os.environ["NCCL_DEBUG_SUBSYS"] = "INIT,NET" + else: + os.environ.pop("NCCL_DEBUG", None) + os.environ.pop("NCCL_DEBUG_SUBSYS", None) + + subprocess.run(["clear"]) + print("= Starting Ray Cluster Setup =") + # 1. Start Head + if setup_head_node(head_ip): + print("Head node started successfully. Waiting 5s before worker connection...") + time.sleep(5) + # 2. Start Worker + if setup_worker_node(worker_ip, head_ip): + # 3. Wait for full cluster + wait_for_cluster() + input("Press Enter to continue...") + break - elif choice == "3": - subprocess.run(["clear"]) print("= Ray Cluster Status =") subprocess.run(["ray", "status"]) input("\nPress Enter to continue...") + elif choice == "3": + subprocess.run(["clear"]) + print("= Stopping Ray Cluster =") + cluster_manager.stop_cluster(worker_ip) + input("\nPress Enter to continue...") + elif choice == "4": + subprocess.run(["clear"]) + print("= Ray Cluster Status =") + res = subprocess.run(["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if res.returncode != 0: + print("\n[!] Cluster is Offline or Unreachable.") + print("Please start the cluster first via Option 2 (Start Ray Cluster).") + else: + print(res.stdout) + input("\nPress Enter to continue...") + + elif choice == "5": # Select Model menu_items = [] for i, m_id in enumerate(MODELS_TO_RUN):