Skip to content

[Data] - Iceberg write datafiles in write() then commit#58601

Merged
raulchen merged 19 commits intoray-project:masterfrom
goutamvenkat-anyscale:goutam/iceberg_streaming_write
Nov 15, 2025
Merged

[Data] - Iceberg write datafiles in write() then commit#58601
raulchen merged 19 commits intoray-project:masterfrom
goutamvenkat-anyscale:goutam/iceberg_streaming_write

Conversation

@goutamvenkat-anyscale
Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale commented Nov 13, 2025

Description

Now on write() the raw data is written to the underlying parquet files and the metadata is returned, namely DataFiles.

On on_write_complete() we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner November 13, 2025 19:06
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Nov 13, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Iceberg datasink to support streaming writes by writing data files in parallel worker tasks and committing the transaction on the driver. This is a solid architectural improvement for performance and scalability. My review identifies a critical race condition in the new schema evolution logic, which could lead to data corruption. I've also included suggestions to improve performance by optimizing schema gathering, enhance code clarity by removing a magic number, and refactor a redundant operation.

# - Partitioned vs unpartitioned tables
# - Proper DataFile formatting
# Use task_idx as counter base to ensure unique file names across tasks
counter = itertools.count(ctx.task_idx * 10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The magic number 10000 is used as a multiplier to create a unique range for the file counter in each task. This assumes a single task will not write more than 10,000 files. While this is a high number, it would be safer and more readable to add a comment explaining this assumption and its purpose.

        # A large enough number to ensure unique file names across tasks.
        # Assumes a single task will not write more than this many files.
        counter = itertools.count(ctx.task_idx * 10000)

)

# Always reload the table to get the latest schema before any post-write operations
self._reload_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

_reload_table() is called here at the beginning of on_write_complete, and then it's called again within _update_schema() (on line 142), which is invoked later in this method if data_files is not empty. This initial call is redundant. The table state only needs to be refreshed after the schema is evolved to ensure subsequent commit operations have the correct table version.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini

@gemini-code-assist
Copy link
Contributor

Hello @goutamvenkat-anyscale! It looks like you've invoked me. Please specify a command like /gemini review to perform a code review or /gemini summary to get a pull request summary. You can find more available commands and their usage by typing /gemini help.

@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request significantly refactors the Iceberg datasink to improve performance and robustness for distributed writes. It introduces a clear separation between the writing of data files by individual tasks and the final transaction commit, which now occurs centrally after all tasks complete. This change enables optimized APPEND and OVERWRITE operations by directly manipulating DataFile objects and leveraging PyIceberg's transaction API, thereby avoiding unnecessary data re-reads. The implementation also enhances schema evolution to handle concurrent updates gracefully across distributed workers and ensures a unified schema is applied during the final commit.

Highlights

  • Decoupled Data Writing and Commit: The write() method now focuses on writing raw data to Parquet files and returning DataFile metadata, while the transaction commit is centralized in on_write_complete().
  • Optimized Overwrite and Append Operations: OVERWRITE mode now uses PyIceberg's transaction API with delete_by_predicate and fast_append for metadata-only operations, avoiding data re-reading. APPEND mode also leverages fast_append for efficiency.
  • Robust Distributed Schema Evolution: Schema updates are handled robustly across distributed workers with optimistic concurrency (retry on CommitFailedException) and then unified and finalized on the driver in on_write_complete().
  • Streaming Write Semantics: The datasink now fully supports streaming writes, with incremental file writing during write() and a single snapshot commit in on_write_complete().
Changelog
  • python/ray/data/_internal/datasource/iceberg_datasink.py
    • Refactored IcebergDatasink to return DataFile objects and schema from the write() method, enabling streaming semantics.
    • Implemented _commit_transaction_with_append for more granular control over Iceberg transactions, supporting delete-by-predicate and fast appends.
    • Optimized OVERWRITE mode to use delete_by_predicate and fast_append for metadata-only operations, avoiding data re-reading.
    • Updated APPEND mode to use fast_append for improved efficiency.
    • Introduced _read_data_files_as_table to explicitly handle data re-reading required for UPSERT operations, with clear documentation on its necessity.
    • Added logic to handle concurrent schema evolution attempts in _update_schema using try-except-finally with CommitFailedException.
    • Introduced _write_uuid for unique file naming and _FILE_NAME_COUNTER_BASE for task-specific file counters.
    • Moved schema unification and final schema evolution to on_write_complete() to ensure a consistent schema across all written data.
Activity
  • The author, goutamvenkat-anyscale, initiated a /gemini command.
  • The bot, gemini-code-assist[bot], responded by asking for a specific command.
  • The author then requested a /gemini summary.
  • Immediately after, the author requested a /gemini review.
  • The bot, gemini-code-assist[bot], provided critical feedback regarding a race condition in _update_schema() being called within the write method by parallel workers.
  • The bot also provided high-priority feedback, suggesting that write() should return DataFile objects and schema to avoid inefficient re-reading of metadata for schema evolution in on_write_complete().
  • Medium-priority feedback was given by the bot regarding the magic number 10000 used for file naming, suggesting a clarifying comment.
  • Another medium-priority feedback from the bot pointed out a redundant _reload_table() call in on_write_complete().
  • The cursor[bot] also reported a bug, reiterating the critical issue of centralizing schema evolution for distributed writes, specifically highlighting the _update_schema() call in the write() method.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed refactoring of the IcebergDatasink. The core change is the move to a two-phase write process, where write() tasks now create data files and return DataFile metadata, and on_write_complete() commits these files in a single transaction. This aligns with streaming best practices and provides substantial performance improvements for APPEND and OVERWRITE modes by avoiding unnecessary data reloading. The implementation is robust, with thoughtful handling of concurrent schema updates and clear, detailed documentation explaining the design choices, particularly the necessary data reload for UPSERT mode. The code is cleaner and more modular. This is an excellent contribution that enhances the performance and reliability of the Iceberg connector.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-designed refactoring of the IcebergDatasink. By moving to a two-phase write approach where workers write data files concurrently and the driver performs a single atomic commit, it addresses major scalability concerns. The implementation correctly handles concurrent schema updates and offloads memory-intensive UPSERT logic to a separate Ray task, which are excellent improvements. My review includes a few suggestions to enhance maintainability by addressing the use of private APIs from the pyiceberg library and to restore a helpful validation check that was present in the previous implementation.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
)
else:
logger.debug(
"Schema update conflict - another worker modified schema, reloading"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to ignore this error?
why would multiple workers update the same table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we have multiple write tasks writing to the table.
but the schemas should be the same for all of them, right?
It'd be nice to validate that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check in the write task

)


@ray.remote(num_cpus=1, memory=DEFAULT_ICEBERG_UPSERT_COMMIT_TASK_MEMORY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what needs to be loaded back in this task?
The entire table? is the default 4GB memory good for most cases?
If we expect users to specify the memory, we should make it easy to configure. e.g., expose it in write_iceberg.

Copy link
Contributor Author

@goutamvenkat-anyscale goutamvenkat-anyscale Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified the upsert to follow copy on write semantics. Now for upsert it will commit the parquet files + accumulate the join keys & associated data on the worker.

Good idea to expose memory args on write_iceberg

Signed-off-by: Goutam <goutam@anyscale.com>
"Schema update conflict - another worker modified schema, reloading"
)
finally:
self._reload_table()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Inefficient Retries Mask Critical Update Failures

The schema update retry logic doesn't break after success and always executes all 3 attempts. When update_schema() succeeds without raising CommitFailedException, the loop continues instead of breaking. Additionally, the exception is silently swallowed after max retries instead of being re-raised, causing failures to go unnoticed. The finally block also unnecessarily reloads the table even on successful updates.

Fix in Cursor Fix in Web

@ray-project ray-project deleted a comment from gemini-code-assist bot Nov 14, 2025
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request significantly refactors the Iceberg datasink to optimize distributed write operations by separating the writing of data files from the transaction commit. This change allows workers to write data in parallel, with the driver coordinating a single, atomic metadata commit. The new implementation enhances the robustness and efficiency of UPSERT and OVERWRITE modes through a copy-on-write strategy and PyIceberg's transaction API, while also improving schema evolution to handle concurrent updates gracefully.

Highlights

  • Two-Phase Commit Strategy: The Iceberg datasink now employs a two-phase commit strategy where individual workers write raw Parquet data files and return their metadata (DataFile objects), while the driver collects this metadata and performs a single, atomic transaction commit in on_write_complete().
  • Enhanced UPSERT Implementation: The UPSERT mode has been refactored to use a copy-on-write strategy. This involves deleting existing rows that match join keys using txn.delete() and then appending all new data files (including updates and inserts) within a single transaction, improving parallelism and efficiency.
  • Improved OVERWRITE Operations: The OVERWRITE mode now leverages PyIceberg's transaction API to first delete data based on a specified filter (or all data for full overwrite) using txn.delete() and then append the new data files, ensuring atomic replacement.
  • Robust Distributed Schema Evolution: Schema updates are now more resilient in distributed environments. The _update_schema method includes retry logic (up to 3 attempts) to handle CommitFailedException that can occur during concurrent schema modifications by multiple workers.
  • Comprehensive Testing for Write Modes: New test cases have been added to validate the correctness of UPSERT and OVERWRITE operations, covering scenarios such as full table overwrite with missing columns, comparison with native PyIceberg upsert, schema evolution during upsert, and handling of NULL values in join keys.
Changelog
  • python/ray/data/_internal/datasource/iceberg_datasink.py
    • Refactored IcebergDatasink to return DataFile objects (and join keys for UPSERT) from write(), separating data writing from metadata commit.
    • Implemented a two-phase commit strategy: write() writes raw Parquet files, on_write_complete() collects DataFile metadata and performs a single atomic transaction.
    • Enhanced _update_schema with retry logic (up to 3 attempts) to handle concurrent schema evolution conflicts using CommitFailedException.
    • Introduced _reload_table helper for refreshing table metadata.
    • Modified on_write_start to initialize a unique _write_uuid for file naming.
    • Implemented _commit_append, _commit_overwrite, and _commit_upsert methods to manage transaction commits based on the save mode.
    • _commit_overwrite now uses txn.delete with Ray Data expressions for filtering before appending new data files.
    • _commit_upsert implements a copy-on-write strategy: it deletes existing rows based on join keys and then appends new data, handling NULL values in join keys.
    • Added validation in __init__ to warn users about incorrect overwrite_filter and delete_filter parameters in overwrite_kwargs.
  • python/ray/data/dataset.py
    • Updated write_iceberg docstring to clarify that Ray Data's UPSERT mode uses a copy-on-write strategy.
  • python/ray/data/tests/test_iceberg.py
    • Modified _create_typed_dataframe to use nullable Int32 for pandas columns to correctly handle NaN values.
    • Added test_overwrite_full_table_missing_columns to verify overwrite behavior with missing columns.
    • Added test_upsert_matches_pyiceberg_native to ensure Ray Data's upsert aligns with PyIceberg's native upsert.
    • Added test_upsert_schema_evolution to test schema changes during upsert.
    • Added test_upsert_null_in_join_keys to validate upsert behavior with NULLs in join keys.
    • Added test_upsert_empty_table to test upserting into an empty table.
Activity
  • The author, goutamvenkat-anyscale, initiated the summary and review process using /gemini commands.
  • Initial bot feedback highlighted a critical race condition in _update_schema when called by parallel workers and suggested that write() should return DataFile objects and schema for efficiency.
  • Further bot comments pointed out a misleading docstring for write() in UPSERT mode, redundant table reloads, and the use of a magic number for file naming.
  • Cursor bot identified a bug where TYPE_CHECKING blocks runtime imports and reiterated the need to centralize schema evolution.
  • Reviewers (Richardliaw and Cursor bot) emphasized the importance of avoiding unnecessary logging in production.
  • A high-priority bug regarding txn.upsert() being called on a Transaction object was reported, which was subsequently addressed by implementing a copy-on-write strategy using txn.delete and append.
  • Srinathk10 suggested annotating memory arguments for tasks, leading to an increase to 4GB, and also proposed retrying schema updates, which was implemented with 3 retries.
  • Raulchen raised questions about the safety of ignoring CommitFailedException and the implications of concurrent schema updates, and suggested making memory configuration easier for users.
  • Cursor bot identified that _update_schema silently swallows CommitFailedException after retries and that UPSERT validation for join_cols occurs too late, potentially leading to orphaned files.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Iceberg datasink to support concurrent writes from multiple workers, which is a significant performance and scalability improvement. The approach of having workers write data files and the driver perform a single commit is sound. The implementation of copy-on-write for upserts and the handling of concurrent schema updates are well thought out. I've identified a critical issue in the schema update retry logic that could lead to incorrect behavior and have also provided a suggestion to improve code conciseness. Overall, this is a great enhancement.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
if extract_join_keys:
join_cols = self._get_join_cols()
for col in join_cols:
join_keys_dict[col].extend(pa_table[col].to_pylist())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: UPSERT: Missing Data Validation

In UPSERT mode, the code extracts join key values by directly accessing pa_table[col] without validating that the join columns exist in the incoming data. If the incoming data is missing any join column, this raises a KeyError. The validation in on_write_start only checks if join_cols are specified but doesn't ensure they're present in the actual data being written.

Fix in Cursor Fix in Web

@raulchen raulchen enabled auto-merge (squash) November 15, 2025 01:15
@raulchen raulchen merged commit aecff3c into ray-project:master Nov 15, 2025
7 checks passed
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Nov 16, 2025
…58601)

## Description
Now on `write()` the raw data is written to the underlying parquet files
and the metadata is returned, namely `DataFiles`.

On `on_write_complete()` we commit the transaction. For upsert, the data
has to be read back in memory and we do that in a separate Ray Task.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the goutam/iceberg_streaming_write branch November 17, 2025 18:53
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…58601)

## Description
Now on `write()` the raw data is written to the underlying parquet files
and the metadata is returned, namely `DataFiles`.

On `on_write_complete()` we commit the transaction. For upsert, the data
has to be read back in memory and we do that in a separate Ray Task.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…58601)

## Description
Now on `write()` the raw data is written to the underlying parquet files
and the metadata is returned, namely `DataFiles`.

On `on_write_complete()` we commit the transaction. For upsert, the data
has to be read back in memory and we do that in a separate Ray Task.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…58601)

## Description
Now on `write()` the raw data is written to the underlying parquet files
and the metadata is returned, namely `DataFiles`.

On `on_write_complete()` we commit the transaction. For upsert, the data
has to be read back in memory and we do that in a separate Ray Task.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
goutamvenkat-anyscale added a commit that referenced this pull request Dec 4, 2025
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…58601)

## Description
Now on `write()` the raw data is written to the underlying parquet files
and the metadata is returned, namely `DataFiles`.

On `on_write_complete()` we commit the transaction. For upsert, the data
has to be read back in memory and we do that in a separate Ray Task.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…58601)

## Description
Now on `write()` the raw data is written to the underlying parquet files
and the metadata is returned, namely `DataFiles`.

On `on_write_complete()` we commit the transaction. For upsert, the data
has to be read back in memory and we do that in a separate Ray Task.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

4 participants