[Data] Propagate Schema in _shuffle_block Empty Block Case to ColumnNotFound Error in Chained Left Joins#61507
Conversation
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Pull request overview
Fixes a Ray Data hash-shuffle join edge case where a first empty-row block prevents schema broadcast to aggregators, producing columnless empty tables that later trigger ColumnNotFoundError in chained left joins.
Changes:
- Update
_shuffle_block()to broadcast schema-carrying empty tables to all aggregators when the first received block is empty andsend_empty_blocks=True. - Add a deterministic regression test that forces the “empty block arrives first” ordering to reproduce the chained left-join failure.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
python/ray/data/_internal/execution/operators/hash_shuffle.py |
Ensures schema propagation even when the first shuffled block has num_rows == 0 and schema broadcast is required. |
python/ray/data/tests/test_join.py |
Adds a regression test that deterministically reproduces and validates the fix for chained left-outer joins with empty intermediate blocks. |
Comments suppressed due to low confidence (1)
python/ray/data/_internal/execution/operators/hash_shuffle.py:295
- In the
block.num_rows == 0 and send_empty_blocksbranch, this broadcasts by calling_create_empty_table(block.schema)andray.put(...)once per partition. For largenum_partitionsthis creates many identical objects in the object store and duplicates logic that already exists in the non-empty path. Consider reusing the existing partition loop by only early-returning whensend_empty_blocksis false, or at leastray.put()a single schema-carrying empty table once and reuse the sameObjectReffor allaggregator.submitcalls.
if block.num_rows == 0:
if send_empty_blocks:
# NOTE: Perform the schema broadcast explicitly
# Every aggregator gets a (0, N) schema-carrying placeholder even when
# the triggering block itself is empty.
pending = []
for partition_id in range(pool.num_partitions):
aggregator = pool.get_aggregator_for_partition(partition_id)
partition_ref = ray.put(_create_empty_table(block.schema))
pending.append(
aggregator.submit.remote(input_index, partition_id, partition_ref)
)
# Wait for all schema-carrier blocks to be accepted before returning.
# This mirrors the synchronization in the normal (non-empty) path and
# ensures aggregators hold the schema before finalize() is called.
while pending:
_, pending = ray.wait(pending, num_returns=len(pending), timeout=1)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… of schema Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request provides a well-reasoned fix for a ColumnNotFoundError that occurs during chained left joins with empty intermediate blocks. The approach of explicitly broadcasting a schema-carrying empty block to all aggregators is a solid solution to ensure schema propagation. The included regression test is excellent, as it deterministically reproduces the issue and verifies the fix. I have one suggestion to simplify the waiting logic by using ray.get() instead of a while loop with ray.wait(), which would make the code more concise and robust in handling task failures. Overall, this is a high-quality contribution.
Note: Security Review did not run due to the size of the PR.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
| if block.num_rows == 0: | ||
| if send_empty_blocks: |
There was a problem hiding this comment.
Deleting the entire early-return if branch in _shuffle_block would also eliminate the issue. However, since the bug only affects the edge case where the first incoming block is empty, removing the full early-return branch risks performance degradation.
Rather than removing this branch entirely, I'm wondering if you could also fix this issue by just doing if block.num_rows == 0 and not send_empty_blocks:. An advantage of this over the current implementation is that it'd be much simpler.
Do we know what the actual performance of impact of that approach would be? In general, I think we should err on the side of simplicity over performance, especially if it's unclear if there's an actual performance bottleneck
There was a problem hiding this comment.
Makes sense; I don't yet have direct evidence of the performance impact. Going with the suggested if block.num_rows == 0 and not send_empty_blocks: change since it's cleaner and also accounts for the early return for everything other than the edge case.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
## Description ```sql SELECT n_name, SUM(l_extendedprice * (1 - l_discount)) AS revenue FROM customer, orders, lineitem, supplier, nation, region WHERE c_custkey = o_custkey AND l_orderkey = o_orderkey AND l_suppkey = s_suppkey AND c_nationkey = s_nationkey AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'ASIA' AND o_orderdate >= DATE '1994-01-01' AND o_orderdate < DATE '1995-01-01' GROUP BY n_name ORDER BY revenue DESC; ``` #61354 reported the `pyarrow.lib.ArrowInvalid: No match or multiple matches for key field reference FieldRef.Name(c_nationkey) on right side of the join` exception is thrown when running tpch q5, but the recent merged #61507 solve it. And the root cause before the fix [pr](#61507) is: 1. In a Q5-derived run, `nation_region` materialize as two blocks: one `(5 rows, 2 cols)` block and one `(0 rows, 2 cols)` block with schema `['n_nationkey', 'n_name']`. 2. `_shuffle_block` returned early for any empty block, even when that empty block was the first block for the input sequence and schema still needed to be broadcast. 3. As a result, some aggregators never received schema for that join side. When those aggregators finalized, they combined an empty list of blocks for that side, effectively producing a columnless table. 4. The downstream `pyarrow.Table.join(...)` then failed to resolve the join key ## Related issues Closes: #61354 Related to #60013 Related to #61507 --------- Signed-off-by: peterxcli <peterxcli@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
…otFound Error in Chained Left Joins (ray-project#61507) ## Description Per ray-project#60013, chained left joins fail with `ColumnNotFoundError` when the first join produces empty intermediate blocks. ray-project#60520 attempted the fix, but a refined reproduction script shows it does not resolve the underlying issue. This PR proposes a targeted fix and a deterministic regression test. ### Root cause Using the example in ray-project#60013, when the streaming executor feeds the second join's input, the first block delivered can have zero rows. The bug is then triggered through the following sequence: 1. `_do_add_input_inner` sees that this is the first block for input sequence 0 (or 1), so it submits a `_shuffle_block` task with `send_empty_blocks=True` and immediately sets `self._has_schemas_broadcasted[input_index] = True`; 2. Remote `_shuffle_block` worker tasks **triggers an early return of `(empty_metadata, {})`**. No `aggregator.submit()` calls are ever made and the schema never reaches any aggregator; 3. All subsequent blocks are submitted with `send_empty_blocks=False`. Qggregators with no non-empty data are never contacted at all, leaving their bucket queue empty; 4. At finalization, `drain_queue()` returns `[]` for those partitions, so `_combine([])` builds an `ArrowblockBuilder` with no `add_block()` calls and produces an empty table with no columns; 5. When `JoiningAggregation.finalize()` calls `pa.Table.join()` on this columnless table, it raises `ColumnNotFoundError` as observed. ### Why ray-project#60520 does not fix this issue ray-project#60520 modifies `ArrowBlockBuilder.build()` to use a stored `self._schema` when `len(tables) == 0`. However, `self._schema` is only populated inside`add_block()` calls. When `partition_shards` is `[]` in `_combine(...)`, `self._schema` remains `None`. ### This fix In `_shuffle_block`, when `block.num_rows == 0` and `send_empty_blocks=True`, explicitly broadcast schema-carrying empty tables to every aggregator before returning. This mirrors the broadcast logic for non-empty blocks, which ensures every aggregator holds at least one schema-carrying block and thus finalizes correctly. ### Alternative fix Deleting the entire early-return if branch in `_shuffle_block` would also eliminate the issue. However, since the bug only affects the edge case where the first incoming block is empty, removing the full early-return branch risks performance degradation. ## Related issues Fixes ray-project#60013 and follows up on ray-project#60520. ## Additional information The original reproduction script in ray-project#60013 occasionally misses the error due to the uncertain order of blocks fed to the second join. To force the bug, add the following lines to the reproduction script: ``` ... shapes = [b.shape for b in blocks] print(f"Columns flattened via map_batches: {flatten_columns}") print("Block shapes after first join:", shapes) # ----- Add the following lines ----- # Force the bug # The streaming executor delivers blocks in completion order, so non-empty # partitions finish faster and arrive first, letting schema broadcast succeed # silently. Reconstructing the dataset with empty blocks at the front # guarantees that _shuffle_block() sees a zero-row block as the very first # block for the left input sequence, triggering the premature # _has_schemas_broadcasted flag and the resulting (0,0) empty-table bug. import pyarrow as pa empty = [b for b in blocks if b.num_rows == 0] nonempty = [b for b in blocks if b.num_rows > 0] assert empty, "No empty blocks found — cannot reproduce the bug with this dataset." print(f"Reordering: {len(empty)} empty blocks first, then {len(nonempty)} non-empty.") ds_joined = ray.data.from_arrow(empty + nonempty) print("Block shapes after reordering:", [b.shape for b in (ray.get(ref) for ref in ds_joined.get_internal_block_refs())]) # ---------------------------------- # Create mapping table # Use some of the location_ids for the mapping shared_location_ids = location_ids[: max(1, len(location_ids) // 3)] ... ``` The augmented script forces the order of blocks so that the first block going into the second join is always empty. The new test case in `test_join.py` places the empty block in a list fed to `from_arrow`, preserving the block order and ensuring that the second join will always see the empty block first. The bug fires reliably on every run before the fix. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
## Description ```sql SELECT n_name, SUM(l_extendedprice * (1 - l_discount)) AS revenue FROM customer, orders, lineitem, supplier, nation, region WHERE c_custkey = o_custkey AND l_orderkey = o_orderkey AND l_suppkey = s_suppkey AND c_nationkey = s_nationkey AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'ASIA' AND o_orderdate >= DATE '1994-01-01' AND o_orderdate < DATE '1995-01-01' GROUP BY n_name ORDER BY revenue DESC; ``` ray-project#61354 reported the `pyarrow.lib.ArrowInvalid: No match or multiple matches for key field reference FieldRef.Name(c_nationkey) on right side of the join` exception is thrown when running tpch q5, but the recent merged ray-project#61507 solve it. And the root cause before the fix [pr](ray-project#61507) is: 1. In a Q5-derived run, `nation_region` materialize as two blocks: one `(5 rows, 2 cols)` block and one `(0 rows, 2 cols)` block with schema `['n_nationkey', 'n_name']`. 2. `_shuffle_block` returned early for any empty block, even when that empty block was the first block for the input sequence and schema still needed to be broadcast. 3. As a result, some aggregators never received schema for that join side. When those aggregators finalized, they combined an empty list of blocks for that side, effectively producing a columnless table. 4. The downstream `pyarrow.Table.join(...)` then failed to resolve the join key ## Related issues Closes: ray-project#61354 Related to ray-project#60013 Related to ray-project#61507 --------- Signed-off-by: peterxcli <peterxcli@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
Description
Per #60013, chained left joins fail with
ColumnNotFoundErrorwhen the first join produces empty intermediate blocks. #60520 attempted the fix, but a refined reproduction script shows it does not resolve the underlying issue. This PR proposes a targeted fix and a deterministic regression test.Root cause
Using the example in #60013, when the streaming executor feeds the second join's input, the first block delivered can have zero rows. The bug is then triggered through the following sequence:
_do_add_input_innersees that this is the first block for input sequence 0 (or 1), so it submits a_shuffle_blocktask withsend_empty_blocks=Trueand immediately setsself._has_schemas_broadcasted[input_index] = True;_shuffle_blockworker tasks triggers an early return of(empty_metadata, {}). Noaggregator.submit()calls are ever made and the schema never reaches any aggregator;send_empty_blocks=False. Qggregators with no non-empty data are never contacted at all, leaving their bucket queue empty;drain_queue()returns[]for those partitions, so_combine([])builds anArrowblockBuilderwith noadd_block()calls and produces an empty table with no columns;JoiningAggregation.finalize()callspa.Table.join()on this columnless table, it raisesColumnNotFoundErroras observed.Why #60520 does not fix this issue
#60520 modifies
ArrowBlockBuilder.build()to use a storedself._schemawhenlen(tables) == 0. However,self._schemais only populated insideadd_block()calls. Whenpartition_shardsis[]in_combine(...),self._schemaremainsNone.This fix
In
_shuffle_block, whenblock.num_rows == 0andsend_empty_blocks=True, explicitly broadcast schema-carrying empty tables to every aggregator before returning. This mirrors the broadcast logic for non-empty blocks, which ensures every aggregator holds at least one schema-carrying block and thus finalizes correctly.Alternative fix
Deleting the entire early-return if branch in
_shuffle_blockwould also eliminate the issue. However, since the bug only affects the edge case where the first incoming block is empty, removing the full early-return branch risks performance degradation.Related issues
Fixes #60013 and follows up on #60520.
Additional information
The original reproduction script in #60013 occasionally misses the error due to the uncertain order of blocks fed to the second join. To force the bug, add the following lines to the reproduction script:
The augmented script forces the order of blocks so that the first block going into the second join is always empty.
The new test case in
test_join.pyplaces the empty block in a list fed tofrom_arrow, preserving the block order and ensuring that the second join will always see the empty block first. The bug fires reliably on every run before the fix.