[Data] Remove column renaming from the read stage#63384
Conversation
The read stage no longer renames columns — column renaming is always carried by an ``AliasExpr`` in a ``Project`` operator above the read. This eliminates the predicate-rebinding dance between the renamed and original column namespaces and makes projection pushdown a single clean rule: * ``ReadFiles`` drops its ``column_renames`` field, its ``get_column_renames`` method, and the rename application in its ``infer_schema``. * ``Read`` drops its ``get_column_renames`` (was a passthrough to the datasource). * ``_DatasourceProjectionPushdownMixin`` drops ``get_column_renames``, ``_apply_rename``, and ``_apply_rename_to_tables``. ``apply_projection`` now normalizes any incoming rename entries to identity and ``_combine_projection_map`` collapses to a set intersection. * ``LogicalOperatorSupportsPredicatePushdown.get_column_renames`` is removed from the interface. * V1 Parquet/Iceberg datasources stop carrying the rename plumbing through their read functions. * ``ProjectionPushdown._push_projection_into_read_op`` is simplified to always push a pure-prune ``projection_map`` and to keep the ``Project`` operator above the read whenever there are renames or computed expressions. Pure-prune projections (no renames, no computed exprs) are still discarded. * ``PredicatePushdown._try_push_down_predicate`` drops the rebinding branch that translated predicates between renamed and original column namespaces — there's now only one namespace at the read. * ``eval_projection``'s star-expansion places rename ``AliasExpr``s back into their source column's position so the output preserves the on-disk column order. A kept ``Project`` on top of a read incurs no runtime overhead: physical operator fusion merges them into a single ``MapOperator`` (e.g., ``TaskPoolMapOperator[ReadFilesParquetV2->Project]``), and the rename runs inline within the same task that reads the file. Tests updated: * ``test_rename_with_partition_residual_filter`` — residual ``Filter`` now references original on-disk column names (it sits between the rename ``Project`` and ``ReadFiles``). * ``test_pushdown_with_rename_and_filter`` — plan now expects a ``Project[Project]`` above the read. * ``test_rename_select_filter_combinations`` — pure-prune projections are still pushed; renames keep a ``Project`` above the read. * ``test_read_batches_from`` — drops the removed ``data_columns_rename_map`` argument. Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the projection and predicate pushdown logic to align with DataFusion's architecture, ensuring that column renaming is handled by a Project operator rather than the read stage. This change simplifies the pushdown process by maintaining a single column namespace at the scanner level. Key changes include removing column renaming logic from ReadFiles, IcebergDatasource, and ParquetDatasource, and updating the logical optimizer rules to keep Project operators for renames. Feedback identifies a potential AttributeError when a StarExpr is present during column pruning and notes that the is_pure_prune logic may incorrectly discard Project operators that perform column duplication.
| is_pure_prune = not any( | ||
| isinstance(e, AliasExpr) for e in current_project.exprs | ||
| ) and all(_is_col_expr(e) for e in _filter_out_star(current_project.exprs)) |
There was a problem hiding this comment.
The is_pure_prune logic doesn't account for duplicate column references (e.g., ds.select("a", "a")) or a StarExpr combined with other expressions. In these cases, the Project operator is not redundant because it performs duplication that the read stage's column pruning cannot replicate. Discarding the Project would result in an incorrect output schema.
Consider adding checks to ensure that the number of expressions matches the number of unique output column names and that a StarExpr is only considered a pure prune if it's the sole expression.
Signed-off-by: Goutam <goutam@anyscale.com>
| # Physical operator fusion later merges the kept ``Project`` | ||
| # into the same ``MapOperator`` as the read, so the | ||
| # runtime cost is the same either way. | ||
| is_pure_prune = not any( |
There was a problem hiding this comment.
Nit: can we break this compound conditional up into two booleans? It's a little hard to parse what this means.
i.e.
| is_pure_prune = not any( | |
| is_pure_prune = no_renames_or_computed and only_col_exprs |
Signed-off-by: Goutam <goutam@anyscale.com>
When a rename AliasExpr's source column is not in input_column_names, it was bucketed into rename_exprs_by_source but never added back to ordered_exprs or extra_exprs — silently dropped instead of raising a 'column not found' error during evaluation. Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit b48a2b3. Configure here.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
## Summary
This PR removes column renaming from the read stage. Reads now only
receive the columns they need to scan, while column renames stay as
`AliasExpr`s in a `Project` above the read.
That gives us one clear rule:
- The read stage handles physical column pruning.
- The `Project` stage handles renames and computed expressions.
- Pure pruning projections can still disappear once pushed into the
scan.
## Plan Shape
```mermaid
flowchart LR
subgraph Before
B1[Project / Filter]
B2[ReadFiles with column_renames]
B1 --> B2
end
subgraph After
A1[Project with AliasExpr renames]
A2[ReadFiles with pruned columns]
A1 --> A2
end
```
Keeping the `Project` above the read does not add a separate runtime
pass. Physical operator fusion still combines the read and projection
into one map operator, so renames run inline with the file read task.
## What Changed
- Removed `column_renames` plumbing from read operators and datasource
projection pushdown.
- Simplified projection pushdown so scans receive only a pure
column-pruning map.
- Simplified predicate pushdown by removing rebinding between renamed
and original column namespaces.
- Kept rename `AliasExpr`s in a `Project` above reads, preserving the
logical output schema.
- Fixed star expansion so only true rename aliases are placed back into
the source column position; non-rename aliases like `with_column("x",
col("a"))` are preserved.
## Tests
- Updated predicate/projection pushdown tests for the new plan shape.
- Updated Iceberg and Parquet coverage for rename, select, and filter
combinations.
- Added regression coverage for `with_column("x",
col("a")).rename_columns({"a": "b"})` so both `x` and `b` are preserved.
---------
Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
…ation, node-size clamp, and column pruning (#63809) ## Description The hash-shuffle/join/aggregate operators size each aggregator's `memory` reservation from the estimated input dataset size. That estimate was both **missing** in some plans (causing under-reservation → OOM) and **over-counted** in others (causing over-reservation → unschedulable). This PR makes the estimate available, accurate, and bounded. ### Triggering change This was surfaced by **#63384 ([Data] Remove column renaming from the read stage)**. Before that PR the read stage performed column renames inline, so a TPCH plan looked like `ReadParquet → Join` with no intervening op. #63384 moved renames out of the read into a separate `Project` (of `AliasExpr`s) above the read, so the plan became `ReadParquet → Project → Join`. That exposed a latent gap: `Project.infer_metadata` returned all-`None`, so once a `Project` sat between a read and a join/aggregate the size estimate disappeared and the aggregator fell back to a fixed default reservation — which under-provisioned the TPCH Q3 autoscaling release test and OOM-killed it. The changes below fix that latent gap and the issues uncovered while fixing it. ### 1. Propagate metadata through one-to-one ops (fixes join OOM) With a rename `Project` now between the read and the join, the join's input op was a `Project`, which inherited the base all-`None` `infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None` and the `HashShuffleAggregator` silently fell back to a fixed default (`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator) instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3 SF100). On an autoscaling cluster this under-provisions the aggregators and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR: Worker connection closed unexpectedly`). `AbstractOneToOne.infer_metadata` now propagates `num_rows`/`size_bytes`/`input_files` from its single input, gated on `can_modify_num_rows` (row-preserving ops like `Project` propagate; `Filter`/`FlatMap` fall back to `None`). `Download` overrides this to report `size_bytes=None`, since it appends blob columns and the input size would be a misleading under-estimate. ### 2. Prune unused columns before hash-shuffle aggregations An `Aggregate` only reads its group keys and each aggregation's target column, but nothing pruned the rest before the shuffle — so wide unused columns (e.g. a string column carried through by `with_column`) were dragged through the aggregator, inflating both its memory reservation and the bytes shuffled. `ProjectionPushdown` now inserts a pruning `Project` below an `Aggregate` keeping only the consumed columns (keys + aggregation targets); the existing fuse/push steps carry the narrowed set into the read. It only fires when the input schema is known and has extra columns (keeping the fixed-point optimizer idempotent), and leaves generic `AggregateFn`s (unknown columns) untouched. ### 3. Clamp the aggregator reservation to the largest node (safety net) With estimates now firing more often, a low-partition shuffle over a large dataset (e.g. a global aggregation with `num_partitions=1`) could reserve ~2× the whole dataset as a single aggregator's `memory` request — exceeding any node and making the actor unschedulable (`ActorUnschedulableError`). The per-aggregator reservation is now clamped to the largest single node's memory: aggregators hold shuffle data in the (spillable) object store, so reserving more heap `memory` than a node has only makes the actor unschedulable, not faster. The clamp only triggers when the estimate exceeds node capacity (well-sized reservations are unaffected) and logs a warning when it engages. ## Verification - Join estimate restored through a plan with a `Project` between the read and the join (was `None` → 1 GiB fallback). - Q6-shaped global aggregation: read is pruned to the consumed columns; end-to-end result matches a pandas ground truth. - Clamp engages only when the estimate exceeds node memory and the run completes via spilling. - Correctness validated across combos (filters, group-by, multi-aggregation, post-shuffle join→aggregate, computed columns) by comparing pruned vs unpruned pipelines. - New parameterized tests in `test_projection_fusion.py` (aggregate input pruning) and existing suites pass (`test_projection_fusion`, `test_predicate_pushdown`, `test_execution_optimizer_basic`, `test_infer_schema`, aggregation tests). ## Related issues Surfaced by #63384 (column renames moved out of the read stage into a `Project`). 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ation, node-size clamp, and column pruning (ray-project#63809) ## Description The hash-shuffle/join/aggregate operators size each aggregator's `memory` reservation from the estimated input dataset size. That estimate was both **missing** in some plans (causing under-reservation → OOM) and **over-counted** in others (causing over-reservation → unschedulable). This PR makes the estimate available, accurate, and bounded. ### Triggering change This was surfaced by **ray-project#63384 ([Data] Remove column renaming from the read stage)**. Before that PR the read stage performed column renames inline, so a TPCH plan looked like `ReadParquet → Join` with no intervening op. ray-project#63384 moved renames out of the read into a separate `Project` (of `AliasExpr`s) above the read, so the plan became `ReadParquet → Project → Join`. That exposed a latent gap: `Project.infer_metadata` returned all-`None`, so once a `Project` sat between a read and a join/aggregate the size estimate disappeared and the aggregator fell back to a fixed default reservation — which under-provisioned the TPCH Q3 autoscaling release test and OOM-killed it. The changes below fix that latent gap and the issues uncovered while fixing it. ### 1. Propagate metadata through one-to-one ops (fixes join OOM) With a rename `Project` now between the read and the join, the join's input op was a `Project`, which inherited the base all-`None` `infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None` and the `HashShuffleAggregator` silently fell back to a fixed default (`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator) instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3 SF100). On an autoscaling cluster this under-provisions the aggregators and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR: Worker connection closed unexpectedly`). `AbstractOneToOne.infer_metadata` now propagates `num_rows`/`size_bytes`/`input_files` from its single input, gated on `can_modify_num_rows` (row-preserving ops like `Project` propagate; `Filter`/`FlatMap` fall back to `None`). `Download` overrides this to report `size_bytes=None`, since it appends blob columns and the input size would be a misleading under-estimate. ### 2. Prune unused columns before hash-shuffle aggregations An `Aggregate` only reads its group keys and each aggregation's target column, but nothing pruned the rest before the shuffle — so wide unused columns (e.g. a string column carried through by `with_column`) were dragged through the aggregator, inflating both its memory reservation and the bytes shuffled. `ProjectionPushdown` now inserts a pruning `Project` below an `Aggregate` keeping only the consumed columns (keys + aggregation targets); the existing fuse/push steps carry the narrowed set into the read. It only fires when the input schema is known and has extra columns (keeping the fixed-point optimizer idempotent), and leaves generic `AggregateFn`s (unknown columns) untouched. ### 3. Clamp the aggregator reservation to the largest node (safety net) With estimates now firing more often, a low-partition shuffle over a large dataset (e.g. a global aggregation with `num_partitions=1`) could reserve ~2× the whole dataset as a single aggregator's `memory` request — exceeding any node and making the actor unschedulable (`ActorUnschedulableError`). The per-aggregator reservation is now clamped to the largest single node's memory: aggregators hold shuffle data in the (spillable) object store, so reserving more heap `memory` than a node has only makes the actor unschedulable, not faster. The clamp only triggers when the estimate exceeds node capacity (well-sized reservations are unaffected) and logs a warning when it engages. ## Verification - Join estimate restored through a plan with a `Project` between the read and the join (was `None` → 1 GiB fallback). - Q6-shaped global aggregation: read is pruned to the consumed columns; end-to-end result matches a pandas ground truth. - Clamp engages only when the estimate exceeds node memory and the run completes via spilling. - Correctness validated across combos (filters, group-by, multi-aggregation, post-shuffle join→aggregate, computed columns) by comparing pruned vs unpruned pipelines. - New parameterized tests in `test_projection_fusion.py` (aggregate input pruning) and existing suites pass (`test_projection_fusion`, `test_predicate_pushdown`, `test_execution_optimizer_basic`, `test_infer_schema`, aggregation tests). ## Related issues Surfaced by ray-project#63384 (column renames moved out of the read stage into a `Project`). 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ation, node-size clamp, and column pruning (ray-project#63809) ## Description The hash-shuffle/join/aggregate operators size each aggregator's `memory` reservation from the estimated input dataset size. That estimate was both **missing** in some plans (causing under-reservation → OOM) and **over-counted** in others (causing over-reservation → unschedulable). This PR makes the estimate available, accurate, and bounded. ### Triggering change This was surfaced by **ray-project#63384 ([Data] Remove column renaming from the read stage)**. Before that PR the read stage performed column renames inline, so a TPCH plan looked like `ReadParquet → Join` with no intervening op. ray-project#63384 moved renames out of the read into a separate `Project` (of `AliasExpr`s) above the read, so the plan became `ReadParquet → Project → Join`. That exposed a latent gap: `Project.infer_metadata` returned all-`None`, so once a `Project` sat between a read and a join/aggregate the size estimate disappeared and the aggregator fell back to a fixed default reservation — which under-provisioned the TPCH Q3 autoscaling release test and OOM-killed it. The changes below fix that latent gap and the issues uncovered while fixing it. ### 1. Propagate metadata through one-to-one ops (fixes join OOM) With a rename `Project` now between the read and the join, the join's input op was a `Project`, which inherited the base all-`None` `infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None` and the `HashShuffleAggregator` silently fell back to a fixed default (`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator) instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3 SF100). On an autoscaling cluster this under-provisions the aggregators and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR: Worker connection closed unexpectedly`). `AbstractOneToOne.infer_metadata` now propagates `num_rows`/`size_bytes`/`input_files` from its single input, gated on `can_modify_num_rows` (row-preserving ops like `Project` propagate; `Filter`/`FlatMap` fall back to `None`). `Download` overrides this to report `size_bytes=None`, since it appends blob columns and the input size would be a misleading under-estimate. ### 2. Prune unused columns before hash-shuffle aggregations An `Aggregate` only reads its group keys and each aggregation's target column, but nothing pruned the rest before the shuffle — so wide unused columns (e.g. a string column carried through by `with_column`) were dragged through the aggregator, inflating both its memory reservation and the bytes shuffled. `ProjectionPushdown` now inserts a pruning `Project` below an `Aggregate` keeping only the consumed columns (keys + aggregation targets); the existing fuse/push steps carry the narrowed set into the read. It only fires when the input schema is known and has extra columns (keeping the fixed-point optimizer idempotent), and leaves generic `AggregateFn`s (unknown columns) untouched. ### 3. Clamp the aggregator reservation to the largest node (safety net) With estimates now firing more often, a low-partition shuffle over a large dataset (e.g. a global aggregation with `num_partitions=1`) could reserve ~2× the whole dataset as a single aggregator's `memory` request — exceeding any node and making the actor unschedulable (`ActorUnschedulableError`). The per-aggregator reservation is now clamped to the largest single node's memory: aggregators hold shuffle data in the (spillable) object store, so reserving more heap `memory` than a node has only makes the actor unschedulable, not faster. The clamp only triggers when the estimate exceeds node capacity (well-sized reservations are unaffected) and logs a warning when it engages. ## Verification - Join estimate restored through a plan with a `Project` between the read and the join (was `None` → 1 GiB fallback). - Q6-shaped global aggregation: read is pruned to the consumed columns; end-to-end result matches a pandas ground truth. - Clamp engages only when the estimate exceeds node memory and the run completes via spilling. - Correctness validated across combos (filters, group-by, multi-aggregation, post-shuffle join→aggregate, computed columns) by comparing pruned vs unpruned pipelines. - New parameterized tests in `test_projection_fusion.py` (aggregate input pruning) and existing suites pass (`test_projection_fusion`, `test_predicate_pushdown`, `test_execution_optimizer_basic`, `test_infer_schema`, aggregation tests). ## Related issues Surfaced by ray-project#63384 (column renames moved out of the read stage into a `Project`). 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

Summary
This PR removes column renaming from the read stage. Reads now only receive the columns they need to scan, while column renames stay as
AliasExprs in aProjectabove the read.That gives us one clear rule:
Projectstage handles renames and computed expressions.Plan Shape
flowchart LR subgraph Before B1[Project / Filter] B2[ReadFiles with column_renames] B1 --> B2 end subgraph After A1[Project with AliasExpr renames] A2[ReadFiles with pruned columns] A1 --> A2 endKeeping the
Projectabove the read does not add a separate runtime pass. Physical operator fusion still combines the read and projection into one map operator, so renames run inline with the file read task.What Changed
column_renamesplumbing from read operators and datasource projection pushdown.AliasExprs in aProjectabove reads, preserving the logical output schema.with_column("x", col("a"))are preserved.Tests
with_column("x", col("a")).rename_columns({"a": "b"})so bothxandbare preserved.