Skip to content

[data][optimizer][bug] fix operator fusion bug to preserve udf modifying row count#59513

Merged
alexeykudinkin merged 8 commits intoray-project:masterfrom
iamjustinhsu:jhsu/fix-operator-fusion-udf-modifying-row-count
Jan 5, 2026
Merged

[data][optimizer][bug] fix operator fusion bug to preserve udf modifying row count#59513
alexeykudinkin merged 8 commits intoray-project:masterfrom
iamjustinhsu:jhsu/fix-operator-fusion-udf-modifying-row-count

Conversation

@iamjustinhsu
Copy link
Contributor

@iamjustinhsu iamjustinhsu commented Dec 17, 2025

Description

def process_single_row(x):
    time.sleep(.3)
    x['id2'] = x['id']
    return x

class PrepareImageUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

class ChatUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

def preprocess(x):
    time.sleep(.3)
    return x

def task2(x):
    time.sleep(.3)
    return x

def task3(x):
    time.sleep(.3)
    return x

def filterfn(x):
    return True


ds = (
    ray.data.range(1024, override_num_blocks=1024)
    .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1))
    .drop_columns(cols="id")
    .map(process_single_row)
    .filter(filterfn)
    .map(preprocess)
    .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()

Here's a fun question: what should this return as the optimized physical plan:
Ans:

-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)]
+- TaskPoolMapOperator[MapBatches(task3)]
   +- TaskPoolMapOperator[ReadRange]
      +- InputDataBuffer[Input]

Cool. It looks like it fused mostly everything from drop_columns to PrepareImageUDF
Ok what if I added these lines: what happens now?

ds = (
    ds
    .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()

Ans:

-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(ChatUdf)]
+- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)]
   +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)]
      +- TaskPoolMapOperator[MapBatches(task3)]
         +- TaskPoolMapOperator[ReadRange]
            +- InputDataBuffer[Input]

HuH?? Why did preprocess->PrepareImageUDF get defused??

The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that.

Related issues

None

Additional information

None

… row count

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner December 17, 2025 19:49
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a bug in operator fusion where the ability of a UDF to modify row counts was not being preserved, leading to incorrect physical plans. The fix involves adding a udf_modifying_row_count parameter to AbstractUDFMap and its subclasses, and correctly propagating this property during fusion. The changes also include a nice refactoring that centralizes the can_modify_num_rows logic in the base class, improving code clarity and reducing redundancy. The implementation is solid and effectively resolves the issue. I have one minor suggestion to add a missing docstring.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Dec 17, 2025
@iamjustinhsu iamjustinhsu changed the title [data][fusion][bug] fix operator fusion bug to preserve udf modifying row count [data][optimizer][bug] fix operator fusion bug to preserve udf modifying row count Dec 17, 2025
@gvspraveen
Copy link
Contributor

Nice catch 👏

self._zero_copy_batch = zero_copy_batch
self._udf_modifying_row_count = udf_modifying_row_count

