[core] Avoid extra memcpy when spilling fused objects#63653
Conversation
_write_multiple_objects concatenated the header and the object buffer into one bytes object before f.write(), which memcpy'd the entire payload. For large objects this Python-side copy rivaled the actual disk I/O (~273ms vs ~251ms of pylogic vs io at 128MB). Write the small fixed header and the buffer as two separate writes instead; the on-disk layout is byte-identical so readers and offset accounting are unchanged. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Yuanzhuo Yang <yuanzhuoyang@gmail.com>
9c7c5ea to
f281e42
Compare
There was a problem hiding this comment.
Code Review
This pull request optimizes the _write_multiple_objects function in python/ray/_private/external_storage.py by splitting the write operation into two separate writes: one for the header and one for the object buffer. This avoids an expensive memory copy of the entire object payload that would occur when concatenating them. There are no review comments to address, and I have no additional feedback to provide.
Kunchd
left a comment
There was a problem hiding this comment.
Looks good. Thanks for the insightful PR description! I left a small nit.
| payload = ( | ||
| # Write the small fixed header (lengths + owner address + metadata) | ||
| # and the object buffer as two separate writes. Concatenating them | ||
| # into one bytes object would memcpy the entire object payload. |
There was a problem hiding this comment.
nit: The comment here is unnecessary. Comments should only clarify or justify the code for future readers. Future readers wouldn't see the concatenation option, and likely wouldn't question why we don't concatenate here.
However, the comment is helpful for code review. I typically just leave a comment in the PR for these clarifications.
Signed-off-by: Yuanzhuo Yang <yuanzhuo.yang1@anyscale.com>
) ## Description `ExternalStorage._write_multiple_objects` builds the spill payload as `header + memoryview(buf)` and then calls `f.write(payload)`. The `+` concatenation allocates a new `bytes` object and copies the whole object buffer into it before writing — an avoidable Python-side memcpy. This PR writes the header and the buffer as two separate writes instead: ```python f.write(header) if buf_len: f.write(memoryview(buf)) ``` `f.write()` is a [buffer protocol][bp] consumer, so `f.write(memoryview(buf))` reads the buffer's memory directly with no copy; `+` is not a consumer — it must materialize a new concatenated object. The on-disk layout is byte-identical, so readers and offset accounting are unchanged. [bp]: https://docs.python.org/3/c-api/buffer.html ## Related issues N/A ## Additional information We instrument one spill and split it into three segments, derived from three raw measurements: - `T_python` — total duration of the Python `spill_objects` call in the IO worker (the whole `external_storage.spill_objects`). Measured by wrapping the delegated call with `perf_counter_ns`: [`external_storage.py` L673-L676][tp]. - `T_io` — time inside `f.write` / `f.flush` within that call (actual disk writes). Accumulated around the two writes ([`external_storage.py` L207-L211][tio-w]) and the flush ([L224-L226][tio-f]). - `T_total` — measured in the raylet C++ with `steady_clock` around the `SpillObjects` RPC, recorded in the reply callback, so it covers the full spill including the RPC and `T_python`: start at [`local_object_manager.cc` L377][tt0], recorded at [L388-L392][tt1]. [tp]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/python/ray/_private/external_storage.py#L673-L676 [tio-w]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/python/ray/_private/external_storage.py#L207-L211 [tio-f]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/python/ray/_private/external_storage.py#L224-L226 [tt0]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/src/ray/raylet/local_object_manager.cc#L377 [tt1]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/src/ray/raylet/local_object_manager.cc#L388-L392 From these: - **I/O** = `T_io` — the disk writes. - **Python logic** = `T_python − T_io` — the rest of the Python work: building each object's header, the payload concatenation this PR removes, and URL/offset bookkeeping. (Objects are already serialized in plasma, so no serialization happens here.) - **gRPC + GIL** = `T_total − T_python` — the raylet→IO-worker RPC round trip plus GIL acquisition, i.e. everything outside Python. Durations are measured within each process (`steady_clock` in C++, `perf_counter_ns` in Python) and differenced, so no cross-process clock alignment is needed. Profiling the filesystem spill path (object size swept 64 KB–128 MB), the concatenation copy scales with object size and rivals disk I/O for large objects. After the fix the Python-side time is flat and large-object spill is ~2× faster: | size | T_total before → after | |-------|---------------------------------| | 32MB | 38,911 → 21,300 µs (−45%) | | 64MB | 73,299 → 38,568 µs (−47%) | | 128MB | 525,784 → 256,012 µs (−51%) | Small objects (gRPC+GIL dominated) are unchanged, as expected. **Before / after** (% of spill time: I/O vs gRPC+GIL vs Python logic):   Benchmark harness and raw data are on the `reform-spill` branch under `spill-bench/`. ### PR history The single concatenated write was not a performance choice and is safe to undo: - **ray-project#12087** (fusion, 2020) originally wrote the header and buffer as *separate* `f.write` calls — the same pattern this PR restores. - **ray-project#14703** This PR merged them into one `f.write(payload)` so it could wrap the write in `try/except IOError` and use the single write's return value for the recorded size. - **ray-project#17016** removed that `try/except` handling. Since then the code is just `written_bytes = f.write(payload); assert written_bytes == payload_len`, so the only thing the concatenation still does is force the avoidable copy. --------- Signed-off-by: Yuanzhuo Yang <yuanzhuoyang@gmail.com> Signed-off-by: Yuanzhuo Yang <yuanzhuo.yang1@anyscale.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Yuanzhuo Yang <yuanzhuo.yang1@anyscale.com>
) ## Description `ExternalStorage._write_multiple_objects` builds the spill payload as `header + memoryview(buf)` and then calls `f.write(payload)`. The `+` concatenation allocates a new `bytes` object and copies the whole object buffer into it before writing — an avoidable Python-side memcpy. This PR writes the header and the buffer as two separate writes instead: ```python f.write(header) if buf_len: f.write(memoryview(buf)) ``` `f.write()` is a [buffer protocol][bp] consumer, so `f.write(memoryview(buf))` reads the buffer's memory directly with no copy; `+` is not a consumer — it must materialize a new concatenated object. The on-disk layout is byte-identical, so readers and offset accounting are unchanged. [bp]: https://docs.python.org/3/c-api/buffer.html ## Related issues N/A ## Additional information We instrument one spill and split it into three segments, derived from three raw measurements: - `T_python` — total duration of the Python `spill_objects` call in the IO worker (the whole `external_storage.spill_objects`). Measured by wrapping the delegated call with `perf_counter_ns`: [`external_storage.py` L673-L676][tp]. - `T_io` — time inside `f.write` / `f.flush` within that call (actual disk writes). Accumulated around the two writes ([`external_storage.py` L207-L211][tio-w]) and the flush ([L224-L226][tio-f]). - `T_total` — measured in the raylet C++ with `steady_clock` around the `SpillObjects` RPC, recorded in the reply callback, so it covers the full spill including the RPC and `T_python`: start at [`local_object_manager.cc` L377][tt0], recorded at [L388-L392][tt1]. [tp]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/python/ray/_private/external_storage.py#L673-L676 [tio-w]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/python/ray/_private/external_storage.py#L207-L211 [tio-f]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/python/ray/_private/external_storage.py#L224-L226 [tt0]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/src/ray/raylet/local_object_manager.cc#L377 [tt1]: https://github.com/ShockYoungCHN/ray/blob/14bc999dec/src/ray/raylet/local_object_manager.cc#L388-L392 From these: - **I/O** = `T_io` — the disk writes. - **Python logic** = `T_python − T_io` — the rest of the Python work: building each object's header, the payload concatenation this PR removes, and URL/offset bookkeeping. (Objects are already serialized in plasma, so no serialization happens here.) - **gRPC + GIL** = `T_total − T_python` — the raylet→IO-worker RPC round trip plus GIL acquisition, i.e. everything outside Python. Durations are measured within each process (`steady_clock` in C++, `perf_counter_ns` in Python) and differenced, so no cross-process clock alignment is needed. Profiling the filesystem spill path (object size swept 64 KB–128 MB), the concatenation copy scales with object size and rivals disk I/O for large objects. After the fix the Python-side time is flat and large-object spill is ~2× faster: | size | T_total before → after | |-------|---------------------------------| | 32MB | 38,911 → 21,300 µs (−45%) | | 64MB | 73,299 → 38,568 µs (−47%) | | 128MB | 525,784 → 256,012 µs (−51%) | Small objects (gRPC+GIL dominated) are unchanged, as expected. **Before / after** (% of spill time: I/O vs gRPC+GIL vs Python logic):   Benchmark harness and raw data are on the `reform-spill` branch under `spill-bench/`. ### PR history The single concatenated write was not a performance choice and is safe to undo: - **ray-project#12087** (fusion, 2020) originally wrote the header and buffer as *separate* `f.write` calls — the same pattern this PR restores. - **ray-project#14703** This PR merged them into one `f.write(payload)` so it could wrap the write in `try/except IOError` and use the single write's return value for the recorded size. - **ray-project#17016** removed that `try/except` handling. Since then the code is just `written_bytes = f.write(payload); assert written_bytes == payload_len`, so the only thing the concatenation still does is force the avoidable copy. --------- Signed-off-by: Yuanzhuo Yang <yuanzhuoyang@gmail.com> Signed-off-by: Yuanzhuo Yang <yuanzhuo.yang1@anyscale.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Yuanzhuo Yang <yuanzhuo.yang1@anyscale.com>
Description
ExternalStorage._write_multiple_objectsbuilds the spill payload asheader + memoryview(buf)and then callsf.write(payload). The+concatenation allocates a newbytesobject and copies the whole object buffer into it before writing — an avoidable Python-side memcpy.This PR writes the header and the buffer as two separate writes instead:
f.write()is a buffer protocol consumer, sof.write(memoryview(buf))reads the buffer's memory directly with no copy;+is not a consumer — it must materialize a new concatenated object. The on-disk layout is byte-identical, so readers and offset accounting are unchanged.Related issues
N/A
Additional information
We instrument one spill and split it into three segments, derived from three raw measurements:
T_python— total duration of the Pythonspill_objectscall in the IO worker (the wholeexternal_storage.spill_objects). Measured by wrapping the delegated call withperf_counter_ns:external_storage.pyL673-L676.T_io— time insidef.write/f.flushwithin that call (actual disk writes). Accumulated around the two writes (external_storage.pyL207-L211) and the flush (L224-L226).T_total— measured in the raylet C++ withsteady_clockaround theSpillObjectsRPC, recorded in the reply callback, so it covers the full spill including the RPC andT_python: start atlocal_object_manager.ccL377, recorded at L388-L392.From these:
T_io— the disk writes.T_python − T_io— the rest of the Python work: building each object's header, the payload concatenation this PR removes, and URL/offset bookkeeping. (Objects are already serialized in plasma, so no serialization happens here.)T_total − T_python— the raylet→IO-worker RPC round trip plus GIL acquisition, i.e. everything outside Python.Durations are measured within each process (
steady_clockin C++,perf_counter_nsin Python) and differenced, so no cross-process clock alignment is needed.Profiling the filesystem spill path (object size swept 64 KB–128 MB), the concatenation copy scales with object size and rivals disk I/O for large objects. After the fix the Python-side time is flat and large-object spill is ~2× faster:
Small objects (gRPC+GIL dominated) are unchanged, as expected.
Before / after (% of spill time: I/O vs gRPC+GIL vs Python logic):
Benchmark harness and raw data are on the
reform-spillbranch underspill-bench/.PR history
The single concatenated write was not a performance choice and is safe to undo:
f.writecalls — the same pattern this PR restores.f.write(payload)so it could wrap the write intry/except IOErrorand use the single write's return value for the recorded size.try/excepthandling. Since then the code is justwritten_bytes = f.write(payload); assert written_bytes == payload_len, so the only thing the concatenation still does is force the avoidable copy.