[Data] [1/n] - Iceberg Schema Evolution#59210
[Data] [1/n] - Iceberg Schema Evolution#59210alexeykudinkin merged 18 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a significant improvement to the Iceberg datasink by adding support for schema evolution. The changes are well-structured, and the refactoring of IcebergDatasink makes it more robust and easier to understand. The addition of comprehensive tests for schema evolution is also a great contribution.
I have a couple of suggestions for minor optimizations in the on_write_complete method to improve efficiency by reducing redundant operations. Overall, this is an excellent pull request.
| """ | ||
| Update the table schema to accommodate incoming data using union-by-name semantics. | ||
|
|
||
| property_as_bool = PropertyUtil.property_as_bool | ||
| This is called from the driver after reconciling all schemas. |
There was a problem hiding this comment.
Let's make it clear that this only can be called from the driver
(Also think about how we can assert that it's only called from the driver)
There was a problem hiding this comment.
is_driver = ray.get_runtime_context().worker.mode != WORKER_MODE should work
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
bb2f196 to
ac28c03
Compare
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Bug: Callback skipped for small datasets in all_inputs_done
The _on_first_input_callback is only invoked in _add_input_inner() but not in all_inputs_done(). When processing small datasets where all bundles don't meet the min_rows_per_bundle threshold during normal processing, the bundles are deferred to all_inputs_done(), which calls _add_bundled_input() directly without invoking the callback. For IcebergDatasink, this means on_write_start() (which handles schema evolution) is never called for small datasets, potentially causing write failures when incoming data has new columns.
python/ray/data/_internal/execution/operators/map_operator.py#L566-L576
ray/python/ray/data/_internal/execution/operators/map_operator.py
Lines 566 to 576 in 318f40c
python/ray/data/_internal/planner/plan_write_op.py#L136-L145
ray/python/ray/data/_internal/planner/plan_write_op.py
Lines 136 to 145 in 318f40c
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
alexeykudinkin
left a comment
There was a problem hiding this comment.
LGTM, minor comments
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini summary |
Summary of ChangesThis pull request significantly enhances Ray Data's integration with Apache Iceberg by upgrading the underlying Pyiceberg library and introducing robust schema evolution capabilities. This allows users to append data to Iceberg tables with varying schemas, as the system will automatically adapt the table's schema to accommodate new columns and promote types as needed, simplifying data ingestion workflows. Highlights
Changelog
Activity
|
Signed-off-by: Goutam <goutam@anyscale.com>
| def _update_schema(self, incoming_schema: "pa.Schema") -> None: | ||
| """ | ||
| Update the table schema to accommodate incoming data using union-by-name semantics. | ||
|
|
||
| property_as_bool = PropertyUtil.property_as_bool | ||
| .. warning:: | ||
| This method must only be called from the driver process. | ||
| It performs schema evolution which requires exclusive table access. | ||
|
|
||
| catalog = self._get_catalog() | ||
| table = catalog.load_table(self.table_identifier) | ||
| self._txn = table.transaction() | ||
| self._io = self._txn._table.io | ||
| self._table_metadata = self._txn.table_metadata | ||
| self._uuid = uuid.uuid4() | ||
|
|
||
| if unsupported_partitions := [ | ||
| field | ||
| for field in self._table_metadata.spec().fields | ||
| if not field.transform.supports_pyarrow_transform | ||
| ]: | ||
| raise ValueError( | ||
| f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." | ||
| ) | ||
|
|
||
| self._manifest_merge_enabled = property_as_bool( | ||
| self._table_metadata.properties, | ||
| TableProperties.MANIFEST_MERGE_ENABLED, | ||
| TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, | ||
| ) | ||
| Args: | ||
| incoming_schema: The PyArrow schema to merge with the table schema | ||
| """ | ||
| with self._table.update_schema() as update: | ||
| update.union_by_name(incoming_schema) | ||
| # Succeeded, reload to get latest table version and exit. | ||
| self._reload_table() |
There was a problem hiding this comment.
This is used in 1 place, let's inline
| # Reload table to get latest metadata | ||
| self._reload_table() |
There was a problem hiding this comment.
Why do we need to reload?
There was a problem hiding this comment.
Can remove this.
| ) | ||
| assert rows_same(result_df, expected) | ||
|
|
||
| def test_multiple_schema_evolutions(self, clean_table): |
There was a problem hiding this comment.
Let's add a test promoting type (as separate test)
There was a problem hiding this comment.
Added test_schema_evolution_type_promotion
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Related issues
Additional information