Skip to content

Commit 58af3fc

Browse files
jeffreywang88claudekouroshHakhaedoakesaslonnie
authored
Cherry picking to 2.55 (#62517)
## Description - #62323 - #62330 - #62366 ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
1 parent 2ecf83a commit 58af3fc

17 files changed

Lines changed: 2500 additions & 12 deletions

‎doc/BUILD.bazel‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ py_test_run_all_subdirectory(
302302
"source/serve/doc_code/cross_node_parallelism_example.py",
303303
"source/serve/doc_code/llm/llm_yaml_config_example.py",
304304
"source/serve/doc_code/llm/qwen_example.py",
305+
"source/serve/doc_code/capacity_queue_request_router_app.py",
305306
],
306307
extra_srcs = [],
307308
tags = [

‎doc/source/serve/advanced-guides/custom-request-router.md‎

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ cover the following:
2020
- Utility mixins for request routing
2121
- Define a complex throughput-aware request router
2222
- Deploy an app with the throughput-aware request router
23+
- Experimental: Define a centralized capacity queue request router
2324

2425

2526
(simple-uniform-request-router)=
@@ -158,6 +159,124 @@ updated routing stats by looking up the `routing_stats` attribute of the running
158159
replicas and use it in the routing policy.
159160

160161

162+
(capacity-queue-request-router)=
163+
## Experimental: Define a centralized capacity queue request router
164+
165+
In the previous examples, the routing decisions are based on the locally visible state of the target replicas from the perspective of the router
166+
replica. This view is **eventually consistent** not strongly because the serve controller frequently broadcasts the replica information to the router.
167+
Under high concurrency with multiple routers, this information can drift from reality and can cause several routers to simultaneously pick the same
168+
replica, causing transient load imbalance or triggering rejections and retries. For some applications this can result in lower throughput. A
169+
**centralized** approach avoids this: a single actor tracks per-replica in-flight counts, and every router acquires a *capacity token*
170+
before forwarding a request. This way, each token guarantees the target replica has room, eliminating the rejection protocol entirely.
171+
172+
This example demonstrates how we can implement such routing policy. The example has three pieces:
173+
174+
1. An importable **`CapacityQueue`** actor that tracks per-replica capacity and hands out
175+
tokens using a least-loaded selection strategy.
176+
2. An importable **`CapacityQueueRouter`** custom request router that acquires a token before
177+
routing and releases it when the request completes. In a real application, we can have multiple
178+
replicas of `CapacityQueueRouter` each one keeping tracking their own view of state of replicas.
179+
The centralized `CapacityQueue` actor is meant to keep their local information synchronized with
180+
reality.
181+
3. A **deployment** that ties them together using a
182+
[deployment actor](../api/doc/ray.serve.config.DeploymentActorConfig.rst) for the queue and a
183+
[`RequestRouterConfig`](../api/doc/ray.serve.config.RequestRouterConfig.rst) for the router.
184+
185+
(deploy-app-with-capacity-queue-router)=
186+
### Deploy an app with the capacity queue router
187+
188+
The deployment wires the pieces together: a `DeploymentActorConfig` for the capacity queue
189+
and a `RequestRouterConfig` pointing at the custom router:
190+
191+
```{literalinclude} ../doc_code/capacity_queue_request_router_app.py
192+
:start-after: __begin_deploy_app_with_capacity_queue_router__
193+
:end-before: __end_deploy_app_with_capacity_queue_router__
194+
:language: python
195+
```
196+
197+
When the app starts:
198+
199+
1. The Serve controller creates the `CapacityQueue` deployment actor **before**
200+
any replicas start. `CapacityQueue` subscribes to replica updates via long poll.
201+
2. As the controller starts replicas, it sends deployment-target updates. The
202+
queue's long-poll callback automatically registers each replica with its
203+
`max_ongoing_requests` capacity and unregisters replicas that are removed
204+
during scale-down or crash recovery.
205+
3. The `CapacityQueueRouter` running in each proxy discovers the singleton `CapacityQueue`
206+
deployment actor, acquires a token for every incoming request, and routes to the replica
207+
identified by the token.
208+
4. When the request completes, `CapacityQueueRouter.on_request_completed` fires and the token is
209+
released back to the queue.
210+
211+
Because the queue is a deployment actor, the controller handles its lifecycle
212+
automatically — health checks, cleanup on app deletion, and versioning during
213+
rolling updates.
214+
215+
### Fault tolerance
216+
217+
The `CapacityQueueRouter` handles failures gracefully:
218+
219+
- **Queue unavailable** — if the queue actor is dead, not yet discovered, or
220+
errors, the router retries with exponential backoff and falls back to
221+
power-of-two-choices after `MAX_FAULT_RETRIES` consecutive failures.
222+
Requests never raise exceptions due to queue issues.
223+
- **Capacity exhausted** — when all replicas are at capacity, the router
224+
backs off and retries until capacity frees up.
225+
- **Queue restart** — a restarted queue has no knowledge of pre-crash
226+
in-flight counts and may temporarily over-provision. This self-heals:
227+
replicas reject excess requests, and the router does not release rejected
228+
tokens intentionally, ratcheting up `in_flight` on the queue until it
229+
matches reality. `token_ttl_s` (if configured) auto-reclaims any
230+
remaining leaked tokens.
231+
- **Replica death** — the controller sends a long-poll update, the queue
232+
unregisters the dead replica, and tokens are only issued for live replicas.
233+
234+
### Usage
235+
The centralized capacity queue request router could bring performance benefits particularly in a constrained supply deployment, i.e. `max_ongoing_request=1` or `2`.
236+
237+
### Benchmark
238+
239+
#### Benchmark Setup
240+
- Deployment topology: Client -> `ParentDeployment` -> `ChildDeployment`. Request router selection is applied to both deployments,
241+
controlling how parent replicas are selected by the HTTP proxy and how child replicas are selected by parent's `DeploymentHandle`.
242+
- Scale: small (8 replicas), medium (32 replicas), large (128 replicas), xlarge (512 replicas).
243+
- Workload: Replica processing latency is drawn from an exponential distribution with mean 1s and capped at 10s.
244+
- `max_ongoing_request` is set to `2`.
245+
- Load generation: Applies closed-loop load generation where the load consistently keeps replicas saturated at `max_ongoing_request` concurrency.
246+
- Warmup: 10s; metrics within the warmup window are discarded entirely.
247+
248+
#### Benchmark Metrics
249+
- Throughput: Requests per second, i.e. `num_requests / duration`.
250+
- Utilization: Measures what fraction of a replica's total processing capacity was consumed by actual work during the experiment.
251+
Concretely, `sum(replica_processing_latency_s) / (duration_s * max_ongoing_requests)`. For GPU deployments, utilization serves as
252+
an assessment proxy for GPU utilization.
253+
- Latency: Measures the client-side end-to-end latency, covering the full round-trip --
254+
client -> `ParentDeployment` -> `ChildDeployment` -> `ParentDeployment` -> client.
255+
256+
#### Normal Situation
257+
Under normal (success) situations, `CapacityQueueRouter` yields higher throughput and utilization and lower latency.
258+
259+
```{image} ../images/capacity-queue-router-normal.png
260+
:align: center
261+
:width: 800px
262+
```
263+
264+
#### Fault Situation
265+
A fault is simulated by killing the `CapacityQueue` router, and upon recovery, `CapacityQueue` converges towards its pre-fault performance.
266+
267+
```{image} ../images/capacity-queue-router-fault.png
268+
:align: center
269+
:width: 800px
270+
```
271+
272+
:::{note}
273+
If you experience the following error when the `CapacityQueue` actor experiences faults and routing decisions fall back to the power-of-two-choices router,
274+
set `RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S` to a higher value.
275+
276+
> Failed to get queue length from Replica(id='...', deployment='ParentDeployment', app='...') within 0.1s.
277+
278+
:::
279+
161280
:::{warning}
162281
## Gotchas and limitations
163282

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# flake8: noqa
2+
3+
# __begin_deploy_app_with_capacity_queue_router__
4+
import ray
5+
from ray import serve
6+
from ray.serve.config import DeploymentActorConfig, RequestRouterConfig
7+
from ray.serve.context import _get_internal_replica_context
8+
9+
from ray.serve.experimental.capacity_queue import (
10+
CapacityQueue,
11+
)
12+
13+
14+
@serve.deployment(
15+
deployment_actors=[
16+
DeploymentActorConfig(
17+
name="capacity_queue",
18+
actor_class=CapacityQueue,
19+
init_kwargs={
20+
"acquire_timeout_s": 0.5,
21+
"token_ttl_s": 5,
22+
# The queue subscribes to controller updates for this deployment
23+
# so it automatically registers/unregisters replicas.
24+
"deployment_id_name": "CapacityQueueApp",
25+
"deployment_id_app": "default",
26+
},
27+
actor_options={"num_cpus": 0},
28+
),
29+
],
30+
request_router_config=RequestRouterConfig(
31+
request_router_class=(
32+
"ray.serve.experimental.capacity_queue_router:CapacityQueueRouter"
33+
),
34+
request_router_kwargs={
35+
"capacity_queue_actor_name": "capacity_queue",
36+
# Fall back to Pow2 after this many consecutive CapacityQueue faults.
37+
"max_fault_retries": 3,
38+
},
39+
# Backoff between retries when the CapacityQueue is unavailable or capacity is exhausted.
40+
initial_backoff_s=0.05,
41+
backoff_multiplier=2.0,
42+
max_backoff_s=1.0,
43+
),
44+
num_replicas=3,
45+
max_ongoing_requests=5,
46+
ray_actor_options={"num_cpus": 0},
47+
)
48+
class CapacityQueueApp:
49+
def __init__(self):
50+
context = _get_internal_replica_context()
51+
self.replica_id = context.replica_id
52+
53+
async def __call__(self):
54+
return self.replica_id
55+
56+
57+
handle = serve.run(CapacityQueueApp.bind())
58+
response = handle.remote().result()
59+
print(f"Response from CapacityQueueApp: {response}")
60+
# __end_deploy_app_with_capacity_queue_router__
176 KB
Loading
739 KB
Loading

‎python/ray/serve/experimental/__init__.py‎

Whitespace-only changes.

0 commit comments

Comments
 (0)