-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[data] Fix errors with concatenation with mixed pyarrow native and extension types #57566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -584,6 +584,106 @@ def shuffle(block: "pyarrow.Table", seed: Optional[int] = None) -> "pyarrow.Tabl | |||||
| return take_table(block, indices) | ||||||
|
|
||||||
|
|
||||||
| def _concat_cols_with_null_list( | ||||||
| col_chunked_arrays: List["pyarrow.ChunkedArray"], | ||||||
| ) -> "pyarrow.ChunkedArray": | ||||||
| import pyarrow as pa | ||||||
|
|
||||||
| # For each opaque list column, iterate through all schemas until | ||||||
| # we find a valid value_type that can be used to override the | ||||||
| # column types in the following for-loop. | ||||||
| scalar_type = None | ||||||
| for arr in col_chunked_arrays: | ||||||
| if not pa.types.is_list(arr.type) or not pa.types.is_null(arr.type.value_type): | ||||||
| scalar_type = arr.type | ||||||
| break | ||||||
|
|
||||||
| if scalar_type is not None: | ||||||
| for c_idx in range(len(col_chunked_arrays)): | ||||||
| c = col_chunked_arrays[c_idx] | ||||||
| if pa.types.is_list(c.type) and pa.types.is_null(c.type.value_type): | ||||||
| if pa.types.is_list(scalar_type): | ||||||
| # If we are dealing with a list input, | ||||||
| # cast the array to the scalar_type found above. | ||||||
| col_chunked_arrays[c_idx] = c.cast(scalar_type) | ||||||
| else: | ||||||
| # If we are dealing with a single value, construct | ||||||
| # a new array with null values filled. | ||||||
| col_chunked_arrays[c_idx] = pa.chunked_array( | ||||||
| [pa.nulls(c.length(), type=scalar_type)] | ||||||
| ) | ||||||
|
|
||||||
| return _concatenate_chunked_arrays(col_chunked_arrays) | ||||||
|
|
||||||
|
|
||||||
| def _concat_cols_with_extension_tensor_types( | ||||||
| col_chunked_arrays: List["pyarrow.ChunkedArray"], | ||||||
| ) -> "pyarrow.ChunkedArray": | ||||||
|
|
||||||
| import pyarrow as pa | ||||||
|
|
||||||
| # For our tensor extension types, manually construct a chunked array | ||||||
| # containing chunks from all blocks. This is to handle | ||||||
| # homogeneous-shaped block columns having different shapes across | ||||||
| # blocks: if tensor element shapes differ across blocks, a | ||||||
| # variable-shaped tensor array will be returned. | ||||||
| combined_chunks = list( | ||||||
| itertools.chain(*[chunked.iterchunks() for chunked in col_chunked_arrays]) | ||||||
| ) | ||||||
|
|
||||||
| return pa.chunked_array(unify_tensor_arrays(combined_chunks)) | ||||||
|
|
||||||
|
|
||||||
| def _concat_cols_with_extension_object_types( | ||||||
| col_chunked_arrays: List["pyarrow.ChunkedArray"], | ||||||
| ) -> "pyarrow.ChunkedArray": | ||||||
| import pyarrow as pa | ||||||
|
|
||||||
| from ray.data.extensions import ArrowPythonObjectArray, ArrowPythonObjectType | ||||||
|
|
||||||
| chunks_to_concat = [] | ||||||
| # Cast everything to objects if concatenated with an object column | ||||||
| for ca in col_chunked_arrays: | ||||||
| for chunk in ca.chunks: | ||||||
| if isinstance(ca.type, ArrowPythonObjectType): | ||||||
| chunks_to_concat.append(chunk) | ||||||
| else: | ||||||
| chunks_to_concat.append( | ||||||
| ArrowPythonObjectArray.from_objects(chunk.to_pylist()) | ||||||
| ) | ||||||
| return pa.chunked_array(chunks_to_concat) | ||||||
|
|
||||||
|
|
||||||
| def _concat_cols_with_native_pyarrow_types( | ||||||
| col_names: List[str], blocks: List["pyarrow.Table"], promote_types: bool = False | ||||||
| ) -> Dict[str, "pyarrow.ChunkedArray"]: | ||||||
| if not col_names: | ||||||
| return {} | ||||||
|
|
||||||
| # For columns with native Pyarrow types, we should use built-in pyarrow.concat_tables. | ||||||
| import pyarrow as pa | ||||||
|
|
||||||
| # When concatenating tables we allow type promotions to occur, since | ||||||
| # no schema enforcement is currently performed, therefore allowing schemas | ||||||
| # to vary b/w blocks | ||||||
|
|
||||||
| # NOTE: Type promotions aren't available in Arrow < 14.0 | ||||||
| subset_blocks = [] | ||||||
| for block in blocks: | ||||||
| cols_to_select = [ | ||||||
| col_name for col_name in col_names if col_name in block.schema.names | ||||||
| ] | ||||||
| subset_blocks.append(block.select(cols_to_select)) | ||||||
| if get_pyarrow_version() < parse_version("14.0.0"): | ||||||
| table = pa.concat_tables(subset_blocks, promote=True) | ||||||
| else: | ||||||
| arrow_promote_types_mode = "permissive" if promote_types else "default" | ||||||
| table = pa.concat_tables( | ||||||
| subset_blocks, promote_options=arrow_promote_types_mode | ||||||
| ) | ||||||
| return {col_name: table.column(col_name) for col_name in table.schema.names} | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Schema Mismatch in PyArrow Table ConcatenationThe |
||||||
|
|
||||||
|
|
||||||
| def concat( | ||||||
| blocks: List["pyarrow.Table"], *, promote_types: bool = False | ||||||
| ) -> "pyarrow.Table": | ||||||
|
|
@@ -594,7 +694,6 @@ def concat( | |||||
|
|
||||||
| from ray.air.util.tensor_extensions.arrow import ArrowConversionError | ||||||
| from ray.data.extensions import ( | ||||||
| ArrowPythonObjectArray, | ||||||
| ArrowPythonObjectType, | ||||||
| get_arrow_extension_tensor_types, | ||||||
| ) | ||||||
|
|
@@ -624,104 +723,53 @@ def concat( | |||||
| # Handle alignment of struct type columns. | ||||||
| blocks = _align_struct_fields(blocks, schema) | ||||||
|
|
||||||
| # Rollup columns with opaque (null-typed) lists, to process in following for-loop. | ||||||
| # Identify columns with null lists | ||||||
| cols_with_null_list = set() | ||||||
| for b in blocks: | ||||||
| for col_name in b.schema.names: | ||||||
| col_type = b.schema.field(col_name).type | ||||||
| if pa.types.is_list(col_type) and pa.types.is_null(col_type.value_type): | ||||||
| cols_with_null_list.add(col_name) | ||||||
|
|
||||||
| if ( | ||||||
| any(isinstance(type_, pa.ExtensionType) for type_ in schema.types) | ||||||
| or cols_with_null_list | ||||||
| ): | ||||||
| # Custom handling for extension array columns. | ||||||
| cols = [] | ||||||
| for col_name in schema.names: | ||||||
| col_chunked_arrays = [] | ||||||
| for block in blocks: | ||||||
| col_chunked_arrays.append(block.column(col_name)) | ||||||
|
|
||||||
| if isinstance(schema.field(col_name).type, tensor_types): | ||||||
| # For our tensor extension types, manually construct a chunked array | ||||||
| # containing chunks from all blocks. This is to handle | ||||||
| # homogeneous-shaped block columns having different shapes across | ||||||
| # blocks: if tensor element shapes differ across blocks, a | ||||||
| # variable-shaped tensor array will be returned. | ||||||
| combined_chunks = list( | ||||||
| itertools.chain( | ||||||
| *[chunked.iterchunks() for chunked in col_chunked_arrays] | ||||||
| ) | ||||||
| ) | ||||||
| # Concatenate the columns according to their type | ||||||
| concatenated_cols = {} | ||||||
| native_pyarrow_cols = [] | ||||||
| for col_name in schema.names: | ||||||
| col_type = schema.field(col_name).type | ||||||
|
|
||||||
| col = pa.chunked_array(unify_tensor_arrays(combined_chunks)) | ||||||
| elif isinstance(schema.field(col_name).type, ArrowPythonObjectType): | ||||||
| chunks_to_concat = [] | ||||||
| # Cast everything to objects if concatenated with an object column | ||||||
| for ca in col_chunked_arrays: | ||||||
| for chunk in ca.chunks: | ||||||
| if isinstance(ca.type, ArrowPythonObjectType): | ||||||
| chunks_to_concat.append(chunk) | ||||||
| else: | ||||||
| chunks_to_concat.append( | ||||||
| ArrowPythonObjectArray.from_objects(chunk.to_pylist()) | ||||||
| ) | ||||||
| col = pa.chunked_array(chunks_to_concat) | ||||||
| col_chunked_arrays = [] | ||||||
| for block in blocks: | ||||||
| if col_name in block.schema.names: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To improve efficiency, you can check for column existence directly on the schema object (
Suggested change
|
||||||
| col_chunked_arrays.append(block.column(col_name)) | ||||||
| else: | ||||||
| if col_name in cols_with_null_list: | ||||||
| # For each opaque list column, iterate through all schemas until | ||||||
| # we find a valid value_type that can be used to override the | ||||||
| # column types in the following for-loop. | ||||||
| scalar_type = None | ||||||
| for arr in col_chunked_arrays: | ||||||
| if not pa.types.is_list(arr.type) or not pa.types.is_null( | ||||||
| arr.type.value_type | ||||||
| ): | ||||||
| scalar_type = arr.type | ||||||
| break | ||||||
|
|
||||||
| if scalar_type is not None: | ||||||
| for c_idx in range(len(col_chunked_arrays)): | ||||||
| c = col_chunked_arrays[c_idx] | ||||||
| if pa.types.is_list(c.type) and pa.types.is_null( | ||||||
| c.type.value_type | ||||||
| ): | ||||||
| if pa.types.is_list(scalar_type): | ||||||
| # If we are dealing with a list input, | ||||||
| # cast the array to the scalar_type found above. | ||||||
| col_chunked_arrays[c_idx] = c.cast(scalar_type) | ||||||
| else: | ||||||
| # If we are dealing with a single value, construct | ||||||
| # a new array with null values filled. | ||||||
| col_chunked_arrays[c_idx] = pa.chunked_array( | ||||||
| [pa.nulls(c.length(), type=scalar_type)] | ||||||
| ) | ||||||
|
|
||||||
| col = _concatenate_chunked_arrays(col_chunked_arrays) | ||||||
| cols.append(col) | ||||||
|
|
||||||
| # Build the concatenated table. | ||||||
| table = pyarrow.Table.from_arrays(cols, schema=schema) | ||||||
| # Validate table schema (this is a cheap check by default). | ||||||
| table.validate() | ||||||
| else: | ||||||
| # No extension array columns, so use built-in pyarrow.concat_tables. | ||||||
|
|
||||||
| # When concatenating tables we allow type promotions to occur, since | ||||||
| # no schema enforcement is currently performed, therefore allowing schemas | ||||||
| # to vary b/w blocks | ||||||
| # | ||||||
| # NOTE: Type promotions aren't available in Arrow < 14.0 | ||||||
| if get_pyarrow_version() < parse_version("14.0.0"): | ||||||
| table = pyarrow.concat_tables(blocks, promote=True) | ||||||
| else: | ||||||
| arrow_promote_types_mode = "permissive" if promote_types else "default" | ||||||
| table = pyarrow.concat_tables( | ||||||
| blocks, promote_options=arrow_promote_types_mode | ||||||
| col_chunked_arrays.append(pa.nulls(block.num_rows, type=col_type)) | ||||||
|
|
||||||
| if col_name in cols_with_null_list: | ||||||
| concatenated_cols[col_name] = _concat_cols_with_null_list( | ||||||
| col_chunked_arrays | ||||||
| ) | ||||||
| elif isinstance(col_type, tensor_types): | ||||||
| concatenated_cols[col_name] = _concat_cols_with_extension_tensor_types( | ||||||
| col_chunked_arrays | ||||||
| ) | ||||||
| elif isinstance(col_type, ArrowPythonObjectType): | ||||||
| concatenated_cols[col_name] = _concat_cols_with_extension_object_types( | ||||||
| col_chunked_arrays | ||||||
| ) | ||||||
| else: | ||||||
| # Add to the list of native pyarrow columns, these will be concatenated after the loop using pyarrow.concat_tables | ||||||
| native_pyarrow_cols.append(col_name) | ||||||
|
|
||||||
| return table | ||||||
| concatenated_cols.update( | ||||||
| _concat_cols_with_native_pyarrow_types( | ||||||
| native_pyarrow_cols, blocks, promote_types | ||||||
| ) | ||||||
| ) | ||||||
|
|
||||||
| # Ensure that the columns are in the same order as the schema, reconstruct the table. | ||||||
| return pyarrow.Table.from_arrays( | ||||||
| [concatenated_cols[col_name] for col_name in schema.names], schema=schema | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def concat_and_sort( | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better performance, consider converting
block.schema.namesto a set for the membership check. Checking for an item's presence in a list has a time complexity of O(n), whereas for a set it's O(1) on average. Since this check is inside a loop, this optimization can be beneficial, especially with schemas containing many columns.