Skip to content

[data] Fix multi-input operator object store memory attribution to upstream operators#61208

Merged
alexeykudinkin merged 9 commits into
ray-project:masterfrom
justinvyu:fix-union-operator-deadlock-preserve-order
Feb 24, 2026
Merged

[data] Fix multi-input operator object store memory attribution to upstream operators#61208
alexeykudinkin merged 9 commits into
ray-project:masterfrom
justinvyu:fix-union-operator-deadlock-preserve-order

Conversation

@justinvyu

@justinvyu justinvyu commented Feb 20, 2026

Copy link
Copy Markdown
Contributor

Description

  • Fix resource attribution for multi-input operators (union, zip): the resource manager previously charged each upstream operator the total internal inqueue bytes (across all inputs) of the downstream operator, double-counting memory for multi-input operators. Now, OpRuntimeMetrics tracks per-input inqueues, so each upstream is only charged for the blocks it actually produced.
  • This fixes a deadlock with preserve_order=True where a fast input inflates the total inqueue across inputs in the union, incorrectly backpressuring the slow input that the union is waiting on (strict round-robin), so neither side can make progress.
Before:

    op_A (slow) ───┐                                                                                                                           
                    ├──▶ UnionOp [inqueue: A=10B, B=90B, total=100B]                                                                           
    op_B (fast) ───┘                                                                                                                           
                                                                                                                                               
    Resource manager charges each upstream for the AGGREGATE inqueue:                                                                          
      op_A owes: 100B (total inqueue) ← wrong, A only contributed 10B                                                                          
      op_B owes: 100B (total inqueue) ← wrong, B only contributed 90B                                                                          
                                         
    Both get backpressured → op_A can't produce → union waits for A → deadlock

After:

    op_A (slow) ───┐
                    ├──▶ UnionOp [inqueue[0]: 10B, inqueue[1]: 90B]
    op_B (fast) ───┘

    Resource manager charges each upstream for its OWN per-input inqueue:
      op_A owes: 10B  (inqueue[0]) ← correct
      op_B owes: 90B  (inqueue[1]) ← correct

    op_A stays unthrottled → produces blocks → union makes progress ✓

Followup work

This PR only solves the preserve_order case where union input queues build up. However, the default case can still run into problems:

  • The default case doesn't even use input queues -- it just automatically routes inputs to the output queue, so the input attribution is immediately thrown away since all the blocks get mixed together in the output queue.

preserve_order=False case illustrated:

Step 1: Blocks arrive — union puts them directly in internal output buffer
                                                       
    op_A ──▶ union._output_buffer: [A₁]                           
    op_B ──▶ union._output_buffer: [A₁, B₁, B₂, B₃, B₄]
                                                                                                                                               
    (on_input_queued never called, on_output_queued instead — no input attribution)             
                                                                                                                                               
  Step 2: Executor eagerly drains union into external outqueue (every step)                                                                    
                                                                                                                                               
    while op.has_next():                 
        op_state.add_output(op.get_next())                                                                                                     
                                         
    union._output_buffer: []
    union external outqueue: [A₁, B₁, B₂, B₃, B₄] = 100B  ← charged to union

  Step 3: Downstream is backpressured, stops consuming — blocks pile up

    op_A ──▶ union external outqueue: 100B  ← all charged to union
    op_B ──▶                                   (no per-input info anywhere)
                      │
                downstream (backpressured, not dispatching)

  Step 4: If union is ineligible, full 100B rolls up to BOTH upstreams

    op_A usage += 100B  (but A only contributed 10B)
    op_B usage += 100B  (but B only contributed 90B)

  Blocks never touch the inqueues at all in preserve_order=False — they go straight to the output buffer, get eagerly drained to the external
  outqueue, and from there there's no information about which input produced them.
justinvyu and others added 4 commits February 19, 2026 11:52
…queue attribution

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…ued calls

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu requested a review from a team as a code owner February 20, 2026 20:31

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly fixes a memory attribution issue for multi-input operators like Union and Zip. By introducing per-input inqueues in OpRuntimeMetrics and updating the resource manager to use this fine-grained information, the change prevents double-counting of memory and resolves a potential deadlock scenario. The changes are well-structured, backward-compatible, and include a new unit test that validates the fix. I have one suggestion to improve code conciseness.

Comment thread python/ray/data/_internal/execution/resource_manager.py Outdated
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Comment thread python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py Outdated
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Feb 20, 2026
…al_inqueue_for_input

Signed-off-by: Justin Yu <justinvyu@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/execution/resource_manager.py
@alexeykudinkin alexeykudinkin merged commit c11c103 into ray-project:master Feb 24, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants