Skip to content

[Data] Revisit OutputSplitter semantic to avoid unnecessary buffer accumulation#60237

Merged
alexeykudinkin merged 2 commits into
ray-project:masterfrom
iamjustinhsu:jhsu/out-split-opt
Jan 16, 2026
Merged

[Data] Revisit OutputSplitter semantic to avoid unnecessary buffer accumulation#60237
alexeykudinkin merged 2 commits into
ray-project:masterfrom
iamjustinhsu:jhsu/out-split-opt

Conversation

@iamjustinhsu

@iamjustinhsu iamjustinhsu commented Jan 16, 2026

Copy link
Copy Markdown
Contributor

Description

Currently, OutputSplitter is only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense.

This change instead inverses that semantic to

  • Dispatch blocks to the next outstanding receiver as soon as these become available
  • Force dispatch in case buffer exceed it's max-size threshold (enforce buffer doesn't exceed it's max-size)

Related issues

Additional information

…cumulation

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner January 16, 2026 19:58

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the OutputSplitter to use a maximum buffer size instead of a minimum, which is a great change to prevent unnecessary buffer accumulation and improve streaming performance. The new dispatching logic appears to correctly prioritize locality while ensuring liveness.

However, there are a few critical issues that need to be addressed:

  • Merge Conflict: The file python/ray/data/tests/test_operators.py contains unresolved merge conflict markers (<<<<<<<, =======, >>>>>>>) which will cause the build to fail.
  • Unrelated Changes: The same test file also includes a large number of changes unrelated to the OutputSplitter (e.g., new tests for MapOperator). To keep this PR focused and easy to review, please move these changes to a separate pull request.
  • Argument Parsing Error: The benchmark script release/nightly_tests/dataset/streaming_split_benchmark.py has a duplicate definition for the --equal-split argument, which will cause a runtime error. The corresponding YAML configuration in release/release_data_tests.yaml also uses this flag incorrectly.

I've left specific comments on these issues. Once they are resolved, this will be a solid improvement.

Comment thread python/ray/data/tests/test_operators.py Outdated
Comment thread release/nightly_tests/dataset/streaming_split_benchmark.py Outdated
Comment thread release/release_data_tests.yaml Outdated
preferred_loc = self._locality_hints[target_index]
preferred_loc = self._locality_hints[target_output_index]

# TODO make this more efficient (adding inverse hash-map)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The TODO here correctly identifies a potential performance bottleneck. The current implementation iterates through the entire buffer (O(N)) to find a bundle with preferred locality. If the buffer size becomes large, this linear scan could impact performance.

Consider implementing the suggestion in the TODO by using an inverted index (e.g., a dictionary mapping location -> List[RefBundle]) to achieve O(1) lookups for preferred bundles. This would make the locality optimization much more efficient.

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Comment thread python/ray/data/tests/test_operators.py Outdated
Comment thread release/nightly_tests/dataset/streaming_split_benchmark.py Outdated
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu changed the title [Data] Revisit OutputSplitter semantic to avoid unnecessary buffer ac… Jan 16, 2026

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

assert not self._buffer, "All bundles should have been dispatched"
return

if not self._buffer:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing forced dispatch when equal=False without locality hints

Medium Severity

When equal=False and _locality_hints is falsy, the new all_inputs_done logic skips the explicit forced dispatch and falls through to the finalize distribution code if the buffer is non-empty. The finalize distribution code is designed only for equal=True mode and assumes the output distribution needs equalization. Running it with equal=False can trigger the assert remainder >= 0 assertion to fail since greedy dispatching produces uneven distributions where sum(allocation) may exceed buffer_size.

Fix in Cursor Fix in Web

@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Jan 16, 2026
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) January 16, 2026 21:57
@alexeykudinkin alexeykudinkin merged commit 67f6397 into ray-project:master Jan 16, 2026
8 checks passed
@iamjustinhsu iamjustinhsu deleted the jhsu/out-split-opt branch January 16, 2026 22:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

2 participants