[Data] Shut down executor when DataIterator exits early#62949
Conversation
When user code breaks out of `iter_batches()` early (e.g. `break` in a for-loop, or Lightning's `limit_train_batches`), the dataset was not marked as finished. The streaming executor's worker thread kept producing blocks that piled up in the object store, and the executor held resources that could starve other datasets (e.g. a validation dataset waiting to run). The inner `_ClosingIterator` only shuts down the executor through its `__del__`, which is non-deterministic — generator references may linger, leaving the executor alive long after iteration stops. Add a `try/finally` around `yield from batch_iterator` in `DataIterator._iter_batches` so the executor is shut down eagerly on StopIteration, GeneratorExit (early break), and exceptions. `StreamingExecutor.shutdown` is idempotent, so this is safe alongside the existing natural-completion shutdown path. `StreamSplitDataIterator` returns `executor=None`, so its multi-shard semantics are unaffected. Test plan: three regression tests covering early break, full iteration, and exception paths, each asserting `executor._shutdown`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces eager shutdown of the streaming executor during batch iteration by wrapping the process in a try-finally block, ensuring resources are released even on early exits or exceptions. Corresponding unit tests were added to validate shutdown behavior across various scenarios. Feedback was provided regarding the placement of the iteration time metric recording, which currently includes the shutdown duration and may lead to inaccurate performance data.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
| yield from batch_iterator | ||
| if stats: | ||
| stats.iter_total_s.add(time.perf_counter() - time_start) | ||
| finally: |
There was a problem hiding this comment.
Btw, finally doesn't trigger in this case:
it = iter(ds.iter_batches())
for i, batch in enumerate(it):
if i == 0:
break
# the iterator sticks around until it goes out of scope
it.close() # only way to guarantee that finally happensI can't think of a good way to solve this case so we should just followup with a warning on our docs to not keep a iterator reference around and to just stick with the for loop.
One other problem is that other libraries like lightning could keep an iter() reference around which is outside of user control:
There was a problem hiding this comment.
I think it is fine to keep this "leak". Since users keep the iterator reference, they might use this later.
The previous fix only covered the local `ds.iter_batches()` path because `StreamSplitDataIterator._to_ref_bundle_iterator` returns ``executor=None`` — the executor lives on the remote ``SplitCoordinator`` actor. Add a ``try/finally`` around the inner ``gen_blocks`` generator that notifies the coordinator (``client_disengaged``) when a split stops consuming for an epoch, whether due to normal exhaustion, early ``break``, or an exception. The coordinator clears that split's prefetch state and, once every split has disengaged, shuts down the executor so it stops producing blocks into the object store. Shutdown is dispatched outside the lock since ``StreamingExecutor.shutdown`` joins the scheduling thread for up to 2s. Also document the lingering-``iter()``-reference caveat: holding a reference to ``iter(ds.iter_batches())`` defers cleanup until that reference is dropped, since the for-loop can't fire ``GeneratorExit``. Recommend ``it.close()`` for callers (including libraries like PyTorch Lightning) that keep iterator references internally. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous commit's ``gen_blocks`` ``finally`` doesn't fire promptly on early ``break``: ``gen_blocks`` runs inside ``make_async_gen``'s filling worker thread, which on interrupt exits without explicitly closing the generator — its cleanup is then bound to GC, which can be arbitrarily delayed (10+ s in CI under load, causing ``test_streaming_split_early_exit_shuts_down_executor`` to flake). Move the disengagement signal to a consumer-thread hook: * New ``DataIterator._on_iteration_end`` hook (default no-op), called from ``_iter_batches``'s ``finally`` on the consumer thread. * ``StreamSplitDataIterator`` records the active epoch in ``self._active_epoch`` once ``start_epoch`` returns and overrides ``_on_iteration_end`` to fire ``client_disengaged`` synchronously on the consumer thread (no lock — plain attribute access keeps the iterator picklable, since users pass split iterators to ``@ray.remote`` tasks). * ``client_disengaged`` is now idempotent per ``(epoch, split_idx)`` via a ``_disengaged_splits_in_epoch`` set, so the consumer-thread hook and ``gen_blocks``'s own (eventual) ``finally`` can both fire it without double-counting. Strengthen the test to cover this case: a slow per-row map makes natural completion ~28 s, so a 10 s ``wait_for_condition`` would time out unless the shutdown is actually triggered by the consumer-thread hook. Also rename the test-only helper ``is_executor_shutdown`` → ``_is_executor_shutdown`` per review. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| def client_disengaged(self, epoch_id: int, split_idx: int) -> None: | ||
| """Called by a split iterator when it stops consuming for ``epoch_id``. |
There was a problem hiding this comment.
How does this work if you have N threads on the split coordinator occupied by calls to SplitCoord.get()?
(We technically have N+1 threads to allow the controller to shutdown the data executor in the aborted run case, but are we also relying on this 1 extra thread here?)
There was a problem hiding this comment.
For the N pending SplitCoord.get() tasks, do we expect those to all error out once the executor shuts down? Can those errors propagate up to the user and possibly crash the user code?
There was a problem hiding this comment.
Discussed with @justinvyu offline, the consumer thread is not likely to propagate the error out.
| # ``shutdown`` is idempotent. | ||
| if executor is not None: | ||
| executor.shutdown(force=False) | ||
| # ``StreamSplitDataIterator`` uses this hook to disengage |
There was a problem hiding this comment.
@goutamvenkat-anyscale tried adding a similar "client finished" remote call but it was in the gen_blocks try/finally which is not guaranteed to run immediately if the client breaks out in the main thread because it the gen_blocks thread would only exit once it gets to the next yield call. And after that it might need to wait for gc as well.
The try/finally here should always happen immediately (except for the leaked iterator reference case).
Three follow-up changes from PR review: 1. Move executor shutdown into the ``_on_iteration_end`` hook so ``_iter_batches``'s ``finally`` has a single cleanup line. The hook now takes ``executor`` and the default implementation calls ``executor.shutdown(force=False)``. ``StreamSplitDataIterator``'s override accepts the arg (always ``None`` for that path) and continues to fire ``client_disengaged``. 2. Remove ``gen_blocks``'s ``try/finally`` in ``StreamSplitDataIterator``. That was a redundant second path: the consumer-thread ``_on_iteration_end`` hook already covers every exit mode promptly, and ``gen_blocks``'s ``finally`` was GC-bound on early ``break`` anyway. ``_on_iteration_end`` now also resets ``_active_epoch`` to ``None`` (previously done in the removed ``finally``). Updated surrounding comments — ``client_disengaged``'s idempotency now guards only against duplicate/stale notifications, not dual-path firing. 3. Add ``test_iter_batches_close_on_held_iterator_shuts_down_executor`` covering the documented escape hatch: when the caller holds an ``iter()`` reference, ``it.close()`` triggers the ``finally`` and shuts the executor down. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
justinvyu
left a comment
There was a problem hiding this comment.
Needed to clarify the problem we are solving in this PR. We only solve this following case in the PR, not the pytorch lightning limit_train_batches which falls under the "iterator leak" category which we don't have control over. I removed the mention of limit_train_batches the PR description.
import ray.data
from ray.train import ScalingConfig
import time
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer
epochs = 2
train_dataloader = ray.data.from_items([{"x": 1} for _ in range(1000)])
val_dataloader = ray.data.from_items([{"x": 2} for _ in range(1000)])
def train_func():
train_dataloader = ray.train.get_dataset_shard("train").iter_batches(batch_size=100)
val_dataloader = ray.train.get_dataset_shard("val").iter_batches(batch_size=100)
for epoch in range(epochs):
print("Train epoch", epoch)
for batch in train_dataloader:
...
if epoch == 1:
time.sleep(10)
print("Val epoch", epoch)
for i, batch in enumerate(val_dataloader):
if i == 1:
break
import gc
gc.collect()
trainer = DataParallelTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": train_dataloader, "val": val_dataloader},
)
trainer.fit()
# Val epoch 0 executor hangs around for the entire duration of training epoch 1
# Only shuts down when validation epoch 1 starts.
# See that the val epoch 0 execution takes a long time to finish since it needs to wait for the long training epoch 1 to finish.
# (pid=32682) ✔️ Dataset val_4_0 execution finished in 51.27 seconds: : 620 row [00:51, 12.1 row/s]…hecks - Rename ``client_disengaged`` → ``notify_split_finished``, ``_disengaged_splits_in_epoch`` → ``_finished_splits``, and ``_unfinished_clients_in_epoch`` → ``_num_unarrived_splits_at_barrier`` to clarify the role of each coordinator-side counter. - Drop the per-``(epoch, split_idx)`` idempotency early-return in ``notify_split_finished``: ``set.add`` plus idempotent bookkeeping already make duplicate notifications harmless. Rewrite the stale-epoch comment to explain the realistic interleaving (fire-and- forget notify vs. ``_try_start_new_epoch`` from other splits). - Replace the "call ``it.close()``" Lightning advice in the ``Dataset.iter_batches`` and ``DataIterator.iter_batches`` docstrings with a ``ds.limit(n)`` recommendation, which users can apply without customizing Lightning internals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
``_finished_splits`` is a set of split indices in ``[0, n)``, so ``len(...) == self._n`` is equivalent to ``>=`` and reads more clearly as "every split has finished." Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
…62949) When user code breaks out of `iter_batches()` early (`break` in a for-loop), the dataset was not marked as finished. The streaming executor's worker thread kept producing blocks that piled up in the object store, and the executor held resources that could starve other datasets (e.g. a validation dataset waiting to run). The inner `_ClosingIterator` only shuts down the executor through its `__del__`, which is non-deterministic — generator references may linger, leaving the executor alive long after iteration stops. Add a `try/finally` around `yield from batch_iterator` in `DataIterator._iter_batches`, calling a new `_on_iteration_end(executor)` hook on `StopIteration`, `GeneratorExit` (early break), and exceptions. - The default implementation calls `executor.shutdown(force=False)` (idempotent), covering the local `iter_batches()` path. - `StreamSplitDataIterator` overrides it to fire `client_disengaged(epoch, split_idx)` on the remote `SplitCoordinator`, since its executor lives on the actor and the iterator returns `executor=None`. Both run synchronously on the **consumer's thread**, so cleanup happens the moment the consumer stops pulling. --------- Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
When user code breaks out of
iter_batches()early (breakin a for-loop), the dataset was not marked as finished. The streaming executor's worker thread kept producing blocks that piled up in the object store, and the executor held resources that could starve other datasets (e.g. a validation dataset waiting to run).The inner
_ClosingIteratoronly shuts down the executor through its__del__, which is non-deterministic — generator references may linger, leaving the executor alive long after iteration stops.Single deterministic shutdown path:
_on_iteration_endAdd a
try/finallyaroundyield from batch_iteratorinDataIterator._iter_batches, calling a new_on_iteration_end(executor)hook onStopIteration,GeneratorExit(early break), and exceptions.executor.shutdown(force=False)(idempotent), covering the localiter_batches()path.StreamSplitDataIteratoroverrides it to fireclient_disengaged(epoch, split_idx)on the remoteSplitCoordinator, since its executor lives on the actor and the iterator returnsexecutor=None.Both run synchronously on the consumer's thread, so cleanup happens the moment the consumer stops pulling.
Why
GeneratorExitmatters here_iter_batchesis a generator (ityield froms the batch iterator). When the consumer abandons the generator early — either bybreak-ing out of the for-loop, or by letting the generator go out of scope — Python's generator protocol throwsGeneratorExitinto it at the currentyieldpoint. Without thetry/finally, thatGeneratorExitunwinds straight out and the cleanup afteryield fromnever runs; the executor keeps producing blocks until_ClosingIterator.__del__eventually fires, which can be arbitrarily delayed. With thetry/finally,GeneratorExittriggers_on_iteration_endsynchronously on the consumer's thread, giving us one deterministic shutdown path for all three exit modes.Why the consumer thread, not
gen_blocks'sfinallyStreamSplitDataIterator's innergen_blocksgenerator runs insidemake_async_gen's filling worker thread. On earlybreak, that worker exits via theinterrupted_eventpath without explicitly closing the generator — its cleanup is then GC-bound and can be arbitrarily delayed under CI load (10+ s, causing flakes). The consumer-side hook avoids that entirely.SplitCoordinatorcleanupOn each
client_disengaged(epoch_id, split_idx)call, the coordinator:RefBundleso remaining splits don't suffer stale backpressure or pinned memory.StreamingExecutor.shutdownjoins the scheduling thread for up to 2 s.(epoch_id, split_idx)as defense against duplicate/stale notifications (e.g. notifications arriving after the next epoch has started).StreamSplitDataIterator._active_epochis a plain attribute (no lock) — split iterators are passed to@ray.remotetasks, and athreading.Lockwould make them unpicklable. The single-consumer convention plus the strict happens-before ordering (start_epoch→ firstyield→ consumer exit →_on_iteration_end) makes this safe.Lingering
iter()reference caveatThere is a known case the
try/finallycannot cover:When the user keeps an explicit reference to the iterator, Python doesn't fire
GeneratorExitonbreak— cleanup is deferred until the reference is dropped. Some libraries (e.g. PyTorch Lightning's batch fetchers) do this internally and the user can't control it. There's no general fix in code, so the docstrings onDataset.iter_batchesandDataIterator.iter_batchesdocument the caveat and recommend callingit.close()for callers who hold the iterator reference themselves.it.close()is Python's built-in generator method — it raisesGeneratorExitat the currentyieldpoint and runs ourfinally.Test plan
Local iterator path (
test_iterator.py):test_iter_batches_early_exit_shuts_down_executor—breakafter first batch; assertexecutor._shutdown is True.test_iter_batches_full_iteration_shuts_down_executor— natural exhaustion still shuts down (regression guard).test_iter_batches_exception_shuts_down_executor— exception inside the consumer loop still shuts down.test_iter_batches_close_on_held_iterator_shuts_down_executor— caller holds aniter()reference;it.close()triggers shutdown (covers the documented escape hatch for libraries like Lightning).Streaming-split path (
test_streaming_integration.py):test_streaming_split_early_exit_shuts_down_executor— both splits break early; uses a slow per-row map so natural completion takes ~28 s, proving the executor shuts down via the consumer-thread hook (10 s wait).test_streaming_split_partial_early_exit_keeps_executor— only one split breaks early; the other consumes all remaining batches (executor stays alive); shutdown happens once the second split also disengages.Existing iterator + streaming-integration tests still pass locally.