Skip to content

[Data] Support strict=False mode for StreamingRepartition#60295

Merged
alexeykudinkin merged 33 commits into
ray-project:masterfrom
machichima:streamingrepartition-strict-false
Feb 23, 2026
Merged

[Data] Support strict=False mode for StreamingRepartition#60295
alexeykudinkin merged 33 commits into
ray-project:masterfrom
machichima:streamingrepartition-strict-false

Conversation

@machichima

@machichima machichima commented Jan 19, 2026

Copy link
Copy Markdown
Contributor

Description

Currently, StreamingRepartition operator is essentially strict=True. We want to relax this to allow non-strict mode with following guarantees:

  • Strict mode: is guaranteeing that all output blocks (maybe except for the last one), will be of size target_num_rows
  • Non-strict mode: will provide more relaxed guarantee – it can produce 1 block that is < target_num_rows blocks per input block (ie it wouldn’t do any stitching)

This mode will be the default mode and would allow StreamingRepartition to be fused into previous operator

Related issues

Closes #60026

Additional information

  • Added strict: bool = False parameter to repartition()
  • Added mode-specific bundler selection in _get_fused_streaming_repartition_operator() and plan_streaming_repartition_op():
    - Strict: uses ref_bundler=StreamingRepartitionRefBundler
    - Non-strict: uses ref_bundler=None (default BlockRefBundler)
  • Add unit tests
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested a review from a team as a code owner January 19, 2026 12:00

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a strict parameter to StreamingRepartition, allowing for a non-strict mode. In non-strict mode, repartitioning doesn't stitch blocks, which enables more operator fusion opportunities. The changes are well-implemented across the logical planning, fusion rules, and physical planning layers. The default for repartition is now non-strict, which is a good choice for performance. The added tests are comprehensive and cover both the new non-strict behavior and the fusion logic. My main feedback is to add documentation for the new strict parameter in the user-facing Dataset.repartition method to ensure users understand how to use it.

Comment thread python/ray/data/dataset.py
cursor[bot]

This comment was marked as outdated.

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima

Copy link
Copy Markdown
Contributor Author

@owenowenisme PTAL. Thank you!

@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 19, 2026
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the streamingrepartition-strict-false branch from 6cfbfc5 to 04964bc Compare January 19, 2026 21:25
cursor[bot]

This comment was marked as outdated.

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…artition-strict-false

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>

@owenowenisme owenowenisme left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_operator_fusion is failing could you please take a look?

Comment thread python/ray/data/_internal/logical/rules/combine_shuffles.py
input_physical_dag,
data_context,
name=op.name,
compute_strategy=compute,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need min_rows_per_bundle = op.target_num_rows_per_block here if strict=False?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 89965d0

@machichima machichima Jan 26, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like when we set min_rows_per_bundle here, the BlockRefBundler will try to stitch the output:

return list(output_buffer), _merge_ref_bundles(*output_buffer)

Therefor, I think we should keep it as None here to prevent stitching

if self._min_rows_per_bundle is None:
# Short-circuit if no bundle row target was defined.
assert len(self._bundle_buffer) == 1
bundle = self._bundle_buffer[0]
self._bundle_buffer = []
self._bundle_buffer_size = 0
self._bundle_buffer_size_bytes = 0
return [bundle], bundle

Comment on lines +1693 to +1698
strict: If ``True``, ``repartition`` guarantees that all output blocks,
except for the last one, will have exactly ``target_num_rows_per_block`` rows.
If ``False``, ``repartition`` is more relaxed and may produce blocks smaller
than ``target_num_rows_per_block`` without stitching them together.
This parameter is only used with ``target_num_rows_per_block``.
Defaults to ``False``.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to say that will only produce at most 1 block that is < target_num_rows_per_block per input block if strict is false.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in f748b79



@pytest.mark.parametrize("batch_size", [30, 35, 45])
def test_streaming_repartition_fusion_non_strict(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fusion test should be in python/ray/data/tests/test_operator_fusion.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's existing fusion and streaming repartition related test in this file, I think we can put this here as it align with existing tests. WDYT?

def test_streaming_repartition_fusion_output_shape(

ref_bundler = StreamingRepartitionRefBundler(batch_size)
# No further fusion because StreamingRepartitionRefBundler is stateful
# and maintains internal buffering state across bundles.
supports_fusion = False

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this prevent fusion when batch_size == target_num_rows_per_block ?

@machichima machichima Jan 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think it's intended. As the original code (strict mode) hard-coded supports_fusion=False to prevent further fusion

            # For now, we don't want to over-fuse StreamingRepartition with other map operators,
            # so the result operator does not support further fusion.
            supports_fusion=False,
Comment on lines +425 to +428
strict: If True, guarantees that all output blocks, except for the last one,
will have exactly target_num_rows_per_block rows. If False, is more relaxed
and may produce blocks smaller than target_num_rows_per_block without
stitching them together. Defaults to False.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto with the comment in dataset.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in f748b79

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Comment thread python/ray/data/_internal/logical/rules/combine_shuffles.py
Comment thread python/ray/data/_internal/logical/rules/operator_fusion.py
Signed-off-by: machichima <nary12321@gmail.com>
…artition-strict-false

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
ref_bundler = StreamingRepartitionRefBundler(batch_size)
# No further fusion because StreamingRepartitionRefBundler is stateful
# and maintains internal buffering state across bundles.
supports_fusion = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd not be blocking any subsequent fusion like that

Let's add a test that we're able to fuse multiple ops like this:

  • Map > Map > SR
  • Map > SR > SR

@machichima machichima Feb 7, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the comment is on line 338 (supports_fusion=False), I want to make sure do we want to support fusion for strict mode? Or just add test for non-strict mode? I think it's the latter one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map > SR > SR case cannot work here because after the first Map > SR fusion, the logical operator becomes AbstractUDFMap rather than MapBatches.

logical_op = AbstractUDFMap(
name,
input_op,
up_logical_op.fn,
can_modify_num_rows=up_logical_op.can_modify_num_rows,
fn_args=up_logical_op.fn_args,
fn_kwargs=up_logical_op.fn_kwargs,
fn_constructor_args=up_logical_op.fn_constructor_args,
fn_constructor_kwargs=up_logical_op.fn_constructor_kwargs,
min_rows_per_bundled_input=batch_size,
compute=compute,
ray_remote_args_fn=ray_remote_args_fn,
ray_remote_args=ray_remote_args,
)
self._op_map[op] = logical_op

The current implementation only allows MapBatches > SR fusion:

and isinstance(self._op_map[upstream_ops[0]], MapBatches)

To support Map > SR > SR fusion, we will need more changes, which I think is a bit out of scope of this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it MapBatches then. Map > SR > SR needs to work

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I look into it more, seems like Map > SR > SR already worked, but it's CombineShuffles._combine() combining two SR into one, so the result will just be Map > SR

Updated the test in 8552ff9

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima but this makes it order dependent -- my point is we should avoid setting supports_fusion=False for the resulting operator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! I updated in 210b634 to set supports_fusion=True for both of them

machichima and others added 4 commits February 7, 2026 14:07
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Comment thread python/ray/data/tests/test_operator_fusion.py Outdated

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/tests/test_operator_fusion.py Outdated
Signed-off-by: machichima <nary12321@gmail.com>
ref_bundler = StreamingRepartitionRefBundler(batch_size)
# No further fusion because StreamingRepartitionRefBundler is stateful
# and maintains internal buffering state across bundles.
supports_fusion = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima but this makes it order dependent -- my point is we should avoid setting supports_fusion=False for the resulting operator

Comment on lines +302 to +305
# StreamingRepartition can only fuse in non-strict mode.
# In strict mode, it does not support further fusion.
if isinstance(up_logical_op, StreamingRepartition):
return False
return not up_logical_op._strict

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't want to fuse SR > Map, b/c that will reduce parallelism for Map (i believe we'd have the test for that)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 90153fc (also update the test as well)

Comment on lines +365 to +367
# In non-strict mode, use min_rows_per_bundle to ensure creating batches with batch_size.
# In strict mode, ref_bundler handles bundling, so do not set min_rows_per_bundle.
min_rows = None if down_logical_op._strict else batch_size

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we'd clean up that parameter replacing it with the bundler (don't need to do that in this PR we can do it separately)

Comment on lines 799 to 801
Case 1: map_batches -> map_batches -> streaming_repartition(strict=True) -> map_batches -> map_batches
Result: map -> (map -> s_r)-> (map -> map)
The fused (map -> s_r) doesn't fuse further with surrounding maps.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima this is the case we've talked about:

Should be (Map -> Map -> SR) -> (Map -> Map)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 90153fc

…artition-strict-false

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

# For now, we don't want to over-fuse StreamingRepartition with other map operators,
# so the result operator does not support further fusion.
supports_fusion=False,
supports_fusion=True,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strict mode loses StreamingRepartitionRefBundler during further fusion

Medium Severity

Setting supports_fusion=True unconditionally allows the fused MapBatches→StreamingRepartition operator (in strict mode) to be further fused with upstream MapBatches via _get_fused_map_operator during the map fusion phase. That generic method doesn't preserve the ref_bundler, replacing StreamingRepartitionRefBundler with a BlockRefBundler. Unlike StreamingRepartitionRefBundler, which slices input blocks to guarantee exact-multiple row counts per task, BlockRefBundler simply accumulates blocks until a minimum threshold, potentially sending non-multiple row counts. This causes tasks to produce partial output blocks, breaking the strict mode guarantee that all blocks (except the last) have exactly target_num_rows_per_block rows. Non-strict mode is unaffected since it already uses ref_bundler=None.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima PTAL ^

@machichima machichima Feb 20, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This review comment is related to your previous comment: #60295 (comment)

In the original codebase, we set supports_fusion=False for strict mode.

# For now, we don't want to over-fuse StreamingRepartition with other map operators,
# so the result operator does not support further fusion.
supports_fusion=False,

I updated to supports_fusion=True for both strict and non-strict mode in 210b634 that breaks the CI test.

Want to confirm if we want to:

  1. change it back to supports_fusion=False for strict mode
  2. update the test to make CI pass, and keep supports_fusion=True for both strict and non-strict

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima yeah, we want to keep supports_fusion=True, we just need to fix the fusion to make sure that appropriate bundler is preserved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Updated in 6d25a27

@alexeykudinkin alexeykudinkin left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima changes LGTM!

We just need to address the last comment from Bugbot and 1 test failure and we should be good to go

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/logical/rules/operator_fusion.py
Case 1: map_batches -> map_batches -> streaming_repartition -> map_batches -> map_batches
Result: map -> (map -> s_r)-> (map -> map)
The fused (map -> s_r) doesn't fuse further with surrounding maps.
Case 1: map_batches -> map_batches -> streaming_repartition(strict=True) -> map_batches -> map_batches

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't fuse irrespective of whether it's strict or not (otherwise we might decrease parallelism regardless of whether it's strict or not)

@alexeykudinkin alexeykudinkin merged commit 35b297f into ray-project:master Feb 23, 2026
6 checks passed
@alexeykudinkin

Copy link
Copy Markdown
Contributor

Thanks for bringing this over the finish line @machichima!

Rruop pushed a commit to Rruop/ray that referenced this pull request May 29, 2026
- Expand Table 1 from 239 to 264 entries (add Dashboard + Core commits)
- Rebuild Table 1 in correct branch commit order
- Add fork point analysis (d60d131)
- Add Revert pair documentation
- Add 2.54.x cherry-pick correspondence table
- Add Build/compilation risk analysis (Bazel 7, gRPC)
- Add PR ray-project#60295, PR ray-project#61821, and Issue ray-project#63544 detailed analysis
- Add Table 2: all 344 missing Core/Data/Dashboard commits
- Add Table 3: 80 commits not covered by Table 1

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

claude-code-assisted community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

4 participants