Skip to content

[Data] Release pinned blocks after dataset execution (executor shutdown, terminal ops, OutputSplitter)#62456

Merged
goutamvenkat-anyscale merged 5 commits into
masterfrom
marwan/improve-epoch-end-cleanup
Apr 16, 2026
Merged

[Data] Release pinned blocks after dataset execution (executor shutdown, terminal ops, OutputSplitter)#62456
goutamvenkat-anyscale merged 5 commits into
masterfrom
marwan/improve-epoch-end-cleanup

Conversation

@marwan116

@marwan116 marwan116 commented Apr 9, 2026

Copy link
Copy Markdown
Contributor

Summary

Dataset.streaming_split() can keep ObjectRefs alive across epochs because the split coordinator holds an iterator that closes over the executor topology, and because OutputSplitter can leave the metrics “shadow” input queue out of sync with the real buffer after equal=True truncation. This PR fixes both issues.

What was going wrong

  1. Topology + queues across epochs
    The coordinator keeps _output_iterator until the next epoch replaces it. Anything reachable from the old executor’s topology (including OpState queues) can pin the object store until that chain is dropped and queues are drained.

  2. Truncated remainder + metrics
    After equalizing splits, a bare _buffer.clear() emptied the real buffer without on_input_dequeued, so OpRuntimeMetrics could still hold RefBundles for the truncated remainder.

  3. Terminal OutputSplitter and mark_execution_finished
    mark_execution_finished() (which clears internal queues via the mixin) was not run for operators with no downstream deps, so terminal splitters did not follow the same lifecycle path as other ops.

What we changed

  • OutputSplitter.all_inputs_done: replace _buffer.clear() with clear_internal_input_queue() so truncation drains through metrics and releases refs consistently.
  • update_operator_states: treat fully completed terminal ops like split sinks as finished for execution and run mark_execution_finished() when output_dependencies is empty.
  • StreamingExecutor.shutdown: after op.shutdown(), call _clear_topology_queues_post_shutdown(force) to clear internal queues and topology OpState queues; for the DAG sink with num_output_splits > 1, skip clearing the external output queues on cooperative (force=False) shutdown so other splits keep their bundles; force=True still clears everything (e.g. epoch rollover in SplitCoordinator._try_start_new_epoch).
  • Tests: test_output_split_shutdown_preserves_sibling_split_queues covers fast vs slow split consumers.

Links (for reviewers)

@marwan116 marwan116 requested a review from a team as a code owner April 9, 2026 03:30

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the shutdown method in StreamingExecutor to clear internal and operator queues, which helps release ObjectRef references and prevent memory leaks in the object store. A review comment suggests also clearing state.active_tasks to ensure that references held by completed tasks are also promptly released.

Comment thread python/ray/data/_internal/execution/streaming_executor.py Outdated
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch 4 times, most recently from aeef7e9 to f76e8d8 Compare April 9, 2026 04:01
Comment thread test_epoch_cleanup.py Outdated
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from f76e8d8 to a38027c Compare April 9, 2026 04:07
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Apr 9, 2026
@marwan116 marwan116 marked this pull request as draft April 9, 2026 14:28
@marwan116 marwan116 changed the title [Data] Clear operator queues on executor shutdown to prevent lingering object store memory Apr 9, 2026
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from d5dbb8f to ef3e9aa Compare April 9, 2026 14:54
@marwan116 marwan116 marked this pull request as ready for review April 9, 2026 14:55
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from ef3e9aa to 1b3ec6b Compare April 9, 2026 15:15

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 1b3ec6b68f74358ca2e3e81e37e1b716a07fcf37. Configure here.

Comment thread python/ray/data/_internal/execution/streaming_executor.py Outdated
@marwan116 marwan116 marked this pull request as draft April 9, 2026 15:30
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from a97312c to 2664b13 Compare April 9, 2026 15:42
@marwan116 marwan116 marked this pull request as ready for review April 9, 2026 15:52
Comment thread python/ray/data/_internal/execution/streaming_executor.py Outdated
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch 3 times, most recently from 323d3fb to d425f0d Compare April 13, 2026 14:36
…g object store memory

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
…lease pinned ObjectRefs

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
…nternal queues

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from d425f0d to dd0c146 Compare April 13, 2026 14:42
Comment thread python/ray/data/_internal/execution/streaming_executor.py Outdated
…ingering object store memory

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
@iamjustinhsu iamjustinhsu self-assigned this Apr 14, 2026
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from 22bffaa to bfa289c Compare April 15, 2026 01:00
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from bfa289c to 6ab3178 Compare April 15, 2026 01:01
…ingering object store memory

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
@marwan116 marwan116 force-pushed the marwan/improve-epoch-end-cleanup branch from 6ab3178 to 1fa5084 Compare April 15, 2026 01:02
@marwan116 marwan116 changed the title [Data] Address Leak in streaming_split across epoch boundaries Apr 15, 2026
@marwan116 marwan116 changed the title [Data] Address leak in streaming_split across epoch boundaries Apr 15, 2026

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! lgtm

@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Apr 15, 2026
@goutamvenkat-anyscale goutamvenkat-anyscale merged commit caaa50f into master Apr 16, 2026
8 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the marwan/improve-epoch-end-cleanup branch April 16, 2026 16:22
shaun0927 added a commit to shaun0927/ray that referenced this pull request Apr 17, 2026
…perators

After ray-project#62456, update_operator_states calls op.mark_execution_finished()
whenever the new terminal_completed predicate is true. terminal_completed
stays True for the rest of execution once the terminal operator has
completed, so mark_execution_finished() is invoked on every subsequent
scheduler iteration. The base method is idempotent, but the
InternalQueueOperatorMixin override walks the (typically already-empty)
internal input and output queues each call.

Guard with has_execution_finished() so the work runs exactly once.

Signed-off-by: JunghwanNA <70629228+shaun0927@users.noreply.github.com>
HLDKNotFound pushed a commit to chichic21039/ray that referenced this pull request Apr 22, 2026
…wn, terminal ops, OutputSplitter) (ray-project#62456)

### Summary

`Dataset.streaming_split()` can keep `ObjectRef`s alive across epochs
because the split coordinator holds an iterator that closes over the
executor topology, and because `OutputSplitter` can leave the metrics
“shadow” input queue out of sync with the real buffer after `equal=True`
truncation. This PR fixes both issues.

### What was going wrong

1. **Topology + queues across epochs**  
The coordinator keeps `_output_iterator` until the next epoch replaces
it. Anything reachable from the old executor’s topology (including
`OpState` queues) can pin the object store until that chain is dropped
and queues are drained.

2. **Truncated remainder + metrics**  
After equalizing splits, a bare `_buffer.clear()` emptied the real
buffer without `on_input_dequeued`, so `OpRuntimeMetrics` could still
hold `RefBundle`s for the truncated remainder.

3. **Terminal `OutputSplitter` and `mark_execution_finished`**  
`mark_execution_finished()` (which clears internal queues via the mixin)
was not run for operators with no downstream deps, so terminal splitters
did not follow the same lifecycle path as other ops.

### What we changed

- **`OutputSplitter.all_inputs_done`**: replace `_buffer.clear()` with
`clear_internal_input_queue()` so truncation drains through metrics and
releases refs consistently.
- **`update_operator_states`**: treat fully completed terminal ops like
split sinks as finished for execution and run
`mark_execution_finished()` when `output_dependencies` is empty.
- **`StreamingExecutor.shutdown`**: after `op.shutdown()`, call
`_clear_topology_queues_post_shutdown(force)` to clear internal queues
and topology `OpState` queues; **for the DAG sink with
`num_output_splits > 1`, skip clearing the external output queues on
cooperative (`force=False`) shutdown** so other splits keep their
bundles; **`force=True` still clears everything** (e.g. epoch rollover
in `SplitCoordinator._try_start_new_epoch`).
- **Tests**: `test_output_split_shutdown_preserves_sibling_split_queues`
covers fast vs slow split consumers.

### Links (for reviewers)

- Cooperative shutdown from first exhausted split:
[`streaming_executor.py`
(`_ClosingIterator.get_next`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L688-L710)
- Forced teardown on epoch advance: [`stream_split_iterator.py`
(`_try_start_new_epoch`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L216-L228)
- Barrier / when the next epoch starts (not tied to first consumer
finishing): [`start_epoch` /
`_barrier`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L205-L214),
[`_barrier` →
`_try_start_new_epoch`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L357-L382)

---------

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…wn, terminal ops, OutputSplitter) (ray-project#62456)

### Summary

`Dataset.streaming_split()` can keep `ObjectRef`s alive across epochs
because the split coordinator holds an iterator that closes over the
executor topology, and because `OutputSplitter` can leave the metrics
“shadow” input queue out of sync with the real buffer after `equal=True`
truncation. This PR fixes both issues.

### What was going wrong

1. **Topology + queues across epochs**  
The coordinator keeps `_output_iterator` until the next epoch replaces
it. Anything reachable from the old executor’s topology (including
`OpState` queues) can pin the object store until that chain is dropped
and queues are drained.

2. **Truncated remainder + metrics**  
After equalizing splits, a bare `_buffer.clear()` emptied the real
buffer without `on_input_dequeued`, so `OpRuntimeMetrics` could still
hold `RefBundle`s for the truncated remainder.

3. **Terminal `OutputSplitter` and `mark_execution_finished`**  
`mark_execution_finished()` (which clears internal queues via the mixin)
was not run for operators with no downstream deps, so terminal splitters
did not follow the same lifecycle path as other ops.

### What we changed

- **`OutputSplitter.all_inputs_done`**: replace `_buffer.clear()` with
`clear_internal_input_queue()` so truncation drains through metrics and
releases refs consistently.
- **`update_operator_states`**: treat fully completed terminal ops like
split sinks as finished for execution and run
`mark_execution_finished()` when `output_dependencies` is empty.
- **`StreamingExecutor.shutdown`**: after `op.shutdown()`, call
`_clear_topology_queues_post_shutdown(force)` to clear internal queues
and topology `OpState` queues; **for the DAG sink with
`num_output_splits > 1`, skip clearing the external output queues on
cooperative (`force=False`) shutdown** so other splits keep their
bundles; **`force=True` still clears everything** (e.g. epoch rollover
in `SplitCoordinator._try_start_new_epoch`).
- **Tests**: `test_output_split_shutdown_preserves_sibling_split_queues`
covers fast vs slow split consumers.

### Links (for reviewers)

- Cooperative shutdown from first exhausted split:
[`streaming_executor.py`
(`_ClosingIterator.get_next`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L688-L710)
- Forced teardown on epoch advance: [`stream_split_iterator.py`
(`_try_start_new_epoch`)](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L216-L228)
- Barrier / when the next epoch starts (not tied to first consumer
finishing): [`start_epoch` /
`_barrier`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L205-L214),
[`_barrier` →
`_try_start_new_epoch`](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L357-L382)

---------

Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants