[data][optimizer][bug] fix operator fusion bug to preserve udf modifying row count#59513
Conversation
… row count Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request addresses a bug in operator fusion where the ability of a UDF to modify row counts was not being preserved, leading to incorrect physical plans. The fix involves adding a udf_modifying_row_count parameter to AbstractUDFMap and its subclasses, and correctly propagating this property during fusion. The changes also include a nice refactoring that centralizes the can_modify_num_rows logic in the base class, improving code clarity and reducing redundancy. The implementation is solid and effectively resolves the issue. I have one minor suggestion to add a missing docstring.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
|
Nice catch 👏 |
| self._zero_copy_batch = zero_copy_batch | ||
| self._udf_modifying_row_count = udf_modifying_row_count | ||
|
|
||
| def can_modify_num_rows(self) -> bool: |
There was a problem hiding this comment.
you can remove this method now. Looks like you added this in parent class AbstractUDFMap
|
Can you add tests covering this use case? |
| input_op: The operator preceding this operator in the plan DAG. The outputs | ||
| of `input_op` will be the inputs to this operator. | ||
| fn: User-defined function to be called. | ||
| udf_modifying_row_count: Whether the UDF can change the row count. True if |
There was a problem hiding this comment.
isnt this supposed to be backwards?
udf_modifying_row_count should be True if # of input rows NOT EQUAL # of output rows
|
Please add tests |
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Bug: Missing required `can_modify_num_rows` argument in AbstractMap constructor
When creating a fused AbstractMap logical operator in _get_fused_map_operator, the can_modify_num_rows argument is not passed. This parameter is now a required keyword-only argument in AbstractMap.__init__ (defined after * with no default value), so this code will raise a TypeError at runtime when the downstream operator is an AbstractMap rather than an AbstractUDFMap. The fix needs to add can_modify_num_rows=up_logical_op.can_modify_num_rows() or down_logical_op.can_modify_num_rows() similar to the AbstractUDFMap case above it.
python/ray/data/_internal/logical/rules/operator_fusion.py#L489-L496
ray/python/ray/data/_internal/logical/rules/operator_fusion.py
Lines 489 to 496 in 3e4630a
| fn_constructor_kwargs=down_logical_op._fn_constructor_kwargs, | ||
| min_rows_per_bundled_input=min_rows_per_bundled_input, | ||
| compute=compute, | ||
| can_modify_num_rows=can_modify_num_rows, |
There was a problem hiding this comment.
I understand your intention here Justin, but the actual problem here is that the impl of the current rule isn't right -- instead of this hoopla with carrying logical op, we should split this rule into 2: one applied at logical level, another at physical.
Let's try to see if splitting it up is feasible immediately and if it's let's do that. If it's not, we can come back to this as temporary solution
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
…ing row count (ray-project#59513) ## Description ```python def process_single_row(x): time.sleep(.3) x['id2'] = x['id'] return x class PrepareImageUdf: def __call__(self, x): time.sleep(5) return x class ChatUdf: def __call__(self, x): time.sleep(5) return x def preprocess(x): time.sleep(.3) return x def task2(x): time.sleep(.3) return x def task3(x): time.sleep(.3) return x def filterfn(x): return True ds = ( ray.data.range(1024, override_num_blocks=1024) .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1)) .drop_columns(cols="id") .map(process_single_row) .filter(filterfn) .map(preprocess) .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Here's a fun question: what should this return as the optimized physical plan: Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` Cool. It looks like it fused mostly everything from `drop_columns` to `PrepareImageUDF` Ok what if I added these lines: what happens now? ```python ds = ( ds .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(ChatUdf)] +- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` HuH?? Why did `preprocess->PrepareImageUDF` get defused?? The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
…ing row count (ray-project#59513) ## Description ```python def process_single_row(x): time.sleep(.3) x['id2'] = x['id'] return x class PrepareImageUdf: def __call__(self, x): time.sleep(5) return x class ChatUdf: def __call__(self, x): time.sleep(5) return x def preprocess(x): time.sleep(.3) return x def task2(x): time.sleep(.3) return x def task3(x): time.sleep(.3) return x def filterfn(x): return True ds = ( ray.data.range(1024, override_num_blocks=1024) .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1)) .drop_columns(cols="id") .map(process_single_row) .filter(filterfn) .map(preprocess) .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Here's a fun question: what should this return as the optimized physical plan: Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` Cool. It looks like it fused mostly everything from `drop_columns` to `PrepareImageUDF` Ok what if I added these lines: what happens now? ```python ds = ( ds .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(ChatUdf)] +- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` HuH?? Why did `preprocess->PrepareImageUDF` get defused?? The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…ing row count (ray-project#59513) ## Description ```python def process_single_row(x): time.sleep(.3) x['id2'] = x['id'] return x class PrepareImageUdf: def __call__(self, x): time.sleep(5) return x class ChatUdf: def __call__(self, x): time.sleep(5) return x def preprocess(x): time.sleep(.3) return x def task2(x): time.sleep(.3) return x def task3(x): time.sleep(.3) return x def filterfn(x): return True ds = ( ray.data.range(1024, override_num_blocks=1024) .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1)) .drop_columns(cols="id") .map(process_single_row) .filter(filterfn) .map(preprocess) .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Here's a fun question: what should this return as the optimized physical plan: Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` Cool. It looks like it fused mostly everything from `drop_columns` to `PrepareImageUDF` Ok what if I added these lines: what happens now? ```python ds = ( ds .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(ChatUdf)] +- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` HuH?? Why did `preprocess->PrepareImageUDF` get defused?? The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…ing row count (ray-project#59513) ## Description ```python def process_single_row(x): time.sleep(.3) x['id2'] = x['id'] return x class PrepareImageUdf: def __call__(self, x): time.sleep(5) return x class ChatUdf: def __call__(self, x): time.sleep(5) return x def preprocess(x): time.sleep(.3) return x def task2(x): time.sleep(.3) return x def task3(x): time.sleep(.3) return x def filterfn(x): return True ds = ( ray.data.range(1024, override_num_blocks=1024) .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1)) .drop_columns(cols="id") .map(process_single_row) .filter(filterfn) .map(preprocess) .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Here's a fun question: what should this return as the optimized physical plan: Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` Cool. It looks like it fused mostly everything from `drop_columns` to `PrepareImageUDF` Ok what if I added these lines: what happens now? ```python ds = ( ds .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(ChatUdf)] +- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` HuH?? Why did `preprocess->PrepareImageUDF` get defused?? The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…ing row count (ray-project#59513) ## Description ```python def process_single_row(x): time.sleep(.3) x['id2'] = x['id'] return x class PrepareImageUdf: def __call__(self, x): time.sleep(5) return x class ChatUdf: def __call__(self, x): time.sleep(5) return x def preprocess(x): time.sleep(.3) return x def task2(x): time.sleep(.3) return x def task3(x): time.sleep(.3) return x def filterfn(x): return True ds = ( ray.data.range(1024, override_num_blocks=1024) .map_batches(task3, compute=ray.data.TaskPoolStrategy(size=1)) .drop_columns(cols="id") .map(process_single_row) .filter(filterfn) .map(preprocess) .map_batches(PrepareImageUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Here's a fun question: what should this return as the optimized physical plan: Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)->Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` Cool. It looks like it fused mostly everything from `drop_columns` to `PrepareImageUDF` Ok what if I added these lines: what happens now? ```python ds = ( ds .map_batches(ChatUdf, zero_copy_batch=True, batch_size=64, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=12)) ) ds.explain() ``` Ans: ```python -------- Physical Plan (Optimized) -------- ActorPoolMapOperator[MapBatches(ChatUdf)] +- ActorPoolMapOperator[Map(preprocess)->MapBatches(PrepareImageUdf)] +- TaskPoolMapOperator[MapBatches(drop_columns)->Map(process_single_row)->Filter(filterfn)] +- TaskPoolMapOperator[MapBatches(task3)] +- TaskPoolMapOperator[ReadRange] +- InputDataBuffer[Input] ``` HuH?? Why did `preprocess->PrepareImageUDF` get defused?? The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Here's a fun question: what should this return as the optimized physical plan:
Ans:
Cool. It looks like it fused mostly everything from
drop_columnstoPrepareImageUDFOk what if I added these lines: what happens now?
Ans:
HuH?? Why did
preprocess->PrepareImageUDFget defused??The issue is that operator map fusion does not preserve whether or not the row counts can be modified. This PR addresses that.
Related issues
None
Additional information
None