Skip to content

[Data] Remove column renaming from the read stage#63384

Merged
goutamvenkat-anyscale merged 7 commits into
ray-project:masterfrom
goutamvenkat-anyscale:goutam/no_read_stage_rename
May 20, 2026
Merged

[Data] Remove column renaming from the read stage#63384
goutamvenkat-anyscale merged 7 commits into
ray-project:masterfrom
goutamvenkat-anyscale:goutam/no_read_stage_rename

Conversation

@goutamvenkat-anyscale

@goutamvenkat-anyscale goutamvenkat-anyscale commented May 15, 2026

Copy link
Copy Markdown
Contributor

Summary

This PR removes column renaming from the read stage. Reads now only receive the columns they need to scan, while column renames stay as AliasExprs in a Project above the read.

That gives us one clear rule:

  • The read stage handles physical column pruning.
  • The Project stage handles renames and computed expressions.
  • Pure pruning projections can still disappear once pushed into the scan.

Plan Shape

flowchart LR
    subgraph Before
        B1[Project / Filter]
        B2[ReadFiles with column_renames]
        B1 --> B2
    end

    subgraph After
        A1[Project with AliasExpr renames]
        A2[ReadFiles with pruned columns]
        A1 --> A2
    end
Loading

Keeping the Project above the read does not add a separate runtime pass. Physical operator fusion still combines the read and projection into one map operator, so renames run inline with the file read task.

What Changed

  • Removed column_renames plumbing from read operators and datasource projection pushdown.
  • Simplified projection pushdown so scans receive only a pure column-pruning map.
  • Simplified predicate pushdown by removing rebinding between renamed and original column namespaces.
  • Kept rename AliasExprs in a Project above reads, preserving the logical output schema.
  • Fixed star expansion so only true rename aliases are placed back into the source column position; non-rename aliases like with_column("x", col("a")) are preserved.

Tests

  • Updated predicate/projection pushdown tests for the new plan shape.
  • Updated Iceberg and Parquet coverage for rename, select, and filter combinations.
  • Added regression coverage for with_column("x", col("a")).rename_columns({"a": "b"}) so both x and b are preserved.
The read stage no longer renames columns — column renaming is always
carried by an ``AliasExpr`` in a ``Project`` operator above the read.
This eliminates the predicate-rebinding dance between the renamed and
original column namespaces and makes projection pushdown a single
clean rule:

* ``ReadFiles`` drops its ``column_renames`` field, its
  ``get_column_renames`` method, and the rename application in its
  ``infer_schema``.
* ``Read`` drops its ``get_column_renames`` (was a passthrough to the
  datasource).
* ``_DatasourceProjectionPushdownMixin`` drops ``get_column_renames``,
  ``_apply_rename``, and ``_apply_rename_to_tables``. ``apply_projection``
  now normalizes any incoming rename entries to identity and
  ``_combine_projection_map`` collapses to a set intersection.
* ``LogicalOperatorSupportsPredicatePushdown.get_column_renames`` is
  removed from the interface.
* V1 Parquet/Iceberg datasources stop carrying the rename plumbing
  through their read functions.
* ``ProjectionPushdown._push_projection_into_read_op`` is simplified
  to always push a pure-prune ``projection_map`` and to keep the
  ``Project`` operator above the read whenever there are renames or
  computed expressions. Pure-prune projections (no renames, no
  computed exprs) are still discarded.
* ``PredicatePushdown._try_push_down_predicate`` drops the rebinding
  branch that translated predicates between renamed and original
  column namespaces — there's now only one namespace at the read.
* ``eval_projection``'s star-expansion places rename ``AliasExpr``s
  back into their source column's position so the output preserves
  the on-disk column order.

A kept ``Project`` on top of a read incurs no runtime overhead:
physical operator fusion merges them into a single ``MapOperator``
(e.g., ``TaskPoolMapOperator[ReadFilesParquetV2->Project]``), and the
rename runs inline within the same task that reads the file.

Tests updated:
* ``test_rename_with_partition_residual_filter`` — residual ``Filter``
  now references original on-disk column names (it sits between the
  rename ``Project`` and ``ReadFiles``).
* ``test_pushdown_with_rename_and_filter`` — plan now expects a
  ``Project[Project]`` above the read.
