[Data] - Iceberg support upsert tables + schema update + overwrite tables#58270
Conversation
…bles Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces significant enhancements to the Iceberg datasink, adding support for UPSERT and OVERWRITE save modes, along with automatic schema evolution. The implementation is well-structured, cleanly separating the logic for different write modes and leveraging PyIceberg's high-level APIs where appropriate. The accompanying documentation updates and comprehensive test suite are excellent, clearly explaining the new features and ensuring their correctness. Overall, this is a high-quality contribution that greatly improves the functionality of the Iceberg integration. I've left a couple of minor suggestions for code refinement.
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request adds significant new functionality to the Iceberg datasink, including support for UPSERT and OVERWRITE modes, as well as automatic schema evolution. The code is well-structured, with good separation of concerns into different helper methods. The addition of to_iceberg() on expressions to support filter pushdown for overwrites is a great feature. The tests are comprehensive and cover the new modes and schema evolution scenarios well.
I have a few comments, including a high-severity typing issue in IcebergDatasink that violates the Datasink generic contract, a point about reliance on a private pyiceberg API, a minor clarification needed in the write_iceberg docstring, and a misleading test name that should be corrected for clarity. Overall, this is a great contribution that significantly enhances Ray Data's Iceberg integration.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces significant enhancements to the Iceberg datasink, adding support for UPSERT and OVERWRITE save modes, as well as automatic schema evolution. The implementation is well-structured, separating logic for different modes into distinct methods, which improves readability and maintainability. The accompanying tests are comprehensive and cover the new functionality well.
My review includes a few suggestions to improve documentation accuracy and code robustness by avoiding the use of private members from the pyiceberg library. These changes will make the code more maintainable and easier for users to understand.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
| ERROR: Raise an error if data already exists. | ||
| UPSERT: Update existing rows that match on key fields, or insert new rows. | ||
| Requires identifier/key fields to be specified. | ||
| """ |
There was a problem hiding this comment.
nit, use triple-quote-strings for each item.
This would work better with IDEs - you can hover an item to show its doc.
| self._mode = mode | ||
| self._overwrite_filter = overwrite_filter | ||
| self._upsert_kwargs = (upsert_kwargs or {}).copy() | ||
| self._overwrite_kwargs = (overwrite_kwargs or {}).copy() |
There was a problem hiding this comment.
nit, check the kwargs are only set when mode is relevant.
Signed-off-by: Goutam <goutam@anyscale.com>
…bles (ray-project#58270) ## Description - Support upserting iceberg tables for IcebergDatasink - Update schema on APPEND and UPSERT - Enable overwriting the entire table Upgrades to pyicberg 0.10.0 because it now supports upsert and overwrite functionality. Also for append, the library now handles the transaction logic implicitly so that burden can be lifted from Ray Data. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>
…bles (ray-project#58270) ## Description - Support upserting iceberg tables for IcebergDatasink - Update schema on APPEND and UPSERT - Enable overwriting the entire table Upgrades to pyicberg 0.10.0 because it now supports upsert and overwrite functionality. Also for append, the library now handles the transaction logic implicitly so that burden can be lifted from Ray Data. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>
…bles (ray-project#58270) ## Description - Support upserting iceberg tables for IcebergDatasink - Update schema on APPEND and UPSERT - Enable overwriting the entire table Upgrades to pyicberg 0.10.0 because it now supports upsert and overwrite functionality. Also for append, the library now handles the transaction logic implicitly so that burden can be lifted from Ray Data. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
Description
Upgrades to pyicberg 0.10.0 because it now supports upsert and overwrite functionality. Also for append, the library now handles the transaction logic implicitly so that burden can be lifted from Ray Data.
Related issues
Additional information