[Data] Fix Union operator to avoid blocking when preserve order#59922
Conversation
There was a problem hiding this comment.
Code Review
This pull request fixes a blocking issue in the Union operator when preserve_order is enabled. The change introduces a streaming approach, flushing data from input operators as they complete, which is a good improvement. My review identifies a critical potential for deadlock because the flushing logic isn't triggered when an input stream finishes without sending a final block. I've also included a couple of suggestions to improve code readability and maintainability by reducing duplication and simplifying expressions. Addressing the critical issue is necessary to prevent hangs in the data processing pipeline.
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
alexeykudinkin
left a comment
There was a problem hiding this comment.
@owenowenisme the problem is that we still we're exhausting 1 input before we move on to the next one, which means we're gonna be accumulating remaining outputs before producing.
Instead, please take a look how we achieve determinism in make_async_gen and apply the same technique here:
- Iterate over inputs in the same order
- Always deque 1 block and never skip an input!
- Once op completes you can start skipping it
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
|
@alexeykudinkin |
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
…ot_block_when_preserve_order_true
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
e4e4007 to
da8d1b5
Compare
…project#59922) ## Description Make the Union operator not blocking when `preserve_order` is enabled if `_add_input_inner` is called with the input in the front. --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
…project#59922) ## Description Make the Union operator not blocking when `preserve_order` is enabled if `_add_input_inner` is called with the input in the front. --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: 400Ping <jiekaichang@apache.org>
…project#59922) ## Description Make the Union operator not blocking when `preserve_order` is enabled if `_add_input_inner` is called with the input in the front. --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
…project#59922) ## Description Make the Union operator not blocking when `preserve_order` is enabled if `_add_input_inner` is called with the input in the front. --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…project#59922) ## Description Make the Union operator not blocking when `preserve_order` is enabled if `_add_input_inner` is called with the input in the front. --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Make the Union operator not blocking when
preserve_orderis enabled if_add_input_inneris called with the input in the front.