* ``test_rename_select_filter_combinations`` — pure-prune projections
  are still pushed; renames keep a ``Project`` above the read.
* ``test_read_batches_from`` — drops the removed
  ``data_columns_rename_map`` argument.

Co-authored-by: Cursor <cursoragent@cursor.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner May 15, 2026 22:56

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the projection and predicate pushdown logic to align with DataFusion's architecture, ensuring that column renaming is handled by a Project operator rather than the read stage. This change simplifies the pushdown process by maintaining a single column namespace at the scanner level. Key changes include removing column renaming logic from ReadFiles, IcebergDatasource, and ParquetDatasource, and updating the logical optimizer rules to keep Project operators for renames. Feedback identifies a potential AttributeError when a StarExpr is present during column pruning and notes that the is_pure_prune logic may incorrectly discard Project operators that perform column duplication.

Comment thread python/ray/data/_internal/logical/operators/read_operator.py
Comment on lines +357 to +359
is_pure_prune = not any(
isinstance(e, AliasExpr) for e in current_project.exprs
) and all(_is_col_expr(e) for e in _filter_out_star(current_project.exprs))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The is_pure_prune logic doesn't account for duplicate column references (e.g., ds.select("a", "a")) or a StarExpr combined with other expressions. In these cases, the Project operator is not redundant because it performs duplication that the read stage's column pruning cannot replicate. Discarding the Project would result in an incorrect output schema.

Consider adding checks to ensure that the number of expressions matches the number of unique output column names and that a StarExpr is only considered a pure prune if it's the sole expression.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels May 15, 2026
Signed-off-by: Goutam <goutam@anyscale.com>
# Physical operator fusion later merges the kept ``Project``
# into the same ``MapOperator`` as the read, so the
# runtime cost is the same either way.
is_pure_prune = not any(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we break this compound conditional up into two booleans? It's a little hard to parse what this means.

i.e.

Suggested change
is_pure_prune = not any(
is_pure_prune = no_renames_or_computed and only_col_exprs
Comment thread python/ray/data/_internal/logical/operators/read_operator.py
Signed-off-by: Goutam <goutam@anyscale.com>
When a rename AliasExpr's source column is not in input_column_names, it was bucketed into rename_exprs_by_source but never added back to ordered_exprs or extra_exprs — silently dropped instead of raising a 'column not found' error during evaluation.

Signed-off-by: Goutam <goutam@anyscale.com>

@ayushk7102 ayushk7102 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit b48a2b3. Configure here.

Comment thread python/ray/data/_internal/logical/rules/projection_pushdown.py Outdated
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale enabled auto-merge (squash) May 20, 2026 05:12
@goutamvenkat-anyscale goutamvenkat-anyscale merged commit 82e7208 into ray-project:master May 20, 2026
7 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the goutam/no_read_stage_rename branch May 20, 2026 05:48
TruongQuangPhat pushed a commit to cyhapun/ray-fix-issue that referenced this pull request May 27, 2026
## Summary

This PR removes column renaming from the read stage. Reads now only
receive the columns they need to scan, while column renames stay as
`AliasExpr`s in a `Project` above the read.

That gives us one clear rule:

- The read stage handles physical column pruning.
- The `Project` stage handles renames and computed expressions.
- Pure pruning projections can still disappear once pushed into the
scan.

## Plan Shape

```mermaid
flowchart LR
    subgraph Before
        B1[Project / Filter]
        B2[ReadFiles with column_renames]
        B1 --> B2
    end

    subgraph After
        A1[Project with AliasExpr renames]
        A2[ReadFiles with pruned columns]
        A1 --> A2
    end
```

Keeping the `Project` above the read does not add a separate runtime
pass. Physical operator fusion still combines the read and projection
into one map operator, so renames run inline with the file read task.

## What Changed

- Removed `column_renames` plumbing from read operators and datasource
projection pushdown.
- Simplified projection pushdown so scans receive only a pure
column-pruning map.
- Simplified predicate pushdown by removing rebinding between renamed
and original column namespaces.
- Kept rename `AliasExpr`s in a `Project` above reads, preserving the
logical output schema.
- Fixed star expansion so only true rename aliases are placed back into
the source column position; non-rename aliases like `with_column("x",
col("a"))` are preserved.

## Tests

- Updated predicate/projection pushdown tests for the new plan shape.
- Updated Iceberg and Parquet coverage for rename, select, and filter
combinations.
- Added regression coverage for `with_column("x",
col("a")).rename_columns({"a": "b"})` so both `x` and `b` are preserved.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
goutamvenkat-anyscale added a commit that referenced this pull request Jun 3, 2026
…ation, node-size clamp, and column pruning (#63809)

## Description

The hash-shuffle/join/aggregate operators size each aggregator's
`memory` reservation from the estimated input dataset size. That
estimate was both **missing** in some plans (causing under-reservation →
OOM) and **over-counted** in others (causing over-reservation →
unschedulable). This PR makes the estimate available, accurate, and
bounded.

### Triggering change

This was surfaced by **#63384 ([Data] Remove column renaming from the
read stage)**. Before that PR the read stage performed column renames
inline, so a TPCH plan looked like `ReadParquet → Join` with no
intervening op. #63384 moved renames out of the read into a separate
`Project` (of `AliasExpr`s) above the read, so the plan became
`ReadParquet → Project → Join`. That exposed a latent gap:
`Project.infer_metadata` returned all-`None`, so once a `Project` sat
between a read and a join/aggregate the size estimate disappeared and
the aggregator fell back to a fixed default reservation — which
under-provisioned the TPCH Q3 autoscaling release test and OOM-killed
it. The changes below fix that latent gap and the issues uncovered while
fixing it.

### 1. Propagate metadata through one-to-one ops (fixes join OOM)

With a rename `Project` now between the read and the join, the join's
input op was a `Project`, which inherited the base all-`None`
`infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None`
and the `HashShuffleAggregator` silently fell back to a fixed default
(`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator)
instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3
SF100). On an autoscaling cluster this under-provisions the aggregators
and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR:
Worker connection closed unexpectedly`).

`AbstractOneToOne.infer_metadata` now propagates
`num_rows`/`size_bytes`/`input_files` from its single input, gated on
`can_modify_num_rows` (row-preserving ops like `Project` propagate;
`Filter`/`FlatMap` fall back to `None`). `Download` overrides this to
report `size_bytes=None`, since it appends blob columns and the input
size would be a misleading under-estimate.

### 2. Prune unused columns before hash-shuffle aggregations

An `Aggregate` only reads its group keys and each aggregation's target
column, but nothing pruned the rest before the shuffle — so wide unused
columns (e.g. a string column carried through by `with_column`) were
dragged through the aggregator, inflating both its memory reservation
and the bytes shuffled. `ProjectionPushdown` now inserts a pruning
`Project` below an `Aggregate` keeping only the consumed columns (keys +
aggregation targets); the existing fuse/push steps carry the narrowed
set into the read. It only fires when the input schema is known and has
extra columns (keeping the fixed-point optimizer idempotent), and leaves
generic `AggregateFn`s (unknown columns) untouched.

### 3. Clamp the aggregator reservation to the largest node (safety net)

With estimates now firing more often, a low-partition shuffle over a
large dataset (e.g. a global aggregation with `num_partitions=1`) could
reserve ~2× the whole dataset as a single aggregator's `memory` request
— exceeding any node and making the actor unschedulable
(`ActorUnschedulableError`). The per-aggregator reservation is now
clamped to the largest single node's memory: aggregators hold shuffle
data in the (spillable) object store, so reserving more heap `memory`
than a node has only makes the actor unschedulable, not faster. The
clamp only triggers when the estimate exceeds node capacity (well-sized
reservations are unaffected) and logs a warning when it engages.

## Verification

- Join estimate restored through a plan with a `Project` between the
read and the join (was `None` → 1 GiB fallback).
- Q6-shaped global aggregation: read is pruned to the consumed columns;
end-to-end result matches a pandas ground truth.
- Clamp engages only when the estimate exceeds node memory and the run
completes via spilling.
- Correctness validated across combos (filters, group-by,
multi-aggregation, post-shuffle join→aggregate, computed columns) by
comparing pruned vs unpruned pipelines.
- New parameterized tests in `test_projection_fusion.py` (aggregate
input pruning) and existing suites pass (`test_projection_fusion`,
`test_predicate_pushdown`, `test_execution_optimizer_basic`,
`test_infer_schema`, aggregation tests).

## Related issues

Surfaced by #63384 (column renames moved out of the read stage into a
`Project`).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
rueian pushed a commit to rueian/ray that referenced this pull request Jun 4, 2026
…ation, node-size clamp, and column pruning (ray-project#63809)

## Description

The hash-shuffle/join/aggregate operators size each aggregator's
`memory` reservation from the estimated input dataset size. That
estimate was both **missing** in some plans (causing under-reservation →
OOM) and **over-counted** in others (causing over-reservation →
unschedulable). This PR makes the estimate available, accurate, and
bounded.

### Triggering change

This was surfaced by **ray-project#63384 ([Data] Remove column renaming from the
read stage)**. Before that PR the read stage performed column renames
inline, so a TPCH plan looked like `ReadParquet → Join` with no
intervening op. ray-project#63384 moved renames out of the read into a separate
`Project` (of `AliasExpr`s) above the read, so the plan became
`ReadParquet → Project → Join`. That exposed a latent gap:
`Project.infer_metadata` returned all-`None`, so once a `Project` sat
between a read and a join/aggregate the size estimate disappeared and
the aggregator fell back to a fixed default reservation — which
under-provisioned the TPCH Q3 autoscaling release test and OOM-killed
it. The changes below fix that latent gap and the issues uncovered while
fixing it.

### 1. Propagate metadata through one-to-one ops (fixes join OOM)

With a rename `Project` now between the read and the join, the join's
input op was a `Project`, which inherited the base all-`None`
`infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None`
and the `HashShuffleAggregator` silently fell back to a fixed default
(`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator)
instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3
SF100). On an autoscaling cluster this under-provisions the aggregators
and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR:
Worker connection closed unexpectedly`).

`AbstractOneToOne.infer_metadata` now propagates
`num_rows`/`size_bytes`/`input_files` from its single input, gated on
`can_modify_num_rows` (row-preserving ops like `Project` propagate;
`Filter`/`FlatMap` fall back to `None`). `Download` overrides this to
report `size_bytes=None`, since it appends blob columns and the input
size would be a misleading under-estimate.

### 2. Prune unused columns before hash-shuffle aggregations

An `Aggregate` only reads its group keys and each aggregation's target
column, but nothing pruned the rest before the shuffle — so wide unused
columns (e.g. a string column carried through by `with_column`) were
dragged through the aggregator, inflating both its memory reservation
and the bytes shuffled. `ProjectionPushdown` now inserts a pruning
`Project` below an `Aggregate` keeping only the consumed columns (keys +
aggregation targets); the existing fuse/push steps carry the narrowed
set into the read. It only fires when the input schema is known and has
extra columns (keeping the fixed-point optimizer idempotent), and leaves
generic `AggregateFn`s (unknown columns) untouched.

### 3. Clamp the aggregator reservation to the largest node (safety net)

With estimates now firing more often, a low-partition shuffle over a
large dataset (e.g. a global aggregation with `num_partitions=1`) could
reserve ~2× the whole dataset as a single aggregator's `memory` request
— exceeding any node and making the actor unschedulable
(`ActorUnschedulableError`). The per-aggregator reservation is now
clamped to the largest single node's memory: aggregators hold shuffle
data in the (spillable) object store, so reserving more heap `memory`
than a node has only makes the actor unschedulable, not faster. The
clamp only triggers when the estimate exceeds node capacity (well-sized
reservations are unaffected) and logs a warning when it engages.

## Verification

- Join estimate restored through a plan with a `Project` between the
read and the join (was `None` → 1 GiB fallback).
- Q6-shaped global aggregation: read is pruned to the consumed columns;
end-to-end result matches a pandas ground truth.
- Clamp engages only when the estimate exceeds node memory and the run
completes via spilling.
- Correctness validated across combos (filters, group-by,
multi-aggregation, post-shuffle join→aggregate, computed columns) by
comparing pruned vs unpruned pipelines.
- New parameterized tests in `test_projection_fusion.py` (aggregate
input pruning) and existing suites pass (`test_projection_fusion`,
`test_predicate_pushdown`, `test_execution_optimizer_basic`,
`test_infer_schema`, aggregation tests).

## Related issues

Surfaced by ray-project#63384 (column renames moved out of the read stage into a
`Project`).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…ation, node-size clamp, and column pruning (ray-project#63809)

## Description

The hash-shuffle/join/aggregate operators size each aggregator's
`memory` reservation from the estimated input dataset size. That
estimate was both **missing** in some plans (causing under-reservation →
OOM) and **over-counted** in others (causing over-reservation →
unschedulable). This PR makes the estimate available, accurate, and
bounded.

### Triggering change

This was surfaced by **ray-project#63384 ([Data] Remove column renaming from the
read stage)**. Before that PR the read stage performed column renames
inline, so a TPCH plan looked like `ReadParquet → Join` with no
intervening op. ray-project#63384 moved renames out of the read into a separate
`Project` (of `AliasExpr`s) above the read, so the plan became
`ReadParquet → Project → Join`. That exposed a latent gap:
`Project.infer_metadata` returned all-`None`, so once a `Project` sat
between a read and a join/aggregate the size estimate disappeared and
the aggregator fell back to a fixed default reservation — which
under-provisioned the TPCH Q3 autoscaling release test and OOM-killed
it. The changes below fix that latent gap and the issues uncovered while
fixing it.

### 1. Propagate metadata through one-to-one ops (fixes join OOM)

With a rename `Project` now between the read and the join, the join's
input op was a `Project`, which inherited the base all-`None`
`infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None`
and the `HashShuffleAggregator` silently fell back to a fixed default
(`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator)
instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3
SF100). On an autoscaling cluster this under-provisions the aggregators
and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR:
Worker connection closed unexpectedly`).

`AbstractOneToOne.infer_metadata` now propagates
`num_rows`/`size_bytes`/`input_files` from its single input, gated on
`can_modify_num_rows` (row-preserving ops like `Project` propagate;
`Filter`/`FlatMap` fall back to `None`). `Download` overrides this to
report `size_bytes=None`, since it appends blob columns and the input
size would be a misleading under-estimate.

### 2. Prune unused columns before hash-shuffle aggregations

An `Aggregate` only reads its group keys and each aggregation's target
column, but nothing pruned the rest before the shuffle — so wide unused
columns (e.g. a string column carried through by `with_column`) were
dragged through the aggregator, inflating both its memory reservation
and the bytes shuffled. `ProjectionPushdown` now inserts a pruning
`Project` below an `Aggregate` keeping only the consumed columns (keys +
aggregation targets); the existing fuse/push steps carry the narrowed
set into the read. It only fires when the input schema is known and has
extra columns (keeping the fixed-point optimizer idempotent), and leaves
generic `AggregateFn`s (unknown columns) untouched.

### 3. Clamp the aggregator reservation to the largest node (safety net)

With estimates now firing more often, a low-partition shuffle over a
large dataset (e.g. a global aggregation with `num_partitions=1`) could
reserve ~2× the whole dataset as a single aggregator's `memory` request
— exceeding any node and making the actor unschedulable
(`ActorUnschedulableError`). The per-aggregator reservation is now
clamped to the largest single node's memory: aggregators hold shuffle
data in the (spillable) object store, so reserving more heap `memory`
than a node has only makes the actor unschedulable, not faster. The
clamp only triggers when the estimate exceeds node capacity (well-sized
reservations are unaffected) and logs a warning when it engages.

## Verification

- Join estimate restored through a plan with a `Project` between the
read and the join (was `None` → 1 GiB fallback).
- Q6-shaped global aggregation: read is pruned to the consumed columns;
end-to-end result matches a pandas ground truth.
- Clamp engages only when the estimate exceeds node memory and the run
completes via spilling.
- Correctness validated across combos (filters, group-by,
multi-aggregation, post-shuffle join→aggregate, computed columns) by
comparing pruned vs unpruned pipelines.
- New parameterized tests in `test_projection_fusion.py` (aggregate
input pruning) and existing suites pass (`test_projection_fusion`,
`test_predicate_pushdown`, `test_execution_optimizer_basic`,
`test_infer_schema`, aggregation tests).

## Related issues

Surfaced by ray-project#63384 (column renames moved out of the read stage into a
`Project`).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants