[Data] Support strict=False mode for StreamingRepartition#60295
[Data] Support strict=False mode for StreamingRepartition#60295alexeykudinkin merged 33 commits into
Conversation
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a strict parameter to StreamingRepartition, allowing for a non-strict mode. In non-strict mode, repartitioning doesn't stitch blocks, which enables more operator fusion opportunities. The changes are well-implemented across the logical planning, fusion rules, and physical planning layers. The default for repartition is now non-strict, which is a good choice for performance. The added tests are comprehensive and cover both the new non-strict behavior and the fusion logic. My main feedback is to add documentation for the new strict parameter in the user-facing Dataset.repartition method to ensure users understand how to use it.
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
|
@owenowenisme PTAL. Thank you! |
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
6cfbfc5 to
04964bc
Compare
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…artition-strict-false Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
owenowenisme
left a comment
There was a problem hiding this comment.
test_operator_fusion is failing could you please take a look?
| input_physical_dag, | ||
| data_context, | ||
| name=op.name, | ||
| compute_strategy=compute, |
There was a problem hiding this comment.
I think we need min_rows_per_bundle = op.target_num_rows_per_block here if strict=False?
There was a problem hiding this comment.
Seems like when we set min_rows_per_bundle here, the BlockRefBundler will try to stitch the output:
Therefor, I think we should keep it as None here to prevent stitching
ray/python/ray/data/_internal/execution/operators/map_operator.py
Lines 828 to 835 in 68d01c4
| strict: If ``True``, ``repartition`` guarantees that all output blocks, | ||
| except for the last one, will have exactly ``target_num_rows_per_block`` rows. | ||
| If ``False``, ``repartition`` is more relaxed and may produce blocks smaller | ||
| than ``target_num_rows_per_block`` without stitching them together. | ||
| This parameter is only used with ``target_num_rows_per_block``. | ||
| Defaults to ``False``. |
There was a problem hiding this comment.
Might be better to say that will only produce at most 1 block that is < target_num_rows_per_block per input block if strict is false.
|
|
||
|
|
||
| @pytest.mark.parametrize("batch_size", [30, 35, 45]) | ||
| def test_streaming_repartition_fusion_non_strict( |
There was a problem hiding this comment.
I think fusion test should be in python/ray/data/tests/test_operator_fusion.py
There was a problem hiding this comment.
There's existing fusion and streaming repartition related test in this file, I think we can put this here as it align with existing tests. WDYT?
| ref_bundler = StreamingRepartitionRefBundler(batch_size) | ||
| # No further fusion because StreamingRepartitionRefBundler is stateful | ||
| # and maintains internal buffering state across bundles. | ||
| supports_fusion = False |
There was a problem hiding this comment.
Will this prevent fusion when batch_size == target_num_rows_per_block ?
There was a problem hiding this comment.
Yes, but I think it's intended. As the original code (strict mode) hard-coded supports_fusion=False to prevent further fusion
# For now, we don't want to over-fuse StreamingRepartition with other map operators,
# so the result operator does not support further fusion.
supports_fusion=False,| strict: If True, guarantees that all output blocks, except for the last one, | ||
| will have exactly target_num_rows_per_block rows. If False, is more relaxed | ||
| and may produce blocks smaller than target_num_rows_per_block without | ||
| stitching them together. Defaults to False. |
There was a problem hiding this comment.
Ditto with the comment in dataset.py
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…artition-strict-false Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| ref_bundler = StreamingRepartitionRefBundler(batch_size) | ||
| # No further fusion because StreamingRepartitionRefBundler is stateful | ||
| # and maintains internal buffering state across bundles. | ||
| supports_fusion = False |
There was a problem hiding this comment.
We'd not be blocking any subsequent fusion like that
Let's add a test that we're able to fuse multiple ops like this:
- Map > Map > SR
- Map > SR > SR
There was a problem hiding this comment.
While the comment is on line 338 (supports_fusion=False), I want to make sure do we want to support fusion for strict mode? Or just add test for non-strict mode? I think it's the latter one?
There was a problem hiding this comment.
The Map > SR > SR case cannot work here because after the first Map > SR fusion, the logical operator becomes AbstractUDFMap rather than MapBatches.
ray/python/ray/data/_internal/logical/rules/operator_fusion.py
Lines 355 to 369 in f3d444a
The current implementation only allows MapBatches > SR fusion:
To support Map > SR > SR fusion, we will need more changes, which I think is a bit out of scope of this PR.
There was a problem hiding this comment.
Let's keep it MapBatches then. Map > SR > SR needs to work
There was a problem hiding this comment.
I look into it more, seems like Map > SR > SR already worked, but it's CombineShuffles._combine() combining two SR into one, so the result will just be Map > SR
Updated the test in 8552ff9
There was a problem hiding this comment.
@machichima but this makes it order dependent -- my point is we should avoid setting supports_fusion=False for the resulting operator
There was a problem hiding this comment.
Got it! I updated in 210b634 to set supports_fusion=True for both of them
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| ref_bundler = StreamingRepartitionRefBundler(batch_size) | ||
| # No further fusion because StreamingRepartitionRefBundler is stateful | ||
| # and maintains internal buffering state across bundles. | ||
| supports_fusion = False |
There was a problem hiding this comment.
@machichima but this makes it order dependent -- my point is we should avoid setting supports_fusion=False for the resulting operator
| # StreamingRepartition can only fuse in non-strict mode. | ||
| # In strict mode, it does not support further fusion. | ||
| if isinstance(up_logical_op, StreamingRepartition): | ||
| return False | ||
| return not up_logical_op._strict |
There was a problem hiding this comment.
We actually don't want to fuse SR > Map, b/c that will reduce parallelism for Map (i believe we'd have the test for that)
There was a problem hiding this comment.
Updated in 90153fc (also update the test as well)
| # In non-strict mode, use min_rows_per_bundle to ensure creating batches with batch_size. | ||
| # In strict mode, ref_bundler handles bundling, so do not set min_rows_per_bundle. | ||
| min_rows = None if down_logical_op._strict else batch_size |
There was a problem hiding this comment.
Ah, we'd clean up that parameter replacing it with the bundler (don't need to do that in this PR we can do it separately)
| Case 1: map_batches -> map_batches -> streaming_repartition(strict=True) -> map_batches -> map_batches | ||
| Result: map -> (map -> s_r)-> (map -> map) | ||
| The fused (map -> s_r) doesn't fuse further with surrounding maps. |
There was a problem hiding this comment.
@machichima this is the case we've talked about:
Should be (Map -> Map -> SR) -> (Map -> Map)
…artition-strict-false Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| # For now, we don't want to over-fuse StreamingRepartition with other map operators, | ||
| # so the result operator does not support further fusion. | ||
| supports_fusion=False, | ||
| supports_fusion=True, |
There was a problem hiding this comment.
Strict mode loses StreamingRepartitionRefBundler during further fusion
Medium Severity
Setting supports_fusion=True unconditionally allows the fused MapBatches→StreamingRepartition operator (in strict mode) to be further fused with upstream MapBatches via _get_fused_map_operator during the map fusion phase. That generic method doesn't preserve the ref_bundler, replacing StreamingRepartitionRefBundler with a BlockRefBundler. Unlike StreamingRepartitionRefBundler, which slices input blocks to guarantee exact-multiple row counts per task, BlockRefBundler simply accumulates blocks until a minimum threshold, potentially sending non-multiple row counts. This causes tasks to produce partial output blocks, breaking the strict mode guarantee that all blocks (except the last) have exactly target_num_rows_per_block rows. Non-strict mode is unaffected since it already uses ref_bundler=None.
Additional Locations (1)
There was a problem hiding this comment.
This review comment is related to your previous comment: #60295 (comment)
In the original codebase, we set supports_fusion=False for strict mode.
ray/python/ray/data/_internal/logical/rules/operator_fusion.py
Lines 344 to 346 in eabc0ac
I updated to supports_fusion=True for both strict and non-strict mode in 210b634 that breaks the CI test.
Want to confirm if we want to:
- change it back to
supports_fusion=Falsefor strict mode - update the test to make CI pass, and keep
supports_fusion=Truefor both strict and non-strict
There was a problem hiding this comment.
@machichima yeah, we want to keep supports_fusion=True, we just need to fix the fusion to make sure that appropriate bundler is preserved.
alexeykudinkin
left a comment
There was a problem hiding this comment.
@machichima changes LGTM!
We just need to address the last comment from Bugbot and 1 test failure and we should be good to go
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| Case 1: map_batches -> map_batches -> streaming_repartition -> map_batches -> map_batches | ||
| Result: map -> (map -> s_r)-> (map -> map) | ||
| The fused (map -> s_r) doesn't fuse further with surrounding maps. | ||
| Case 1: map_batches -> map_batches -> streaming_repartition(strict=True) -> map_batches -> map_batches |
There was a problem hiding this comment.
This shouldn't fuse irrespective of whether it's strict or not (otherwise we might decrease parallelism regardless of whether it's strict or not)
|
Thanks for bringing this over the finish line @machichima! |
- Expand Table 1 from 239 to 264 entries (add Dashboard + Core commits) - Rebuild Table 1 in correct branch commit order - Add fork point analysis (d60d131) - Add Revert pair documentation - Add 2.54.x cherry-pick correspondence table - Add Build/compilation risk analysis (Bazel 7, gRPC) - Add PR ray-project#60295, PR ray-project#61821, and Issue ray-project#63544 detailed analysis - Add Table 2: all 344 missing Core/Data/Dashboard commits - Add Table 3: 80 commits not covered by Table 1 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>


Description
Currently, StreamingRepartition operator is essentially
strict=True. We want to relax this to allow non-strict mode with following guarantees:target_num_rowstarget_num_rowsblocks per input block (ie it wouldn’t do any stitching)This mode will be the default mode and would allow StreamingRepartition to be fused into previous operator
Related issues
Closes #60026
Additional information
strict: bool = Falseparameter to repartition()_get_fused_streaming_repartition_operator()andplan_streaming_repartition_op():- Strict: uses
ref_bundler=StreamingRepartitionRefBundler- Non-strict: uses
ref_bundler=None(defaultBlockRefBundler)