Skip to content

Commit 4811215

Browse files
[Feat][Executor] Introduce RayExecutorV2 (vllm-project#36836)
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
1 parent a455214 commit 4811215

14 files changed

Lines changed: 1603 additions & 30 deletions

‎.buildkite/test_areas/distributed.yaml‎

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,20 @@ steps:
224224
commands:
225225
- ./.buildkite/scripts/run-multi-node-test.sh /vllm-workspace/tests 2 2 $IMAGE_TAG "VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py | grep 'Same node test passed' && NUM_NODES=2 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_node_count.py | grep 'Node count test passed' && python3 ../examples/offline_inference/data_parallel.py -dp=2 -tp=1 --dp-num-nodes=2 --dp-node-rank=0 --dp-master-addr=192.168.10.10 --dp-master-port=12345 --enforce-eager --trust-remote-code && VLLM_MULTI_NODE=1 pytest -v -s distributed/test_multi_node_assignment.py && VLLM_MULTI_NODE=1 pytest -v -s distributed/test_pipeline_parallel.py" "VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py | grep 'Same node test passed' && NUM_NODES=2 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_node_count.py | grep 'Node count test passed' && python3 ../examples/offline_inference/data_parallel.py -dp=2 -tp=1 --dp-num-nodes=2 --dp-node-rank=1 --dp-master-addr=192.168.10.10 --dp-master-port=12345 --enforce-eager --trust-remote-code"
226226

227+
- label: MessageQueue TCP Multi-Node (2 GPUs)
228+
timeout_in_minutes: 10
229+
working_dir: "/vllm-workspace/tests"
230+
num_devices: 1
231+
num_nodes: 2
232+
no_plugin: true
233+
optional: true
234+
source_file_dependencies:
235+
- vllm/distributed/device_communicators/shm_broadcast.py
236+
- vllm/distributed/parallel_state.py
237+
- tests/distributed/test_mq_tcp_multinode.py
238+
commands:
239+
- ./.buildkite/scripts/run-multi-node-test.sh /vllm-workspace/tests 2 1 $IMAGE_TAG "torchrun --nnodes 2 --nproc-per-node=1 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_mq_tcp_multinode.py" "torchrun --nnodes 2 --nproc-per-node=1 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_mq_tcp_multinode.py"
240+
227241
- label: Distributed NixlConnector PD accuracy (4 GPUs)
228242
timeout_in_minutes: 30
229243
working_dir: "/vllm-workspace/tests"
@@ -294,3 +308,23 @@ steps:
294308
commands:
295309
- pytest -v -s distributed/test_pp_cudagraph.py
296310
- pytest -v -s distributed/test_pipeline_parallel.py
311+
312+
- label: RayExecutorV2 (4 GPUs)
313+
timeout_in_minutes: 60
314+
working_dir: "/vllm-workspace/tests"
315+
num_devices: 4
316+
source_file_dependencies:
317+
- vllm/v1/executor/ray_executor_v2.py
318+
- vllm/v1/executor/abstract.py
319+
- vllm/v1/executor/multiproc_executor.py
320+
- tests/distributed/test_ray_v2_executor.py
321+
- tests/distributed/test_ray_v2_executor_e2e.py
322+
- tests/distributed/test_pipeline_parallel.py
323+
- tests/basic_correctness/test_basic_correctness.py
324+
commands:
325+
- export VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1
326+
- export NCCL_CUMEM_HOST_ENABLE=0
327+
- pytest -v -s distributed/test_ray_v2_executor.py
328+
- pytest -v -s distributed/test_ray_v2_executor_e2e.py
329+
- pytest -v -s distributed/test_pipeline_parallel.py -k "ray"
330+
- TARGET_TEST_SUITE=L4 pytest -v -s basic_correctness/test_basic_correctness.py -k "ray"

‎tests/distributed/conftest.py‎

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
import os
34
import random
45

56
import msgspec
@@ -166,3 +167,31 @@ def close(self):
166167
self.sub.close()
167168
for replay in self.replay_sockets:
168169
replay.close()
170+
171+
172+
@pytest.fixture
173+
def enable_ray_v2_backend():
174+
"""Set env vars for the Ray V2 executor backend and shut down Ray
175+
between tests."""
176+
import ray
177+
178+
saved = {
179+
"VLLM_USE_RAY_V2_EXECUTOR_BACKEND": os.environ.get(
180+
"VLLM_USE_RAY_V2_EXECUTOR_BACKEND"
181+
),
182+
"VLLM_ENABLE_V1_MULTIPROCESSING": os.environ.get(
183+
"VLLM_ENABLE_V1_MULTIPROCESSING"
184+
),
185+
}
186+
os.environ["VLLM_USE_RAY_V2_EXECUTOR_BACKEND"] = "1"
187+
os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
188+
if ray.is_initialized():
189+
ray.shutdown()
190+
try:
191+
yield
192+
finally:
193+
if ray.is_initialized():
194+
ray.shutdown()
195+
os.environ.update({k: v for k, v in saved.items() if v is not None})
196+
for key in (k for k, v in saved.items() if v is None):
197+
os.environ.pop(key, None)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
"""
4+
Multi-node integration test for MessageQueue TCP fallback.
5+
6+
Verifies that when writer and readers span separate nodes (Docker containers
7+
with isolated /dev/shm), `create_from_process_group` correctly detects
8+
cross-node ranks via `in_the_same_node_as()` and falls back to ZMQ TCP
9+
transport — and that data actually arrives.
10+
"""
11+
12+
import numpy as np
13+
import torch.distributed as dist
14+
15+
from vllm.distributed.device_communicators.shm_broadcast import MessageQueue
16+
from vllm.distributed.parallel_state import in_the_same_node_as
17+
18+
19+
def main():
20+
dist.init_process_group(backend="gloo")
21+
22+
rank = dist.get_rank()
23+
world_size = dist.get_world_size()
24+
assert world_size >= 2, (
25+
f"Need at least 2 ranks across nodes, got world_size={world_size}"
26+
)
27+
28+
# Verify that in_the_same_node_as detects cross-node correctly
29+
status = in_the_same_node_as(dist.group.WORLD, source_rank=0)
30+
local_count = sum(status)
31+
print(
32+
f"[Rank {rank}] in_the_same_node_as(source=0): {status} "
33+
f"(local={local_count}/{world_size})"
34+
)
35+
# With 2 Docker containers (1 proc each), rank 0 and rank 1
36+
# should be on different nodes.
37+
assert local_count < world_size, (
38+
f"Expected cross-node ranks but all {world_size} ranks appear local."
39+
)
40+
41+
# Create MessageQueue
42+
writer_rank = 0
43+
mq = MessageQueue.create_from_process_group(
44+
dist.group.WORLD,
45+
max_chunk_bytes=1024 * 1024, # 1 MiB
46+
max_chunks=10,
47+
writer_rank=writer_rank,
48+
)
49+
50+
# Verify the transport path selection
51+
if rank == writer_rank:
52+
print(
53+
f"[Rank {rank}] Writer: n_local_reader={mq.n_local_reader}, "
54+
f"n_remote_reader={mq.n_remote_reader}"
55+
)
56+
assert mq.n_remote_reader > 0, (
57+
"Writer should have at least 1 remote (TCP) reader in a multi-node setup."
58+
)
59+
else:
60+
if status[rank]:
61+
assert mq._is_local_reader, (
62+
f"Rank {rank} is on the same node as writer but is not a local reader."
63+
)
64+
print(f"[Rank {rank}] Reader: local (shared memory)")
65+
else:
66+
assert mq._is_remote_reader, (
67+
f"Rank {rank} is on a different node but is not a remote (TCP) reader."
68+
)
69+
print(f"[Rank {rank}] Reader: remote (TCP)")
70+
71+
# Test data transfer: simple objects
72+
dist.barrier()
73+
if rank == writer_rank:
74+
mq.enqueue("hello_from_node0")
75+
else:
76+
msg = mq.dequeue(timeout=10)
77+
assert msg == "hello_from_node0"
78+
dist.barrier()
79+
print(f"[Rank {rank}] Simple object test passed")
80+
81+
# Test data transfer: numpy arrays
82+
np.random.seed(42)
83+
arrays = [
84+
np.random.randint(0, 100, size=np.random.randint(100, 5000)) for _ in range(100)
85+
]
86+
87+
dist.barrier()
88+
if rank == writer_rank:
89+
for arr in arrays:
90+
mq.enqueue(arr)
91+
else:
92+
for i, expected in enumerate(arrays):
93+
received = mq.dequeue(timeout=10)
94+
assert np.array_equal(expected, received), (
95+
f"Array mismatch at index {i}: "
96+
f"expected shape {expected.shape}, got shape {received.shape}"
97+
)
98+
dist.barrier()
99+
print(f"[Rank {rank}] Numpy array test passed")
100+
101+
# Test data transfer: large payload (> max_chunk_bytes)
102+
dist.barrier()
103+
big_array = np.zeros(200_000, dtype=np.int64) # ~1.6 MiB > 1 MiB chunk
104+
if rank == writer_rank:
105+
mq.enqueue(big_array)
106+
else:
107+
received = mq.dequeue(timeout=10)
108+
assert np.array_equal(big_array, received)
109+
dist.barrier()
110+
print(f"[Rank {rank}] Large payload test passed")
111+
112+
# Done -- cleanup
113+
dist.barrier()
114+
print(f"[Rank {rank}] All MessageQueue TCP multi-node tests passed!")
115+
dist.destroy_process_group()
116+
117+
118+
if __name__ == "__main__":
119+
main()

0 commit comments

Comments
 (0)