Skip to content

[Data] Propagate Schema in _shuffle_block Empty Block Case to ColumnNotFound Error in Chained Left Joins#61507

Merged
alexeykudinkin merged 13 commits into
ray-project:masterfrom
rayhhome:join-empty
Mar 16, 2026
Merged

[Data] Propagate Schema in _shuffle_block Empty Block Case to ColumnNotFound Error in Chained Left Joins#61507
alexeykudinkin merged 13 commits into
ray-project:masterfrom
rayhhome:join-empty

Conversation

@rayhhome

@rayhhome rayhhome commented Mar 5, 2026

Copy link
Copy Markdown
Contributor

Description

Per #60013, chained left joins fail with ColumnNotFoundError when the first join produces empty intermediate blocks. #60520 attempted the fix, but a refined reproduction script shows it does not resolve the underlying issue. This PR proposes a targeted fix and a deterministic regression test.

Root cause

Using the example in #60013, when the streaming executor feeds the second join's input, the first block delivered can have zero rows. The bug is then triggered through the following sequence:

  1. _do_add_input_inner sees that this is the first block for input sequence 0 (or 1), so it submits a _shuffle_block task with send_empty_blocks=True and immediately sets self._has_schemas_broadcasted[input_index] = True;
  2. Remote _shuffle_block worker tasks triggers an early return of (empty_metadata, {}). No aggregator.submit() calls are ever made and the schema never reaches any aggregator;
  3. All subsequent blocks are submitted with send_empty_blocks=False. Qggregators with no non-empty data are never contacted at all, leaving their bucket queue empty;
  4. At finalization, drain_queue() returns [] for those partitions, so _combine([]) builds an ArrowblockBuilder with no add_block() calls and produces an empty table with no columns;
  5. When JoiningAggregation.finalize() calls pa.Table.join() on this columnless table, it raises ColumnNotFoundError as observed.

Why #60520 does not fix this issue

#60520 modifies ArrowBlockBuilder.build() to use a stored self._schema when len(tables) == 0. However, self._schema is only populated insideadd_block() calls. When partition_shards is [] in _combine(...), self._schema remains None.

This fix

In _shuffle_block, when block.num_rows == 0 and send_empty_blocks=True, explicitly broadcast schema-carrying empty tables to every aggregator before returning. This mirrors the broadcast logic for non-empty blocks, which ensures every aggregator holds at least one schema-carrying block and thus finalizes correctly.

Alternative fix

Deleting the entire early-return if branch in _shuffle_block would also eliminate the issue. However, since the bug only affects the edge case where the first incoming block is empty, removing the full early-return branch risks performance degradation.

Related issues

Fixes #60013 and follows up on #60520.

Additional information

The original reproduction script in #60013 occasionally misses the error due to the uncertain order of blocks fed to the second join. To force the bug, add the following lines to the reproduction script:

    ...
    shapes = [b.shape for b in blocks]
    print(f"Columns flattened via map_batches: {flatten_columns}")
    print("Block shapes after first join:", shapes)

    # ----- Add the following lines -----
    # Force the bug
    # The streaming executor delivers blocks in completion order, so non-empty
    # partitions finish faster and arrive first, letting schema broadcast succeed
    # silently.  Reconstructing the dataset with empty blocks at the front
    # guarantees that _shuffle_block() sees a zero-row block as the very first
    # block for the left input sequence, triggering the premature
    # _has_schemas_broadcasted flag and the resulting (0,0) empty-table bug.
    import pyarrow as pa
    empty   = [b for b in blocks if b.num_rows == 0]
    nonempty = [b for b in blocks if b.num_rows > 0]
    assert empty, "No empty blocks found — cannot reproduce the bug with this dataset."
    print(f"Reordering: {len(empty)} empty blocks first, then {len(nonempty)} non-empty.")
    ds_joined = ray.data.from_arrow(empty + nonempty)
    print("Block shapes after reordering:", [b.shape for b in (ray.get(ref) for ref in ds_joined.get_internal_block_refs())])
    # ----------------------------------

    # Create mapping table
    # Use some of the location_ids for the mapping
    shared_location_ids = location_ids[: max(1, len(location_ids) // 3)]
    ...

The augmented script forces the order of blocks so that the first block going into the second join is always empty.

The new test case in test_join.py places the empty block in a list fed to from_arrow, preserving the block order and ensuring that the second join will always see the empty block first. The bug fires reliably on every run before the fix.

rayhhome added 2 commits March 4, 2026 15:36
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
@rayhhome rayhhome self-assigned this Mar 5, 2026
Copilot AI review requested due to automatic review settings March 5, 2026 00:49
@rayhhome rayhhome requested a review from a team as a code owner March 5, 2026 00:49
@rayhhome rayhhome added the data Ray Data-related issues label Mar 5, 2026

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a Ray Data hash-shuffle join edge case where a first empty-row block prevents schema broadcast to aggregators, producing columnless empty tables that later trigger ColumnNotFoundError in chained left joins.

Changes:

  • Update _shuffle_block() to broadcast schema-carrying empty tables to all aggregators when the first received block is empty and send_empty_blocks=True.
  • Add a deterministic regression test that forces the “empty block arrives first” ordering to reproduce the chained left-join failure.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
python/ray/data/_internal/execution/operators/hash_shuffle.py Ensures schema propagation even when the first shuffled block has num_rows == 0 and schema broadcast is required.
python/ray/data/tests/test_join.py Adds a regression test that deterministically reproduces and validates the fix for chained left-outer joins with empty intermediate blocks.
Comments suppressed due to low confidence (1)

python/ray/data/_internal/execution/operators/hash_shuffle.py:295

  • In the block.num_rows == 0 and send_empty_blocks branch, this broadcasts by calling _create_empty_table(block.schema) and ray.put(...) once per partition. For large num_partitions this creates many identical objects in the object store and duplicates logic that already exists in the non-empty path. Consider reusing the existing partition loop by only early-returning when send_empty_blocks is false, or at least ray.put() a single schema-carrying empty table once and reuse the same ObjectRef for all aggregator.submit calls.
    if block.num_rows == 0:
        if send_empty_blocks:
            # NOTE: Perform the schema broadcast explicitly
            #       Every aggregator gets a (0, N) schema-carrying placeholder even when
            #       the triggering block itself is empty.
            pending = []
            for partition_id in range(pool.num_partitions):
                aggregator = pool.get_aggregator_for_partition(partition_id)
                partition_ref = ray.put(_create_empty_table(block.schema))
                pending.append(
                    aggregator.submit.remote(input_index, partition_id, partition_ref)
                )
            # Wait for all schema-carrier blocks to be accepted before returning.
            # This mirrors the synchronization in the normal (non-empty) path and
            # ensures aggregators hold the schema before finalize() is called.
            while pending:
                _, pending = ray.wait(pending, num_returns=len(pending), timeout=1)


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request provides a well-reasoned fix for a ColumnNotFoundError that occurs during chained left joins with empty intermediate blocks. The approach of explicitly broadcasting a schema-carrying empty block to all aggregators is a solid solution to ensure schema propagation. The included regression test is excellent, as it deterministically reproduces the issue and verifies the fix. I have one suggestion to simplify the waiting logic by using ray.get() instead of a while loop with ray.wait(), which would make the code more concise and robust in handling task failures. Overall, this is a high-quality contribution.

Note: Security Review did not run due to the size of the PR.

Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Comment thread python/ray/data/tests/test_join.py Outdated
Comment thread python/ray/data/tests/test_join.py Outdated
@rayhhome rayhhome added the go add ONLY when ready to merge, run all tests label Mar 7, 2026
Comment thread python/ray/data/tests/test_join.py Outdated
Comment thread python/ray/data/tests/test_join.py Outdated
Comment on lines +278 to +279
if block.num_rows == 0:
if send_empty_blocks:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting the entire early-return if branch in _shuffle_block would also eliminate the issue. However, since the bug only affects the edge case where the first incoming block is empty, removing the full early-return branch risks performance degradation.

Rather than removing this branch entirely, I'm wondering if you could also fix this issue by just doing if block.num_rows == 0 and not send_empty_blocks:. An advantage of this over the current implementation is that it'd be much simpler.

Do we know what the actual performance of impact of that approach would be? In general, I think we should err on the side of simplicity over performance, especially if it's unclear if there's an actual performance bottleneck

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense; I don't yet have direct evidence of the performance impact. Going with the suggested if block.num_rows == 0 and not send_empty_blocks: change since it's cleaner and also accounts for the early return for everything other than the edge case.

Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
@alexeykudinkin alexeykudinkin merged commit b5b334f into ray-project:master Mar 16, 2026
6 checks passed
@peterxcli peterxcli mentioned this pull request Apr 22, 2026
richardliaw pushed a commit that referenced this pull request May 6, 2026
## Description

```sql
SELECT n_name, SUM(l_extendedprice * (1 - l_discount)) AS revenue
FROM customer, orders, lineitem, supplier, nation, region
	WHERE c_custkey = o_custkey
		AND l_orderkey = o_orderkey
		AND l_suppkey = s_suppkey
		AND c_nationkey = s_nationkey
		AND s_nationkey = n_nationkey
		AND n_regionkey = r_regionkey
		AND r_name = 'ASIA'
		AND o_orderdate >= DATE '1994-01-01'
		AND o_orderdate <  DATE '1995-01-01'
	GROUP BY n_name
	ORDER BY revenue DESC;
```

#61354 reported the
`pyarrow.lib.ArrowInvalid: No match or multiple matches for key field
reference FieldRef.Name(c_nationkey) on right side of the join`
exception is thrown when running tpch q5, but the recent merged
#61507 solve it.

And the root cause before the fix
[pr](#61507) is:

1. In a Q5-derived run, `nation_region` materialize as two blocks:
one `(5 rows, 2 cols)` block and one `(0 rows, 2 cols)` block with
schema `['n_nationkey', 'n_name']`.
2. `_shuffle_block` returned early for any empty block, even when that
empty block was the first block for the input sequence and schema still
needed to be broadcast.
3. As a result, some aggregators never received schema for that join
side. When those aggregators finalized, they combined an empty list of
blocks for that side, effectively producing a columnless table.
4. The downstream `pyarrow.Table.join(...)` then failed to resolve the
join key

## Related issues

Closes: #61354

Related to #60013
Related to #61507

---------

Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…otFound Error in Chained Left Joins (ray-project#61507)

## Description
Per ray-project#60013, chained left joins fail with `ColumnNotFoundError` when the
first join produces empty intermediate blocks. ray-project#60520 attempted the fix,
but a refined reproduction script shows it does not resolve the
underlying issue. This PR proposes a targeted fix and a deterministic
regression test.

### Root cause
Using the example in ray-project#60013, when the streaming executor feeds the
second join's input, the first block delivered can have zero rows. The
bug is then triggered through the following sequence:
1. `_do_add_input_inner` sees that this is the first block for input
sequence 0 (or 1), so it submits a `_shuffle_block` task with
`send_empty_blocks=True` and immediately sets
`self._has_schemas_broadcasted[input_index] = True`;
2. Remote `_shuffle_block` worker tasks **triggers an early return of
`(empty_metadata, {})`**. No `aggregator.submit()` calls are ever made
and the schema never reaches any aggregator;
3. All subsequent blocks are submitted with `send_empty_blocks=False`.
Qggregators with no non-empty data are never contacted at all, leaving
their bucket queue empty;
4. At finalization, `drain_queue()` returns `[]` for those partitions,
so `_combine([])` builds an `ArrowblockBuilder` with no `add_block()`
calls and produces an empty table with no columns;
5. When `JoiningAggregation.finalize()` calls `pa.Table.join()` on this
columnless table, it raises `ColumnNotFoundError` as observed.

### Why ray-project#60520 does not fix this issue

ray-project#60520 modifies `ArrowBlockBuilder.build()` to use a stored
`self._schema` when `len(tables) == 0`. However, `self._schema` is only
populated inside`add_block()` calls. When `partition_shards` is `[]` in
`_combine(...)`, `self._schema` remains `None`.

### This fix

In `_shuffle_block`, when `block.num_rows == 0` and
`send_empty_blocks=True`, explicitly broadcast schema-carrying empty
tables to every aggregator before returning. This mirrors the broadcast
logic for non-empty blocks, which ensures every aggregator holds at
least one schema-carrying block and thus finalizes correctly.

### Alternative fix

Deleting the entire early-return if branch in `_shuffle_block` would
also eliminate the issue. However, since the bug only affects the edge
case where the first incoming block is empty, removing the full
early-return branch risks performance degradation.

## Related issues
Fixes ray-project#60013 and follows up on ray-project#60520.

## Additional information
The original reproduction script in ray-project#60013 occasionally misses the error
due to the uncertain order of blocks fed to the second join. To force
the bug, add the following lines to the reproduction script:



```
    ...
    shapes = [b.shape for b in blocks]
    print(f"Columns flattened via map_batches: {flatten_columns}")
    print("Block shapes after first join:", shapes)

    # ----- Add the following lines -----
    # Force the bug
    # The streaming executor delivers blocks in completion order, so non-empty
    # partitions finish faster and arrive first, letting schema broadcast succeed
    # silently.  Reconstructing the dataset with empty blocks at the front
    # guarantees that _shuffle_block() sees a zero-row block as the very first
    # block for the left input sequence, triggering the premature
    # _has_schemas_broadcasted flag and the resulting (0,0) empty-table bug.
    import pyarrow as pa
    empty   = [b for b in blocks if b.num_rows == 0]
    nonempty = [b for b in blocks if b.num_rows > 0]
    assert empty, "No empty blocks found — cannot reproduce the bug with this dataset."
    print(f"Reordering: {len(empty)} empty blocks first, then {len(nonempty)} non-empty.")
    ds_joined = ray.data.from_arrow(empty + nonempty)
    print("Block shapes after reordering:", [b.shape for b in (ray.get(ref) for ref in ds_joined.get_internal_block_refs())])
    # ----------------------------------

    # Create mapping table
    # Use some of the location_ids for the mapping
    shared_location_ids = location_ids[: max(1, len(location_ids) // 3)]
    ...
```
The augmented script forces the order of blocks so that the first block
going into the second join is always empty.

The new test case in `test_join.py` places the empty block in a list fed
to `from_arrow`, preserving the block order and ensuring that the second
join will always see the empty block first. The bug fires reliably on
every run before the fix.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
## Description

```sql
SELECT n_name, SUM(l_extendedprice * (1 - l_discount)) AS revenue
FROM customer, orders, lineitem, supplier, nation, region
	WHERE c_custkey = o_custkey
		AND l_orderkey = o_orderkey
		AND l_suppkey = s_suppkey
		AND c_nationkey = s_nationkey
		AND s_nationkey = n_nationkey
		AND n_regionkey = r_regionkey
		AND r_name = 'ASIA'
		AND o_orderdate >= DATE '1994-01-01'
		AND o_orderdate <  DATE '1995-01-01'
	GROUP BY n_name
	ORDER BY revenue DESC;
```

ray-project#61354 reported the
`pyarrow.lib.ArrowInvalid: No match or multiple matches for key field
reference FieldRef.Name(c_nationkey) on right side of the join`
exception is thrown when running tpch q5, but the recent merged
ray-project#61507 solve it.

And the root cause before the fix
[pr](ray-project#61507) is:

1. In a Q5-derived run, `nation_region` materialize as two blocks:
one `(5 rows, 2 cols)` block and one `(0 rows, 2 cols)` block with
schema `['n_nationkey', 'n_name']`.
2. `_shuffle_block` returned early for any empty block, even when that
empty block was the first block for the input sequence and schema still
needed to be broadcast.
3. As a result, some aggregators never received schema for that join
side. When those aggregators finalized, they combined an empty list of
blocks for that side, effectively producing a columnless table.
4. The downstream `pyarrow.Table.join(...)` then failed to resolve the
join key

## Related issues

Closes: ray-project#61354

Related to ray-project#60013
Related to ray-project#61507

---------

Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
@rayhhome rayhhome deleted the join-empty branch July 2, 2026 21:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

5 participants