def can_modify_num_rows(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this method now. Looks like you added this in parent class AbstractUDFMap

@gvspraveen
Copy link
Contributor

Can you add tests covering this use case?

input_op: The operator preceding this operator in the plan DAG. The outputs
of `input_op` will be the inputs to this operator.
fn: User-defined function to be called.
udf_modifying_row_count: Whether the UDF can change the row count. True if
Copy link
Contributor

@gvspraveen gvspraveen Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt this supposed to be backwards?

udf_modifying_row_count should be True if # of input rows NOT EQUAL # of output rows

@alexeykudinkin
Copy link
Contributor

Please add tests

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 18, 2025
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing required `can_modify_num_rows` argument in AbstractMap constructor

When creating a fused AbstractMap logical operator in _get_fused_map_operator, the can_modify_num_rows argument is not passed. This parameter is now a required keyword-only argument in AbstractMap.__init__ (defined after * with no default value), so this code will raise a TypeError at runtime when the downstream operator is an AbstractMap rather than an AbstractUDFMap. The fix needs to add can_modify_num_rows=up_logical_op.can_modify_num_rows() or down_logical_op.can_modify_num_rows() similar to the AbstractUDFMap case above it.

python/ray/data/_internal/logical/rules/operator_fusion.py#L489-L496

# The downstream op is AbstractMap instead of AbstractUDFMap.
logical_op = AbstractMap(
name,
input_op,
min_rows_per_bundled_input=min_rows_per_bundled_input,
ray_remote_args_fn=ray_remote_args_fn,
ray_remote_args=ray_remote_args,
)

Fix in Cursor Fix in Web


Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
fn_constructor_kwargs=down_logical_op._fn_constructor_kwargs,
min_rows_per_bundled_input=min_rows_per_bundled_input,
compute=compute,
can_modify_num_rows=can_modify_num_rows,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your intention here Justin, but the actual problem here is that the impl of the current rule isn't right -- instead of this hoopla with carrying logical op, we should split this rule into 2: one applied at logical level, another at physical.

Let's try to see if splitting it up is feasible immediately and if it's let's do that. If it's not, we can come back to this as temporary solution

@github-actions
Copy link

github-actions bot commented Jan 3, 2026

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jan 3, 2026
@iamjustinhsu iamjustinhsu removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jan 5, 2026
@alexeykudinkin alexeykudinkin merged commit 0b2cfa0 into ray-project:master Jan 5, 2026
6 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
…ing row count (ray-project#59513)

## Description
```python
def process_single_row(x):
    time.sleep(.3)
    x['id2'] = x['id']
    return x

class PrepareImageUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

class ChatUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

def preprocess(x):
    time.sleep(.3)
    return x

def task2(x):
    time.sleep(.3)
    return x

def task3(x):
    time.sleep(.3)
    return x

def filterfn(x):
    return True

ds = (
    ray.data.range(1024, override_num_blocks=1024)
    .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1))
    .drop_columns(cols="id")
    .map(process_single_row)
    .filter(filterfn)
    .map(preprocess)
    .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Here's a fun question: what should this return as the optimized physical
plan:
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)]
+- TaskPoolMapOperator[MapBatches(task3)]
   +- TaskPoolMapOperator[ReadRange]
      +- InputDataBuffer[Input]
```

Cool. It looks like it fused mostly everything from `drop_columns` to
`PrepareImageUDF`
Ok what if I added these lines: what happens now?
```python
ds = (
    ds
    .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(ChatUdf)]
+- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)]
   +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)]
      +- TaskPoolMapOperator[MapBatches(task3)]
         +- TaskPoolMapOperator[ReadRange]
            +- InputDataBuffer[Input]
```
HuH?? Why did `preprocess->PrepareImageUDF` get defused??

The issue is that operator map fusion does not preserve whether or not
the row counts can be modified. This PR addresses that.

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
…ing row count (ray-project#59513)

## Description
```python
def process_single_row(x):
    time.sleep(.3)
    x['id2'] = x['id']
    return x

class PrepareImageUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

class ChatUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

def preprocess(x):
    time.sleep(.3)
    return x

def task2(x):
    time.sleep(.3)
    return x

def task3(x):
    time.sleep(.3)
    return x

def filterfn(x):
    return True


ds = (
    ray.data.range(1024, override_num_blocks=1024)
    .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1))
    .drop_columns(cols="id")
    .map(process_single_row)
    .filter(filterfn)
    .map(preprocess)
    .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Here's a fun question: what should this return as the optimized physical
plan:
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)]
+- TaskPoolMapOperator[MapBatches(task3)]
   +- TaskPoolMapOperator[ReadRange]
      +- InputDataBuffer[Input]
```

Cool. It looks like it fused mostly everything from `drop_columns` to
`PrepareImageUDF`
Ok what if I added these lines: what happens now?
```python
ds = (
    ds
    .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(ChatUdf)]
+- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)]
   +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)]
      +- TaskPoolMapOperator[MapBatches(task3)]
         +- TaskPoolMapOperator[ReadRange]
            +- InputDataBuffer[Input]
```
HuH?? Why did `preprocess->PrepareImageUDF` get defused??

The issue is that operator map fusion does not preserve whether or not
the row counts can be modified. This PR addresses that.

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
…ing row count (ray-project#59513)

## Description
```python
def process_single_row(x):
    time.sleep(.3)
    x['id2'] = x['id']
    return x

class PrepareImageUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

class ChatUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

def preprocess(x):
    time.sleep(.3)
    return x

def task2(x):
    time.sleep(.3)
    return x

def task3(x):
    time.sleep(.3)
    return x

def filterfn(x):
    return True


ds = (
    ray.data.range(1024, override_num_blocks=1024)
    .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1))
    .drop_columns(cols="id")
    .map(process_single_row)
    .filter(filterfn)
    .map(preprocess)
    .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Here's a fun question: what should this return as the optimized physical
plan:
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)]
+- TaskPoolMapOperator[MapBatches(task3)]
   +- TaskPoolMapOperator[ReadRange]
      +- InputDataBuffer[Input]
```

Cool. It looks like it fused mostly everything from `drop_columns` to
`PrepareImageUDF`
Ok what if I added these lines: what happens now?
```python
ds = (
    ds
    .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(ChatUdf)]
+- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)]
   +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)]
      +- TaskPoolMapOperator[MapBatches(task3)]
         +- TaskPoolMapOperator[ReadRange]
            +- InputDataBuffer[Input]
```
HuH?? Why did `preprocess->PrepareImageUDF` get defused??

The issue is that operator map fusion does not preserve whether or not
the row counts can be modified. This PR addresses that.

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ing row count (ray-project#59513)

## Description
```python
def process_single_row(x):
    time.sleep(.3)
    x['id2'] = x['id']
    return x

class PrepareImageUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

class ChatUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

def preprocess(x):
    time.sleep(.3)
    return x

def task2(x):
    time.sleep(.3)
    return x

def task3(x):
    time.sleep(.3)
    return x

def filterfn(x):
    return True

ds = (
    ray.data.range(1024, override_num_blocks=1024)
    .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1))
    .drop_columns(cols="id")
    .map(process_single_row)
    .filter(filterfn)
    .map(preprocess)
    .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Here's a fun question: what should this return as the optimized physical
plan:
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)]
+- TaskPoolMapOperator[MapBatches(task3)]
   +- TaskPoolMapOperator[ReadRange]
      +- InputDataBuffer[Input]
```

Cool. It looks like it fused mostly everything from `drop_columns` to
`PrepareImageUDF`
Ok what if I added these lines: what happens now?
```python
ds = (
    ds
    .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(ChatUdf)]
+- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)]
   +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)]
      +- TaskPoolMapOperator[MapBatches(task3)]
         +- TaskPoolMapOperator[ReadRange]
            +- InputDataBuffer[Input]
```
HuH?? Why did `preprocess->PrepareImageUDF` get defused??

The issue is that operator map fusion does not preserve whether or not
the row counts can be modified. This PR addresses that.

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ing row count (ray-project#59513)

## Description
```python
def process_single_row(x):
    time.sleep(.3)
    x['id2'] = x['id']
    return x

class PrepareImageUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

class ChatUdf:
    def __call__(self, x):
        time.sleep(5)
        return x

def preprocess(x):
    time.sleep(.3)
    return x

def task2(x):
    time.sleep(.3)
    return x

def task3(x):
    time.sleep(.3)
    return x

def filterfn(x):
    return True

ds = (
    ray.data.range(1024, override_num_blocks=1024)
    .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1))
    .drop_columns(cols="id")
    .map(process_single_row)
    .filter(filterfn)
    .map(preprocess)
    .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Here's a fun question: what should this return as the optimized physical
plan:
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)]
+- TaskPoolMapOperator[MapBatches(task3)]
   +- TaskPoolMapOperator[ReadRange]
      +- InputDataBuffer[Input]
```

Cool. It looks like it fused mostly everything from `drop_columns` to
`PrepareImageUDF`
Ok what if I added these lines: what happens now?
```python
ds = (
    ds
    .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12))
)
ds.explain()
```
Ans:
```python
-------- Physical Plan (Optimized) --------
ActorPoolMapOperator[MapBatches(ChatUdf)]
+- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)]
   +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)]
      +- TaskPoolMapOperator[MapBatches(task3)]
         +- TaskPoolMapOperator[ReadRange]
            +- InputDataBuffer[Input]
```
HuH?? Why did `preprocess->PrepareImageUDF` get defused??

The issue is that operator map fusion does not preserve whether or not
the row counts can be modified. This PR addresses that.

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants