Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3a3a250
Implement RayExecutorV2 & tested on a single-node
jeffreywang88 Mar 10, 2026
df75664
Enable multinode
jeffreywang88 Mar 12, 2026
bbaa21b
Fix pre-commit
jeffreywang88 Mar 16, 2026
2541f2d
Fix RayExecutorV2 monitor thread self-join
jeffreywang88 Mar 16, 2026
c3ad8e5
Remove unnecessary changes
Mar 17, 2026
300d0ae
Extract bundle sorting to a utility
jeffreywang88 Mar 17, 2026
11d32eb
Fix linter
jeffreywang88 Mar 17, 2026
5795f1d
Enable async scheduling
jeffreywang88 Mar 18, 2026
7128074
Address CR feedback
jeffreywang88 Mar 19, 2026
e7a3c1f
Address test feedback
jeffreywang88 Mar 19, 2026
5b4119a
Merge branch 'main' into ray
jeffreywang88 Mar 19, 2026
ec2730d
Iterate over world_size
jeffreywang88 Mar 19, 2026
ca95900
Fix tests and linters
jeffreywang88 Mar 19, 2026
139c02a
Respect VLLM_RAY_BUNDLE_INDICES
jeffreywang88 Mar 22, 2026
7657031
Adjust DP rank for ray executor backend
jeffreywang88 Mar 23, 2026
6c1ea7e
Apply DP local-rank device offset for RayExecutorV2 workers
jeffreywang88 Mar 23, 2026
d040317
Support DP
jeffreywang88 Mar 24, 2026
a76acc9
Fix linter
jeffreywang88 Mar 24, 2026
c9f0a39
Lazily initialize RayWorkerProc
jeffreywang88 Mar 25, 2026
29c7426
Propagate env var; add tests
jeffreywang88 Mar 25, 2026
aae5938
Add nsight profiling and non-GPU device support to RayExecutorV2
jeffreywang88 Mar 25, 2026
25eaf8e
Fix AsyncLLMActor async detection in e2e tests
jeffreywang88 Mar 26, 2026
6717ca2
Fix AsyncLLMActor async detection in e2e tests
jeffreywang88 Mar 26, 2026
cfba15e
Fix test
jeffreywang88 Mar 26, 2026
476501b
Fix test
jeffreywang88 Mar 26, 2026
c7aa661
Fix wrong PYTHONPATH in Ray workers
jeffreywang88 Mar 26, 2026
e0fd321
CR feedback round 1
jeffreywang88 Mar 30, 2026
af21cdd
CR feedback round 2
jeffreywang88 Mar 30, 2026
ad8f6d0
Only apply blacklist & propagate env with setdefault
jeffreywang88 Mar 30, 2026
605a347
Merge branch 'main' into ray
jeffreywang88 Mar 31, 2026
7586204
CR feedback round 3
jeffreywang88 Mar 31, 2026
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Extract bundle sorting to a utility
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
  • Loading branch information
jeffreywang88 committed Mar 17, 2026
commit 300d0ae3ecc18a465579ce36eeeca37603d9e3d7
94 changes: 94 additions & 0 deletions tests/utils_/test_ray_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

from unittest.mock import MagicMock, patch

import pytest

NODE_A = "node_a"
NODE_B = "node_b"
NODE_C = "node_c"


@pytest.mark.parametrize(
"bundles_to_node_id,bundle_specs,world_size,expected",
[
pytest.param(
{0: NODE_C, 1: NODE_A, 2: NODE_B, 3: NODE_C, 4: NODE_A,
5: NODE_B},
[{"GPU": 1}] * 6,
6,
[(1, NODE_A), (4, NODE_A), (2, NODE_B), (5, NODE_B),
(0, NODE_C), (3, NODE_C)],
),
pytest.param(
{0: NODE_B, 1: NODE_B, 2: NODE_A, 3: NODE_A},
[{"GPU": 1}] * 4,
4,
[(2, NODE_A), (3, NODE_A), (0, NODE_B), (1, NODE_B)],
),
pytest.param(
{0: NODE_C, 1: NODE_B, 2: NODE_C, 3: NODE_B},
[{"GPU": 1}] * 4,
4,
[(1, NODE_B), (3, NODE_B), (0, NODE_C), (2, NODE_C)],
),
pytest.param(
{0: NODE_A, 1: NODE_A, 2: NODE_A},
[{"GPU": 1}] * 3,
3,
[(0, NODE_A), (1, NODE_A), (2, NODE_A)],
),
pytest.param(
{0: NODE_A},
[{"GPU": 1}],
1,
[(0, NODE_A)],
),
pytest.param(
{},
[],
0,
[],
),
pytest.param(
{0: NODE_A, 1: NODE_B, 2: NODE_A, 3: NODE_B},
[{"GPU": 1}] * 4,
2,
[(0, NODE_A), (1, NODE_B)],
),
pytest.param(
{0: NODE_A, 1: NODE_B, 2: NODE_A},
[{"CPU": 1}, {"GPU": 1}, {"GPU": 1}],
2,
[(2, NODE_A), (1, NODE_B)],
),
],
)
def test_get_bundles_sorted_by_node(
bundles_to_node_id, bundle_specs, world_size, expected
):
mock_pg = MagicMock()
mock_pg.bundle_specs = bundle_specs

mock_ctx = MagicMock()
mock_ctx.get_node_id.return_value = NODE_A

with (
patch(
"vllm.v1.executor.ray_utils.placement_group_table",
return_value={"bundles_to_node_id": bundles_to_node_id},
),
patch("vllm.v1.executor.ray_utils.ray") as mock_ray,
patch(
"vllm.v1.executor.ray_utils.current_platform"
) as mock_platform,
):
mock_ray.get_runtime_context.return_value = mock_ctx
mock_platform.ray_device_key = "GPU"

from vllm.v1.executor.ray_utils import get_bundles_sorted_by_node

result = get_bundles_sorted_by_node(mock_pg, world_size)

assert result == expected
43 changes: 10 additions & 33 deletions vllm/v1/executor/ray_executor_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
WorkerProc,
)
from vllm.v1.executor.ray_utils import (
get_bundles_sorted_by_node,
initialize_ray_cluster,
ray,
)
Expand Down Expand Up @@ -170,35 +171,11 @@ def _init_executor(self) -> None:
)

# Step 2: Query PG table, sort bundles, assign ranks
pg_table = ray.util.placement_group_table(placement_group)
bundle_to_node = pg_table["bundles_to_node_id"]

# Prefer driver node; group by node for TP locality
bundle_to_node_id = []
assert placement_group is not None
bundle_specs = placement_group.bundle_specs
assert bundle_specs is not None
for i, bundle in enumerate(bundle_specs):
ray_device_key = current_platform.ray_device_key
if not ray_device_key:
raise ValueError(
f"current platform {current_platform.device_name}"
" does not support ray."
)

if bundle.get(ray_device_key):
node_id = bundle_to_node.get(i) or bundle_to_node.get(str(i))
bundle_to_node_id.append((i, node_id))

bundle_to_node_id = bundle_to_node_id[: self.world_size]
bundle_to_node_id = get_bundles_sorted_by_node(
placement_group, self.world_size
)
driver_node = ray.get_runtime_context().get_node_id()

def _sort_key(item):
_, node_id = item
return (0 if node_id == driver_node else 1, node_id)

bundle_to_node_id.sort(key=_sort_key)

# Assign each worker a local rank
node_rank_counter: dict[str, int] = defaultdict(int)
bundle_assignments: list[dict[str, Any]] = []
Comment thread
jeffreywang88 marked this conversation as resolved.
Expand Down Expand Up @@ -340,9 +317,9 @@ def _should_stop() -> bool:
return not executor or executor.shutting_down

def monitor_workers():
# TODO (jeffreywang): Is there a better way?
# Poll with timeout; a blocking ray.wait() would segfault
# if Ray is torn down while this thread is waiting.
# Poll with a timeout rather than blocking on ray.wait()
# because a blocking call would segfault if Ray is torn down
# while this thread is inside it.
while not _should_stop() and ray.is_initialized():
try:
done, _ = ray.wait(run_refs, num_returns=1, timeout=5.0)
Expand Down Expand Up @@ -378,9 +355,9 @@ def _join_monitor_thread(self) -> None:

Must be called before tearing down Ray resources — the monitor
may be inside ray.wait() which would segfault if Ray is shut
down underneath it. When the monitor itself calls shutdown()
(on worker death), we skip the join because the thread is
about to return anyway.
down underneath it. When the monitor itself calls shutdown()
on worker death, we skip the join because the thread is about
to return anyway.
"""
monitor = getattr(self, "_monitor_thread", None)
if (
Expand Down
38 changes: 38 additions & 0 deletions vllm/v1/executor/ray_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,44 @@ def _verify_bundles(
)


def get_bundles_sorted_by_node(
Comment thread
jeffreywang88 marked this conversation as resolved.
placement_group: "PlacementGroup",
world_size: int,
) -> list[tuple[int, str]]:
"""
Return GPU bundle indices paired with node IDs, sorted driver-first.

This utility has to be invoked from the driver node.
"""
pg_data = placement_group_table(placement_group)
bundle_to_node = pg_data["bundles_to_node_id"]

ray_device_key = current_platform.ray_device_key
if not ray_device_key:
raise ValueError(
f"current platform {current_platform.device_name}"
" does not support ray."
)

bundle_specs = placement_group.bundle_specs
assert bundle_specs is not None
bundle_to_node_id: list[tuple[int, str]] = []
for i, bundle in enumerate(bundle_specs):
Comment thread
jeffreywang88 marked this conversation as resolved.
Outdated
if bundle.get(ray_device_key):
node_id = bundle_to_node.get(i) or bundle_to_node.get(str(i))
Comment thread
jeffreywang88 marked this conversation as resolved.
Outdated
bundle_to_node_id.append((i, node_id))

bundle_to_node_id = bundle_to_node_id[:world_size]
Comment thread
jeffreywang88 marked this conversation as resolved.
Outdated
driver_node = ray.get_runtime_context().get_node_id()

def _sort_key(item):
_, node_id = item
return (0 if node_id == driver_node else 1, node_id)

bundle_to_node_id.sort(key=_sort_key)
return bundle_to_node_id


def _wait_until_pg_ready(current_placement_group: "PlacementGroup"):
"""Wait until a placement group is ready.

Expand Down
Loading