Skip to content

[serve][2/3] Add choose_replica/dispatch to AsyncioRouter#63254

Merged
kouroshHakha merged 12 commits into
masterfrom
decouple-routing-primitives-2
May 11, 2026
Merged

[serve][2/3] Add choose_replica/dispatch to AsyncioRouter#63254
kouroshHakha merged 12 commits into
masterfrom
decouple-routing-primitives-2

Conversation

@jeffreywang88

@jeffreywang88 jeffreywang88 commented May 9, 2026

Copy link
Copy Markdown
Contributor

Description

This PR implemented choose_replica and dispatch flow on AsyncioRouter by consuming primitives introduced by #63252.

  • AsyncioRouter.choose_replica
    • @asynccontextmanager that picks a replica via the request router, calls replica.reserve_slot(), feeds the returned num_ongoing_requests into on_new_queue_len_info, and yields a ReplicaSelection.
    • Retries on ActorDiedError, ActorUnavailableError, or replica-side accepted=False.
    • __aexit__ calls release_slot if the selection wasn't dispatched, which frees the replica's semaphore and refreshes the cache from the actor's reported queue length.
  • AsyncioRouter.dispatch / _dispatch_to_marked_selection
    • Verifies the replica is still in curr_replicas, injects _reserved_slot_token into the request metadata, sends with with_rejection=False (the replica recognizes the token on the way in and skips re-acquiring the semaphore).
    • Releases the slot on dispatch failure or cancellation.
    • _release_slot_if_still_reserved is registered as a done callback for every dispatched result.
  • Reserved-slots metric on RouterMetricsManager (inc_reserved_slots / dec_reserved_slots).

What's still missing in this PR (coming in #63255)

Gap
Router abstract base doesn't declare choose_replica / dispatch, so SingletonThreadRouter and CurrentLoopRouter don't implement them. Callers can only use AsyncioRouter directly.
DeploymentHandle has no choose_replica / dispatch — there's no public API yet.
local_testing_mode LocalRouter has no choose/dispatch placeholder.

Tests

  • Refusal paths: test_choose_replica_raises_deployment_unavailable, test_choose_replica_raises_backpressure.
  • Happy path (parametrized streaming + non-streaming): test_choose_and_dispatch.
  • Slot release on every exit shape: test_choose_without_dispatch_releases_slot, test_choose_with_exception_releases_slot, test_release_failure_does_not_leak_reserved_slots_metric (gauge stays accurate even when release_slot itself raises), test_dispatch_failure_is_not_masked_by_release_failure (original dispatch error survives a release-time failure).
  • Retry semantics: test_choose_replica_retries_when_reservation_rejected (replica returns accepted=False → retry on a different replica).
  • Queue-length cache invariants: test_cache_updated_on_choose_replica, test_cache_decremented_on_choose_without_dispatch, test_concurrent_choose_replica_updates_cache.
  • Reserved-slots gauge: test_reserved_slots_gauge_increments_and_decrements asserts the 0 → 1 → 0 transition on both the dispatch and the no-dispatch path, via both the internal counter and the underlying Gauge.set() calls.
  • Dispatch-time failure modes: test_dispatch_replica_unavailable (replica disappears between reserve and dispatch), test_multiple_dispatch_calls_fail (double-dispatch raises).
  • Sequential reuse: test_multiple_sequential_selections.

Related issues

RFC: #59792
Original PR: #60865

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a two-step routing process in the Ray Serve router, introducing choose_replica for slot reservation and dispatch for request execution. It adds metrics to track reserved slots and updates the queue length cache logic to handle cancellations. The review feedback recommends improving the robustness of slot release logic by using try...finally blocks to ensure metrics are correctly updated and to avoid masking original exceptions when a dispatch fails.

Comment thread python/ray/serve/_private/router.py Outdated
Comment thread python/ray/serve/_private/router.py Outdated
@jeffreywang88 jeffreywang88 force-pushed the decouple-routing-primitives-2 branch from 82037f1 to f1a3114 Compare May 9, 2026 07:39
@jeffreywang88 jeffreywang88 changed the title [Serve] Add choose_replica/dispatch on AsyncioRouter May 9, 2026
@jeffreywang88 jeffreywang88 added go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue labels May 9, 2026
@jeffreywang88 jeffreywang88 marked this pull request as ready for review May 9, 2026 07:49
@jeffreywang88 jeffreywang88 requested a review from a team as a code owner May 9, 2026 07:49
@jeffreywang88 jeffreywang88 changed the title [Serve] Add choose_replica/dispatch to AsyncioRouter May 9, 2026
Comment thread python/ray/serve/_private/router.py Outdated
await self._resolve_request_arguments(pr)

is_retry = False
while True:

@jeffreywang88 jeffreywang88 May 9, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Backoff occurs at replica = await self.request_router._choose_replica_for_request(pr, is_retry=True).

Comment on lines +1431 to +1434
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
self._metrics_manager.inc_num_running_requests_for_replica(
replica.replica_id
)

@jeffreywang88 jeffreywang88 May 9, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This mirrors assign_request path. Once dispatch sends the request, it should be counted as a running handle-side request until the done callback fires. dispatch increments this metric, and _process_finished_request() decrements it.

Comment thread python/ray/serve/_private/router.py
Comment thread python/ray/serve/_private/router.py Outdated
Base automatically changed from decouple-routing-primitives-1 to master May 10, 2026 06:30
jeffreywang88 and others added 9 commits May 10, 2026 07:58
Co-Authored-By: machichima <nary12321@gmail.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…accepted always ignored

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
@jeffreywang88 jeffreywang88 force-pushed the decouple-routing-primitives-2 branch from 8db5cab to 6afface Compare May 10, 2026 08:10

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

