[Serve] Add broadcast API for deployment handles that broadcasts the same RPC across all live replicas of a deployment#61472
Conversation
Signed-off-by: bittoby <bittoby@users.noreply.github.com>
0f0d5ff to
887ef8f
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces a broadcast() method to DeploymentHandle, allowing a method to be called on all replicas of a deployment. This is a valuable feature for operations like cache clearing or state synchronization. However, the current implementation introduces significant security and stability concerns. Specifically, DeploymentBroadcastResponse.results() can block the asyncio event loop and may crash when used with CurrentLoopRouter. More critically, the broadcast() implementation in the router bypasses max_queued_requests and replica-side rejection, posing a risk of Denial of Service by allowing an attacker to overwhelm all replicas. Additionally, the broadcast implementation in the local testing router is blocking, which is inconsistent with its asynchronous interface. The main AsyncioRouter implementation of broadcast also fails to resolve arguments from other handle calls and does not propagate the tracing context, which are significant omissions.
…ve-broadcast-deployment-handle
…, and event loop safety Signed-off-by: bittoby <bittoby@users.noreply.github.com>
…ve-broadcast-deployment-handle
Signed-off-by: bittoby <bittoby@users.noreply.github.com>
…rg copies, and request counter Signed-off-by: bittoby <bittoby@users.noreply.github.com>
…ve-broadcast-deployment-handle
Signed-off-by: bittoby <bittoby@users.noreply.github.com>
|
@kouroshHakha @bveeramani could you please review this PR? Welcome to any feedback. |
|
Hey @bittoby , thanks for the PR! I don't have context on the Serve codebase so I can't review this. Will defer to the Serve code owners |
|
deferring to @abrarsheikh for review |
…ve-broadcast-deployment-handle
Signed-off-by: bittoby <bittoby@users.noreply.github.com>
8a92e5c to
0ba79be
Compare
|
@abrarsheikh could you please review this PR? I'd appreciate any feedback |
|
@bittoby can you please fix the lint issues and test coverage failures? |
…ve-broadcast-deployment-handle
…ponse Signed-off-by: bittoby <bittoby@users.noreply.github.com>
1b93735 to
4dc3f7f
Compare
|
Update! Please review again @eicherseiji |
eicherseiji
left a comment
There was a problem hiding this comment.
It looks good to me, but would like @abrarsheikh's feedback as well. @bittoby please fix lint.
…ve-broadcast-deployment-handle
Signed-off-by: bittoby <bittoby@users.noreply.github.com>
70208f2 to
b23256e
Compare
…ve-broadcast-deployment-handle
c9b5357 to
6a46e00
Compare
|
@cursoragent can you update the serve docs to reflect this feature? |
|
Hi, @kouroshHakha what else should I need to update? It took long time since I submitted this PR. would you please merge this PR? thanks |
3b80c4c to
63a58aa
Compare
…docs - Cache the coroutine outcome in _scheduled_future for the local testing path (loop=None) so a second call to results() replays the cached exception instead of crashing with "cannot reuse already awaited coroutine". - Add warning admonition to broadcast() docstring about bypassing max_queued_requests backpressure. - Add note to DeploymentBroadcastResponse docstring clarifying that results only include replicas live at call time. - Add WORKER_STARTUP_FAILED to ERROR_TYPE to fix proto enum sync. Made-with: Cursor Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Made-with: Cursor
63a58aa to
a6952c7
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit a6952c7. Configure here.
| finally: | ||
| tmp_loop.close() | ||
| self._scheduled_future = cached_future | ||
| self._replica_results = self._scheduled_future.result(timeout=timeout_s) |
There was a problem hiding this comment.
Timeout ignored during local-testing-mode initial fetch
Low Severity
In _fetch_replica_results_sync, when self._loop is None (local testing mode), tmp_loop.run_until_complete(self._coro) blocks indefinitely without honoring timeout_s. The timeout is only applied to self._scheduled_future.result(timeout=timeout_s), but by that point the future is already resolved. This means results(timeout_s=...) can hang forever in local testing if the broadcast coroutine itself blocks.
Reviewed by Cursor Bugbot for commit a6952c7. Configure here.
|
Thanks @kouroshHakha 👍 |
…same RPC across all live replicas of a deployment (ray-project#61472) Signed-off-by: bittoby <bittoby@users.noreply.github.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Co-authored-by: bittoby <bittoby@users.noreply.github.com> Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>


Description
Adds a
broadcast()method toDeploymentHandlethat calls a method on every replica of a deployment. This is useful when you need to hit all replicas - e.g. clearing a cache, reloading config, or resetting state - without relying on private internals.Currently there's no public way to do this. Users have to dig into
_router._asyncio_router._request_routerto get replica references, which is fragile and breaks across versions.Related issues
Fixes #55654
Additional information
API:
What changed:
DeploymentHandle.broadcast(method_name, *args, **kwargs)- public method that bypasses load balancing and sends to all replicas directlyDeploymentBroadcastResponse- response wrapper withresults()(sync) andresults_async()(async) to collect results from every replicaRouter.broadcast()- abstract method implemented across all router subclasses (SingletonThreadRouter,CurrentLoopRouter,LocalRouter)Follow-up fixes (co-author: @kouroshHakha):
DeploymentBroadcastResponse._fetch_replica_results_sync: in local testing mode (loop=None), if the broadcast coroutine raised an exception, callingresults()a second time would crash withRuntimeError: cannot reuse already awaited coroutine. The fix caches the outcome in aconcurrent.futures.Futureso retries replay the cached exception... warning::tobroadcast()docstring noting it bypassesmax_queued_requestsbackpressure and is intended for infrequent control-plane operations only.DeploymentBroadcastResponsedocstring clarifying results only include replicas live at the timebroadcast()was called.TestDeploymentBroadcastResponseLocalTesting.