Skip to content

[Data] - Iceberg support predicate & projection pushdown#58286

Merged
bveeramani merged 28 commits into
ray-project:masterfrom
goutamvenkat-anyscale:goutam/iceberg_expr
Nov 11, 2025
Merged

[Data] - Iceberg support predicate & projection pushdown#58286
bveeramani merged 28 commits into
ray-project:masterfrom
goutamvenkat-anyscale:goutam/iceberg_expr

Conversation

@goutamvenkat-anyscale

@goutamvenkat-anyscale goutamvenkat-anyscale commented Oct 29, 2025

Copy link
Copy Markdown
Contributor

Description

Predicate pushdown (#58150) in conjunction with this PR should speed up reads from Iceberg.

Once the above change lands, we can add the pushdown interface support for IcebergDatasource

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner October 29, 2025 20:21
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Oct 29, 2025

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a converter from Ray Data expressions to PyIceberg expressions, enabling predicate pushdown for Iceberg data sources. The implementation is well-structured and includes comprehensive tests. My feedback focuses on improving code maintainability by reducing duplication, adding missing type hints for better code clarity and safety, and strengthening test assertions to ensure structural equality of the converted expressions.

Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Comment thread python/ray/data/tests/test_expressions.py Outdated
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a converter from Ray Data expressions to PyIceberg expressions, enabling predicate pushdown for Iceberg data sources. The implementation is clean and well-tested. I've added a few suggestions to improve performance and maintainability in the new _IcebergExpressionVisitor by making operation maps class-level constants and dynamically generating error messages.

Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale changed the title [Data] - Ray Data Expr to Iceberg Expr converter Oct 31, 2025
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces predicate and projection pushdown support for the Iceberg datasource, which is a significant enhancement for read performance. The implementation is well-structured, introducing an _IcebergExpressionVisitor for converting Ray Data expressions and updating IcebergDatasource to support the necessary pushdown interfaces. The accompanying tests are exceptionally thorough, covering a wide range of scenarios and combinations of filters, projections, and column renames. I've identified a minor potential bug in the projection logic and have a couple of small suggestions for code simplification.

Comment thread python/ray/data/_internal/datasource/iceberg_datasource.py Outdated
Comment thread python/ray/data/expressions.py Outdated
Comment thread python/ray/data/_internal/datasource/iceberg_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/iceberg_datasource.py Outdated
Signed-off-by: Goutam <goutam@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor Author

/gemini review

cursor[bot]

This comment was marked as outdated.

Signed-off-by: Goutam <goutam@anyscale.com>
Comment thread python/ray/data/_internal/datasource/csv_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/iceberg_datasource.py Outdated
Returns:
List of column names to project, or None if all columns are selected.
"""
return self._data_columns

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment thread python/ray/data/datasource/datasource.py Outdated
Comment thread python/ray/data/datasource/datasource.py Outdated
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant improvements by adding predicate and projection pushdown support for Iceberg datasources. The implementation is well-structured, leveraging a new _DatasourceProjectionPushdownMixin to provide a generic framework for projection pushdown, which is also adopted by the Parquet datasource for better code consistency and reuse. A new _IcebergExpressionVisitor is introduced to translate Ray Data expressions into PyIceberg expressions, enabling efficient filtering at the source. The test coverage for these new features is comprehensive and robust.

I've found two issues: a critical bug in ParquetDatasource where a super().__init__() call is missing, which would lead to an AttributeError during predicate pushdown, and a high-severity logic error in the projection combination logic that could cause all columns to be read instead of none. Addressing these will make the implementation solid.

Comment thread python/ray/data/_internal/datasource/parquet_datasource.py
Comment thread python/ray/data/datasource/datasource.py Outdated
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant improvements by adding predicate and projection pushdown support for Iceberg datasources. The refactoring to create a _DatasourceProjectionPushdownMixin is a great step towards centralizing projection logic and promoting code reuse, as demonstrated by its adoption in the Parquet datasource as well. The implementation of an _IcebergExpressionVisitor for translating Ray Data expressions is a solid approach for enabling predicate pushdown. Furthermore, the API enhancements in read_iceberg, deprecating older parameters in favor of a more fluent API, improve consistency across the library. The test coverage for these new features is comprehensive and well-executed. I've included a couple of minor suggestions to improve documentation clarity and type hinting.

Comment thread python/ray/data/datasource/datasource.py
Comment thread python/ray/data/_internal/planner/plan_expression/expression_visitors.py Outdated
Signed-off-by: Goutam <goutam@anyscale.com>
Comment on lines -40 to -42
def supports_predicate_pushdown(self) -> bool:
return True

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by?

@goutamvenkat-anyscale goutamvenkat-anyscale Nov 8, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In the prior implementation, csv datasource was incorrectly applying predicate and projection pushdown even though it makes no sense since csv has no accompanying metadata.

Comment thread python/ray/data/datasource/datasource.py Outdated
Comment thread python/ray/data/datasource/datasource.py Outdated
Comment thread python/ray/data/read_api.py Outdated
Comment thread python/ray/data/datasource/datasource.py Outdated
Comment on lines +275 to +280
# Store as projection_map (identity mapping if columns specified, None otherwise)
# Note: Empty list [] means no columns, None means all columns
if data_columns is None:
self._projection_map = None
else:
self._projection_map = {col: col for col in data_columns}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also out-of-scope for this PR, but I think it's implicit that you need to set this specific _projection_map attribute that's defined in an ancestor class, and it's not part of the _DatasourceProjectionPushdownMixin interface.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this will be deprecated in a few releases, cause it's an anti-pattern to pass in the columns as part of the reads. But I see what you mean.

Comment on lines 324 to 325
current_project.exprs
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a case where a logical operator subclasses LogicalOperatorSupportsProjectionPushdown but supports_projection_pushdown() is false?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The Read operator supports pushdown but not all readers support projection pushdown (CSV is an example.)

Comment thread python/ray/data/tests/test_iceberg.py Outdated
Comment thread python/ray/data/tests/test_iceberg.py Outdated
Comment thread python/ray/data/tests/test_iceberg.py Outdated
@bveeramani bveeramani merged commit 10983e8 into ray-project:master Nov 11, 2025
6 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the goutam/iceberg_expr branch November 11, 2025 01:38
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…#58286)

## Description
Predicate pushdown (ray-project#58150) in
conjunction with this PR should speed up reads from Iceberg.


Once the above change lands, we can add the pushdown interface support
for IcebergDatasource

---------

Signed-off-by: Goutam <goutam@anyscale.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…#58286)

## Description
Predicate pushdown (ray-project#58150) in
conjunction with this PR should speed up reads from Iceberg.


Once the above change lands, we can add the pushdown interface support
for IcebergDatasource

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…#58286)

## Description
Predicate pushdown (ray-project#58150) in
conjunction with this PR should speed up reads from Iceberg.

Once the above change lands, we can add the pushdown interface support
for IcebergDatasource

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Comment on lines +127 to +129
# Initialize parent class to set up predicate pushdown mixin
super().__init__()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make init an abstract method in the mixin

return input_op.apply_projection(
required_columns, output_column_rename_map
)
# Determine columns to project

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goutamvenkat-anyscale please explain changes in this rule

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants