[serve][2/3] Add choose_replica/dispatch to AsyncioRouter#63254
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a two-step routing process in the Ray Serve router, introducing choose_replica for slot reservation and dispatch for request execution. It adds metrics to track reserved slots and updates the queue length cache logic to handle cancellations. The review feedback recommends improving the robustness of slot release logic by using try...finally blocks to ensure metrics are correctly updated and to avoid masking original exceptions when a dispatch fails.
82037f1 to
f1a3114
Compare
| await self._resolve_request_arguments(pr) | ||
|
|
||
| is_retry = False | ||
| while True: |
There was a problem hiding this comment.
Note: Backoff occurs at replica = await self.request_router._choose_replica_for_request(pr, is_retry=True).
| if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE: | ||
| self._metrics_manager.inc_num_running_requests_for_replica( | ||
| replica.replica_id | ||
| ) |
There was a problem hiding this comment.
Note: This mirrors assign_request path. Once dispatch sends the request, it should be counted as a running handle-side request until the done callback fires. dispatch increments this metric, and _process_finished_request() decrements it.
Co-Authored-By: machichima <nary12321@gmail.com> Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…accepted always ignored Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
8db5cab to
6afface
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 3 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6afface. Configure here.
| except BaseException: | ||
| # Dispatch failed; release the reservation before re-raising. | ||
| await self._release_slot_and_refresh_cache(selection, replica, force=True) | ||
| raise |
There was a problem hiding this comment.
Cleanup can mask original error with CancelledError
Medium Severity
In _dispatch_to_marked_selection, the except BaseException handler awaits _release_slot_and_refresh_cache before re-raising the original error. However, _release_slot_and_refresh_cache only catches Exception, not BaseException. If asyncio.CancelledError (a BaseException since Python 3.9) is raised during the await selection._release_slot(force=force) call, it propagates through the handler, and the raise on the next line never executes — the original dispatch error is lost. The docstring of _release_slot_and_refresh_cache explicitly promises it "swallows release failures so callers can put this in a finally without masking other errors," but this guarantee doesn't hold for CancelledError.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 6afface. Configure here.
| if pr.metadata.is_streaming: | ||
| return FakeReplicaResult(self._replica_id, is_generator_object=True) | ||
| else: | ||
| return FakeReplicaResult(self._replica_id, is_generator_object=False) |
There was a problem hiding this comment.
Unused send_request_with_slot method in FakeReplica
Low Severity
FakeReplica.send_request_with_slot is defined but never called anywhere in the codebase. It appears to be leftover scaffolding that was superseded by the approach of injecting _reserved_slot_token into RequestMetadata and going through try_send_request instead.
Reviewed by Cursor Bugbot for commit 6afface. Configure here.
kouroshHakha
left a comment
There was a problem hiding this comment.
The two-step routing primitive is clean and well-reasoned. The extraction of _register_completion_callback, _register_decrement_queue_len_cache_callback, and _release_slot_and_refresh_cache removes real warts from _route_and_send_request_once/broadcast, and the test suite is thorough — especially the gauge invariants and the dispatch-failure-masked-by-release-failure case.
Three things I would like addressed before merge, in descending priority:
- Slot leak on
ActorUnavailableErrorinRunningReplica.reserve_slot— same class of bug as the existingCancelledErrorcleanup. on_request_completedcalled even when no request was dispatched — currently a no-op in the base class, but sets a wrong contract for future subclasses.- Extra
release_slotRPC on every dispatched request completion — performance, not correctness.
Plus a low-severity gauge-leak nit flagged by Cursor bot.
Note
This review was co-written with AI assistance (Claude Code).
| if ( | ||
| not selection._completion_callback_registered | ||
| and self.request_router | ||
| ): | ||
| self.request_router.on_request_completed( | ||
| replica.replica_id, request_meta.internal_request_id | ||
| ) |
There was a problem hiding this comment.
Important — on_request_completed is fired here whenever _completion_callback_registered is False, which includes the plain no-dispatch exit path (caller held the slot and walked away without ever sending). on_request_completed is documented as "called when a request to a replica has completed," but no request was ever dispatched in that case. on_request_routed was never called either, so any future router implementation that pairs those two hooks to track capacity tokens would see a spurious on_request_completed with no matching on_request_routed.
Fix: guard the call behind selection._dispatched so it only fires when a request was actually sent:
if (
selection._dispatched
and not selection._completion_callback_registered
and self.request_router
):
self.request_router.on_request_completed(
replica.replica_id, request_meta.internal_request_id
)This still covers the dispatch-raised-before-callback-registered case (_dispatched=True, _completion_callback_registered=False), and stops calling the hook when no request was ever sent.
| result.add_done_callback( | ||
| lambda _: self._event_loop.call_soon_threadsafe( | ||
| lambda: self._event_loop.create_task( | ||
| self._release_slot_if_still_reserved(selection) | ||
| ) | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Important (suggestion) — _release_slot_if_still_reserved is registered as a done-callback on every dispatched result and unconditionally calls selection._release_slot(force=True), which sends release_slot.remote(slot_token) to the replica actor — including the happy path where the slot was already consumed when the request arrived. The actor handles this idempotently, so correctness is maintained, but it is one extra actor RPC per dispatched request. At high throughput (e.g. 10k req/s) that is 10k unnecessary RPCs/s to each replica.
The cancellation safety is real: if the request is cancelled or fails before the replica processes it, the slot needs releasing. One option that avoids the overhead in the common case:
def _maybe_release_slot(fut, selection=selection):
if fut.cancelled():
self._event_loop.call_soon_threadsafe(
lambda: self._event_loop.create_task(
self._release_slot_if_still_reserved(selection)
)
)
result.add_done_callback(_maybe_release_slot)Alternatively, if the simplicity tradeoff is preferred, the docstring on _release_slot_if_still_reserved should say "fires on every result completion; is a no-op if the slot was already consumed" rather than "if a dispatched request was cancelled before starting."
| replica, slot_token = await self._pick_and_reserve_replica(pr) | ||
|
|
||
| # Increment reserved slots metric (after queue metric is decremented) | ||
| self._metrics_manager.inc_reserved_slots() |
There was a problem hiding this comment.
Suggestion (low severity) — inc_reserved_slots() is called inside with wrap_request_assignment but the matching dec_reserved_slots() is in the separate try/finally at line 1283. The gap (lines 1254-1266, the ReplicaSelection(...) dataclass construction) is synchronous Python and cannot raise in practice. But if _reserved_slots_gauge.set() inside inc_reserved_slots ever throws, the gauge leaks for the process lifetime. Moving inc_reserved_slots() to just inside the try: at line 1267 closes this gap at no cost.
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…tching Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…d_slots exception Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
…project#63254) Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com> Co-authored-by: machichima <nary12321@gmail.com>


Description
This PR implemented
choose_replicaanddispatchflow onAsyncioRouterby consuming primitives introduced by #63252.AsyncioRouter.choose_replica@asynccontextmanagerthat picks a replica via the request router, callsreplica.reserve_slot(), feeds the returnednum_ongoing_requestsintoon_new_queue_len_info, and yields aReplicaSelection.ActorDiedError,ActorUnavailableError, or replica-sideaccepted=False.__aexit__callsrelease_slotif the selection wasn't dispatched, which frees the replica's semaphore and refreshes the cache from the actor's reported queue length.AsyncioRouter.dispatch/_dispatch_to_marked_selectioncurr_replicas, injects_reserved_slot_tokeninto the request metadata, sends withwith_rejection=False(the replica recognizes the token on the way in and skips re-acquiring the semaphore)._release_slot_if_still_reservedis registered as a done callback for every dispatched result.RouterMetricsManager(inc_reserved_slots/dec_reserved_slots).What's still missing in this PR (coming in #63255)
Routerabstract base doesn't declarechoose_replica/dispatch, soSingletonThreadRouterandCurrentLoopRouterdon't implement them. Callers can only useAsyncioRouterdirectly.DeploymentHandlehas nochoose_replica/dispatch— there's no public API yet.local_testing_modeLocalRouterhas no choose/dispatch placeholder.Tests
test_choose_replica_raises_deployment_unavailable,test_choose_replica_raises_backpressure.test_choose_and_dispatch.test_choose_without_dispatch_releases_slot,test_choose_with_exception_releases_slot,test_release_failure_does_not_leak_reserved_slots_metric(gauge stays accurate even whenrelease_slotitself raises),test_dispatch_failure_is_not_masked_by_release_failure(original dispatch error survives a release-time failure).test_choose_replica_retries_when_reservation_rejected(replica returnsaccepted=False→ retry on a different replica).test_cache_updated_on_choose_replica,test_cache_decremented_on_choose_without_dispatch,test_concurrent_choose_replica_updates_cache.test_reserved_slots_gauge_increments_and_decrementsasserts the0 → 1 → 0transition on both the dispatch and the no-dispatch path, via both the internal counter and the underlyingGauge.set()calls.test_dispatch_replica_unavailable(replica disappears between reserve and dispatch),test_multiple_dispatch_calls_fail(double-dispatch raises).test_multiple_sequential_selections.Related issues
RFC: #59792
Original PR: #60865
Additional information