Skip to content

[Data] Compute Expressions-list Operation#59346

Merged
alexeykudinkin merged 8 commits intoray-project:masterfrom
myandpr:compute-expressions-list
Feb 4, 2026
Merged

[Data] Compute Expressions-list Operation#59346
alexeykudinkin merged 8 commits intoray-project:masterfrom
myandpr:compute-expressions-list

Conversation

@myandpr
Copy link
Member

@myandpr myandpr commented Dec 10, 2025

Description

Completing the .list Operations (sort, flatten)

Test

import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()

Related issues

Related to #58674

Additional information

@myandpr myandpr requested a review from a team as a code owner December 10, 2025 12:26
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds sort and flatten operations to the list expression namespace, which is a great addition. The implementation of sort with a native PyArrow fallback is well done. However, the flatten implementation has significant issues with performance, complexity, and correctness. It uses an inefficient Python-based approach instead of the available native pyarrow.compute.list_flatten function, contains buggy type inference logic, and has non-standard behavior. I've provided a detailed comment with a suggested refactoring for the flatten method to address these critical issues.

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Dec 10, 2025
@myandpr
Copy link
Member Author

myandpr commented Dec 18, 2025

Hey @goutamvenkat-anyscale , could you please review the list-namespace changes from [Data] Compute Expressions-list Operation? Thank a lot!

Comment on lines +182 to +204
return_dtype = DataType(object)
if self._expr.data_type.is_arrow_type():
arrow_type = self._expr.data_type.to_arrow_dtype()
if pyarrow.types.is_list(arrow_type) or pyarrow.types.is_large_list(
arrow_type
):
child_type = arrow_type.value_type
list_factory = (
pyarrow.large_list
if pyarrow.types.is_large_list(arrow_type)
else pyarrow.list_
)
if (
pyarrow.types.is_list(child_type)
or pyarrow.types.is_large_list(child_type)
or pyarrow.types.is_fixed_size_list(child_type)
):
flattened_type = list_factory(child_type.value_type)
return_dtype = DataType.from_arrow(flattened_type)
elif pyarrow.types.is_fixed_size_list(arrow_type):
child_type = arrow_type.value_type
if (
pyarrow.types.is_list(child_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import pyarrow
import pyarrow.compute as pc

from ray.data.datatype import DataType


def _infer_flattened_dtype(expr: "Expr") -> DataType:
    """Infer the return DataType after flattening one level of list nesting."""
    if not expr.data_type.is_arrow_type():
        return DataType(object)

    arrow_type = expr.data_type.to_arrow_dtype()
    outer_dtype = DataType.from_arrow(arrow_type)
    if not outer_dtype.is_list_type():
        return DataType(object)

    child_type = arrow_type.value_type
    child_dtype = DataType.from_arrow(child_type)
    if not child_dtype.is_list_type():
        return DataType(object)

    if pyarrow.types.is_large_list(arrow_type):
        return DataType.from_arrow(pyarrow.large_list(child_type.value_type))
    else:
        return DataType.from_arrow(pyarrow.list_(child_type.value_type))


def _validate_nested_list(arr_type: pyarrow.DataType) -> None:
    """Raise TypeError if arr_type is not a list of lists."""
    outer_dtype = DataType.from_arrow(arr_type)
    if not outer_dtype.is_list_type():
        raise TypeError(
            "list.flatten() requires a list column whose elements are also lists."
        )

    child_dtype = DataType.from_arrow(arr_type.value_type)
    if not child_dtype.is_list_type():
        raise TypeError(
            "list.flatten() requires a list column whose elements are also lists."
        )


def flatten(self) -> "UDFExpr":
    """Flatten one level of nesting for each list value.

    Transforms list<list<T>> -> list<T> by concatenating inner lists per row.
    Example: [[1, 2], [3, 4]] -> [1, 2, 3, 4]
    """
    return_dtype = _infer_flattened_dtype(self._expr)

    @pyarrow_udf(return_dtype=return_dtype)
    def _list_flatten(arr: pyarrow.Array) -> pyarrow.Array:
        if isinstance(arr, pyarrow.ChunkedArray):
            arr = arr.combine_chunks()

        _validate_nested_list(arr.type)

        # Flatten: list<list<T>> -> all inner lists -> all scalar values
        inner_lists: pyarrow.Array = pc.list_flatten(arr)
        all_scalars: pyarrow.Array = pc.list_flatten(inner_lists)

        n_rows: int = len(arr)
        if len(all_scalars) == 0:
            offsets: pyarrow.Array = pyarrow.repeat(0, n_rows + 1)
        else:
            # Reconstruct row boundaries after flattening.
            #
            # pc.list_flatten loses row structure, so we must track which scalars
            # belong to which original row and rebuild the offsets.
            #
            # Example:
            #   arr = [[[1, 2], [3]], [[4, 5]]]  (2 rows)
            #   inner_lists = pc.list_flatten(arr) = [[1, 2], [3], [4, 5]]
            #   all_scalars = pc.list_flatten(inner_lists) = [1, 2, 3, 4, 5]
            #
            #   list_parent_indices(arr) = [0, 0, 1]  (inner list -> row)
            #   list_parent_indices(inner_lists) = [0, 0, 1, 2, 2]  (scalar -> inner list)
            #   row_indices = take([0, 0, 1], [0, 0, 1, 2, 2]) = [0, 0, 0, 1, 1]  (scalar -> row)
            #
            #   value_counts([0, 0, 0, 1, 1]) = {values: [0, 1], counts: [3, 2]}
            #   cumsum([3, 2]) = [3, 5]
            #   offsets = [0, 3, 5]  -> row 0 has scalars 0:3, row 1 has scalars 3:5
            #
            # Result: [[1, 2, 3], [4, 5]]

            # Map each scalar to its original row
            row_indices: pyarrow.Array = pc.take(
                pc.list_parent_indices(arr),
                pc.list_parent_indices(inner_lists),
            )

            # Count scalars per row using value_counts
            # Note: value_counts omits rows with 0 scalars, so we use index_in to handle gaps
            vc: pyarrow.StructArray = pc.value_counts(row_indices)
            rows_with_scalars: pyarrow.Array = pc.struct_field(vc, "values")
            scalar_counts: pyarrow.Array = pc.struct_field(vc, "counts")

            # For each row 0..n_rows-1, look up its count (0 if not present)
            row_sequence: pyarrow.Array = pyarrow.arange(0, n_rows)
            positions: pyarrow.Array = pc.index_in(row_sequence, value_set=rows_with_scalars)

            counts: pyarrow.Array = pc.if_else(
                pc.is_null(positions),
                0,
                pc.take(scalar_counts, pc.fill_null(positions, 0)),
            )

            # Build offsets: [0] + cumsum(counts)
            cumsum: pyarrow.Array = pc.cumulative_sum(counts)
            offsets = pc.concat_arrays([
                pyarrow.array([0], type=cumsum.type),
                cumsum,
            ])

        # Construct result with appropriate list type
        is_large: bool = pyarrow.types.is_large_list(arr.type)
        if not is_large:
            offsets = pc.cast(offsets, pyarrow.int32())

        null_mask: pyarrow.Array | None = arr.is_null() if arr.null_count else None
        array_cls: type = pyarrow.LargeListArray if is_large else pyarrow.ListArray
        return array_cls.from_arrays(offsets, all_scalars, mask=null_mask)

    return _list_flatten(self._expr)

Couple of things.

  1. Let's assume that list_flatten is a valid pyrrow function
  2. Let's avoid the conversion to numpy from pyarrow
  3. Let's leverage the existing functions that are in place in datatype.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. Left a couple of comments

@goutamvenkat-anyscale goutamvenkat-anyscale added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Dec 18, 2025
@myandpr myandpr force-pushed the compute-expressions-list branch from 152efd3 to f3da415 Compare December 22, 2025 17:26
@myandpr
Copy link
Member Author

myandpr commented Dec 22, 2025

@goutamvenkat-anyscale PTAL, thanks!

@goutamvenkat-anyscale
Copy link
Contributor

/gemini summary

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request expands the functionality of Ray Data expressions by adding sort and flatten operations to the list namespace. These additions enable more direct and efficient manipulation of list-typed columns within datasets, allowing users to perform common list transformations without needing to resort to less optimized methods. The changes include core implementation, type inference, validation, and corresponding documentation and tests.

Highlights

  • New List Operations: Introduced list.sort() and list.flatten() operations to the ray.data.expressions.col().list namespace, enhancing capabilities for manipulating list-typed columns.
  • List Sorting: The list.sort() method allows sorting elements within each list, supporting ascending or descending order and configurable null_placement.
  • List Flattening: The list.flatten() method flattens one level of nesting for each list value, transforming list<list<T>> into list<T>.
  • Documentation and Testing: Added comprehensive documentation with examples for both new operations and included robust unit tests covering various scenarios, including LargeListArray types and type validation.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • doc/source/data/api/expressions.rst
    • Added new documentation section with examples for list.sort() and list.flatten().
  • python/ray/data/namespace_expressions/list_namespace.py
    • Implemented sort() method within _ListNamespace to sort elements in lists.
    • Implemented flatten() method within _ListNamespace to flatten one level of nested lists.
    • Added _infer_flattened_dtype helper function for type inference in flatten.
    • Added _validate_nested_list helper function to enforce nested list input for flatten.
    • Updated imports to include Literal for type hinting sort parameters.
  • python/ray/data/tests/test_namespace_expressions.py
    • Added test_list_sort to verify sorting functionality with different orders and null placements.
    • Added test_list_flatten to confirm correct flattening of nested lists.
    • Added test_list_flatten_requires_nested_lists to test error handling for non-nested list inputs.
    • Added test_list_flatten_large_list_type to ensure LargeList types are handled correctly during flattening.
    • Imported RayTaskError for testing expected exceptions.
Activity
  • myandpr requested a review from goutamvenkat-anyscale.
  • myandpr pinged goutamvenkat-anyscale for a review.
  • goutamvenkat-anyscale provided initial feedback on the flatten implementation, recommending the use of pyarrow.compute.list_flatten for performance and simplification.
  • cursor[bot] identified several bugs in the initial flatten implementation, including issues with fixed_size_list type inference, incorrect return_dtype for non-nested lists, missing fixed_size_list checks, and incorrect LargeListArray creation.
  • cursor[bot] also pointed out the use of non-existent pyarrow.arange and pyarrow.repeat functions.
  • cursor[bot] reported a crash in sort when handling FixedSizeListArray with null rows.
  • goutamvenkat-anyscale provided a revised flatten implementation suggestion and advised against Python-based sort, also suggesting the use of Literal[str] for sort arguments.
  • myandpr updated the pull request based on the feedback, addressing the identified issues and incorporating suggestions.

Comment on lines +264 to +266
offsets: pyarrow.Array = pyarrow.array(
[0] * (n_rows + 1), type=pyarrow.int64()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be using pc.repeat here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @goutamvenkat-anyscale , thanks for the suggestion! I originally planned to follow your idea and use pyarrow.compute.repeat, but when I checked the official docs locally (pyarrow 20.0.0,
https://arrow.apache.org/docs/python/api/compute.html), I couldn’t find that kernel in the compute module; that’s why I kept the current implementation. Since it appears to be missing in the versions we target, I’m not sure how to apply the change—please let me know if I’m misunderstanding something!

Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually support pyarrow 21.0.0 in our CI pipelines. For the test pertaining to this case, you can add a @skipif decorator if the pyarrow version is < 21.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a user appropriate error message, let's assert that the user has pyarrow >= 21.0.0. You can create a reusable function def _check_module_version(module: str, min_version: str) -> None: that does this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @goutamvenkat-anyscale , thanks for the suggestion! I originally planned to follow your idea and use pyarrow.compute.repeat, but when I checked the official docs locally (pyarrow 20.0.0, https://arrow.apache.org/docs/python/api/compute.html), I couldn’t find that kernel in the compute module; that’s why I kept the current implementation. Since it appears to be missing in the versions we target, I’m not sure how to apply the change—please let me know if I’m misunderstanding something!

@goutamvenkat-anyscale , Correction on my previous reply — I misspoke about the version.
I’m actually testing against pyarrow 22.0.0, which is the latest released version, locally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok in that case, let's construct the pyarrow array from numpy. We can use https://numpy.org/doc/1.26/reference/generated/numpy.repeat.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I’ve switched to building the zeros with np.repeat first and then converting to an Arrow array, so this no longer depends on pyarrow.compute.repeat.

scalar_counts: pyarrow.Array = pc.struct_field(vc, "counts")

row_sequence: pyarrow.Array = pyarrow.array(
list(range(n_rows)), type=pyarrow.int64()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Member Author

@myandpr myandpr Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same situation as above, I couldn’t find a pyarrow.compute.range or any related range-generating kernel documented https://arrow.apache.org/docs/python/api/compute.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

construct the pyarrow array from numpy. Related doc https://numpy.org/doc/1.26/reference/generated/numpy.arange.html

@myandpr
Copy link
Member Author

myandpr commented Jan 5, 2026

Hi @goutamvenkat-anyscale, just a gentle ping on this PR.
When you have time, I’d really appreciate your review. Thanks!

@goutamvenkat-anyscale goutamvenkat-anyscale added go add ONLY when ready to merge, run all tests and removed @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. labels Jan 7, 2026
Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!

@myandpr myandpr force-pushed the compute-expressions-list branch 2 times, most recently from 1231776 to 240bda9 Compare January 8, 2026 17:00
"Either column_values or both offsets and values must be provided."
)
else:
lens = [len(v) for v in column_values]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're touching this, can we use list_value_length here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think pc.list_value_length fits here, since we need each array’s row count to build the top‑level offsets. list_value_length gives per‑row list sizes, so it would change the meaning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you're doing the same thing below:

lengths = pc.list_value_length(sort_arr)
            lengths = pc.fill_null(lengths, 0)
            is_large = pyarrow.types.is_large_list(arr_type)
            offsets = _counts_to_offsets(lengths)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the note. I think I understand your point now, so let me try to explain how I’m seeing it.
Here column_values is List[pa.Array | pa.ChunkedArray] (multiple list columns). In _combine_as_list_array we need each column’s row count to build column‑level offsets, so len(v) is correct.
pc.list_value_length(v) returns per‑row list sizes (row‑level vector), which is a different level/meaning.

Example:

column_values = [
  pa.array([[1,2],[3]]),            # v1
  pa.array([[4], None, [5,6,7]]),   # v2
]
len(v1)=2, len(v2)=3  -> offsets [0,2,5]   # column boundaries
pc.list_value_length(v1)=[2,1], pc.list_value_length(v2)=[1,null,3]

In sort(), sort_arr is a single list column, and we’re rebuilding per‑row list boundaries, so pc.list_value_length(sort_arr) is exactly what we want there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but i'm not sure i fully understand your point. Are you saying that it's gonna return a list of lists in case input is ChunkedArray?

Comment on lines +32 to +35
arrow_type = expr.data_type.to_arrow_dtype()
outer_dtype = DataType.from_arrow(arrow_type)
if not outer_dtype.is_list_type():
return DataType(object)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not checking expr.data_type directly? What's cycling t/h Arrow type is for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the Arrow type here to inspect nested value types and to preserve large_list vs list when building the return dtype. expr.data_type is a Ray DataType and may be non‑Arrow or lacks the Arrow value_type info we need, so we convert to Arrow first for the nested checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand your point

  • You take DataType
  • Convert it to Arrow
  • Convert it back to DataType
  • Then doing the checks.

Why Doing DT > Arrow > DT conversion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification — the round‑trip was only there to reuse DataType.is_list_type for consistency, but you’re right it’s unnecessary. I’ve updated the code to check list/large_list/fixed_size_list directly on the Arrow type and removed the DT→Arrow→DT conversions. Should be clearer now

Comment on lines +220 to +222
list_type = pyarrow.list_(child_type)
sort_arr = sort_arr.cast(list_type)
arr_type = sort_arr.type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to cast? Can you please add a comment for less obvious steps that you had to take

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The cast is to handle fixed_size_list: list_* kernels (list_flatten, list_parent_indices, etc.) operate on list/large_list, so we temporarily cast fixed_size_list to list, then cast back after sorting. I’ll add a short comment to make this explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait but how would it work correctly? FixedSizeList is fundamentally different from var-length List

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We only cast to run list_* kernels, but we preserve fixed-size semantics: null rows are filled with a fixed_size_list of nulls (so each row still has list_size elements), we rebuild using list_value_length (which stays list_size for every non‑null row), and then cast back to the original fixed_size_list type. So the output still respects the fixed-size constraint.

Comment on lines +208 to +235
if pyarrow.types.is_fixed_size_list(arr_type):
# Example: FixedSizeList<2>[ [3,1], None, [2,4] ]
# Fill null row -> [[3,1],[None,None],[2,4]], cast to list<child> for sort,
# then cast back to fixed_size to preserve schema.
child_type = arr_type.value_type
list_size = arr_type.list_size
if null_mask is not None:
filler_values = pyarrow.nulls(len(arr) * list_size, type=child_type)
filler = pyarrow.FixedSizeListArray.from_arrays(
filler_values, list_size
)
sort_arr = pc.if_else(null_mask, filler, arr)
list_type = pyarrow.list_(child_type)
sort_arr = sort_arr.cast(list_type)
arr_type = sort_arr.type
values = pc.list_flatten(sort_arr)
if len(values):
row_indices = pc.list_parent_indices(sort_arr)
struct = pyarrow.StructArray.from_arrays(
[row_indices, values],
["row", "value"],
)
sorted_indices = pc.sort_indices(
struct,
sort_keys=[("row", "ascending"), ("value", order)],
null_placement=null_placement,
)
values = pc.take(values, sorted_indices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract this to a standalone method (place it into the utils)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d prefer to keep this inline for now since it’s tightly coupled to sort()’s flow and there’s no reuse yet. Extracting to a util would add indirection without much benefit. If you see another call site where this would be reused, I’m happy to revisit.

)
values = pc.take(values, sorted_indices)

lengths = pc.list_value_length(sort_arr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you're right now converting fixed-size lists into arbitrary size lists, we'd not do that (we'd keep fixed-size lists as such)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only cast fixed_size_list to list temporarily to use list_* kernels for sorting, then cast back to original_type at the end. The output stays fixed_size_list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're creating a new List below with _combine_as_list_array, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, _combine_as_list_array builds a List as an intermediate, but we immediately cast it back to fixed_size_list (sorted_arr = sorted_arr.cast(original_type)), so the final output stays fixed_size_list. The List is just a temporary representation because list_* kernels don’t support fixed_size_list. Avoiding a temporary List would require re‑implementing the sort/rebuild logic, which is much more complex.

If you’d prefer to avoid the temporary List entirely, let me know and I can adjust.

Comment on lines +269 to +272
n_rows: int = len(arr)
if len(all_scalars) == 0:
counts = pyarrow.array(np.repeat(0, n_rows), type=pyarrow.int64())
offsets = _counts_to_offsets(counts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this branch for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles the all‑empty case. When all_scalars is empty (all rows are None or contain only empty/None sublists), value_counts can’t produce per‑row counts, so we explicitly build a zero count for each row to get correct offsets and preserve row count.

Comment on lines +283 to +297
vc: pyarrow.StructArray = pc.value_counts(row_indices)
rows_with_scalars: pyarrow.Array = pc.struct_field(vc, "values")
scalar_counts: pyarrow.Array = pc.struct_field(vc, "counts")

row_sequence: pyarrow.Array = pyarrow.array(
np.arange(n_rows, dtype=np.int64), type=pyarrow.int64()
)
positions: pyarrow.Array = pc.index_in(
row_sequence, value_set=rows_with_scalars
)

counts: pyarrow.Array = pc.if_else(
pc.is_null(positions),
0,
pc.take(scalar_counts, pc.fill_null(positions, 0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments, it's very hard to understand what you're trying to do right now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update


is_large: bool = pyarrow.types.is_large_list(arr.type)
null_mask: pyarrow.Array | None = arr.is_null() if arr.null_count else None
return _combine_as_list_array(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above that this will convert fixed-size lists into arbitrary sized one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In flatten(), the output is intentionally a list/large_list. Flattening list<list> (or fixed_size_list<list>) produces variable-length lists, so preserving fixed_size_list wouldn’t be correct here. This is why _combine_as_list_array is used.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

"Either column_values or both offsets and values must be provided."
)
else:
lens = [len(v) for v in column_values]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you're doing the same thing below:

lengths = pc.list_value_length(sort_arr)
            lengths = pc.fill_null(lengths, 0)
            is_large = pyarrow.types.is_large_list(arr_type)
            offsets = _counts_to_offsets(lengths)

Comment on lines +32 to +35
arrow_type = expr.data_type.to_arrow_dtype()
outer_dtype = DataType.from_arrow(arrow_type)
if not outer_dtype.is_list_type():
return DataType(object)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand your point

  • You take DataType
  • Convert it to Arrow
  • Convert it back to DataType
  • Then doing the checks.

Why Doing DT > Arrow > DT conversion?

Comment on lines +220 to +222
list_type = pyarrow.list_(child_type)
sort_arr = sort_arr.cast(list_type)
arr_type = sort_arr.type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait but how would it work correctly? FixedSizeList is fundamentally different from var-length List

Comment on lines +219 to +224
if null_mask is not None:
filler_values = pyarrow.nulls(len(arr) * list_size, type=child_type)
filler = pyarrow.FixedSizeListArray.from_arrays(
filler_values, list_size
)
sort_arr = pc.if_else(null_mask, filler, arr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this filling? Please add comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this fill to keep fixed_size_list lengths intact when there are null rows. list_* kernels treat null rows as length 0, so if we don’t fill, the reconstructed offsets would drop those list_size slots and the values buffer length would no longer be n_rows * list_size. Filling with fixed‑size null lists keeps per‑row length = list_size, so rebuilding offsets and casting back to fixed_size_list stays correct.

added comments.

order: Literal["ascending", "descending"] = "ascending",
null_placement: Literal["at_start", "at_end"] = "at_end",
) -> "UDFExpr":
"""Sort the elements within each list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Sort the elements within each list.
"""Sorts the elements within each (nested) list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, add an example to explain what we're doing here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

)
values = pc.take(values, sorted_indices)

lengths = pc.list_value_length(sort_arr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're creating a new List below with _combine_as_list_array, right?

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

@myandpr
Copy link
Member Author

myandpr commented Feb 1, 2026

CI looks unrelated to the code change — the job fails during docker build with “client version 1.52 is too new; maximum supported API version is 1.43.”

"Either column_values or both offsets and values must be provided."
)
else:
lens = [len(v) for v in column_values]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but i'm not sure i fully understand your point. Are you saying that it's gonna return a list of lists in case input is ChunkedArray?

@alexeykudinkin alexeykudinkin enabled auto-merge (squash) February 4, 2026 07:43
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
@alexeykudinkin alexeykudinkin force-pushed the compute-expressions-list branch from 7e0bef8 to 6b68a08 Compare February 4, 2026 17:41
@github-actions github-actions bot disabled auto-merge February 4, 2026 17:41
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) February 4, 2026 17:49
@alexeykudinkin alexeykudinkin merged commit 4ca1097 into ray-project:master Feb 4, 2026
7 checks passed
tiennguyentony pushed a commit to tiennguyentony/ray that referenced this pull request Feb 7, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
tiennguyentony pushed a commit to tiennguyentony/ray that referenced this pull request Feb 7, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
tiennguyentony pushed a commit to tiennguyentony/ray that referenced this pull request Feb 7, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Sparks0219 pushed a commit to Sparks0219/ray that referenced this pull request Feb 9, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to #58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to #58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab pushed a commit to kunling-anyscale/ray that referenced this pull request Feb 20, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Completing the .list Operations (sort, flatten)

#### Test
```
import ray
from ray.data.expressions import col
ray.init(include_dashboard=False)

ds = ray.data.from_items([
    {"values": [3, 1, 2], "nested": [[1, 2], [3]]},
    {"values": [2, None, 5], "nested": [[4], []]}
])

ds.show()

ds = ds.with_column("sorted_column", col("values").list.sort(order="descending"))
ds = ds.with_column("flattened_nested", col("nested").list.flatten())

ds.show()
```

## Related issues
Related to ray-project#58674
## Additional information

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants