Skip to content

[Data] Introduce BlockEntry on RefBundle in place of (ref, metadata) tuples#63654

Merged
goutamvenkat-anyscale merged 3 commits into
ray-project:masterfrom
goutamvenkat-anyscale:block-entry-foundation
Jun 1, 2026
Merged

[Data] Introduce BlockEntry on RefBundle in place of (ref, metadata) tuples#63654
goutamvenkat-anyscale merged 3 commits into
ray-project:masterfrom
goutamvenkat-anyscale:block-entry-foundation

Conversation

@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor

Replaces the legacy RefBundle.blocks: Tuple[Tuple[ObjectRef, BlockMetadata], ...] shape with Tuple[BlockEntry, ...] where BlockEntry is a frozen dataclass with named ref / metadata fields. Construction is strict: 2-tuples are rejected with an actionable assertion.

Why:

  • Named fields make every call site self-describing, eliminating the ad-hoc destructuring spread across the codebase.
  • Reserves room for the per-block delivery shape to grow without disturbing every call site (e.g. forecasts about the next consumer, to be added in a follow-up PR).

Mechanically:

  • Add BlockEntry next to BlockSlice in python/ray/data/_internal/execution/interfaces/ref_bundle.py, retype RefBundle.blocks, and update internal accessors (block_refs, metadata, num_rows, size_bytes, slice, merge_ref_bundles, __hash__, __str__).
  • Re-export BlockEntry from the interfaces package.
  • Migrate every production destructure of bundle.blocks and indexed access (op_runtime_metrics, executor operators, planner exchange, iterator, dataset, plan, equalize, …) to use entry.ref / entry.metadata.
  • Convert at boundaries when handing data to legacy BlockPartition helpers in ray.data._internal.split.
  • Migrate every RefBundle(blocks=[(ref, meta), …], …) construction site (datasources, planner, executors, tests) to use BlockEntry(ref, meta).
  • Update docstring example in dataset.iter_internal_ref_bundles.

Tests:

  • New BlockEntry tests in tests/test_ref_bundle.py cover direct construction, rejection of legacy 2-tuples, and accessor parity.
  • Existing tests for queues, op metrics, bundler, deduping schema, split, repartition, batching, dataset iter, and operators pass unchanged.
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner May 27, 2026 00:23

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors RefBundle to use a new BlockEntry dataclass (wrapping an ObjectRef and BlockMetadata) instead of the legacy (ObjectRef, BlockMetadata) 2-tuple, updating all construction and access sites across Ray Data. The review feedback suggests updating a misleading comment in ref_bundle.py that incorrectly states legacy 2-tuples are still accepted and normalized, and recommends adding runtime type assertions to BlockEntry to prevent silent type errors.

Comment thread python/ray/data/_internal/execution/interfaces/ref_bundle.py Outdated
Comment on lines +44 to +45
ref: ObjectRef[Block]
metadata: BlockMetadata

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The previous implementation of RefBundle.__post_init__ performed runtime type assertions to ensure that the block references were instances of ray.ObjectRef and the metadata was an instance of BlockMetadata. Since BlockEntry is a standard Python dataclass, these type annotations are not enforced at runtime, which could lead to silent type errors or harder-to-debug failures down the line.

Consider adding a __post_init__ method to BlockEntry to restore these runtime type checks.

    ref: ObjectRef[Block]
    metadata: BlockMetadata

    def __post_init__(self):
        import ray
        assert isinstance(self.ref, ray.ObjectRef), f"Expected ray.ObjectRef, got {type(self.ref).__name__}"
        assert isinstance(self.metadata, BlockMetadata), f"Expected BlockMetadata, got {type(self.metadata).__name__}"
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels May 27, 2026
@goutamvenkat-anyscale goutamvenkat-anyscale changed the title [Data] Introduce BlockEntry on RefBundle in place of (ref, metadata) tuples May 27, 2026
Comment thread python/ray/data/tests/datasource/test_csv.py Outdated
@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the block-entry-foundation branch 2 times, most recently from 8b0af92 to 6e320f9 Compare May 27, 2026 07:18
current_window_size += block_ref_and_md[1].num_rows
for entry in next_ref_bundle.blocks:
sliding_window.append((entry.ref, entry.metadata))
current_window_size += entry.metadata.num_rows
prefetcher.prefetch_blocks(
[block_ref for block_ref, _ in list(sliding_window)]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think u don't need to unravel the dataclass into a tuple, and instead, after appending to sliding_window, we do

[entry.ref for entry in list(sliding_window)]
@@ -31,6 +31,20 @@ def num_rows(self) -> int:
return self.end_offset - self.start_offset


@dataclass(frozen=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on adding slots=True? I think this will reduce the memory size, since i know we like to pass ref bundles in the streaming split coordinator and we might have more serialization overhead by converting this to a dataclass

Comment on lines +166 to +171
for entry in bundle.blocks:
left_blocks_with_metadata.append((entry.ref, entry.metadata))
right_blocks_with_metadata = []
for bundle in right_input:
for block, meta in bundle.blocks:
right_blocks_with_metadata.append((block, meta))
for entry in bundle.blocks:
right_blocks_with_metadata.append((entry.ref, entry.metadata))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here too i wonder if it's possible to not make it a tuple and instead do .metadata and .ref when needed

Comment thread python/ray/data/_internal/iterator/stream_split_iterator.py Outdated
goutamvenkat-anyscale and others added 2 commits May 27, 2026 15:28
…tuples

Replaces the legacy `RefBundle.blocks: Tuple[Tuple[ObjectRef, BlockMetadata], ...]`
shape with `Tuple[BlockEntry, ...]` where ``BlockEntry`` is a frozen
dataclass with named ``ref`` / ``metadata`` fields. Construction is strict:
2-tuples are rejected with an actionable assertion.

Why:
- Named fields make every call site self-describing, eliminating the
  ad-hoc destructuring spread across the codebase.
- Reserves room for the per-block delivery shape to grow without
  disturbing every call site (e.g. forecasts about the next consumer,
  to be added in a follow-up PR).

Mechanically:
- Add `BlockEntry` next to `BlockSlice` in
  `python/ray/data/_internal/execution/interfaces/ref_bundle.py`,
  retype `RefBundle.blocks`, and update internal accessors
  (`block_refs`, `metadata`, `num_rows`, `size_bytes`, `slice`,
  `merge_ref_bundles`, `__hash__`, `__str__`).
- Re-export `BlockEntry` from the interfaces package.
- Migrate every production destructure of `bundle.blocks` and indexed
  access (op_runtime_metrics, executor operators, planner exchange,
  iterator, dataset, plan, equalize, …) to use `entry.ref` /
  `entry.metadata`.
- Convert at boundaries when handing data to legacy `BlockPartition`
  helpers in `ray.data._internal.split`.
- Migrate every `RefBundle(blocks=[(ref, meta), …], …)` construction
  site (datasources, planner, executors, tests) to use
  `BlockEntry(ref, meta)`.
- Update docstring example in `dataset.iter_internal_ref_bundles`.

Tests:
- New BlockEntry tests in `tests/test_ref_bundle.py` cover direct
  construction, rejection of legacy 2-tuples, and accessor parity.
- Existing tests for queues, op metrics, bundler, deduping schema,
  split, repartition, batching, dataset iter, and operators pass
  unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
- Keep BlockEntry through local code rather than unraveling into (ref, metadata)
  tuples in stream_split_iterator, iter_batches prefetcher, and zip operator.
  Convert to tuples only at the _split_at_indices boundary.
- Add slots=True to BlockEntry for cheaper allocation/attribute access.
- Restore pre-refactor no-op semantics in test_parquet_roundtrip's metadata
  loop (the original expression was never wrapped in assert; the migration
  accidentally tightened it and exposed a pre-existing size_bytes discrepancy
  unrelated to BlockEntry).
- Fix test_object_store_usage to construct a BlockEntry instead of a 2-tuple.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit ed12ae0. Configure here.

# pyrefly: ignore[no-matching-overload]
BlockAccessor.for_block(
ray.get(entry.ref)
).size_bytes() == entry.metadata.size_bytes

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test comparison missing assert, check is a no-op

Medium Severity

The metadata size comparison at BlockAccessor.for_block(ray.get(entry.ref)).size_bytes() == entry.metadata.size_bytes is a bare expression whose boolean result is silently discarded. Unlike the analogous fixes in test_csv.py and test_json.py where assert was properly added in this PR, the parquet test is missing the assert, making this "Test metadata ops" check a complete no-op.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit ed12ae0. Configure here.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale enabled auto-merge (squash) June 1, 2026 19:05
@goutamvenkat-anyscale goutamvenkat-anyscale merged commit 886ae8b into ray-project:master Jun 1, 2026
7 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the block-entry-foundation branch June 1, 2026 20:00
rueian pushed a commit to rueian/ray that referenced this pull request Jun 4, 2026
…ata)` tuples (ray-project#63654)

Replaces the legacy `RefBundle.blocks: Tuple[Tuple[ObjectRef,
BlockMetadata], ...]` shape with `Tuple[BlockEntry, ...]` where
``BlockEntry`` is a frozen dataclass with named ``ref`` / ``metadata``
fields. Construction is strict: 2-tuples are rejected with an actionable
assertion.

Why:
- Named fields make every call site self-describing, eliminating the
ad-hoc destructuring spread across the codebase.
- Reserves room for the per-block delivery shape to grow without
disturbing every call site (e.g. forecasts about the next consumer, to
be added in a follow-up PR).

Mechanically:
- Add `BlockEntry` next to `BlockSlice` in
`python/ray/data/_internal/execution/interfaces/ref_bundle.py`, retype
`RefBundle.blocks`, and update internal accessors (`block_refs`,
`metadata`, `num_rows`, `size_bytes`, `slice`, `merge_ref_bundles`,
`__hash__`, `__str__`).
- Re-export `BlockEntry` from the interfaces package.
- Migrate every production destructure of `bundle.blocks` and indexed
access (op_runtime_metrics, executor operators, planner exchange,
iterator, dataset, plan, equalize, …) to use `entry.ref` /
`entry.metadata`.
- Convert at boundaries when handing data to legacy `BlockPartition`
helpers in `ray.data._internal.split`.
- Migrate every `RefBundle(blocks=[(ref, meta), …], …)` construction
site (datasources, planner, executors, tests) to use `BlockEntry(ref,
meta)`.
- Update docstring example in `dataset.iter_internal_ref_bundles`.

Tests:
- New BlockEntry tests in `tests/test_ref_bundle.py` cover direct
construction, rejection of legacy 2-tuples, and accessor parity.
- Existing tests for queues, op metrics, bundler, deduping schema,
split, repartition, batching, dataset iter, and operators pass
unchanged.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…ata)` tuples (ray-project#63654)

Replaces the legacy `RefBundle.blocks: Tuple[Tuple[ObjectRef,
BlockMetadata], ...]` shape with `Tuple[BlockEntry, ...]` where
``BlockEntry`` is a frozen dataclass with named ``ref`` / ``metadata``
fields. Construction is strict: 2-tuples are rejected with an actionable
assertion.

Why:
- Named fields make every call site self-describing, eliminating the
ad-hoc destructuring spread across the codebase.
- Reserves room for the per-block delivery shape to grow without
disturbing every call site (e.g. forecasts about the next consumer, to
be added in a follow-up PR).

Mechanically:
- Add `BlockEntry` next to `BlockSlice` in
`python/ray/data/_internal/execution/interfaces/ref_bundle.py`, retype
`RefBundle.blocks`, and update internal accessors (`block_refs`,
`metadata`, `num_rows`, `size_bytes`, `slice`, `merge_ref_bundles`,
`__hash__`, `__str__`).
- Re-export `BlockEntry` from the interfaces package.
- Migrate every production destructure of `bundle.blocks` and indexed
access (op_runtime_metrics, executor operators, planner exchange,
iterator, dataset, plan, equalize, …) to use `entry.ref` /
`entry.metadata`.
- Convert at boundaries when handing data to legacy `BlockPartition`
helpers in `ray.data._internal.split`.
- Migrate every `RefBundle(blocks=[(ref, meta), …], …)` construction
site (datasources, planner, executors, tests) to use `BlockEntry(ref,
meta)`.
- Update docstring example in `dataset.iter_internal_ref_bundles`.

Tests:
- New BlockEntry tests in `tests/test_ref_bundle.py` cover direct
construction, rejection of legacy 2-tuples, and accessor parity.
- Existing tests for queues, op metrics, bundler, deduping schema,
split, repartition, batching, dataset iter, and operators pass
unchanged.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants