[llm] Fix P/D direct streaming OpenAI routing#63679
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces support for direct streaming in the Prefill/Decode (P/D) disaggregation architecture by implementing __serve_build_asgi_app__ to route HTTP traffic through P/D orchestration. It also ensures that session IDs are preserved across the decode-to-prefill hop and adds corresponding unit tests. Feedback highlights two critical issues: bypassing the parent class methods for local decode introduces a performance regression for non-direct streaming paths due to missing batching logic, and the custom ASGI ingress class fails to pass lora_paths to its parent constructor, which breaks dynamic LoRA model discovery.
d8e47cf to
fbd9daf
Compare
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…ng in P/D ingress
- Write the resolved model id back onto the request in the P/D direct-
streaming ingress so an omitted model defaults to a concrete id before
orchestration, instead of discarding _get_model_id's return value.
- Add session_id_from_headers() to http_util and reuse it in both the
OpenAiIngress and the P/D orchestrator, removing the duplicated
session-id header matching.
- Tests: parametrize the prefill-hop test over chat/completions, assert
the /v1/models/{id} retrieve route for parity, and drop a redundant
intermediate and a no-op fake handle option.
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The custom _PDDirectStreamingIngress omitted lora_paths when constructing OpenAiIngress, so GET /v1/models would not enumerate dynamic LoRA adapters on the P/D direct-streaming path. Build lora_paths from the decode config's lora_config like the standard ingress builder does. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
be24049 to
3140572
Compare
When VLLM_NIXL_SIDE_CHANNEL_PORT is unset, use a default base plus _compute_port_offset() (DP rank if set; else Serve replica rank × num_devices). Replicas of one deployment land on non-overlapping side-channel ports without any coordination state. Operators running multiple PD deployments on a single node should set NIXL_SIDE_CHANNEL_PORT_BASE per deployment to non-overlapping ranges. This replaces the earlier file-locked port-block allocator at /tmp/ray_serve_nixl_side_channel_ports.next, which had a TOCTOU window between port-check and vLLM bind, a world-writable state file, and a loop-bound bug when the persisted next-start was below the configured base. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Hoist _PDDirectStreamingIngress from a method-local class to a module-scope class (via lazy factory) so it's testable in isolation and not redefined on every __serve_build_asgi_app__ invocation. - Restore super().chat / super().completions for the local decode hop in _pd_handle_request. The previous bypass to self.engine.chat dropped batch_output_stream=True and manually re-implemented two of _run_request's three concerns. Going through super() runs the full LLMServer request pipeline (request_id propagation, LoRA multiplex resolution, batched streaming) on the decode side. - Consolidate the unit-level decode-hop test and the ASGI-routes-exist test into one HTTP-driven regression test (test_direct_streaming_http_runs_pd_orchestration). It drives a request via TestClient through PDDecodeServer.__serve_build_asgi_app__ and asserts the prefill handle was invoked with the session-id from the request header and that the decode call received the prefill's kv_transfer_params. Parametrized over chat and completions. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
3140572 to
39a6cec
Compare
… swap Replace the in-process OpenAiIngress shim with the same model plain LLMServer direct streaming uses: start from the engine-native ASGI app and re-point only /v1/chat/completions and /v1/completions at the P/D orchestrator (self.chat / self.completions). Every other route stays engine-native, so P/D direct streaming has the same HTTP surface as non-P/D direct streaming instead of reintroducing the OpenAiIngress deployment that direct streaming exists to eliminate. - Drop _PDDirectStreamingIngress, _make_pd_direct_streaming_ingress_cls, and _build_pd_lora_paths. - Add _strip_routes (remove an engine route by path) and _pd_http_response (shape the orchestrator's generator into a JSON or SSE response, reusing the ingress module's response helpers). - /health now comes from the engine-native app, matching non-P/D direct streaming. - Test attaches a started MockVLLMEngine so the engine-native base app builds; assertions unchanged. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Replace the hand-rolled _AsyncIterator class with a 2-line async generator, and collapse _FakePrefillHandle._method's nested class into a SimpleNamespace closure. Behavior-preserving test cleanup. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…ample The dp_pd_example doc test colocates prefill and decode on one node, but neither set NIXL_SIDE_CHANNEL_PORT_BASE, so both fell back to the same default base and their NIXL side-channel listeners collided (zmq Address already in use on :20000/:20002), hanging the example to a test timeout. Assign non-overlapping sub-ephemeral bases (15000/16000), matching the multi-node integration test convention. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
build_pd_openai_app now defaults prefill and decode to non-overlapping NIXL side-channel port bases (20000 / 22000) so their colocated replicas on a single node don't bind the same port. _compute_port_offset() already fans replicas out within each base; this separates the two engines. Uses setdefault, so an explicit NIXL_SIDE_CHANNEL_PORT_BASE still wins. Drop the now-redundant explicit bases from dp_pd_example so it exercises the default. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
|
|
||
| def _strip_routes(app, path: str) -> None: | ||
| """Remove the engine-native APIRoute(s) registered at ``path``.""" | ||
| from fastapi.routing import APIRoute |
There was a problem hiding this comment.
nit: move import to top
There was a problem hiding this comment.
applicable for other imports in this PR
There was a problem hiding this comment.
Done — hoisted these (and the other function-local imports in this file) to the module top in def44d2ff6.
| ] | ||
|
|
||
|
|
||
| async def _pd_http_response(gen): |
There was a problem hiding this comment.
could you add a return type annotation?
There was a problem hiding this comment.
Done — annotated -> Response in def44d2ff6.
| "remote_port": None, | ||
| } | ||
| prefill_request.max_tokens = 1 | ||
| if hasattr(prefill_request, "max_completion_tokens"): |
There was a problem hiding this comment.
prior to this PR, for a chat request that specified max_completion_tokens, the prefill engine could generate more than a token? this isn't directly related to PD - direct streaming compatibility, is it?
There was a problem hiding this comment.
Yes. Chat requests are bounded by max_completion_tokens, not max_tokens, so before this the prefill engine could emit up to the user's max_completion_tokens instead of a single token. It's a latent correctness fix (prefill must produce exactly one token), not specific to direct streaming — it just surfaced here because the direct-streaming chat path exercises it.
There was a problem hiding this comment.
Gotcha, why doesn't non-direct-streaming exercise this path?
There was a problem hiding this comment.
It does, actually. Both paths route chat through the same PDDecodeServer.chat -> _pd_handle_request -> _prepare_prefill_request (non-direct via the OpenAiIngress handle call, direct via the engine-native ASGI app), and pre-PR that helper only set max_tokens = 1.
The over-generation only triggers when a client sends max_completion_tokens. vLLM resolves the chat cap as max_completion_tokens or max_tokens (to_sampling_params), so a client-supplied max_completion_tokens silently overrides our forced max_tokens = 1 and prefill generates up to that many tokens. Clients sending only the legacy max_tokens, or nothing, were capped at 1 correctly, which is why it went unnoticed on the non-direct path. Completions never had the issue since CompletionRequest has no max_completion_tokens field.
So it's a shared latent fix, not direct-streaming-specific. The direct-streaming chat test added here is just the first coverage that exercises it.
| _openai_json_wrapper, | ||
| _peek_at_generator, |
There was a problem hiding this comment.
nit: now that we're using these private symbols cross-module, consider moving them to a shared ingress utils module.
There was a problem hiding this comment.
Done in def44d2ff6 — moved NON_STREAMING_RESPONSE_TYPES, _sanitize_chat_completion_request, _peek_at_generator, and _openai_json_wrapper (plus the supporting _apply_openai_json_format and stream type aliases) into a new core/ingress/utils.py. Both ingress.py and pd_server.py now import them from there.
| ) | ||
|
|
||
| self._set_side_channel_port() | ||
| self._set_side_channel_host() |
There was a problem hiding this comment.
is this reordering preventing any issues?
There was a problem hiding this comment.
Reverted in def44d2ff6 — _set_side_channel_host and _set_side_channel_port don't depend on each other, so the reorder was incidental. Restored the original order to keep the diff minimal.
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| if vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"): | ||
| return | ||
|
|
||
| base_port = int( |
There was a problem hiding this comment.
The random base port may be sequential, e.g. 20000 for P and 20001 for D. For a wide-EP PD setup, this causes collisions when vLLM tries incrementing from the base port (now PD rank 1 collides with D rank 0)
| logger = get_logger(__name__) | ||
|
|
||
| # Disjoint default NIXL side-channel port bases so colocated prefill/decode replicas don't collide. | ||
| _DEFAULT_NIXL_PORT_BASE_PREFILL = 20000 |
There was a problem hiding this comment.
should we make these configurable via env-vars?
There was a problem hiding this comment.
Decided to keep these as inline defaults rather than env vars. The base is already overridable per-deployment via experimental_configs={"NIXL_SIDE_CHANNEL_PORT_BASE": ...}, which is the right scope if someone needs to relocate a range; an env var would be a third surface for the same knob. In 9df4befb0d I also dropped the named _DEFAULT_* constants (they read like configurable tunables when they're just default values) — the builder now shifts only the decode base (22000) and prefill inherits the connector default (20000).
…d ingress utils - pd_server: hoist function-local fastapi/starlette/ingress imports to module top. - pd_server: annotate _pd_http_response return type as Response. - Extract shared OpenAI ingress helpers into core/ingress/utils.py (NON_STREAMING_RESPONSE_TYPES, _sanitize_chat_completion_request, _peek_at_generator, _openai_json_wrapper, plus supporting _apply_openai_json_format and stream type aliases); ingress.py and pd_server now import them from utils. - Revert the incidental host/port reorder in NixlConnectorBackend.setup. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The _DEFAULT_* module constants read like configurable tunables when they're just inline default values, so replace them with commented literals. build_pd_openai_app now shifts only the decode base (22000); prefill inherits the connector default (20000). Drops the duplicated 20000 across the connector and builder. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
|
llm_serve_multi_node_direct_streaming failure is unrelated |
## Description With direct streaming, the engine-native ASGI app is served straight from the LLM replica and the separate `OpenAiIngress` deployment is dropped. For prefill/decode (P/D) this broke routing: the engine-native `/v1/chat/completions` and `/v1/completions` routes hit the local decode engine and skipped remote prefill. This PR routes direct-streaming HTTP on the decode server through P/D orchestration: - `PDOrchestratorMixin.__serve_build_asgi_app__` starts from the engine-native app and re-points only `/v1/chat/completions` and `/v1/completions` at the orchestrator (remote prefill -> local decode). Other routes stay engine-native. - Local decode goes through `super().chat` / `super().completions`, so the standard `LLMServer` pipeline (request id, LoRA multiplex, `batch_output_stream`) runs. - The session-id header is propagated to the prefill handle via `.options(session_id=...)` so a proxy's `-`/`_` rewrite cannot silently drop affinity. New `session_id_from_headers` helper replaces the inlined matcher. - `_prepare_prefill_request` also pins `max_completion_tokens=1` and clears `stream_options` so the one-token prefill request is well-formed for chat. ### NIXL side-channel port The side-channel port must be predictable to the remote peer that starts the handshake. The old default (`get_open_port()`) picked a random ephemeral port the peer can't know. The default is now a fixed base (`20000`) plus the existing `_compute_port_offset()`, so replicas land on reproducible, non-colliding ports. `NIXL_SIDE_CHANNEL_PORT_BASE` stays as the override. ## Test plan - [x] `test_prepare_prefill_request_limits_chat_to_one_token` - [x] `test_direct_streaming_http_runs_pd_orchestration` (chat/completions) - [x] `test_default_side_channel_port_uses_configured_base` --------- Signed-off-by: Seiji Eicher <seiji@anyscale.com>
## Description With direct streaming, the engine-native ASGI app is served straight from the LLM replica and the separate `OpenAiIngress` deployment is dropped. For prefill/decode (P/D) this broke routing: the engine-native `/v1/chat/completions` and `/v1/completions` routes hit the local decode engine and skipped remote prefill. This PR routes direct-streaming HTTP on the decode server through P/D orchestration: - `PDOrchestratorMixin.__serve_build_asgi_app__` starts from the engine-native app and re-points only `/v1/chat/completions` and `/v1/completions` at the orchestrator (remote prefill -> local decode). Other routes stay engine-native. - Local decode goes through `super().chat` / `super().completions`, so the standard `LLMServer` pipeline (request id, LoRA multiplex, `batch_output_stream`) runs. - The session-id header is propagated to the prefill handle via `.options(session_id=...)` so a proxy's `-`/`_` rewrite cannot silently drop affinity. New `session_id_from_headers` helper replaces the inlined matcher. - `_prepare_prefill_request` also pins `max_completion_tokens=1` and clears `stream_options` so the one-token prefill request is well-formed for chat. ### NIXL side-channel port The side-channel port must be predictable to the remote peer that starts the handshake. The old default (`get_open_port()`) picked a random ephemeral port the peer can't know. The default is now a fixed base (`20000`) plus the existing `_compute_port_offset()`, so replicas land on reproducible, non-colliding ports. `NIXL_SIDE_CHANNEL_PORT_BASE` stays as the override. ## Test plan - [x] `test_prepare_prefill_request_limits_chat_to_one_token` - [x] `test_direct_streaming_http_runs_pd_orchestration` (chat/completions) - [x] `test_default_side_channel_port_uses_configured_base` --------- Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Description
With direct streaming, the engine-native ASGI app is served straight from the LLM replica and the separate
OpenAiIngressdeployment is dropped. For prefill/decode (P/D) this broke routing: the engine-native/v1/chat/completionsand/v1/completionsroutes hit the local decode engine and skipped remote prefill.This PR routes direct-streaming HTTP on the decode server through P/D orchestration:
PDOrchestratorMixin.__serve_build_asgi_app__starts from the engine-native app and re-points only/v1/chat/completionsand/v1/completionsat the orchestrator (remote prefill -> local decode). Other routes stay engine-native.super().chat/super().completions, so the standardLLMServerpipeline (request id, LoRA multiplex,batch_output_stream) runs..options(session_id=...)so a proxy's-/_rewrite cannot silently drop affinity. Newsession_id_from_headershelper replaces the inlined matcher._prepare_prefill_requestalso pinsmax_completion_tokens=1and clearsstream_optionsso the one-token prefill request is well-formed for chat.NIXL side-channel port
The side-channel port must be predictable to the remote peer that starts the handshake. The old default (
get_open_port()) picked a random ephemeral port the peer can't know. The default is now a fixed base (20000) plus the existing_compute_port_offset(), so replicas land on reproducible, non-colliding ports.NIXL_SIDE_CHANNEL_PORT_BASEstays as the override.Test plan
test_prepare_prefill_request_limits_chat_to_one_tokentest_direct_streaming_http_runs_pd_orchestration(chat/completions)test_default_side_channel_port_uses_configured_base