[Data] - [2/n] - Iceberg Upsert + Overwrite support#59335
Merged
alexeykudinkin merged 22 commits intoray-project:masterfrom Dec 10, 2025
Merged
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
alexeykudinkin
approved these changes
Dec 10, 2025
| self, | ||
| txn: "Table.transaction", | ||
| data_files: List["DataFile"], | ||
| upsert_keys_dicts: List[Dict[str, List[Any]]], |
Contributor
There was a problem hiding this comment.
(Follow-up) We'd store values as PA array to avoid any deserialization
Comment on lines
+240
to
+262
| def _preserve_identifier_field_requirements( | ||
| self, update: "UpdateSchema", table_schema: "Schema" | ||
| ) -> None: | ||
| """Ensure identifier fields remain required after schema union. | ||
|
|
||
| When union_by_name is called with a schema that has nullable fields, | ||
| PyIceberg may make identifier fields optional. Since identifier fields | ||
| must be required, this helper ensures they remain required after union. | ||
|
|
||
| Example: | ||
| Table schema: id: int (required, identifier), val: string | ||
| Input schema: id: int (optional), val: string | ||
|
|
||
| `union_by_name` merges them to: | ||
| id: int (optional), val: string | ||
|
|
||
| This violates the identifier constraint. This function forces `id` | ||
| back to required in the pending update. | ||
|
|
||
| Args: | ||
| update: The UpdateSchema object from update_schema() context manager | ||
| table_schema: The current table schema to get identifier field IDs from | ||
| """ |
Contributor
There was a problem hiding this comment.
Why would this ever be the case?
Comment on lines
+398
to
423
| if self._overwrite_filter is not None: | ||
| from ray.data._internal.datasource.iceberg_datasource import ( | ||
| _IcebergExpressionVisitor, | ||
| ) | ||
|
|
||
| visitor = _IcebergExpressionVisitor() | ||
| pyi_filter = visitor.visit(self._overwrite_filter) | ||
|
|
||
| txn.delete( | ||
| delete_filter=pyi_filter, | ||
| snapshot_properties=self._snapshot_properties, | ||
| **self._overwrite_kwargs, | ||
| ) | ||
| else: | ||
| # Full overwrite - delete all | ||
| from pyiceberg.expressions import AlwaysTrue | ||
|
|
||
| txn.delete( | ||
| delete_filter=AlwaysTrue(), | ||
| snapshot_properties=self._snapshot_properties, | ||
| **self._overwrite_kwargs, | ||
| ) | ||
|
|
||
| # Append new data files and commit | ||
| self._append_and_commit(txn, data_files) | ||
|
|
Contributor
There was a problem hiding this comment.
Extract common, remove duplication
|
|
||
| # Verify initial write | ||
| result = _read_from_iceberg(sort_by="col_a") | ||
| assert len(result) == 3 |
Contributor
There was a problem hiding this comment.
Let's assert whole table
peterxcli
pushed a commit
to peterxcli/ray
that referenced
this pull request
Feb 25, 2026
## Description Iceberg Upsert + Overwrite support ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Iceberg Upsert + Overwrite support
Related issues
Additional information