[Data] Introduce BlockEntry on RefBundle in place of (ref, metadata) tuples#63654
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors RefBundle to use a new BlockEntry dataclass (wrapping an ObjectRef and BlockMetadata) instead of the legacy (ObjectRef, BlockMetadata) 2-tuple, updating all construction and access sites across Ray Data. The review feedback suggests updating a misleading comment in ref_bundle.py that incorrectly states legacy 2-tuples are still accepted and normalized, and recommends adding runtime type assertions to BlockEntry to prevent silent type errors.
| ref: ObjectRef[Block] | ||
| metadata: BlockMetadata |
There was a problem hiding this comment.
The previous implementation of RefBundle.__post_init__ performed runtime type assertions to ensure that the block references were instances of ray.ObjectRef and the metadata was an instance of BlockMetadata. Since BlockEntry is a standard Python dataclass, these type annotations are not enforced at runtime, which could lead to silent type errors or harder-to-debug failures down the line.
Consider adding a __post_init__ method to BlockEntry to restore these runtime type checks.
ref: ObjectRef[Block]
metadata: BlockMetadata
def __post_init__(self):
import ray
assert isinstance(self.ref, ray.ObjectRef), f"Expected ray.ObjectRef, got {type(self.ref).__name__}"
assert isinstance(self.metadata, BlockMetadata), f"Expected BlockMetadata, got {type(self.metadata).__name__}"dcffdad to
e006b05
Compare
e006b05 to
e1d7161
Compare
BlockEntry on RefBundle in place of (ref, metadata) tuples8b0af92 to
6e320f9
Compare
| current_window_size += block_ref_and_md[1].num_rows | ||
| for entry in next_ref_bundle.blocks: | ||
| sliding_window.append((entry.ref, entry.metadata)) | ||
| current_window_size += entry.metadata.num_rows | ||
| prefetcher.prefetch_blocks( | ||
| [block_ref for block_ref, _ in list(sliding_window)] |
There was a problem hiding this comment.
I think u don't need to unravel the dataclass into a tuple, and instead, after appending to sliding_window, we do
[entry.ref for entry in list(sliding_window)]| @@ -31,6 +31,20 @@ def num_rows(self) -> int: | |||
| return self.end_offset - self.start_offset | |||
|
|
|||
|
|
|||
| @dataclass(frozen=True) | |||
There was a problem hiding this comment.
thoughts on adding slots=True? I think this will reduce the memory size, since i know we like to pass ref bundles in the streaming split coordinator and we might have more serialization overhead by converting this to a dataclass
| for entry in bundle.blocks: | ||
| left_blocks_with_metadata.append((entry.ref, entry.metadata)) | ||
| right_blocks_with_metadata = [] | ||
| for bundle in right_input: | ||
| for block, meta in bundle.blocks: | ||
| right_blocks_with_metadata.append((block, meta)) | ||
| for entry in bundle.blocks: | ||
| right_blocks_with_metadata.append((entry.ref, entry.metadata)) |
There was a problem hiding this comment.
I think here too i wonder if it's possible to not make it a tuple and instead do .metadata and .ref when needed
…tuples Replaces the legacy `RefBundle.blocks: Tuple[Tuple[ObjectRef, BlockMetadata], ...]` shape with `Tuple[BlockEntry, ...]` where ``BlockEntry`` is a frozen dataclass with named ``ref`` / ``metadata`` fields. Construction is strict: 2-tuples are rejected with an actionable assertion. Why: - Named fields make every call site self-describing, eliminating the ad-hoc destructuring spread across the codebase. - Reserves room for the per-block delivery shape to grow without disturbing every call site (e.g. forecasts about the next consumer, to be added in a follow-up PR). Mechanically: - Add `BlockEntry` next to `BlockSlice` in `python/ray/data/_internal/execution/interfaces/ref_bundle.py`, retype `RefBundle.blocks`, and update internal accessors (`block_refs`, `metadata`, `num_rows`, `size_bytes`, `slice`, `merge_ref_bundles`, `__hash__`, `__str__`). - Re-export `BlockEntry` from the interfaces package. - Migrate every production destructure of `bundle.blocks` and indexed access (op_runtime_metrics, executor operators, planner exchange, iterator, dataset, plan, equalize, …) to use `entry.ref` / `entry.metadata`. - Convert at boundaries when handing data to legacy `BlockPartition` helpers in `ray.data._internal.split`. - Migrate every `RefBundle(blocks=[(ref, meta), …], …)` construction site (datasources, planner, executors, tests) to use `BlockEntry(ref, meta)`. - Update docstring example in `dataset.iter_internal_ref_bundles`. Tests: - New BlockEntry tests in `tests/test_ref_bundle.py` cover direct construction, rejection of legacy 2-tuples, and accessor parity. - Existing tests for queues, op metrics, bundler, deduping schema, split, repartition, batching, dataset iter, and operators pass unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
- Keep BlockEntry through local code rather than unraveling into (ref, metadata) tuples in stream_split_iterator, iter_batches prefetcher, and zip operator. Convert to tuples only at the _split_at_indices boundary. - Add slots=True to BlockEntry for cheaper allocation/attribute access. - Restore pre-refactor no-op semantics in test_parquet_roundtrip's metadata loop (the original expression was never wrapped in assert; the migration accidentally tightened it and exposed a pre-existing size_bytes discrepancy unrelated to BlockEntry). - Fix test_object_store_usage to construct a BlockEntry instead of a 2-tuple. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
6e320f9 to
ed12ae0
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit ed12ae0. Configure here.
| # pyrefly: ignore[no-matching-overload] | ||
| BlockAccessor.for_block( | ||
| ray.get(entry.ref) | ||
| ).size_bytes() == entry.metadata.size_bytes |
There was a problem hiding this comment.
Test comparison missing assert, check is a no-op
Medium Severity
The metadata size comparison at BlockAccessor.for_block(ray.get(entry.ref)).size_bytes() == entry.metadata.size_bytes is a bare expression whose boolean result is silently discarded. Unlike the analogous fixes in test_csv.py and test_json.py where assert was properly added in this PR, the parquet test is missing the assert, making this "Test metadata ops" check a complete no-op.
Reviewed by Cursor Bugbot for commit ed12ae0. Configure here.
Signed-off-by: Goutam <goutam@anyscale.com>
…ata)` tuples (ray-project#63654) Replaces the legacy `RefBundle.blocks: Tuple[Tuple[ObjectRef, BlockMetadata], ...]` shape with `Tuple[BlockEntry, ...]` where ``BlockEntry`` is a frozen dataclass with named ``ref`` / ``metadata`` fields. Construction is strict: 2-tuples are rejected with an actionable assertion. Why: - Named fields make every call site self-describing, eliminating the ad-hoc destructuring spread across the codebase. - Reserves room for the per-block delivery shape to grow without disturbing every call site (e.g. forecasts about the next consumer, to be added in a follow-up PR). Mechanically: - Add `BlockEntry` next to `BlockSlice` in `python/ray/data/_internal/execution/interfaces/ref_bundle.py`, retype `RefBundle.blocks`, and update internal accessors (`block_refs`, `metadata`, `num_rows`, `size_bytes`, `slice`, `merge_ref_bundles`, `__hash__`, `__str__`). - Re-export `BlockEntry` from the interfaces package. - Migrate every production destructure of `bundle.blocks` and indexed access (op_runtime_metrics, executor operators, planner exchange, iterator, dataset, plan, equalize, …) to use `entry.ref` / `entry.metadata`. - Convert at boundaries when handing data to legacy `BlockPartition` helpers in `ray.data._internal.split`. - Migrate every `RefBundle(blocks=[(ref, meta), …], …)` construction site (datasources, planner, executors, tests) to use `BlockEntry(ref, meta)`. - Update docstring example in `dataset.iter_internal_ref_bundles`. Tests: - New BlockEntry tests in `tests/test_ref_bundle.py` cover direct construction, rejection of legacy 2-tuples, and accessor parity. - Existing tests for queues, op metrics, bundler, deduping schema, split, repartition, batching, dataset iter, and operators pass unchanged. --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ata)` tuples (ray-project#63654) Replaces the legacy `RefBundle.blocks: Tuple[Tuple[ObjectRef, BlockMetadata], ...]` shape with `Tuple[BlockEntry, ...]` where ``BlockEntry`` is a frozen dataclass with named ``ref`` / ``metadata`` fields. Construction is strict: 2-tuples are rejected with an actionable assertion. Why: - Named fields make every call site self-describing, eliminating the ad-hoc destructuring spread across the codebase. - Reserves room for the per-block delivery shape to grow without disturbing every call site (e.g. forecasts about the next consumer, to be added in a follow-up PR). Mechanically: - Add `BlockEntry` next to `BlockSlice` in `python/ray/data/_internal/execution/interfaces/ref_bundle.py`, retype `RefBundle.blocks`, and update internal accessors (`block_refs`, `metadata`, `num_rows`, `size_bytes`, `slice`, `merge_ref_bundles`, `__hash__`, `__str__`). - Re-export `BlockEntry` from the interfaces package. - Migrate every production destructure of `bundle.blocks` and indexed access (op_runtime_metrics, executor operators, planner exchange, iterator, dataset, plan, equalize, …) to use `entry.ref` / `entry.metadata`. - Convert at boundaries when handing data to legacy `BlockPartition` helpers in `ray.data._internal.split`. - Migrate every `RefBundle(blocks=[(ref, meta), …], …)` construction site (datasources, planner, executors, tests) to use `BlockEntry(ref, meta)`. - Update docstring example in `dataset.iter_internal_ref_bundles`. Tests: - New BlockEntry tests in `tests/test_ref_bundle.py` cover direct construction, rejection of legacy 2-tuples, and accessor parity. - Existing tests for queues, op metrics, bundler, deduping schema, split, repartition, batching, dataset iter, and operators pass unchanged. --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>


Replaces the legacy
RefBundle.blocks: Tuple[Tuple[ObjectRef, BlockMetadata], ...]shape withTuple[BlockEntry, ...]whereBlockEntryis a frozen dataclass with namedref/metadatafields. Construction is strict: 2-tuples are rejected with an actionable assertion.Why:
Mechanically:
BlockEntrynext toBlockSliceinpython/ray/data/_internal/execution/interfaces/ref_bundle.py, retypeRefBundle.blocks, and update internal accessors (block_refs,metadata,num_rows,size_bytes,slice,merge_ref_bundles,__hash__,__str__).BlockEntryfrom the interfaces package.bundle.blocksand indexed access (op_runtime_metrics, executor operators, planner exchange, iterator, dataset, plan, equalize, …) to useentry.ref/entry.metadata.BlockPartitionhelpers inray.data._internal.split.RefBundle(blocks=[(ref, meta), …], …)construction site (datasources, planner, executors, tests) to useBlockEntry(ref, meta).dataset.iter_internal_ref_bundles.Tests:
tests/test_ref_bundle.pycover direct construction, rejection of legacy 2-tuples, and accessor parity.