[Data] Release pinned blocks after dataset execution (executor shutdown, terminal ops, OutputSplitter)#62456
Merged
Conversation
Contributor
There was a problem hiding this comment.
Code Review
This pull request updates the shutdown method in StreamingExecutor to clear internal and operator queues, which helps release ObjectRef references and prevent memory leaks in the object store. A review comment suggests also clearing state.active_tasks to ensure that references held by completed tasks are also promptly released.
aeef7e9 to
f76e8d8
Compare
f76e8d8 to
a38027c
Compare
d5dbb8f to
ef3e9aa
Compare
ef3e9aa to
1b3ec6b
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 1b3ec6b68f74358ca2e3e81e37e1b716a07fcf37. Configure here.
a97312c to
2664b13
Compare
iamjustinhsu
reviewed
Apr 9, 2026
323d3fb to
d425f0d
Compare
…g object store memory Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
…lease pinned ObjectRefs Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
…nternal queues Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
d425f0d to
dd0c146
Compare
…ingering object store memory Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
22bffaa to
bfa289c
Compare
bfa289c to
6ab3178
Compare
…ingering object store memory Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
6ab3178 to
1fa5084
Compare
shaun0927
added a commit
to shaun0927/ray
that referenced
this pull request
Apr 17, 2026
…perators After ray-project#62456, update_operator_states calls op.mark_execution_finished() whenever the new terminal_completed predicate is true. terminal_completed stays True for the rest of execution once the terminal operator has completed, so mark_execution_finished() is invoked on every subsequent scheduler iteration. The base method is idempotent, but the InternalQueueOperatorMixin override walks the (typically already-empty) internal input and output queues each call. Guard with has_execution_finished() so the work runs exactly once. Signed-off-by: JunghwanNA <70629228+shaun0927@users.noreply.github.com>
HLDKNotFound
pushed a commit
to chichic21039/ray
that referenced
this pull request
Apr 22, 2026
…wn, terminal ops, OutputSplitter) (ray-project#62456) ### Summary `Dataset.streaming_split()` can keep `ObjectRef`s alive across epochs because the split coordinator holds an iterator that closes over the executor topology, and because `OutputSplitter` can leave the metrics “shadow” input queue out of sync with the real buffer after `equal=True` truncation. This PR fixes both issues. ### What was going wrong 1. **Topology + queues across epochs** The coordinator keeps `_output_iterator` until the next epoch replaces it. Anything reachable from the old executor’s topology (including `OpState` queues) can pin the object store until that chain is dropped and queues are drained. 2. **Truncated remainder + metrics** After equalizing splits, a bare `_buffer.clear()` emptied the real buffer without `on_input_dequeued`, so `OpRuntimeMetrics` could still hold `RefBundle`s for the truncated remainder. 3. **Terminal `OutputSplitter` and `mark_execution_finished`** `mark_execution_finished()` (which clears internal queues via the mixin) was not run for operators with no downstream deps, so terminal splitters did not follow the same lifecycle path as other ops. ### What we changed - **`OutputSplitter.all_inputs_done`**: replace `_buffer.clear()` with `clear_internal_input_queue()` so truncation drains through metrics and releases refs consistently. - **`update_operator_states`**: treat fully completed terminal ops like split sinks as finished for execution and run `mark_execution_finished()` when `output_dependencies` is empty. - **`StreamingExecutor.shutdown`**: after `op.shutdown()`, call `_clear_topology_queues_post_shutdown(force)` to clear internal queues and topology `OpState` queues; **for the DAG sink with `num_output_splits > 1`, skip clearing the external output queues on cooperative (`force=False`) shutdown** so other splits keep their bundles; **`force=True` still clears everything** (e.g. epoch rollover in `SplitCoordinator._try_start_new_epoch`). - **Tests**: `test_output_split_shutdown_preserves_sibling_split_queues` covers fast vs slow split consumers. ### Links (for reviewers) - Cooperative shutdown from first exhausted split: [`streaming_executor.py` (`_ClosingIterator.get_next`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L688-L710) - Forced teardown on epoch advance: [`stream_split_iterator.py` (`_try_start_new_epoch`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L216-L228) - Barrier / when the next epoch starts (not tied to first consumer finishing): [`start_epoch` / `_barrier`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L205-L214), [`_barrier` → `_try_start_new_epoch`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L357-L382) --------- Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
Lucas61000
pushed a commit
to Lucas61000/ray
that referenced
this pull request
May 15, 2026
…wn, terminal ops, OutputSplitter) (ray-project#62456) ### Summary `Dataset.streaming_split()` can keep `ObjectRef`s alive across epochs because the split coordinator holds an iterator that closes over the executor topology, and because `OutputSplitter` can leave the metrics “shadow” input queue out of sync with the real buffer after `equal=True` truncation. This PR fixes both issues. ### What was going wrong 1. **Topology + queues across epochs** The coordinator keeps `_output_iterator` until the next epoch replaces it. Anything reachable from the old executor’s topology (including `OpState` queues) can pin the object store until that chain is dropped and queues are drained. 2. **Truncated remainder + metrics** After equalizing splits, a bare `_buffer.clear()` emptied the real buffer without `on_input_dequeued`, so `OpRuntimeMetrics` could still hold `RefBundle`s for the truncated remainder. 3. **Terminal `OutputSplitter` and `mark_execution_finished`** `mark_execution_finished()` (which clears internal queues via the mixin) was not run for operators with no downstream deps, so terminal splitters did not follow the same lifecycle path as other ops. ### What we changed - **`OutputSplitter.all_inputs_done`**: replace `_buffer.clear()` with `clear_internal_input_queue()` so truncation drains through metrics and releases refs consistently. - **`update_operator_states`**: treat fully completed terminal ops like split sinks as finished for execution and run `mark_execution_finished()` when `output_dependencies` is empty. - **`StreamingExecutor.shutdown`**: after `op.shutdown()`, call `_clear_topology_queues_post_shutdown(force)` to clear internal queues and topology `OpState` queues; **for the DAG sink with `num_output_splits > 1`, skip clearing the external output queues on cooperative (`force=False`) shutdown** so other splits keep their bundles; **`force=True` still clears everything** (e.g. epoch rollover in `SplitCoordinator._try_start_new_epoch`). - **Tests**: `test_output_split_shutdown_preserves_sibling_split_queues` covers fast vs slow split consumers. ### Links (for reviewers) - Cooperative shutdown from first exhausted split: [`streaming_executor.py` (`_ClosingIterator.get_next`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L688-L710) - Forced teardown on epoch advance: [`stream_split_iterator.py` (`_try_start_new_epoch`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L216-L228) - Barrier / when the next epoch starts (not tied to first consumer finishing): [`start_epoch` / `_barrier`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L205-L214), [`_barrier` → `_try_start_new_epoch`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L357-L382) --------- Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
Dataset.streaming_split()can keepObjectRefs alive across epochs because the split coordinator holds an iterator that closes over the executor topology, and becauseOutputSplittercan leave the metrics “shadow” input queue out of sync with the real buffer afterequal=Truetruncation. This PR fixes both issues.What was going wrong
Topology + queues across epochs
The coordinator keeps
_output_iteratoruntil the next epoch replaces it. Anything reachable from the old executor’s topology (includingOpStatequeues) can pin the object store until that chain is dropped and queues are drained.Truncated remainder + metrics
After equalizing splits, a bare
_buffer.clear()emptied the real buffer withouton_input_dequeued, soOpRuntimeMetricscould still holdRefBundles for the truncated remainder.Terminal
OutputSplitterandmark_execution_finishedmark_execution_finished()(which clears internal queues via the mixin) was not run for operators with no downstream deps, so terminal splitters did not follow the same lifecycle path as other ops.What we changed
OutputSplitter.all_inputs_done: replace_buffer.clear()withclear_internal_input_queue()so truncation drains through metrics and releases refs consistently.update_operator_states: treat fully completed terminal ops like split sinks as finished for execution and runmark_execution_finished()whenoutput_dependenciesis empty.StreamingExecutor.shutdown: afterop.shutdown(), call_clear_topology_queues_post_shutdown(force)to clear internal queues and topologyOpStatequeues; for the DAG sink withnum_output_splits > 1, skip clearing the external output queues on cooperative (force=False) shutdown so other splits keep their bundles;force=Truestill clears everything (e.g. epoch rollover inSplitCoordinator._try_start_new_epoch).test_output_split_shutdown_preserves_sibling_split_queuescovers fast vs slow split consumers.Links (for reviewers)
streaming_executor.py(_ClosingIterator.get_next)stream_split_iterator.py(_try_start_new_epoch)start_epoch/_barrier,_barrier→_try_start_new_epoch