Skip to content

[data] Fix iter_batches spilling (2/n): Replace inner format/collate make_async_gen with iter_threaded#63682

Merged
justinvyu merged 10 commits into
ray-project:masterfrom
justinvyu:justinvyu/replace-inner-make-async-gen
Jun 4, 2026
Merged

[data] Fix iter_batches spilling (2/n): Replace inner format/collate make_async_gen with iter_threaded#63682
justinvyu merged 10 commits into
ray-project:masterfrom
justinvyu:justinvyu/replace-inner-make-async-gen

Conversation

@justinvyu

@justinvyu justinvyu commented May 27, 2026

Copy link
Copy Markdown
Contributor

Description

Replaces the inner format/collate make_async_gen with iter_threaded from #63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction).

  • _format_in_threadpool runs format + collate across a threadpool via make_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False). With the default buffer_size=1, this allocates one shared input queue of size (buffer_size + 1) * num_workers and num_workers per-worker output queues of size buffer_size — for num_workers=4, that is 8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16 batches buffered inside the threadpool, none of which are visible to the resource manager.
  • These buffered batches are pre-format pa.Table.slice() views that pin their full source blocks in the object store (pa.Table.slice is zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling.
  • Replace with iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers) from PR 1 (generalized in this stack to take a required fn and num_workers). Workers share batch_iter under a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other's put() when collate is non-trivial. In-flight is now bounded to ~2 × num_workers ≈ 8 (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches.

Problem illustrated

See problems 2 and 3 below. This PR mitigates problem 2 by reducing the impact of problem 3 -- store fewer refs to shared memory in unnecessary buffers.

image

Release test results

backpressure_benchmark

backpressure_benchmark.single_node

Metric Master PR1 PR1+2
runtime (s) 1039.86 1035.94 1035.63
peak obj store (GB) 9.50 7.63 7.13
util peak 0.9896 0.7943 0.7422
spilled (GB) 5.88 0.0 0.0

backpressure_benchmark.multi_node

Metric Master PR1 PR1+2
runtime (s) 144.47 142.11 141.77
peak obj store (GB) 72.25 63.88 57.38
util peak 0.8362 0.7393 0.6641
spilled (GB) 63.88 14.00 0.0

training_ingest_regression_test

training_ingest_regression_test.peak_object_store_memory (sleep=2s, prefetch_batches=4, num_workers=4)

Metric Master PR1+2
total runtime (s) 133.67 ± 2.76 145.07 ± 4.54
throughput steady (rows/s) 460.52 ± 3.97 467.80 ± 5.88
next-batch steady (ms) 1.67 ± 0.22 1.25 ± 0.17
step avg (ms) 2213.92 ± 22.32 2179.76 ± 25.41
peak obj store (GiB) 136.19 ± 0.32 79.89 ± 0.27
peak obj store util 0.693 0.640
spilled (GiB) 0.0 0.0

training_ingest_regression_test.throughput (sleep=0s, prefetch_batches=4, num_workers=4)

Metric Master PR1+2
total runtime (s) 120.88 ± 1.19 119.07 ± 3.89
throughput steady (rows/s) 1017.33 ± 75.40 1222.37 ± 24.52
next-batch steady (ms) 32.54 ± 28.99 113.17 ± 26.13
step avg (ms) 923.61 ± 61.14 711.28 ± 11.51
peak obj store (GiB) 115.27 ± 12.44 74.06 ± 1.32
peak obj store util 0.587 0.593
spilled (GiB) 0.0 0.0

Headline wins per variant:

  • peak_object_store_memory variant: −41% peak Plasma (136 → 80 GiB), throughput tied, next-batch latency improved (1.67 → 1.25 ms), runtime +8% (within noise window for this consumer-bound run).
  • throughput variant: −36% peak Plasma (115 → 74 GiB), +20% throughput steady (1017 → 1222 rows/s), step avg dropped −23%, next-batch latency regression (32 → 113 ms) — the remaining tail PR3 (restore_original_order
    gating) is meant to fix.

Follow-up work

  1. This PR bounds untracked pinned object-store memory but does not eliminate it — ~2 * num_workers batches still pin their source blocks without the distributed reference counter seeing them. With huge batches, large prefetch_batches, and many workers per node, this can still be too much. Follow-up PRs should fix the accounting gap to have no remaining "hidden buffers" on the consumer side.
  2. [data] Fix iter_batches (3/n): Gate restore_original_order in iter_batches consumer pipeline behind preserve_order #63792 needs to be merged as a followup. This is because restore_original_order introduces a slight regression because make_async_gen round-robin pull roughly kept the batch output order the same as the input order. This meant that restore_original_order was a no-op most of the time before. Now, the threadpool doesn't have any ordering guarantees, so the restore_original_order stall is more significant.

Appendix

Alternative considered: detaching blocks to heap memory

I also prototyped copying resolved Arrow blocks out of Plasma into process heap (via pa.concat_arrays on each column) inside prefetch_batches_locally, then immediately del'ing the ObjectRef. This would eliminate untracked pinning entirely — the resolved block becomes heap-backed and the Plasma segment can be reclaimed as soon as the ref drops.

Decided against it for this PR:

  • Caps pipeline throughput at single-thread DRAM bandwidth. The copy becomes a serial stage running at ~10 GB/s on a typical x86 cloud VM (~13 ms per 128 MB block). It contends with format and the GPU H2D transfer for the same DRAM bus, so extra prefetching can't hide it — the stage is bandwidth-bound, not latency-bound, and parallel scaling tops out at ~2–3 threads on most cloud VMs.
  • Pure added latency on the common GPU path. For the default noop collate → torch finalize.to("cuda") flow, finalize's H2D transfer is itself a copy that severs the Plasma dependency. An upstream heap copy duplicates that data movement (Plasma → CPU heap → GPU) for no extra unpinning benefit — just slower per-batch latency.

The bounded-buffer approach in this PR gets most of the win (3× reduction in pinned batches) with no copy cost, and leaves the principled fix on the table.

justinvyu and others added 6 commits May 26, 2026 18:07
…terator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…sumer exit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…kers

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu requested a review from a team as a code owner May 27, 2026 23:06

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request replaces the usage of make_async_gen with a new iter_threaded utility in iter_batches.py and implements it in util.py to handle multi-threaded iteration over a shared base iterator. The reviewer suggested checking if the generator has stopped (stopped.is_set()) inside _locked_next under the lock to prevent background workers from fetching more items after the consumer has already stopped.

Comment thread python/ray/data/_internal/block_batching/util.py
@justinvyu

Copy link
Copy Markdown
Contributor Author

Single node backpressure_training_prefetch release test results: object store memory usage 7.0GiB -> 5.5GiB

Before:
Screenshot 2026-05-27 at 4 16 51 PM

After:
Screenshot 2026-05-27 at 4 16 06 PM

@bveeramani bveeramani self-assigned this Jun 2, 2026
…s) -> more in-flight rather than bursty collate pipeline

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu

Copy link
Copy Markdown
Contributor Author

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit b793d97. Configure here.

Comment thread python/ray/data/_internal/block_batching/util.py
Comment thread python/ray/data/_internal/block_batching/iter_batches.py
justinvyu added 2 commits June 3, 2026 11:30
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu

justinvyu commented Jun 3, 2026

Copy link
Copy Markdown
Contributor Author

Investigation: Inner collate buffer size — latency variance vs hidden memory

Training ingest regression test results:

Master PR1+2 (inner=1) PR1+2 (inner=N=4)
memory variant (sleep=2s)
peak (GiB) 136.19 ± 0.32 73.16 ± 0.16 79.89 ± 0.27
util 0.693 0.586 0.640
throughput steady (rows/s) 460.5 463.4 467.8 ± 5.9
next-batch steady (ms) 1.67 ± 0.22 53.77 ± 48.75 1.25 ± 0.17
step avg (ms) 2213.9 2154.5 2179.8
throughput variant (no sleep)
peak (GiB) 115.27 ± 12.44 66.47 ± 0.76 74.06 ± 1.32
util 0.587 0.533 0.593
throughput steady (rows/s) 1017.3 1247.7 1222.4 ± 24.5
next-batch steady (ms) 32.54 ± 28.99 119.39 ± 26.19 113.17 ± 26.13
step avg (ms) 923.6 692.9 711.3

The decision: inner = num_threadpool_workers (=N) on the inner iter_threaded's output_buffer_size.

1. The memory trade — still a 2× reduction vs master

The change adds N − 1 hidden Plasma slots back, but the cut vs master is what matters.

Design In-flight items in format threadpool
Master (make_async_gen with buffer_size=1) input queue (2N) + workers (N) + N output queues (N) = 4N
PR (iter_threaded with output_buffer_size=1) workers (N) + output (1) = N + 1
PR (iter_threaded with output_buffer_size=N) ← shipping workers (N) + output (N) = 2N

At N=4: master holds 16, PR (buf=N) holds 8 — still a 2× reduction. The shipping config gives up about half of the maximum theoretical reduction (4N → N+1 would have been ~16×) in exchange for the H2D
smoothing benefits below. At 588 MiB/batch × N=4 workers × 4 trainers ≈ ~9 GiB cluster-wide extra Plasma. Small relative to the ~60 GiB master → PR reduction we observe.

2. Latency variance reduction

The inner output buffer is the H2D fan-out smoother. When the outer worker pulls from inner and runs finalize_fn, each call kicks off an async H2D on the GPU copy engine. With:

  • buf=1: outer worker pulls 1 batch, finalizes, kicks 1 H2D, pushes to outer queue. Has to wait for inner workers to produce the next batch before kicking the next H2D. H2Ds get spaced out — when the consumer
    asks, the next batch's H2D is freshly kicked, not done yet. forward() stalls on H2D → loss.item() outlier.
  • buf=N: inner workers can run ahead and accumulate up to N completed batches in the buffer. The outer worker can then rapid-fire N finalize calls back-to-back, queuing N H2Ds on the copy engine in a burst. By
    the time the consumer pulls each batch, its H2D has had time to complete.

Measured on the memory variant: next-batch steady 53.77 ms (± 48.75) with buf=1 → 1.25 ms (± 0.17) with buf=N. That's a 43× mean reduction and 287× stdev reduction. Even master's 1.67 ms is beat. The
latency improvement is huge and the stdev collapse means the consumer sees a smooth stream instead of bursty wave-then-quiet.

3. The benchmark under-represents the benefit — heavy collate would gain more

The test uses the default DefaultCollateFn with pin_memory=False, which is essentially zero-copy (arrow_batch_to_tensors returns torch.as_tensor(numpy_view) — no CPU work). Per-batch collate work is
~microseconds.

Users with non-trivial collate (custom preprocessing inside collate_fn, pin_memory=True triggering a memcpy, per-row transforms, etc.) put real CPU work per batch — easily tens to hundreds of ms each. In that
regime:

  • buf=1: as soon as one worker finishes collate and puts, the next worker that finishes blocks on the full output queue. Workers serialize on put() even though their compute could happen in parallel. The N-way
    parallelism degrades to ~1-way effective throughput.
  • buf=N: workers can each put their finished batch without blocking → all N collates run truly concurrently → throughput stays at the format threadpool's nominal N × (1/T_collate).

So the H2D-smoothing latency win we measured (cheap collate) is the low end of what buf=N buys. The throughput-preservation benefit for heavy-collate users isn't visible in our benchmark but is structurally
larger.

Net decision

buf=N adds ~9 GiB cluster Plasma vs buf=1 — well under 10% of the budget — and buys:

  • 43× next-batch latency mean reduction, 287× stdev reduction (measured)
  • Insurance against throughput collapse for users with heavy collate (not measured here, but structurally guaranteed)
  • Matches master's effective buffer depth so no user-side regression possible

Still a 2× reduction in untracked Plasma vs master (4N → 2N). Ship.

@justinvyu

justinvyu commented Jun 3, 2026

Copy link
Copy Markdown
Contributor Author

Why master's step_avg is larger than PR1+2's (pageable H2D blocks compute)

My release test doesn't pin batches into non-pageable memory, so the H2D's non_blocking=True flag is a no-op, and the copies execute on the same default stream as compute — so they fully block model computation such as forward/backward/optim_step instead of pipelining on the copy engine.

See GPU traces below:
Screenshot 2026-06-03 at 4 00 48 PM

Master has WAY more Memcpy H2D Pageable->Device happening directly on the default compute stream. So actual computation couldn’t happen concurrently with this influx of h2d operations. PR1+2 reduces this to just 1 synchronous H2D happening.

TODO: fixes for followups.

  • Pinning memory in a thread so that GPU can do H2D non-pageable -> device.
  • Creating another cuda stream to actually do this H2D non-pageable -> device concurrently with default stream computation.

@rayhhome rayhhome left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation and tests both LGTM!

@justinvyu justinvyu enabled auto-merge (squash) June 4, 2026 02:15
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label Jun 4, 2026
@justinvyu justinvyu merged commit 2c6dd5c into ray-project:master Jun 4, 2026
8 checks passed
rueian pushed a commit to rueian/ray that referenced this pull request Jun 4, 2026
…e `make_async_gen` with `iter_threaded` (ray-project#63682)

Replaces the inner format/collate `make_async_gen` with `iter_threaded`
from ray-project#63660, cutting untracked object store memory pinned batches from
~16 to ~8 (2× reduction).

- `_format_in_threadpool` runs format + collate across a threadpool via
`make_async_gen(num_workers=min(4, prefetch_batches),
preserve_ordering=False)`. With the default `buffer_size=1`, this
allocates one shared input queue of size `(buffer_size + 1) *
num_workers` and `num_workers` per-worker output queues of size
`buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight
in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool,
none of which are visible to the resource manager.
- These buffered batches are pre-format `pa.Table.slice()` views that
pin their **full** source blocks in the object store (`pa.Table.slice`
is zero-copy and references the entire underlying buffer). They keep
blocks pinned in shared memory even after the distributed reference
counter considers them out of scope, which is the accounting decoupling
that contributes to streaming-split underestimation and spilling.
- Replace with `iter_threaded(..., num_workers=num_threadpool_workers,
output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in
this stack to take a required `fn` and `num_workers`). Workers share
`batch_iter` under a lock and funnel results through a single bounded
queue sized to match the worker count — enough depth to keep workers
from blocking on each other's `put()` when collate is non-trivial.
In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared
output buffer) — roughly a 2× reduction in untracked pinned batches.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
justinvyu added a commit that referenced this pull request Jun 4, 2026
…e behind `preserve_order` (#63792)

Part of the iter_batches consumer pipeline cleanup (#63660, #63682).
Gates restore_original_order behind
`DataContext.execution_options.preserve_order` (default off). When one
format/collate worker lags, the reorder buffer grows with the other
workers' completed batches, and ready batches aren't allowed to be
yielded; this PR skips the reorder step when ordering isn't required.
Recovers next-batch latency from PR1+2's regressed 113 ms steady back to
23 ms (lower than master's 32 ms), with no other regressions.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
@justinvyu justinvyu deleted the justinvyu/replace-inner-make-async-gen branch June 4, 2026 17:30
edoakes pushed a commit to edoakes/ray that referenced this pull request Jun 5, 2026
…e `make_async_gen` with `iter_threaded` (ray-project#63682)

Replaces the inner format/collate `make_async_gen` with `iter_threaded`
from ray-project#63660, cutting untracked object store memory pinned batches from
~16 to ~8 (2× reduction).

- `_format_in_threadpool` runs format + collate across a threadpool via
`make_async_gen(num_workers=min(4, prefetch_batches),
preserve_ordering=False)`. With the default `buffer_size=1`, this
allocates one shared input queue of size `(buffer_size + 1) *
num_workers` and `num_workers` per-worker output queues of size
`buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight
in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool,
none of which are visible to the resource manager.
- These buffered batches are pre-format `pa.Table.slice()` views that
pin their **full** source blocks in the object store (`pa.Table.slice`
is zero-copy and references the entire underlying buffer). They keep
blocks pinned in shared memory even after the distributed reference
counter considers them out of scope, which is the accounting decoupling
that contributes to streaming-split underestimation and spilling.
- Replace with `iter_threaded(..., num_workers=num_threadpool_workers,
output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in
this stack to take a required `fn` and `num_workers`). Workers share
`batch_iter` under a lock and funnel results through a single bounded
queue sized to match the worker count — enough depth to keep workers
from blocking on each other's `put()` when collate is non-trivial.
In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared
output buffer) — roughly a 2× reduction in untracked pinned batches.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
edoakes pushed a commit to edoakes/ray that referenced this pull request Jun 5, 2026
…e behind `preserve_order` (ray-project#63792)

Part of the iter_batches consumer pipeline cleanup (ray-project#63660, ray-project#63682).
Gates restore_original_order behind
`DataContext.execution_options.preserve_order` (default off). When one
format/collate worker lags, the reorder buffer grows with the other
workers' completed batches, and ready batches aren't allowed to be
yielded; this PR skips the reorder step when ordering isn't required.
Recovers next-batch latency from PR1+2's regressed 113 ms steady back to
23 ms (lower than master's 32 ms), with no other regressions.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
@justinvyu

Copy link
Copy Markdown
Contributor Author

Note: iter_torch_batches perf also improved significantly -- TODO: figure out why this is the case, since that test has no GPU work done in the iter_batches loop.

Screenshot 2026-06-08 at 4 38 01 PM
@justinvyu

Copy link
Copy Markdown
Contributor Author
Screenshot 2026-06-30 at 8 50 10 AM
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…e `make_async_gen` with `iter_threaded` (ray-project#63682)

Replaces the inner format/collate `make_async_gen` with `iter_threaded`
from ray-project#63660, cutting untracked object store memory pinned batches from
~16 to ~8 (2× reduction).

- `_format_in_threadpool` runs format + collate across a threadpool via
`make_async_gen(num_workers=min(4, prefetch_batches),
preserve_ordering=False)`. With the default `buffer_size=1`, this
allocates one shared input queue of size `(buffer_size + 1) *
num_workers` and `num_workers` per-worker output queues of size
`buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight
in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool,
none of which are visible to the resource manager.
- These buffered batches are pre-format `pa.Table.slice()` views that
pin their **full** source blocks in the object store (`pa.Table.slice`
is zero-copy and references the entire underlying buffer). They keep
blocks pinned in shared memory even after the distributed reference
counter considers them out of scope, which is the accounting decoupling
that contributes to streaming-split underestimation and spilling.
- Replace with `iter_threaded(..., num_workers=num_threadpool_workers,
output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in
this stack to take a required `fn` and `num_workers`). Workers share
`batch_iter` under a lock and funnel results through a single bounded
queue sized to match the worker count — enough depth to keep workers
from blocking on each other's `put()` when collate is non-trivial.
In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared
output buffer) — roughly a 2× reduction in untracked pinned batches.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…e behind `preserve_order` (ray-project#63792)

Part of the iter_batches consumer pipeline cleanup (ray-project#63660, ray-project#63682).
Gates restore_original_order behind
`DataContext.execution_options.preserve_order` (default off). When one
format/collate worker lags, the reorder buffer grows with the other
workers' completed batches, and ready batches aren't allowed to be
yielded; this PR skips the reorder step when ordering isn't required.
Recovers next-batch latency from PR1+2's regressed 113 ms steady back to
23 ms (lower than master's 32 ms), with no other regressions.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants