Skip to content

[Data] - [2/n] - Iceberg Upsert + Overwrite support#59335

Merged
alexeykudinkin merged 22 commits intoray-project:masterfrom
goutamvenkat-anyscale:goutam/iceberg_overwrite_upsert_v2
Dec 10, 2025
Merged

[Data] - [2/n] - Iceberg Upsert + Overwrite support#59335
alexeykudinkin merged 22 commits intoray-project:masterfrom
goutamvenkat-anyscale:goutam/iceberg_overwrite_upsert_v2

Conversation

@goutamvenkat-anyscale
Copy link
Contributor

Description

Iceberg Upsert + Overwrite support

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@gemini-code-assist
Copy link
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@goutamvenkat-anyscale goutamvenkat-anyscale marked this pull request as ready for review December 10, 2025 04:41
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner December 10, 2025 04:41
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Dec 10, 2025
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
self,
txn: "Table.transaction",
data_files: List["DataFile"],
upsert_keys_dicts: List[Dict[str, List[Any]]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Follow-up) We'd store values as PA array to avoid any deserialization

Comment on lines +240 to +262
def _preserve_identifier_field_requirements(
self, update: "UpdateSchema", table_schema: "Schema"
) -> None:
"""Ensure identifier fields remain required after schema union.

When union_by_name is called with a schema that has nullable fields,
PyIceberg may make identifier fields optional. Since identifier fields
must be required, this helper ensures they remain required after union.

Example:
Table schema: id: int (required, identifier), val: string
Input schema: id: int (optional), val: string

`union_by_name` merges them to:
id: int (optional), val: string

This violates the identifier constraint. This function forces `id`
back to required in the pending update.

Args:
update: The UpdateSchema object from update_schema() context manager
table_schema: The current table schema to get identifier field IDs from
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this ever be the case?

Comment on lines +398 to 423
if self._overwrite_filter is not None:
from ray.data._internal.datasource.iceberg_datasource import (
_IcebergExpressionVisitor,
)

visitor = _IcebergExpressionVisitor()
pyi_filter = visitor.visit(self._overwrite_filter)

txn.delete(
delete_filter=pyi_filter,
snapshot_properties=self._snapshot_properties,
**self._overwrite_kwargs,
)
else:
# Full overwrite - delete all
from pyiceberg.expressions import AlwaysTrue

txn.delete(
delete_filter=AlwaysTrue(),
snapshot_properties=self._snapshot_properties,
**self._overwrite_kwargs,
)

# Append new data files and commit
self._append_and_commit(txn, data_files)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract common, remove duplication


# Verify initial write
result = _read_from_iceberg(sort_by="col_a")
assert len(result) == 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assert whole table

@alexeykudinkin alexeykudinkin merged commit 298002b into ray-project:master Dec 10, 2025
6 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the goutam/iceberg_overwrite_upsert_v2 branch December 16, 2025 21:46
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Iceberg Upsert + Overwrite support

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants