[data] Fix iter_batches spilling (2/n): Replace inner format/collate make_async_gen with iter_threaded#63682
Conversation
…terator Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…sumer exit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…invyu/replace-outer-make-async-gen
…kers Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request replaces the usage of make_async_gen with a new iter_threaded utility in iter_batches.py and implements it in util.py to handle multi-threaded iteration over a shared base iterator. The reviewer suggested checking if the generator has stopped (stopped.is_set()) inside _locked_next under the lock to prevent background workers from fetching more items after the consumer has already stopped.
…invyu/replace-inner-make-async-gen
…s) -> more in-flight rather than bursty collate pipeline Signed-off-by: Justin Yu <justinvyu@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.
Reviewed by Cursor Bugbot for commit b793d97. Configure here.
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Investigation: Inner collate buffer size — latency variance vs hidden memoryTraining ingest regression test results:
The decision: 1. The memory trade — still a 2× reduction vs masterThe change adds N − 1 hidden Plasma slots back, but the cut vs master is what matters.
At N=4: master holds 16, PR (buf=N) holds 8 — still a 2× reduction. The shipping config gives up about half of the maximum theoretical reduction (4N → N+1 would have been ~16×) in exchange for the H2D 2. Latency variance reductionThe inner output buffer is the H2D fan-out smoother. When the outer worker pulls from inner and runs
Measured on the memory variant: next-batch steady 53.77 ms (± 48.75) with buf=1 → 1.25 ms (± 0.17) with buf=N. That's a 43× mean reduction and 287× stdev reduction. Even master's 1.67 ms is beat. The 3. The benchmark under-represents the benefit — heavy collate would gain moreThe test uses the default Users with non-trivial collate (custom preprocessing inside collate_fn,
So the H2D-smoothing latency win we measured (cheap collate) is the low end of what Net decision
Still a 2× reduction in untracked Plasma vs master (4N → 2N). Ship. |
Why master's
|
rayhhome
left a comment
There was a problem hiding this comment.
Implementation and tests both LGTM!
…e `make_async_gen` with `iter_threaded` (ray-project#63682) Replaces the inner format/collate `make_async_gen` with `iter_threaded` from ray-project#63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction). - `_format_in_threadpool` runs format + collate across a threadpool via `make_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False)`. With the default `buffer_size=1`, this allocates one shared input queue of size `(buffer_size + 1) * num_workers` and `num_workers` per-worker output queues of size `buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool, none of which are visible to the resource manager. - These buffered batches are pre-format `pa.Table.slice()` views that pin their **full** source blocks in the object store (`pa.Table.slice` is zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling. - Replace with `iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in this stack to take a required `fn` and `num_workers`). Workers share `batch_iter` under a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other's `put()` when collate is non-trivial. In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e behind `preserve_order` (#63792) Part of the iter_batches consumer pipeline cleanup (#63660, #63682). Gates restore_original_order behind `DataContext.execution_options.preserve_order` (default off). When one format/collate worker lags, the reorder buffer grows with the other workers' completed batches, and ready batches aren't allowed to be yielded; this PR skips the reorder step when ordering isn't required. Recovers next-batch latency from PR1+2's regressed 113 ms steady back to 23 ms (lower than master's 32 ms), with no other regressions. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e `make_async_gen` with `iter_threaded` (ray-project#63682) Replaces the inner format/collate `make_async_gen` with `iter_threaded` from ray-project#63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction). - `_format_in_threadpool` runs format + collate across a threadpool via `make_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False)`. With the default `buffer_size=1`, this allocates one shared input queue of size `(buffer_size + 1) * num_workers` and `num_workers` per-worker output queues of size `buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool, none of which are visible to the resource manager. - These buffered batches are pre-format `pa.Table.slice()` views that pin their **full** source blocks in the object store (`pa.Table.slice` is zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling. - Replace with `iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in this stack to take a required `fn` and `num_workers`). Workers share `batch_iter` under a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other's `put()` when collate is non-trivial. In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e behind `preserve_order` (ray-project#63792) Part of the iter_batches consumer pipeline cleanup (ray-project#63660, ray-project#63682). Gates restore_original_order behind `DataContext.execution_options.preserve_order` (default off). When one format/collate worker lags, the reorder buffer grows with the other workers' completed batches, and ready batches aren't allowed to be yielded; this PR skips the reorder step when ordering isn't required. Recovers next-batch latency from PR1+2's regressed 113 ms steady back to 23 ms (lower than master's 32 ms), with no other regressions. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e `make_async_gen` with `iter_threaded` (ray-project#63682) Replaces the inner format/collate `make_async_gen` with `iter_threaded` from ray-project#63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction). - `_format_in_threadpool` runs format + collate across a threadpool via `make_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False)`. With the default `buffer_size=1`, this allocates one shared input queue of size `(buffer_size + 1) * num_workers` and `num_workers` per-worker output queues of size `buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool, none of which are visible to the resource manager. - These buffered batches are pre-format `pa.Table.slice()` views that pin their **full** source blocks in the object store (`pa.Table.slice` is zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling. - Replace with `iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in this stack to take a required `fn` and `num_workers`). Workers share `batch_iter` under a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other's `put()` when collate is non-trivial. In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e behind `preserve_order` (ray-project#63792) Part of the iter_batches consumer pipeline cleanup (ray-project#63660, ray-project#63682). Gates restore_original_order behind `DataContext.execution_options.preserve_order` (default off). When one format/collate worker lags, the reorder buffer grows with the other workers' completed batches, and ready batches aren't allowed to be yielded; this PR skips the reorder step when ordering isn't required. Recovers next-batch latency from PR1+2's regressed 113 ms steady back to 23 ms (lower than master's 32 ms), with no other regressions. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>






Description
Replaces the inner format/collate
make_async_genwithiter_threadedfrom #63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction)._format_in_threadpoolruns format + collate across a threadpool viamake_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False). With the defaultbuffer_size=1, this allocates one shared input queue of size(buffer_size + 1) * num_workersandnum_workersper-worker output queues of sizebuffer_size— fornum_workers=4, that is 8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16 batches buffered inside the threadpool, none of which are visible to the resource manager.pa.Table.slice()views that pin their full source blocks in the object store (pa.Table.sliceis zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling.iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers)from PR 1 (generalized in this stack to take a requiredfnandnum_workers). Workers sharebatch_iterunder a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other'sput()when collate is non-trivial. In-flight is now bounded to ~2 × num_workers ≈ 8 (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches.Problem illustrated
See problems 2 and 3 below. This PR mitigates problem 2 by reducing the impact of problem 3 -- store fewer refs to shared memory in unnecessary buffers.
Release test results
backpressure_benchmarkbackpressure_benchmark.single_nodebackpressure_benchmark.multi_nodetraining_ingest_regression_testtraining_ingest_regression_test.peak_object_store_memory(sleep=2s,prefetch_batches=4,num_workers=4)training_ingest_regression_test.throughput(sleep=0s,prefetch_batches=4,num_workers=4)Headline wins per variant:
peak_object_store_memoryvariant: −41% peak Plasma (136 → 80 GiB), throughput tied, next-batch latency improved (1.67 → 1.25 ms), runtime +8% (within noise window for this consumer-bound run).throughputvariant: −36% peak Plasma (115 → 74 GiB), +20% throughput steady (1017 → 1222 rows/s), step avg dropped −23%, next-batch latency regression (32 → 113 ms) — the remaining tail PR3 (restore_original_ordergating) is meant to fix.
Follow-up work
2 * num_workersbatches still pin their source blocks without the distributed reference counter seeing them. With huge batches, largeprefetch_batches, and many workers per node, this can still be too much. Follow-up PRs should fix the accounting gap to have no remaining "hidden buffers" on the consumer side.iter_batches(3/n): Gaterestore_original_orderin iter_batches consumer pipeline behindpreserve_order#63792 needs to be merged as a followup. This is becauserestore_original_orderintroduces a slight regression becausemake_async_genround-robin pull roughly kept the batch output order the same as the input order. This meant thatrestore_original_orderwas a no-op most of the time before. Now, the threadpool doesn't have any ordering guarantees, so therestore_original_orderstall is more significant.Appendix
Alternative considered: detaching blocks to heap memory
I also prototyped copying resolved Arrow blocks out of Plasma into process heap (via
pa.concat_arrayson each column) insideprefetch_batches_locally, then immediatelydel'ing theObjectRef. This would eliminate untracked pinning entirely — the resolved block becomes heap-backed and the Plasma segment can be reclaimed as soon as the ref drops.Decided against it for this PR:
noop collate → torch finalize.to("cuda")flow, finalize's H2D transfer is itself a copy that severs the Plasma dependency. An upstream heap copy duplicates that data movement (Plasma → CPU heap → GPU) for no extra unpinning benefit — just slower per-batch latency.The bounded-buffer approach in this PR gets most of the win (3× reduction in pinned batches) with no copy cost, and leaves the principled fix on the table.