[Data] - Iceberg write datafiles in write() then commit#58601
[Data] - Iceberg write datafiles in write() then commit#58601raulchen merged 19 commits intoray-project:masterfrom
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the Iceberg datasink to support streaming writes by writing data files in parallel worker tasks and committing the transaction on the driver. This is a solid architectural improvement for performance and scalability. My review identifies a critical race condition in the new schema evolution logic, which could lead to data corruption. I've also included suggestions to improve performance by optimizing schema gathering, enhance code clarity by removing a magic number, and refactor a redundant operation.
| # - Partitioned vs unpartitioned tables | ||
| # - Proper DataFile formatting | ||
| # Use task_idx as counter base to ensure unique file names across tasks | ||
| counter = itertools.count(ctx.task_idx * 10000) |
There was a problem hiding this comment.
The magic number 10000 is used as a multiplier to create a unique range for the file counter in each task. This assumes a single task will not write more than 10,000 files. While this is a high number, it would be safer and more readable to add a comment explaining this assumption and its purpose.
# A large enough number to ensure unique file names across tasks.
# Assumes a single task will not write more than this many files.
counter = itertools.count(ctx.task_idx * 10000)| ) | ||
|
|
||
| # Always reload the table to get the latest schema before any post-write operations | ||
| self._reload_table() |
There was a problem hiding this comment.
_reload_table() is called here at the beginning of on_write_complete, and then it's called again within _update_schema() (on line 142), which is invoked later in this method if data_files is not empty. This initial call is redundant. The table state only needs to be refreshed after the schema is evolved to ensure subsequent commit operations have the correct table version.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini |
|
Hello @goutamvenkat-anyscale! It looks like you've invoked me. Please specify a command like |
|
/gemini summary |
|
/gemini review |
Summary of ChangesThis pull request significantly refactors the Iceberg datasink to improve performance and robustness for distributed writes. It introduces a clear separation between the writing of data files by individual tasks and the final transaction commit, which now occurs centrally after all tasks complete. This change enables optimized Highlights
Changelog
Activity
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-executed refactoring of the IcebergDatasink. The core change is the move to a two-phase write process, where write() tasks now create data files and return DataFile metadata, and on_write_complete() commits these files in a single transaction. This aligns with streaming best practices and provides substantial performance improvements for APPEND and OVERWRITE modes by avoiding unnecessary data reloading. The implementation is robust, with thoughtful handling of concurrent schema updates and clear, detailed documentation explaining the design choices, particularly the necessary data reload for UPSERT mode. The code is cleaner and more modular. This is an excellent contribution that enhances the performance and reliability of the Iceberg connector.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
|
/gemini summary |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-designed refactoring of the IcebergDatasink. By moving to a two-phase write approach where workers write data files concurrently and the driver performs a single atomic commit, it addresses major scalability concerns. The implementation correctly handles concurrent schema updates and offloads memory-intensive UPSERT logic to a separate Ray task, which are excellent improvements. My review includes a few suggestions to enhance maintainability by addressing the use of private APIs from the pyiceberg library and to restore a helpful validation check that was present in the previous implementation.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
| ) | ||
| else: | ||
| logger.debug( | ||
| "Schema update conflict - another worker modified schema, reloading" |
There was a problem hiding this comment.
is it safe to ignore this error?
why would multiple workers update the same table?
There was a problem hiding this comment.
I see, we have multiple write tasks writing to the table.
but the schemas should be the same for all of them, right?
It'd be nice to validate that.
There was a problem hiding this comment.
Added a check in the write task
| ) | ||
|
|
||
|
|
||
| @ray.remote(num_cpus=1, memory=DEFAULT_ICEBERG_UPSERT_COMMIT_TASK_MEMORY) |
There was a problem hiding this comment.
what needs to be loaded back in this task?
The entire table? is the default 4GB memory good for most cases?
If we expect users to specify the memory, we should make it easy to configure. e.g., expose it in write_iceberg.
There was a problem hiding this comment.
I modified the upsert to follow copy on write semantics. Now for upsert it will commit the parquet files + accumulate the join keys & associated data on the worker.
Good idea to expose memory args on write_iceberg
Signed-off-by: Goutam <goutam@anyscale.com>
| "Schema update conflict - another worker modified schema, reloading" | ||
| ) | ||
| finally: | ||
| self._reload_table() |
There was a problem hiding this comment.
Bug: Inefficient Retries Mask Critical Update Failures
The schema update retry logic doesn't break after success and always executes all 3 attempts. When update_schema() succeeds without raising CommitFailedException, the loop continues instead of breaking. Additionally, the exception is silently swallowed after max retries instead of being re-raised, causing failures to go unnoticed. The finally block also unnecessarily reloads the table even on successful updates.
|
/gemini summary |
|
/gemini review |
Summary of ChangesThis pull request significantly refactors the Iceberg datasink to optimize distributed write operations by separating the writing of data files from the transaction commit. This change allows workers to write data in parallel, with the driver coordinating a single, atomic metadata commit. The new implementation enhances the robustness and efficiency of Highlights
Changelog
Activity
|
There was a problem hiding this comment.
Code Review
This pull request refactors the Iceberg datasink to support concurrent writes from multiple workers, which is a significant performance and scalability improvement. The approach of having workers write data files and the driver perform a single commit is sound. The implementation of copy-on-write for upserts and the handling of concurrent schema updates are well thought out. I've identified a critical issue in the schema update retry logic that could lead to incorrect behavior and have also provided a suggestion to improve code conciseness. Overall, this is a great enhancement.
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
| if extract_join_keys: | ||
| join_cols = self._get_join_cols() | ||
| for col in join_cols: | ||
| join_keys_dict[col].extend(pa_table[col].to_pylist()) |
There was a problem hiding this comment.
Bug: UPSERT: Missing Data Validation
In UPSERT mode, the code extracts join key values by directly accessing pa_table[col] without validating that the join columns exist in the incoming data. If the incoming data is missing any join column, this raises a KeyError. The validation in on_write_start only checks if join_cols are specified but doesn't ensure they're present in the actual data being written.
…58601) ## Description Now on `write()` the raw data is written to the underlying parquet files and the metadata is returned, namely `DataFiles`. On `on_write_complete()` we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>
…58601) ## Description Now on `write()` the raw data is written to the underlying parquet files and the metadata is returned, namely `DataFiles`. On `on_write_complete()` we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…58601) ## Description Now on `write()` the raw data is written to the underlying parquet files and the metadata is returned, namely `DataFiles`. On `on_write_complete()` we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
…58601) ## Description Now on `write()` the raw data is written to the underlying parquet files and the metadata is returned, namely `DataFiles`. On `on_write_complete()` we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>
…)" This reverts commit aecff3c.
…58601) ## Description Now on `write()` the raw data is written to the underlying parquet files and the metadata is returned, namely `DataFiles`. On `on_write_complete()` we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
…58601) ## Description Now on `write()` the raw data is written to the underlying parquet files and the metadata is returned, namely `DataFiles`. On `on_write_complete()` we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…-project#59181) Reverts ray-project#58601 Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Now on
write()the raw data is written to the underlying parquet files and the metadata is returned, namelyDataFiles.On
on_write_complete()we commit the transaction. For upsert, the data has to be read back in memory and we do that in a separate Ray Task.Related issues
Additional information