Skip to content

[data] Fix errors with concatenation with mixed pyarrow native and extension types#57566

Merged
aslonnie merged 1 commit intoray-project:releases/2.50.0from
omatthew98:mowen/cherry-pick-56811
Oct 8, 2025
Merged

[data] Fix errors with concatenation with mixed pyarrow native and extension types#57566
aslonnie merged 1 commit intoray-project:releases/2.50.0from
omatthew98:mowen/cherry-pick-56811

Conversation

@omatthew98
Copy link
Contributor

@omatthew98 omatthew98 commented Oct 8, 2025

Why are these changes needed?

Cherry-pick #56811

Original description:
If we had an execution where we needed to concatenate native pyarrow types and pyarrow extension types, we would get errors like the following:

⚠️  Dataset dataset_5_0 execution failed: : 0.00 row [00:00, ? row/s]
- Repartition 1: 0.00 row [00:00, ? row/s]
  *- Split Repartition: : 0.00 row [00:00, ? row/s]
2025-09-22 17:21:34,068 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2025-09-22 17:21:34,068	ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/Users/mowen/code/ray/python/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/plan.py", line 533, in execute
    blocks = execute_to_legacy_block_list(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 127, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 175, in _bundles_to_block_list
    bundle_list = list(bundles)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__
    return self.get_next()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 680, in get_next
    bundle = state.get_output_blocking(output_split_idx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 373, in get_output_blocking
    raise self._exception
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 331, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 475, in _scheduling_loop_step
    update_operator_states(topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 586, in update_operator_states
    op.all_inputs_done()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/operators/base_physical_operator.py", line 122, in all_inputs_done
    self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/repartition.py", line 84, in split_repartition_fn
    return scheduler.execute(refs, num_outputs, ctx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 106, in execute
    ] = reduce_bar.fetch_until_complete(list(reduce_metadata_schema))
  File "/Users/mowen/code/ray/python/ray/data/_internal/progress_bar.py", line 166, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
  File "/Users/mowen/code/ray/python/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2952, in get
    values, debugger_breakpoint = worker.get_objects(
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 1025, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::reduce() (pid=7442, ip=127.0.0.1)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce
    new_block = builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build
    return self._builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build
    return self._concat_tables(tables)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables
    return transform_pyarrow.concat(tables, promote_types=True)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat
    col = _concatenate_chunked_arrays(col_chunked_arrays)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays
    raise RuntimeError(f"Types mismatch: {type_} != {arr.type}")
RuntimeError: Types mismatch: uint64 != double
2025-09-22 17:21:34,069	ERROR worker.py:429 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::reduce() (pid=7442, ip=127.0.0.1)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce
    new_block = builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build
    return self._builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build
    return self._concat_tables(tables)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables
    return transform_pyarrow.concat(tables, promote_types=True)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat
    col = _concatenate_chunked_arrays(col_chunked_arrays)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays
    raise RuntimeError(f"Types mismatch: {type_} != {arr.type}")
RuntimeError: Types mismatch: uint64 != double

This PR adds a test that replicates this and fixes the underlying issue by concatenating extension types and native types separately before rejoining them.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run pre-commit jobs to lint the changes in this PR. (pre-commit setup)
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@omatthew98 omatthew98 requested a review from a team as a code owner October 8, 2025 20:03
@omatthew98 omatthew98 requested a review from aslonnie October 8, 2025 20:03
table = pa.concat_tables(
subset_blocks, promote_options=arrow_promote_types_mode
)
return {col_name: table.column(col_name) for col_name in table.schema.names}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Schema Mismatch in PyArrow Table Concatenation

The _concat_cols_with_native_pyarrow_types helper selects only existing columns from each block, failing to null-fill missing columns. This creates inconsistent schemas across blocks for native PyArrow types, causing pyarrow.concat_tables to fail with schema mismatch errors.

Fix in Cursor Fix in Web

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the concat function in transform_pyarrow.py by breaking it down into smaller, more manageable helper functions for different column types. This is a great improvement for code readability and maintainability. I've found a couple of opportunities to improve performance in the new implementation.

subset_blocks = []
for block in blocks:
cols_to_select = [
col_name for col_name in col_names if col_name in block.schema.names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better performance, consider converting block.schema.names to a set for the membership check. Checking for an item's presence in a list has a time complexity of O(n), whereas for a set it's O(1) on average. Since this check is inside a loop, this optimization can be beneficial, especially with schemas containing many columns.

Suggested change
col_name for col_name in col_names if col_name in block.schema.names
col_name for col_name in col_names if col_name in set(block.schema.names)

col = pa.chunked_array(chunks_to_concat)
col_chunked_arrays = []
for block in blocks:
if col_name in block.schema.names:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve efficiency, you can check for column existence directly on the schema object (block.schema) instead of its names attribute. col_name in block.schema is an O(1) operation on average, as pyarrow.Schema uses a more efficient lookup mechanism than a list search (in block.schema.names), which is O(n). This is particularly important here, as it's inside nested loops.

Suggested change
if col_name in block.schema.names:
if col_name in block.schema:

@omatthew98 omatthew98 changed the title [data] cherry pick 56811 [data] Fix errors with concatenation with mixed pyarrow native and extension types Oct 8, 2025
@aslonnie aslonnie added the go add ONLY when ready to merge, run all tests label Oct 8, 2025
@aslonnie aslonnie merged commit 1228933 into ray-project:releases/2.50.0 Oct 8, 2025
3 of 6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants