Revert "[Data] - Iceberg support upsert tables + schema update + overwrite tables"#59185
Conversation
…write ta…" This reverts commit 2f55d07.
There was a problem hiding this comment.
Code Review
This pull request reverts the recent changes that added upsert, schema update, and overwrite support for Iceberg tables. The revert appears to be clean and complete, correctly rolling back the code changes, tests, and dependency versions.
I've identified a couple of minor issues in the reverted code that could be addressed to improve code quality. One is a potential side effect due to modifying a user-provided dictionary in-place, and the other is a suggestion to use keyword arguments for better readability and robustness. Please see my detailed comments.
| f"overwrite_kwargs can only be specified when mode is SaveMode.OVERWRITE, " | ||
| f"but mode is {self._mode}" | ||
| ) | ||
| self._catalog_kwargs = catalog_kwargs if catalog_kwargs is not None else {} |
There was a problem hiding this comment.
The catalog_kwargs dictionary is assigned directly and later modified in-place with .pop() on line 60. This can lead to unexpected side effects for the caller if they reuse the dictionary. It's safer to work with a copy of the dictionary to avoid modifying the original object passed by the user.
| self._catalog_kwargs = catalog_kwargs if catalog_kwargs is not None else {} | |
| self._catalog_kwargs = (catalog_kwargs or {}).copy() |
| datasink = IcebergDatasink( | ||
| table_identifier=table_identifier, | ||
| catalog_kwargs=catalog_kwargs, | ||
| snapshot_properties=snapshot_properties, | ||
| mode=mode, | ||
| overwrite_filter=overwrite_filter, | ||
| upsert_kwargs=upsert_kwargs, | ||
| overwrite_kwargs=overwrite_kwargs, | ||
| table_identifier, catalog_kwargs, snapshot_properties | ||
| ) |
There was a problem hiding this comment.
For better readability and robustness against future changes in the IcebergDatasink constructor's signature, it's recommended to use keyword arguments when instantiating the IcebergDatasink.
datasink = IcebergDatasink(
table_identifier=table_identifier,
catalog_kwargs=catalog_kwargs,
snapshot_properties=snapshot_properties,
)| self._catalog_kwargs = catalog_kwargs if catalog_kwargs is not None else {} | ||
| self._snapshot_properties = ( | ||
| snapshot_properties if snapshot_properties is not None else {} | ||
| ) |
There was a problem hiding this comment.
Bug: Missing dictionary copy causes caller's dict mutation
The reverted code removes the .copy() call when assigning catalog_kwargs. The pre-revert code used (catalog_kwargs or {}).copy() but the reverted code directly assigns the reference with catalog_kwargs if catalog_kwargs is not None else {}. Since line 60 calls self._catalog_kwargs.pop("name"), this mutates the caller's original dictionary, causing unexpected side effects where the user's catalog_kwargs dictionary loses its name key after calling write_iceberg().
| from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE | ||
| from pyiceberg.utils.config import Config | ||
|
|
||
| data_files_list: WriteResult[List["DataFile"]] = [] |
There was a problem hiding this comment.
Bug: Incorrect return type annotation in write method
The write method's return type annotation -> WriteResult[List["DataFile"]] is incorrect. According to the Datasink base class, the write method should return WriteReturnType (which is List["DataFile"] for this class), not WriteResult[WriteReturnType]. The WriteResult wrapper is created by the framework and passed to on_write_complete. Additionally, line 130 declares data_files_list with type WriteResult[List["DataFile"]] but assigns it an empty list []. The runtime behavior is correct (returning a list), but the type annotations are misleading.
Reverts #58270