Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 136 additions & 88 deletions python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,106 @@ def shuffle(block: "pyarrow.Table", seed: Optional[int] = None) -> "pyarrow.Tabl
return take_table(block, indices)


def _concat_cols_with_null_list(
col_chunked_arrays: List["pyarrow.ChunkedArray"],
) -> "pyarrow.ChunkedArray":
import pyarrow as pa

# For each opaque list column, iterate through all schemas until
# we find a valid value_type that can be used to override the
# column types in the following for-loop.
scalar_type = None
for arr in col_chunked_arrays:
if not pa.types.is_list(arr.type) or not pa.types.is_null(arr.type.value_type):
scalar_type = arr.type
break

if scalar_type is not None:
for c_idx in range(len(col_chunked_arrays)):
c = col_chunked_arrays[c_idx]
if pa.types.is_list(c.type) and pa.types.is_null(c.type.value_type):
if pa.types.is_list(scalar_type):
# If we are dealing with a list input,
# cast the array to the scalar_type found above.
col_chunked_arrays[c_idx] = c.cast(scalar_type)
else:
# If we are dealing with a single value, construct
# a new array with null values filled.
col_chunked_arrays[c_idx] = pa.chunked_array(
[pa.nulls(c.length(), type=scalar_type)]
)

return _concatenate_chunked_arrays(col_chunked_arrays)


def _concat_cols_with_extension_tensor_types(
col_chunked_arrays: List["pyarrow.ChunkedArray"],
) -> "pyarrow.ChunkedArray":

import pyarrow as pa

# For our tensor extension types, manually construct a chunked array
# containing chunks from all blocks. This is to handle
# homogeneous-shaped block columns having different shapes across
# blocks: if tensor element shapes differ across blocks, a
# variable-shaped tensor array will be returned.
combined_chunks = list(
itertools.chain(*[chunked.iterchunks() for chunked in col_chunked_arrays])
)

return pa.chunked_array(unify_tensor_arrays(combined_chunks))


def _concat_cols_with_extension_object_types(
col_chunked_arrays: List["pyarrow.ChunkedArray"],
) -> "pyarrow.ChunkedArray":
import pyarrow as pa

from ray.data.extensions import ArrowPythonObjectArray, ArrowPythonObjectType

chunks_to_concat = []
# Cast everything to objects if concatenated with an object column
for ca in col_chunked_arrays:
for chunk in ca.chunks:
if isinstance(ca.type, ArrowPythonObjectType):
chunks_to_concat.append(chunk)
else:
chunks_to_concat.append(
ArrowPythonObjectArray.from_objects(chunk.to_pylist())
)
return pa.chunked_array(chunks_to_concat)


def _concat_cols_with_native_pyarrow_types(
col_names: List[str], blocks: List["pyarrow.Table"], promote_types: bool = False
) -> Dict[str, "pyarrow.ChunkedArray"]:
if not col_names:
return {}

# For columns with native Pyarrow types, we should use built-in pyarrow.concat_tables.
import pyarrow as pa

# When concatenating tables we allow type promotions to occur, since
# no schema enforcement is currently performed, therefore allowing schemas
# to vary b/w blocks

# NOTE: Type promotions aren't available in Arrow < 14.0
subset_blocks = []
for block in blocks:
cols_to_select = [
col_name for col_name in col_names if col_name in block.schema.names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better performance, consider converting block.schema.names to a set for the membership check. Checking for an item's presence in a list has a time complexity of O(n), whereas for a set it's O(1) on average. Since this check is inside a loop, this optimization can be beneficial, especially with schemas containing many columns.

Suggested change
col_name for col_name in col_names if col_name in block.schema.names
col_name for col_name in col_names if col_name in set(block.schema.names)

]
subset_blocks.append(block.select(cols_to_select))
if get_pyarrow_version() < parse_version("14.0.0"):
table = pa.concat_tables(subset_blocks, promote=True)
else:
arrow_promote_types_mode = "permissive" if promote_types else "default"
table = pa.concat_tables(
subset_blocks, promote_options=arrow_promote_types_mode
)
return {col_name: table.column(col_name) for col_name in table.schema.names}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Schema Mismatch in PyArrow Table Concatenation

The _concat_cols_with_native_pyarrow_types helper selects only existing columns from each block, failing to null-fill missing columns. This creates inconsistent schemas across blocks for native PyArrow types, causing pyarrow.concat_tables to fail with schema mismatch errors.

Fix in Cursor Fix in Web



def concat(
blocks: List["pyarrow.Table"], *, promote_types: bool = False
) -> "pyarrow.Table":
Expand All @@ -594,7 +694,6 @@ def concat(

from ray.air.util.tensor_extensions.arrow import ArrowConversionError
from ray.data.extensions import (
ArrowPythonObjectArray,
ArrowPythonObjectType,
get_arrow_extension_tensor_types,
)
Expand Down Expand Up @@ -624,104 +723,53 @@ def concat(
# Handle alignment of struct type columns.
blocks = _align_struct_fields(blocks, schema)

# Rollup columns with opaque (null-typed) lists, to process in following for-loop.
# Identify columns with null lists
cols_with_null_list = set()
for b in blocks:
for col_name in b.schema.names:
col_type = b.schema.field(col_name).type
if pa.types.is_list(col_type) and pa.types.is_null(col_type.value_type):
cols_with_null_list.add(col_name)

if (
any(isinstance(type_, pa.ExtensionType) for type_ in schema.types)
or cols_with_null_list
):
# Custom handling for extension array columns.
cols = []
for col_name in schema.names:
col_chunked_arrays = []
for block in blocks:
col_chunked_arrays.append(block.column(col_name))

if isinstance(schema.field(col_name).type, tensor_types):
# For our tensor extension types, manually construct a chunked array
# containing chunks from all blocks. This is to handle
# homogeneous-shaped block columns having different shapes across
# blocks: if tensor element shapes differ across blocks, a
# variable-shaped tensor array will be returned.
combined_chunks = list(
itertools.chain(
*[chunked.iterchunks() for chunked in col_chunked_arrays]
)
)
# Concatenate the columns according to their type
concatenated_cols = {}
native_pyarrow_cols = []
for col_name in schema.names:
col_type = schema.field(col_name).type

col = pa.chunked_array(unify_tensor_arrays(combined_chunks))
elif isinstance(schema.field(col_name).type, ArrowPythonObjectType):
chunks_to_concat = []
# Cast everything to objects if concatenated with an object column
for ca in col_chunked_arrays:
for chunk in ca.chunks:
if isinstance(ca.type, ArrowPythonObjectType):
chunks_to_concat.append(chunk)
else:
chunks_to_concat.append(
ArrowPythonObjectArray.from_objects(chunk.to_pylist())
)
col = pa.chunked_array(chunks_to_concat)
col_chunked_arrays = []
for block in blocks:
if col_name in block.schema.names:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve efficiency, you can check for column existence directly on the schema object (block.schema) instead of its names attribute. col_name in block.schema is an O(1) operation on average, as pyarrow.Schema uses a more efficient lookup mechanism than a list search (in block.schema.names), which is O(n). This is particularly important here, as it's inside nested loops.

Suggested change
if col_name in block.schema.names:
if col_name in block.schema:

col_chunked_arrays.append(block.column(col_name))
else:
if col_name in cols_with_null_list:
# For each opaque list column, iterate through all schemas until
# we find a valid value_type that can be used to override the
# column types in the following for-loop.
scalar_type = None
for arr in col_chunked_arrays:
if not pa.types.is_list(arr.type) or not pa.types.is_null(
arr.type.value_type
):
scalar_type = arr.type
break

if scalar_type is not None:
for c_idx in range(len(col_chunked_arrays)):
c = col_chunked_arrays[c_idx]
if pa.types.is_list(c.type) and pa.types.is_null(
c.type.value_type
):
if pa.types.is_list(scalar_type):
# If we are dealing with a list input,
# cast the array to the scalar_type found above.
col_chunked_arrays[c_idx] = c.cast(scalar_type)
else:
# If we are dealing with a single value, construct
# a new array with null values filled.
col_chunked_arrays[c_idx] = pa.chunked_array(
[pa.nulls(c.length(), type=scalar_type)]
)

col = _concatenate_chunked_arrays(col_chunked_arrays)
cols.append(col)

# Build the concatenated table.
table = pyarrow.Table.from_arrays(cols, schema=schema)
# Validate table schema (this is a cheap check by default).
table.validate()
else:
# No extension array columns, so use built-in pyarrow.concat_tables.

# When concatenating tables we allow type promotions to occur, since
# no schema enforcement is currently performed, therefore allowing schemas
# to vary b/w blocks
#
# NOTE: Type promotions aren't available in Arrow < 14.0
if get_pyarrow_version() < parse_version("14.0.0"):
table = pyarrow.concat_tables(blocks, promote=True)
else:
arrow_promote_types_mode = "permissive" if promote_types else "default"
table = pyarrow.concat_tables(
blocks, promote_options=arrow_promote_types_mode
col_chunked_arrays.append(pa.nulls(block.num_rows, type=col_type))

if col_name in cols_with_null_list:
concatenated_cols[col_name] = _concat_cols_with_null_list(
col_chunked_arrays
)
elif isinstance(col_type, tensor_types):
concatenated_cols[col_name] = _concat_cols_with_extension_tensor_types(
col_chunked_arrays
)
elif isinstance(col_type, ArrowPythonObjectType):
concatenated_cols[col_name] = _concat_cols_with_extension_object_types(
col_chunked_arrays
)
else:
# Add to the list of native pyarrow columns, these will be concatenated after the loop using pyarrow.concat_tables
native_pyarrow_cols.append(col_name)

return table
concatenated_cols.update(
_concat_cols_with_native_pyarrow_types(
native_pyarrow_cols, blocks, promote_types
)
)

# Ensure that the columns are in the same order as the schema, reconstruct the table.
return pyarrow.Table.from_arrays(
[concatenated_cols[col_name] for col_name in schema.names], schema=schema
)


def concat_and_sort(
Expand Down
56 changes: 56 additions & 0 deletions python/ray/data/tests/test_transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,62 @@ def unify_schemas_nested_struct_tensors_schemas():
return {"with_tensor": schema1, "without_tensor": schema2, "expected": expected}


@pytest.mark.parametrize("use_arrow_tensor_v2", [True, False])
@pytest.mark.skipif(
get_pyarrow_version() < MIN_PYARROW_VERSION_TYPE_PROMOTION,
reason="Requires Arrow version of at least 14.0.0",
)
def test_concat_with_mixed_tensor_types_and_native_pyarrow_types(
use_arrow_tensor_v2, restore_data_context
):
DataContext.get_current().use_arrow_tensor_v2 = use_arrow_tensor_v2

num_rows = 1024

# Block A: int is uint64; tensor = Ray tensor extension
t_uint = pa.table(
{
"int": pa.array(np.zeros(num_rows // 2, dtype=np.uint64), type=pa.uint64()),
"tensor": ArrowTensorArray.from_numpy(
np.zeros((num_rows // 2, 3, 3), dtype=np.float32)
),
}
)

# Block B: int is float64 with NaNs; tensor = same extension type
f = np.ones(num_rows // 2, dtype=np.float64)
f[::8] = np.nan
t_float = pa.table(
{
"int": pa.array(f, type=pa.float64()),
"tensor": ArrowTensorArray.from_numpy(
np.zeros((num_rows // 2, 3, 3), dtype=np.float32)
),
}
)

# Two input blocks with different Arrow dtypes for "int"
ds = ray.data.from_arrow([t_uint, t_float])

# Force a concat across blocks
ds = ds.repartition(1)

# This should not raise: RuntimeError: Types mismatch: double != uint64
ds.materialize()

# Ensure that the result is correct
# Determine expected tensor type based on current DataContext setting
if use_arrow_tensor_v2:
expected_tensor_type = ArrowTensorTypeV2((3, 3), pa.float32())
else:
expected_tensor_type = ArrowTensorType((3, 3), pa.float32())

assert ds.schema().base_schema == pa.schema(
[("int", pa.float64()), ("tensor", expected_tensor_type)]
)
assert ds.count() == num_rows


@pytest.fixture
def object_with_tensor_fails_blocks():
"""Blocks that should fail when concatenating objects with tensors."""
Expand Down