There are 3 total unresolved issues (including 1 from previous review).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 6afface. Configure here.

except BaseException:
# Dispatch failed; release the reservation before re-raising.
await self._release_slot_and_refresh_cache(selection, replica, force=True)
raise

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup can mask original error with CancelledError

Medium Severity

In _dispatch_to_marked_selection, the except BaseException handler awaits _release_slot_and_refresh_cache before re-raising the original error. However, _release_slot_and_refresh_cache only catches Exception, not BaseException. If asyncio.CancelledError (a BaseException since Python 3.9) is raised during the await selection._release_slot(force=force) call, it propagates through the handler, and the raise on the next line never executes — the original dispatch error is lost. The docstring of _release_slot_and_refresh_cache explicitly promises it "swallows release failures so callers can put this in a finally without masking other errors," but this guarantee doesn't hold for CancelledError.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 6afface. Configure here.

if pr.metadata.is_streaming:
return FakeReplicaResult(self._replica_id, is_generator_object=True)
else:
return FakeReplicaResult(self._replica_id, is_generator_object=False)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused send_request_with_slot method in FakeReplica

Low Severity

FakeReplica.send_request_with_slot is defined but never called anywhere in the codebase. It appears to be leftover scaffolding that was superseded by the approach of injecting _reserved_slot_token into RequestMetadata and going through try_send_request instead.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 6afface. Configure here.

@kouroshHakha kouroshHakha left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two-step routing primitive is clean and well-reasoned. The extraction of _register_completion_callback, _register_decrement_queue_len_cache_callback, and _release_slot_and_refresh_cache removes real warts from _route_and_send_request_once/broadcast, and the test suite is thorough — especially the gauge invariants and the dispatch-failure-masked-by-release-failure case.

Three things I would like addressed before merge, in descending priority:

  1. Slot leak on ActorUnavailableError in RunningReplica.reserve_slot — same class of bug as the existing CancelledError cleanup.
  2. on_request_completed called even when no request was dispatched — currently a no-op in the base class, but sets a wrong contract for future subclasses.
  3. Extra release_slot RPC on every dispatched request completion — performance, not correctness.

Plus a low-severity gauge-leak nit flagged by Cursor bot.

Note

This review was co-written with AI assistance (Claude Code).

Comment thread python/ray/serve/_private/request_router/replica_wrapper.py
Comment on lines +1276 to +1282
if (
not selection._completion_callback_registered
and self.request_router
):
self.request_router.on_request_completed(
replica.replica_id, request_meta.internal_request_id
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Importanton_request_completed is fired here whenever _completion_callback_registered is False, which includes the plain no-dispatch exit path (caller held the slot and walked away without ever sending). on_request_completed is documented as "called when a request to a replica has completed," but no request was ever dispatched in that case. on_request_routed was never called either, so any future router implementation that pairs those two hooks to track capacity tokens would see a spurious on_request_completed with no matching on_request_routed.

Fix: guard the call behind selection._dispatched so it only fires when a request was actually sent:

if (
    selection._dispatched
    and not selection._completion_callback_registered
    and self.request_router
):
    self.request_router.on_request_completed(
        replica.replica_id, request_meta.internal_request_id
    )

This still covers the dispatch-raised-before-callback-registered case (_dispatched=True, _completion_callback_registered=False), and stops calling the hook when no request was ever sent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread python/ray/serve/_private/router.py Outdated
Comment on lines +1360 to +1366
result.add_done_callback(
lambda _: self._event_loop.call_soon_threadsafe(
lambda: self._event_loop.create_task(
self._release_slot_if_still_reserved(selection)
)
)
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important (suggestion)_release_slot_if_still_reserved is registered as a done-callback on every dispatched result and unconditionally calls selection._release_slot(force=True), which sends release_slot.remote(slot_token) to the replica actor — including the happy path where the slot was already consumed when the request arrived. The actor handles this idempotently, so correctness is maintained, but it is one extra actor RPC per dispatched request. At high throughput (e.g. 10k req/s) that is 10k unnecessary RPCs/s to each replica.

The cancellation safety is real: if the request is cancelled or fails before the replica processes it, the slot needs releasing. One option that avoids the overhead in the common case:

def _maybe_release_slot(fut, selection=selection):
    if fut.cancelled():
        self._event_loop.call_soon_threadsafe(
            lambda: self._event_loop.create_task(
                self._release_slot_if_still_reserved(selection)
            )
        )
result.add_done_callback(_maybe_release_slot)

Alternatively, if the simplicity tradeoff is preferred, the docstring on _release_slot_if_still_reserved should say "fires on every result completion; is a no-op if the slot was already consumed" rather than "if a dispatched request was cancelled before starting."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread python/ray/serve/_private/router.py Outdated
replica, slot_token = await self._pick_and_reserve_replica(pr)

# Increment reserved slots metric (after queue metric is decremented)
self._metrics_manager.inc_reserved_slots()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion (low severity)inc_reserved_slots() is called inside with wrap_request_assignment but the matching dec_reserved_slots() is in the separate try/finally at line 1283. The gap (lines 1254-1266, the ReplicaSelection(...) dataclass construction) is synchronous Python and cannot raise in practice. But if _reserved_slots_gauge.set() inside inc_reserved_slots ever throws, the gauge leaks for the process lifetime. Moving inc_reserved_slots() to just inside the try: at line 1267 closes this gap at no cost.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…tching

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…d_slots exception

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
@kouroshHakha kouroshHakha merged commit 85c03be into master May 11, 2026
6 checks passed
@kouroshHakha kouroshHakha deleted the decouple-routing-primitives-2 branch May 11, 2026 06:12
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…project#63254)

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Co-authored-by: machichima <nary12321@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue

2 participants