Skip to content

[Data] Shut down executor when DataIterator exits early#62949

Merged
justinvyu merged 13 commits into
ray-project:masterfrom
xinyuangui2:fix/dataset-iterator-early-exit-cleanup
May 15, 2026
Merged

[Data] Shut down executor when DataIterator exits early#62949
justinvyu merged 13 commits into
ray-project:masterfrom
xinyuangui2:fix/dataset-iterator-early-exit-cleanup

Conversation

@xinyuangui2

@xinyuangui2 xinyuangui2 commented Apr 26, 2026

Copy link
Copy Markdown
Contributor

When user code breaks out of iter_batches() early (break in a for-loop), the dataset was not marked as finished. The streaming executor's worker thread kept producing blocks that piled up in the object store, and the executor held resources that could starve other datasets (e.g. a validation dataset waiting to run).

The inner _ClosingIterator only shuts down the executor through its __del__, which is non-deterministic — generator references may linger, leaving the executor alive long after iteration stops.

Single deterministic shutdown path: _on_iteration_end

Add a try/finally around yield from batch_iterator in DataIterator._iter_batches, calling a new _on_iteration_end(executor) hook on StopIteration, GeneratorExit (early break), and exceptions.

  • The default implementation calls executor.shutdown(force=False) (idempotent), covering the local iter_batches() path.
  • StreamSplitDataIterator overrides it to fire client_disengaged(epoch, split_idx) on the remote SplitCoordinator, since its executor lives on the actor and the iterator returns executor=None.

Both run synchronously on the consumer's thread, so cleanup happens the moment the consumer stops pulling.

Why GeneratorExit matters here

_iter_batches is a generator (it yield froms the batch iterator). When the consumer abandons the generator early — either by break-ing out of the for-loop, or by letting the generator go out of scope — Python's generator protocol throws GeneratorExit into it at the current yield point. Without the try/finally, that GeneratorExit unwinds straight out and the cleanup after yield from never runs; the executor keeps producing blocks until _ClosingIterator.__del__ eventually fires, which can be arbitrarily delayed. With the try/finally, GeneratorExit triggers _on_iteration_end synchronously on the consumer's thread, giving us one deterministic shutdown path for all three exit modes.

Why the consumer thread, not gen_blocks's finally

StreamSplitDataIterator's inner gen_blocks generator runs inside make_async_gen's filling worker thread. On early break, that worker exits via the interrupted_event path without explicitly closing the generator — its cleanup is then GC-bound and can be arbitrarily delayed under CI load (10+ s, causing flakes). The consumer-side hook avoids that entirely.

SplitCoordinator cleanup

On each client_disengaged(epoch_id, split_idx) call, the coordinator:

  • Clears that split's prefetched-bytes accounting and drops its buffered RefBundle so remaining splits don't suffer stale backpressure or pinned memory.
  • Tracks disengaged splits in a set; once every split has disengaged, shuts the executor down. Shutdown is dispatched outside the lock since StreamingExecutor.shutdown joins the scheduling thread for up to 2 s.
  • Idempotent per (epoch_id, split_idx) as defense against duplicate/stale notifications (e.g. notifications arriving after the next epoch has started).

StreamSplitDataIterator._active_epoch is a plain attribute (no lock) — split iterators are passed to @ray.remote tasks, and a threading.Lock would make them unpicklable. The single-consumer convention plus the strict happens-before ordering (start_epoch → first yield → consumer exit → _on_iteration_end) makes this safe.

Lingering iter() reference caveat

There is a known case the try/finally cannot cover:

it = iter(ds.iter_batches())
for i, batch in enumerate(it):
    if i == 0:
        break
# the iterator sticks around until ``it`` goes out of scope;
# only ``it.close()`` guarantees the ``finally`` runs.

When the user keeps an explicit reference to the iterator, Python doesn't fire GeneratorExit on break — cleanup is deferred until the reference is dropped. Some libraries (e.g. PyTorch Lightning's batch fetchers) do this internally and the user can't control it. There's no general fix in code, so the docstrings on Dataset.iter_batches and DataIterator.iter_batches document the caveat and recommend calling it.close() for callers who hold the iterator reference themselves. it.close() is Python's built-in generator method — it raises GeneratorExit at the current yield point and runs our finally.

Test plan

Local iterator path (test_iterator.py):

  • test_iter_batches_early_exit_shuts_down_executorbreak after first batch; assert executor._shutdown is True.
  • test_iter_batches_full_iteration_shuts_down_executor — natural exhaustion still shuts down (regression guard).
  • test_iter_batches_exception_shuts_down_executor — exception inside the consumer loop still shuts down.
  • test_iter_batches_close_on_held_iterator_shuts_down_executor — caller holds an iter() reference; it.close() triggers shutdown (covers the documented escape hatch for libraries like Lightning).

Streaming-split path (test_streaming_integration.py):

  • test_streaming_split_early_exit_shuts_down_executor — both splits break early; uses a slow per-row map so natural completion takes ~28 s, proving the executor shuts down via the consumer-thread hook (10 s wait).
  • test_streaming_split_partial_early_exit_keeps_executor — only one split breaks early; the other consumes all remaining batches (executor stays alive); shutdown happens once the second split also disengages.

Existing iterator + streaming-integration tests still pass locally.

When user code breaks out of `iter_batches()` early (e.g. `break` in a
for-loop, or Lightning's `limit_train_batches`), the dataset was not
marked as finished. The streaming executor's worker thread kept
producing blocks that piled up in the object store, and the executor
held resources that could starve other datasets (e.g. a validation
dataset waiting to run).

The inner `_ClosingIterator` only shuts down the executor through its
`__del__`, which is non-deterministic — generator references may
linger, leaving the executor alive long after iteration stops.

Add a `try/finally` around `yield from batch_iterator` in
`DataIterator._iter_batches` so the executor is shut down eagerly on
StopIteration, GeneratorExit (early break), and exceptions.
`StreamingExecutor.shutdown` is idempotent, so this is safe alongside
the existing natural-completion shutdown path. `StreamSplitDataIterator`
returns `executor=None`, so its multi-shard semantics are unaffected.

Test plan: three regression tests covering early break, full
iteration, and exception paths, each asserting `executor._shutdown`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces eager shutdown of the streaming executor during batch iteration by wrapping the process in a try-finally block, ensuring resources are released even on early exits or exceptions. Corresponding unit tests were added to validate shutdown behavior across various scenarios. Feedback was provided regarding the placement of the iteration time metric recording, which currently includes the shutdown duration and may lead to inaccurate performance data.

Comment thread python/ray/data/iterator.py Outdated
xinyuangui2 and others added 2 commits April 27, 2026 15:04
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
@xinyuangui2 xinyuangui2 marked this pull request as ready for review April 27, 2026 22:20
@xinyuangui2 xinyuangui2 requested a review from a team as a code owner April 27, 2026 22:20
@xinyuangui2 xinyuangui2 added the go add ONLY when ready to merge, run all tests label Apr 27, 2026
@xinyuangui2 xinyuangui2 requested a review from justinvyu April 27, 2026 22:22
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Apr 28, 2026
Comment thread python/ray/data/iterator.py Outdated
yield from batch_iterator
if stats:
stats.iter_total_s.add(time.perf_counter() - time_start)
finally:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, finally doesn't trigger in this case:

it = iter(ds.iter_batches())
for i, batch in enumerate(it):
    if i == 0:
        break

# the iterator sticks around until it goes out of scope
it.close()  # only way to guarantee that finally happens

I can't think of a good way to solve this case so we should just followup with a warning on our docs to not keep a iterator reference around and to just stick with the for loop.

One other problem is that other libraries like lightning could keep an iter() reference around which is outside of user control:

https://github.com/Lightning-AI/pytorch-lightning/blob/0e20e15f2376f4f356470b08875639a945c43334/src/lightning/pytorch/loops/fetchers.py#L79-L83

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to keep this "leak". Since users keep the iterator reference, they might use this later.

xinyuangui2 and others added 2 commits April 29, 2026 17:12
The previous fix only covered the local `ds.iter_batches()` path because
`StreamSplitDataIterator._to_ref_bundle_iterator` returns ``executor=None`` —
the executor lives on the remote ``SplitCoordinator`` actor.

Add a ``try/finally`` around the inner ``gen_blocks`` generator that
notifies the coordinator (``client_disengaged``) when a split stops
consuming for an epoch, whether due to normal exhaustion, early ``break``,
or an exception. The coordinator clears that split's prefetch state and,
once every split has disengaged, shuts down the executor so it stops
producing blocks into the object store. Shutdown is dispatched outside
the lock since ``StreamingExecutor.shutdown`` joins the scheduling thread
for up to 2s.

Also document the lingering-``iter()``-reference caveat: holding a
reference to ``iter(ds.iter_batches())`` defers cleanup until that
reference is dropped, since the for-loop can't fire ``GeneratorExit``.
Recommend ``it.close()`` for callers (including libraries like PyTorch
Lightning) that keep iterator references internally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous commit's ``gen_blocks`` ``finally`` doesn't fire promptly on
early ``break``: ``gen_blocks`` runs inside ``make_async_gen``'s filling
worker thread, which on interrupt exits without explicitly closing the
generator — its cleanup is then bound to GC, which can be arbitrarily
delayed (10+ s in CI under load, causing
``test_streaming_split_early_exit_shuts_down_executor`` to flake).

Move the disengagement signal to a consumer-thread hook:

* New ``DataIterator._on_iteration_end`` hook (default no-op), called
  from ``_iter_batches``'s ``finally`` on the consumer thread.
* ``StreamSplitDataIterator`` records the active epoch in
  ``self._active_epoch`` once ``start_epoch`` returns and overrides
  ``_on_iteration_end`` to fire ``client_disengaged`` synchronously on
  the consumer thread (no lock — plain attribute access keeps the
  iterator picklable, since users pass split iterators to
  ``@ray.remote`` tasks).
* ``client_disengaged`` is now idempotent per ``(epoch, split_idx)``
  via a ``_disengaged_splits_in_epoch`` set, so the consumer-thread hook
  and ``gen_blocks``'s own (eventual) ``finally`` can both fire it
  without double-counting.

Strengthen the test to cover this case: a slow per-row map makes natural
completion ~28 s, so a 10 s ``wait_for_condition`` would time out unless
the shutdown is actually triggered by the consumer-thread hook.

Also rename the test-only helper ``is_executor_shutdown`` →
``_is_executor_shutdown`` per review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread python/ray/data/_internal/iterator/stream_split_iterator.py Outdated
Comment on lines +514 to +515
def client_disengaged(self, epoch_id: int, split_idx: int) -> None:
"""Called by a split iterator when it stops consuming for ``epoch_id``.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work if you have N threads on the split coordinator occupied by calls to SplitCoord.get()?

(We technically have N+1 threads to allow the controller to shutdown the data executor in the aborted run case, but are we also relying on this 1 extra thread here?)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the N pending SplitCoord.get() tasks, do we expect those to all error out once the executor shuts down? Can those errors propagate up to the user and possibly crash the user code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @justinvyu offline, the consumer thread is not likely to propagate the error out.

Comment thread python/ray/data/iterator.py Outdated
# ``shutdown`` is idempotent.
if executor is not None:
executor.shutdown(force=False)
# ``StreamSplitDataIterator`` uses this hook to disengage

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goutamvenkat-anyscale tried adding a similar "client finished" remote call but it was in the gen_blocks try/finally which is not guaranteed to run immediately if the client breaks out in the main thread because it the gen_blocks thread would only exit once it gets to the next yield call. And after that it might need to wait for gc as well.

The try/finally here should always happen immediately (except for the leaked iterator reference case).

xinyuangui2 and others added 2 commits April 30, 2026 19:22
Three follow-up changes from PR review:

1. Move executor shutdown into the ``_on_iteration_end`` hook so
   ``_iter_batches``'s ``finally`` has a single cleanup line. The hook
   now takes ``executor`` and the default implementation calls
   ``executor.shutdown(force=False)``. ``StreamSplitDataIterator``'s
   override accepts the arg (always ``None`` for that path) and
   continues to fire ``client_disengaged``.

2. Remove ``gen_blocks``'s ``try/finally`` in ``StreamSplitDataIterator``.
   That was a redundant second path: the consumer-thread
   ``_on_iteration_end`` hook already covers every exit mode promptly,
   and ``gen_blocks``'s ``finally`` was GC-bound on early ``break``
   anyway. ``_on_iteration_end`` now also resets ``_active_epoch`` to
   ``None`` (previously done in the removed ``finally``). Updated
   surrounding comments — ``client_disengaged``'s idempotency now
   guards only against duplicate/stale notifications, not dual-path
   firing.

3. Add ``test_iter_batches_close_on_held_iterator_shuts_down_executor``
   covering the documented escape hatch: when the caller holds an
   ``iter()`` reference, ``it.close()`` triggers the ``finally`` and
   shuts the executor down.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to clarify the problem we are solving in this PR. We only solve this following case in the PR, not the pytorch lightning limit_train_batches which falls under the "iterator leak" category which we don't have control over. I removed the mention of limit_train_batches the PR description.

import ray.data
from ray.train import ScalingConfig
import time
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

epochs = 2
train_dataloader = ray.data.from_items([{"x": 1} for _ in range(1000)])
val_dataloader = ray.data.from_items([{"x": 2} for _ in range(1000)])


def train_func():
    train_dataloader = ray.train.get_dataset_shard("train").iter_batches(batch_size=100)
    val_dataloader = ray.train.get_dataset_shard("val").iter_batches(batch_size=100)

    for epoch in range(epochs):
        print("Train epoch", epoch)
        for batch in train_dataloader:
            ...
            if epoch == 1:
                time.sleep(10)
        print("Val epoch", epoch)
        for i, batch in enumerate(val_dataloader):
            if i == 1:
                break
        import gc

        gc.collect()


trainer = DataParallelTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": train_dataloader, "val": val_dataloader},
)
trainer.fit()

# Val epoch 0 executor hangs around for the entire duration of training epoch 1
# Only shuts down when validation epoch 1 starts.
# See that the val epoch 0 execution takes a long time to finish since it needs to wait for the long training epoch 1 to finish.
# (pid=32682) ✔️  Dataset val_4_0 execution finished in 51.27 seconds: : 620 row [00:51, 12.1 row/s]
Comment thread python/ray/data/iterator.py Outdated
Comment thread python/ray/data/_internal/iterator/stream_split_iterator.py Outdated
Comment thread python/ray/data/_internal/iterator/stream_split_iterator.py Outdated
Comment thread python/ray/data/dataset.py Outdated
xinyuangui2 and others added 3 commits May 13, 2026 16:05
…hecks

- Rename ``client_disengaged`` → ``notify_split_finished``,
  ``_disengaged_splits_in_epoch`` → ``_finished_splits``, and
  ``_unfinished_clients_in_epoch`` → ``_num_unarrived_splits_at_barrier``
  to clarify the role of each coordinator-side counter.
- Drop the per-``(epoch, split_idx)`` idempotency early-return in
  ``notify_split_finished``: ``set.add`` plus idempotent bookkeeping
  already make duplicate notifications harmless. Rewrite the
  stale-epoch comment to explain the realistic interleaving (fire-and-
  forget notify vs. ``_try_start_new_epoch`` from other splits).
- Replace the "call ``it.close()``" Lightning advice in the
  ``Dataset.iter_batches`` and ``DataIterator.iter_batches`` docstrings
  with a ``ds.limit(n)`` recommendation, which users can apply without
  customizing Lightning internals.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
``_finished_splits`` is a set of split indices in ``[0, n)``, so
``len(...) == self._n`` is equivalent to ``>=`` and reads more
clearly as "every split has finished."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xinyuangui2 xinyuangui2 requested a review from justinvyu May 14, 2026 01:09
Comment thread python/ray/data/_internal/iterator/stream_split_iterator.py Outdated
Comment thread python/ray/data/iterator.py Outdated
Comment thread python/ray/data/_internal/iterator/stream_split_iterator.py Outdated
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
@justinvyu justinvyu enabled auto-merge (squash) May 14, 2026 23:32
@justinvyu justinvyu merged commit b569195 into ray-project:master May 15, 2026
7 checks passed
TruongQuangPhat pushed a commit to cyhapun/ray-fix-issue that referenced this pull request May 27, 2026
…62949)

When user code breaks out of `iter_batches()` early (`break` in a
for-loop), the dataset was not marked as finished. The streaming
executor's worker thread kept producing blocks that piled up in the
object store, and the executor held resources that could starve other
datasets (e.g. a validation dataset waiting to run).

The inner `_ClosingIterator` only shuts down the executor through its
`__del__`, which is non-deterministic — generator references may linger,
leaving the executor alive long after iteration stops.

Add a `try/finally` around `yield from batch_iterator` in
`DataIterator._iter_batches`, calling a new
`_on_iteration_end(executor)` hook on `StopIteration`, `GeneratorExit`
(early break), and exceptions.

- The default implementation calls `executor.shutdown(force=False)`
(idempotent), covering the local `iter_batches()` path.
- `StreamSplitDataIterator` overrides it to fire
`client_disengaged(epoch, split_idx)` on the remote `SplitCoordinator`,
since its executor lives on the actor and the iterator returns
`executor=None`.

Both run synchronously on the **consumer's thread**, so cleanup happens
the moment the consumer stops pulling.

---------

Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants