Skip to content

[llm] Fix P/D direct streaming OpenAI routing#63679

Merged
elliot-barn merged 12 commits into
ray-project:masterfrom
eicherseiji:seiji/fix-pd-openai-app-routing
May 29, 2026
Merged

[llm] Fix P/D direct streaming OpenAI routing#63679
elliot-barn merged 12 commits into
ray-project:masterfrom
eicherseiji:seiji/fix-pd-openai-app-routing

Conversation

@eicherseiji

@eicherseiji eicherseiji commented May 27, 2026

Copy link
Copy Markdown
Contributor

Description

With direct streaming, the engine-native ASGI app is served straight from the LLM replica and the separate OpenAiIngress deployment is dropped. For prefill/decode (P/D) this broke routing: the engine-native /v1/chat/completions and /v1/completions routes hit the local decode engine and skipped remote prefill.

This PR routes direct-streaming HTTP on the decode server through P/D orchestration:

  • PDOrchestratorMixin.__serve_build_asgi_app__ starts from the engine-native app and re-points only /v1/chat/completions and /v1/completions at the orchestrator (remote prefill -> local decode). Other routes stay engine-native.
  • Local decode goes through super().chat / super().completions, so the standard LLMServer pipeline (request id, LoRA multiplex, batch_output_stream) runs.
  • The session-id header is propagated to the prefill handle via .options(session_id=...) so a proxy's -/_ rewrite cannot silently drop affinity. New session_id_from_headers helper replaces the inlined matcher.
  • _prepare_prefill_request also pins max_completion_tokens=1 and clears stream_options so the one-token prefill request is well-formed for chat.

NIXL side-channel port

The side-channel port must be predictable to the remote peer that starts the handshake. The old default (get_open_port()) picked a random ephemeral port the peer can't know. The default is now a fixed base (20000) plus the existing _compute_port_offset(), so replicas land on reproducible, non-colliding ports. NIXL_SIDE_CHANNEL_PORT_BASE stays as the override.

Test plan

  • test_prepare_prefill_request_limits_chat_to_one_token
  • test_direct_streaming_http_runs_pd_orchestration (chat/completions)
  • test_default_side_channel_port_uses_configured_base

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for direct streaming in the Prefill/Decode (P/D) disaggregation architecture by implementing __serve_build_asgi_app__ to route HTTP traffic through P/D orchestration. It also ensures that session IDs are preserved across the decode-to-prefill hop and adds corresponding unit tests. Feedback highlights two critical issues: bypassing the parent class methods for local decode introduces a performance regression for non-direct streaming paths due to missing batching logic, and the custom ASGI ingress class fails to pass lora_paths to its parent constructor, which breaks dynamic LoRA model discovery.

Comment thread python/ray/llm/_internal/serve/serving_patterns/prefill_decode/pd_server.py Outdated
Comment thread python/ray/llm/_internal/serve/serving_patterns/prefill_decode/pd_server.py Outdated
@eicherseiji eicherseiji force-pushed the seiji/fix-pd-openai-app-routing branch from d8e47cf to fbd9daf Compare May 27, 2026 22:30
@eicherseiji eicherseiji marked this pull request as ready for review May 28, 2026 02:32
@eicherseiji eicherseiji requested review from a team as code owners May 28, 2026 02:32
@eicherseiji eicherseiji added the go add ONLY when ready to merge, run all tests label May 28, 2026
@eicherseiji eicherseiji marked this pull request as draft May 28, 2026 03:41
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…ng in P/D ingress

- Write the resolved model id back onto the request in the P/D direct-
  streaming ingress so an omitted model defaults to a concrete id before
  orchestration, instead of discarding _get_model_id's return value.
- Add session_id_from_headers() to http_util and reuse it in both the
  OpenAiIngress and the P/D orchestrator, removing the duplicated
  session-id header matching.
- Tests: parametrize the prefill-hop test over chat/completions, assert
  the /v1/models/{id} retrieve route for parity, and drop a redundant
  intermediate and a no-op fake handle option.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The custom _PDDirectStreamingIngress omitted lora_paths when constructing
OpenAiIngress, so GET /v1/models would not enumerate dynamic LoRA adapters
on the P/D direct-streaming path. Build lora_paths from the decode config's
lora_config like the standard ingress builder does.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@eicherseiji eicherseiji force-pushed the seiji/fix-pd-openai-app-routing branch from be24049 to 3140572 Compare May 28, 2026 06:06
When VLLM_NIXL_SIDE_CHANNEL_PORT is unset, use a default base plus
_compute_port_offset() (DP rank if set; else Serve replica rank ×
num_devices). Replicas of one deployment land on non-overlapping
side-channel ports without any coordination state.

Operators running multiple PD deployments on a single node should
set NIXL_SIDE_CHANNEL_PORT_BASE per deployment to non-overlapping
ranges.

This replaces the earlier file-locked port-block allocator at
/tmp/ray_serve_nixl_side_channel_ports.next, which had a TOCTOU
window between port-check and vLLM bind, a world-writable state
file, and a loop-bound bug when the persisted next-start was below
the configured base.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Hoist _PDDirectStreamingIngress from a method-local class to a
  module-scope class (via lazy factory) so it's testable in isolation
  and not redefined on every __serve_build_asgi_app__ invocation.

- Restore super().chat / super().completions for the local decode hop
  in _pd_handle_request. The previous bypass to self.engine.chat
  dropped batch_output_stream=True and manually re-implemented two of
  _run_request's three concerns. Going through super() runs the full
  LLMServer request pipeline (request_id propagation, LoRA multiplex
  resolution, batched streaming) on the decode side.

- Consolidate the unit-level decode-hop test and the ASGI-routes-exist
  test into one HTTP-driven regression test
  (test_direct_streaming_http_runs_pd_orchestration). It drives a
  request via TestClient through PDDecodeServer.__serve_build_asgi_app__
  and asserts the prefill handle was invoked with the session-id from
  the request header and that the decode call received the prefill's
  kv_transfer_params. Parametrized over chat and completions.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@eicherseiji eicherseiji force-pushed the seiji/fix-pd-openai-app-routing branch from 3140572 to 39a6cec Compare May 28, 2026 06:14
… swap

Replace the in-process OpenAiIngress shim with the same model plain
LLMServer direct streaming uses: start from the engine-native ASGI app
and re-point only /v1/chat/completions and /v1/completions at the P/D
orchestrator (self.chat / self.completions). Every other route stays
engine-native, so P/D direct streaming has the same HTTP surface as
non-P/D direct streaming instead of reintroducing the OpenAiIngress
deployment that direct streaming exists to eliminate.

- Drop _PDDirectStreamingIngress, _make_pd_direct_streaming_ingress_cls,
  and _build_pd_lora_paths.
- Add _strip_routes (remove an engine route by path) and
  _pd_http_response (shape the orchestrator's generator into a JSON or
  SSE response, reusing the ingress module's response helpers).
- /health now comes from the engine-native app, matching non-P/D
  direct streaming.
- Test attaches a started MockVLLMEngine so the engine-native base app
  builds; assertions unchanged.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@eicherseiji eicherseiji marked this pull request as ready for review May 28, 2026 22:29
Replace the hand-rolled _AsyncIterator class with a 2-line async
generator, and collapse _FakePrefillHandle._method's nested class into
a SimpleNamespace closure. Behavior-preserving test cleanup.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@ray-gardener ray-gardener Bot added the serve Ray Serve Related Issue label May 29, 2026
…ample

The dp_pd_example doc test colocates prefill and decode on one node, but
neither set NIXL_SIDE_CHANNEL_PORT_BASE, so both fell back to the same
default base and their NIXL side-channel listeners collided
(zmq Address already in use on :20000/:20002), hanging the example to a
test timeout. Assign non-overlapping sub-ephemeral bases (15000/16000),
matching the multi-node integration test convention.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@eicherseiji eicherseiji requested a review from a team as a code owner May 29, 2026 05:07
build_pd_openai_app now defaults prefill and decode to non-overlapping
NIXL side-channel port bases (20000 / 22000) so their colocated replicas
on a single node don't bind the same port. _compute_port_offset() already
fans replicas out within each base; this separates the two engines. Uses
setdefault, so an explicit NIXL_SIDE_CHANNEL_PORT_BASE still wins.

Drop the now-redundant explicit bases from dp_pd_example so it exercises
the default.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>

@jeffreywang88 jeffreywang88 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice approach :)


def _strip_routes(app, path: str) -> None:
"""Remove the engine-native APIRoute(s) registered at ``path``."""
from fastapi.routing import APIRoute

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move import to top

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applicable for other imports in this PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — hoisted these (and the other function-local imports in this file) to the module top in def44d2ff6.

]


async def _pd_http_response(gen):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a return type annotation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — annotated -> Response in def44d2ff6.

"remote_port": None,
}
prefill_request.max_tokens = 1
if hasattr(prefill_request, "max_completion_tokens"):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prior to this PR, for a chat request that specified max_completion_tokens, the prefill engine could generate more than a token? this isn't directly related to PD - direct streaming compatibility, is it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Chat requests are bounded by max_completion_tokens, not max_tokens, so before this the prefill engine could emit up to the user's max_completion_tokens instead of a single token. It's a latent correctness fix (prefill must produce exactly one token), not specific to direct streaming — it just surfaced here because the direct-streaming chat path exercises it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, why doesn't non-direct-streaming exercise this path?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, actually. Both paths route chat through the same PDDecodeServer.chat -> _pd_handle_request -> _prepare_prefill_request (non-direct via the OpenAiIngress handle call, direct via the engine-native ASGI app), and pre-PR that helper only set max_tokens = 1.

The over-generation only triggers when a client sends max_completion_tokens. vLLM resolves the chat cap as max_completion_tokens or max_tokens (to_sampling_params), so a client-supplied max_completion_tokens silently overrides our forced max_tokens = 1 and prefill generates up to that many tokens. Clients sending only the legacy max_tokens, or nothing, were capped at 1 correctly, which is why it went unnoticed on the non-direct path. Completions never had the issue since CompletionRequest has no max_completion_tokens field.

So it's a shared latent fix, not direct-streaming-specific. The direct-streaming chat test added here is just the first coverage that exercises it.

Comment on lines +83 to +84
_openai_json_wrapper,
_peek_at_generator,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: now that we're using these private symbols cross-module, consider moving them to a shared ingress utils module.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in def44d2ff6 — moved NON_STREAMING_RESPONSE_TYPES, _sanitize_chat_completion_request, _peek_at_generator, and _openai_json_wrapper (plus the supporting _apply_openai_json_format and stream type aliases) into a new core/ingress/utils.py. Both ingress.py and pd_server.py now import them from there.

)

self._set_side_channel_port()
self._set_side_channel_host()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this reordering preventing any issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in def44d2ff6_set_side_channel_host and _set_side_channel_port don't depend on each other, so the reorder was incidental. Restored the original order to keep the diff minimal.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
if vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"):
return

base_port = int(

@eicherseiji eicherseiji May 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The random base port may be sequential, e.g. 20000 for P and 20001 for D. For a wide-EP PD setup, this causes collisions when vLLM tries incrementing from the base port (now PD rank 1 collides with D rank 0)

logger = get_logger(__name__)

# Disjoint default NIXL side-channel port bases so colocated prefill/decode replicas don't collide.
_DEFAULT_NIXL_PORT_BASE_PREFILL = 20000

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make these configurable via env-vars?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to keep these as inline defaults rather than env vars. The base is already overridable per-deployment via experimental_configs={"NIXL_SIDE_CHANNEL_PORT_BASE": ...}, which is the right scope if someone needs to relocate a range; an env var would be a third surface for the same knob. In 9df4befb0d I also dropped the named _DEFAULT_* constants (they read like configurable tunables when they're just default values) — the builder now shifts only the decode base (22000) and prefill inherits the connector default (20000).

…d ingress utils

- pd_server: hoist function-local fastapi/starlette/ingress imports to module top.
- pd_server: annotate _pd_http_response return type as Response.
- Extract shared OpenAI ingress helpers into core/ingress/utils.py
  (NON_STREAMING_RESPONSE_TYPES, _sanitize_chat_completion_request,
  _peek_at_generator, _openai_json_wrapper, plus supporting
  _apply_openai_json_format and stream type aliases); ingress.py and
  pd_server now import them from utils.
- Revert the incidental host/port reorder in NixlConnectorBackend.setup.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The _DEFAULT_* module constants read like configurable tunables when
they're just inline default values, so replace them with commented
literals. build_pd_openai_app now shifts only the decode base (22000);
prefill inherits the connector default (20000). Drops the duplicated
20000 across the connector and builder.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@eicherseiji eicherseiji requested a review from jeffreywang88 May 29, 2026 05:59
@eicherseiji eicherseiji changed the title Fix P/D direct streaming OpenAI routing May 29, 2026

@jeffreywang88 jeffreywang88 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

@dstrodtman dstrodtman left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stamp

@eicherseiji

Copy link
Copy Markdown
Contributor Author
@elliot-barn elliot-barn merged commit a5ce0e0 into ray-project:master May 29, 2026
5 of 6 checks passed
rueian pushed a commit to rueian/ray that referenced this pull request Jun 4, 2026
## Description

With direct streaming, the engine-native ASGI app is served straight
from the LLM replica and the separate `OpenAiIngress` deployment is
dropped. For prefill/decode (P/D) this broke routing: the engine-native
`/v1/chat/completions` and `/v1/completions` routes hit the local decode
engine and skipped remote prefill.

This PR routes direct-streaming HTTP on the decode server through P/D
orchestration:

- `PDOrchestratorMixin.__serve_build_asgi_app__` starts from the
engine-native app and re-points only `/v1/chat/completions` and
`/v1/completions` at the orchestrator (remote prefill -> local decode).
Other routes stay engine-native.
- Local decode goes through `super().chat` / `super().completions`, so
the standard `LLMServer` pipeline (request id, LoRA multiplex,
`batch_output_stream`) runs.
- The session-id header is propagated to the prefill handle via
`.options(session_id=...)` so a proxy's `-`/`_` rewrite cannot silently
drop affinity. New `session_id_from_headers` helper replaces the inlined
matcher.
- `_prepare_prefill_request` also pins `max_completion_tokens=1` and
clears `stream_options` so the one-token prefill request is well-formed
for chat.

### NIXL side-channel port

The side-channel port must be predictable to the remote peer that starts
the handshake. The old default (`get_open_port()`) picked a random
ephemeral port the peer can't know. The default is now a fixed base
(`20000`) plus the existing `_compute_port_offset()`, so replicas land
on reproducible, non-colliding ports. `NIXL_SIDE_CHANNEL_PORT_BASE`
stays as the override.

## Test plan
- [x] `test_prepare_prefill_request_limits_chat_to_one_token`
- [x] `test_direct_streaming_http_runs_pd_orchestration`
(chat/completions)
- [x] `test_default_side_channel_port_uses_configured_base`

---------

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
## Description

With direct streaming, the engine-native ASGI app is served straight
from the LLM replica and the separate `OpenAiIngress` deployment is
dropped. For prefill/decode (P/D) this broke routing: the engine-native
`/v1/chat/completions` and `/v1/completions` routes hit the local decode
engine and skipped remote prefill.

This PR routes direct-streaming HTTP on the decode server through P/D
orchestration:

- `PDOrchestratorMixin.__serve_build_asgi_app__` starts from the
engine-native app and re-points only `/v1/chat/completions` and
`/v1/completions` at the orchestrator (remote prefill -> local decode).
Other routes stay engine-native.
- Local decode goes through `super().chat` / `super().completions`, so
the standard `LLMServer` pipeline (request id, LoRA multiplex,
`batch_output_stream`) runs.
- The session-id header is propagated to the prefill handle via
`.options(session_id=...)` so a proxy's `-`/`_` rewrite cannot silently
drop affinity. New `session_id_from_headers` helper replaces the inlined
matcher.
- `_prepare_prefill_request` also pins `max_completion_tokens=1` and
clears `stream_options` so the one-token prefill request is well-formed
for chat.

### NIXL side-channel port

The side-channel port must be predictable to the remote peer that starts
the handshake. The old default (`get_open_port()`) picked a random
ephemeral port the peer can't know. The default is now a fixed base
(`20000`) plus the existing `_compute_port_offset()`, so replicas land
on reproducible, non-colliding ports. `NIXL_SIDE_CHANNEL_PORT_BASE`
stays as the override.

## Test plan
- [x] `test_prepare_prefill_request_limits_chat_to_one_token`
- [x] `test_direct_streaming_http_runs_pd_orchestration`
(chat/completions)
- [x] `test_default_side_channel_port_uses_configured_base`

---------

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue

4 participants