Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python/ray/data/_internal/execution/operators/join.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import math
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Type

from ray._private.arrow_utils import get_pyarrow_version
from ray.air.util.transform_pyarrow import _is_pa_extension_type
Expand Down Expand Up @@ -345,7 +345,16 @@ def __init__(
right_columns_suffix: Optional[str] = None,
partition_size_hint: Optional[int] = None,
aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None,
shuffle_aggregation_type: Optional[Type[StatefulShuffleAggregation]] = None,
):
if shuffle_aggregation_type is not None:
if not issubclass(shuffle_aggregation_type, StatefulShuffleAggregation):
raise TypeError(
f"shuffle_aggregation_type must be a subclass of StatefulShuffleAggregation, "
f"got {shuffle_aggregation_type}"
)
Comment on lines +350 to +355
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The issubclass() built-in will raise a TypeError if shuffle_aggregation_type is not a class (e.g., if an instance of a class is passed). While the default error message is acceptable, we can provide a more user-friendly and consistent error message by handling this case explicitly. This improves error handling and makes debugging easier for users of this operator.

        if shuffle_aggregation_type is not None:
            try:
                is_sub = issubclass(shuffle_aggregation_type, StatefulShuffleAggregation)
            except TypeError:
                is_sub = False
            if not is_sub:
                raise TypeError(
                    f"shuffle_aggregation_type must be a class and a subclass of StatefulShuffleAggregation, got {shuffle_aggregation_type}"
                )


aggregation_class = shuffle_aggregation_type or JoiningShuffleAggregation
super().__init__(
name_factory=(
lambda num_partitions: f"Join(num_partitions={num_partitions})"
Expand All @@ -356,7 +365,7 @@ def __init__(
num_partitions=num_partitions,
partition_size_hint=partition_size_hint,
partition_aggregation_factory=(
lambda aggregator_id, target_partition_ids: JoiningShuffleAggregation(
lambda aggregator_id, target_partition_ids: aggregation_class(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Shuffle Aggregation Type Handling Errors

The shuffle_aggregation_type parameter has two issues. The issubclass check can raise a TypeError if the input isn't a class, masking the intended error. Additionally, while it accepts any StatefulShuffleAggregation subclass, the instantiation logic assumes a JoiningShuffleAggregation-like constructor, causing runtime errors for incompatible subclasses.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Join Operator Fails with Custom Aggregation

The Join operator's partition_aggregation_factory lambda instantiates the aggregation class with parameters specific to JoiningShuffleAggregation. If a custom StatefulShuffleAggregation subclass is provided, this can lead to a TypeError at runtime if its constructor signature differs.

Fix in Cursor Fix in Web

aggregator_id=aggregator_id,
join_type=join_type,
left_key_col_names=left_key_columns,
Expand Down
7 changes: 4 additions & 3 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ def unique(self, column: str) -> List[Any]:

>>> import ray
>>> ds = ray.data.from_items([1, 2, 3, 2, 3])
>>> ds.unique("item")
>>> sorted(ds.unique("item"))
[1, 2, 3]

This function is very useful for computing labels
Expand Down Expand Up @@ -3032,11 +3032,12 @@ def std(
>>> import ray
>>> round(ray.data.range(100).std("id", ddof=0), 5)
28.86607
>>> ray.data.from_items([
>>> result = ray.data.from_items([
... {"A": i, "B": i**2}
... for i in range(100)
... ]).std(["A", "B"])
{'std(A)': 29.011491975882016, 'std(B)': 2968.1748039269296}
>>> [(key, round(value, 10)) for key, value in result.items()]
[('std(A)', 29.0114919759), ('std(B)', 2968.1748039269)]

Args:
on: a column name or a list of column names to aggregate.
Expand Down