[Data] - Iceberg support predicate & projection pushdown#58286
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a converter from Ray Data expressions to PyIceberg expressions, enabling predicate pushdown for Iceberg data sources. The implementation is well-structured and includes comprehensive tests. My feedback focuses on improving code maintainability by reducing duplication, adding missing type hints for better code clarity and safety, and strengthening test assertions to ensure structural equality of the converted expressions.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a converter from Ray Data expressions to PyIceberg expressions, enabling predicate pushdown for Iceberg data sources. The implementation is clean and well-tested. I've added a few suggestions to improve performance and maintainability in the new _IcebergExpressionVisitor by making operation maps class-level constants and dynamically generating error messages.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces predicate and projection pushdown support for the Iceberg datasource, which is a significant enhancement for read performance. The implementation is well-structured, introducing an _IcebergExpressionVisitor for converting Ray Data expressions and updating IcebergDatasource to support the necessary pushdown interfaces. The accompanying tests are exceptionally thorough, covering a wide range of scenarios and combinations of filters, projections, and column renames. I've identified a minor potential bug in the projection logic and have a couple of small suggestions for code simplification.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
Signed-off-by: Goutam <goutam@anyscale.com>
| Returns: | ||
| List of column names to project, or None if all columns are selected. | ||
| """ | ||
| return self._data_columns |
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces significant improvements by adding predicate and projection pushdown support for Iceberg datasources. The implementation is well-structured, leveraging a new _DatasourceProjectionPushdownMixin to provide a generic framework for projection pushdown, which is also adopted by the Parquet datasource for better code consistency and reuse. A new _IcebergExpressionVisitor is introduced to translate Ray Data expressions into PyIceberg expressions, enabling efficient filtering at the source. The test coverage for these new features is comprehensive and robust.
I've found two issues: a critical bug in ParquetDatasource where a super().__init__() call is missing, which would lead to an AttributeError during predicate pushdown, and a high-severity logic error in the projection combination logic that could cause all columns to be read instead of none. Addressing these will make the implementation solid.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces significant improvements by adding predicate and projection pushdown support for Iceberg datasources. The refactoring to create a _DatasourceProjectionPushdownMixin is a great step towards centralizing projection logic and promoting code reuse, as demonstrated by its adoption in the Parquet datasource as well. The implementation of an _IcebergExpressionVisitor for translating Ray Data expressions is a solid approach for enabling predicate pushdown. Furthermore, the API enhancements in read_iceberg, deprecating older parameters in favor of a more fluent API, improve consistency across the library. The test coverage for these new features is comprehensive and well-executed. I've included a couple of minor suggestions to improve documentation clarity and type hinting.
| def supports_predicate_pushdown(self) -> bool: | ||
| return True | ||
|
|
There was a problem hiding this comment.
Yes. In the prior implementation, csv datasource was incorrectly applying predicate and projection pushdown even though it makes no sense since csv has no accompanying metadata.
| # Store as projection_map (identity mapping if columns specified, None otherwise) | ||
| # Note: Empty list [] means no columns, None means all columns | ||
| if data_columns is None: | ||
| self._projection_map = None | ||
| else: | ||
| self._projection_map = {col: col for col in data_columns} |
There was a problem hiding this comment.
Also out-of-scope for this PR, but I think it's implicit that you need to set this specific _projection_map attribute that's defined in an ancestor class, and it's not part of the _DatasourceProjectionPushdownMixin interface.
There was a problem hiding this comment.
Actually this will be deprecated in a few releases, cause it's an anti-pattern to pass in the columns as part of the reads. But I see what you mean.
| current_project.exprs | ||
| ) |
There was a problem hiding this comment.
Is there ever a case where a logical operator subclasses LogicalOperatorSupportsProjectionPushdown but supports_projection_pushdown() is false?
There was a problem hiding this comment.
Yes. The Read operator supports pushdown but not all readers support projection pushdown (CSV is an example.)
Signed-off-by: Goutam <goutam@anyscale.com>
…#58286) ## Description Predicate pushdown (ray-project#58150) in conjunction with this PR should speed up reads from Iceberg. Once the above change lands, we can add the pushdown interface support for IcebergDatasource --------- Signed-off-by: Goutam <goutam@anyscale.com>
…#58286) ## Description Predicate pushdown (ray-project#58150) in conjunction with this PR should speed up reads from Iceberg. Once the above change lands, we can add the pushdown interface support for IcebergDatasource --------- Signed-off-by: Goutam <goutam@anyscale.com>
…#58286) ## Description Predicate pushdown (ray-project#58150) in conjunction with this PR should speed up reads from Iceberg. Once the above change lands, we can add the pushdown interface support for IcebergDatasource --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
| # Initialize parent class to set up predicate pushdown mixin | ||
| super().__init__() | ||
|
|
There was a problem hiding this comment.
Just make init an abstract method in the mixin
| return input_op.apply_projection( | ||
| required_columns, output_column_rename_map | ||
| ) | ||
| # Determine columns to project |
There was a problem hiding this comment.
@goutamvenkat-anyscale please explain changes in this rule
Description
Predicate pushdown (#58150) in conjunction with this PR should speed up reads from Iceberg.
Once the above change lands, we can add the pushdown interface support for IcebergDatasource