[Data] Fix streaming executor to drain upstream output queue(s)#56941
[Data] Fix streaming executor to drain upstream output queue(s)#56941alexeykudinkin merged 1 commit intomasterfrom
Conversation
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a potential memory leak in the streaming executor by ensuring upstream output queues are drained when a downstream operator finishes execution, such as when a limit is reached. The fix in update_operator_states is correct and is accompanied by two new tests: a focused unit test and a valuable integration test that validates the fix in an end-to-end scenario. My review found a minor issue in the new integration test's logic for identifying an operator, which could make the test fragile. Overall, this is a solid contribution that improves resource management in Ray Data pipelines.
| topology = executor._topology | ||
| read_parquet_op_state = None | ||
| for op, op_state in topology.items(): | ||
| if "ReadParquet" in op.name: |
There was a problem hiding this comment.
The condition "ReadParquet" in op.name is likely incorrect and fragile. The name of the logical Read operator for a Parquet datasource is constructed as f"Read({datasource.get_name()})", which results in Read(Parquet). The substring "ReadParquet" is not present in this name. A more robust approach would be to match the exact name.
| if "ReadParquet" in op.name: | |
| if "Read(Parquet)" in op.name: |
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? **Issue** - Consider a Pipeline ReadFiles->Limit->Map->... - When Limit, we mark the `mark_execution_finished` for current LimitOperator, so it won't dequeue from the upstream Ops output queue anymore. - When this happens, we can have Object Store resource leaks for the life of the pipeline because of pending unneeded Blocks in the upstream Op's output queue. - In the below progress bar, ReadFiles has 9.7GB locked up. ``` (pid=18548) ✔️ Dataset train_14_0 execution finished in 130.77 seconds: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 887k/887k [02:10<00:00, 6.78k row/s] (pid=18548) - ListFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 513/513 [02:10<00:00, 3.92 row/s] (pid=18548) - ReadFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 9.7GB object store (in=0.0B,out=9.7GB): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.28M/1.28M [02:10<00:00, 9.79k row/s] (pid=18548) - limit=1000000: Tasks: 0; Actors: 0; Queued blocks: 107; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - Map(map_fn): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - split(16, equal=True): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 63.7GB object store (in=0.0B,out=63.7GB); [locality disabled]: 100%|█████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] ``` **Fix** - When current Op is done, proactively drain the upstream Ops output queues and mark input done for that upstream Op. - Here you can observe that ReadFiles Op does not have object store resources locked up anymore. <img width="1293" height="131" alt="Screenshot 2025-09-19 at 9 48 04 AM" src="https://github.com/user-attachments/assets/407d291d-44a5-4357-9cf3-d86ccf40e83b" /> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? **Issue** - Consider a Pipeline ReadFiles->Limit->Map->... - When Limit, we mark the `mark_execution_finished` for current LimitOperator, so it won't dequeue from the upstream Ops output queue anymore. - When this happens, we can have Object Store resource leaks for the life of the pipeline because of pending unneeded Blocks in the upstream Op's output queue. - In the below progress bar, ReadFiles has 9.7GB locked up. ``` (pid=18548) ✔️ Dataset train_14_0 execution finished in 130.77 seconds: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 887k/887k [02:10<00:00, 6.78k row/s] (pid=18548) - ListFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 513/513 [02:10<00:00, 3.92 row/s] (pid=18548) - ReadFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 9.7GB object store (in=0.0B,out=9.7GB): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.28M/1.28M [02:10<00:00, 9.79k row/s] (pid=18548) - limit=1000000: Tasks: 0; Actors: 0; Queued blocks: 107; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - Map(map_fn): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - split(16, equal=True): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 63.7GB object store (in=0.0B,out=63.7GB); [locality disabled]: 100%|█████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] ``` **Fix** - When current Op is done, proactively drain the upstream Ops output queues and mark input done for that upstream Op. - Here you can observe that ReadFiles Op does not have object store resources locked up anymore. <img width="1293" height="131" alt="Screenshot 2025-09-19 at 9 48 04 AM" src="https://github.com/user-attachments/assets/407d291d-44a5-4357-9cf3-d86ccf40e83b" /> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
…project#56941) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? **Issue** - Consider a Pipeline ReadFiles->Limit->Map->... - When Limit, we mark the `mark_execution_finished` for current LimitOperator, so it won't dequeue from the upstream Ops output queue anymore. - When this happens, we can have Object Store resource leaks for the life of the pipeline because of pending unneeded Blocks in the upstream Op's output queue. - In the below progress bar, ReadFiles has 9.7GB locked up. ``` (pid=18548) ✔️ Dataset train_14_0 execution finished in 130.77 seconds: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 887k/887k [02:10<00:00, 6.78k row/s] (pid=18548) - ListFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 513/513 [02:10<00:00, 3.92 row/s] (pid=18548) - ReadFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 9.7GB object store (in=0.0B,out=9.7GB): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.28M/1.28M [02:10<00:00, 9.79k row/s] (pid=18548) - limit=1000000: Tasks: 0; Actors: 0; Queued blocks: 107; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - Map(map_fn): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - split(16, equal=True): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 63.7GB object store (in=0.0B,out=63.7GB); [locality disabled]: 100%|█████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] ``` **Fix** - When current Op is done, proactively drain the upstream Ops output queues and mark input done for that upstream Op. - Here you can observe that ReadFiles Op does not have object store resources locked up anymore. <img width="1293" height="131" alt="Screenshot 2025-09-19 at 9 48 04 AM" src="https://github.com/user-attachments/assets/407d291d-44a5-4357-9cf3-d86ccf40e83b" /> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
…project#56941) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? **Issue** - Consider a Pipeline ReadFiles->Limit->Map->... - When Limit, we mark the `mark_execution_finished` for current LimitOperator, so it won't dequeue from the upstream Ops output queue anymore. - When this happens, we can have Object Store resource leaks for the life of the pipeline because of pending unneeded Blocks in the upstream Op's output queue. - In the below progress bar, ReadFiles has 9.7GB locked up. ``` (pid=18548) ✔️ Dataset train_14_0 execution finished in 130.77 seconds: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 887k/887k [02:10<00:00, 6.78k row/s] (pid=18548) - ListFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 513/513 [02:10<00:00, 3.92 row/s] (pid=18548) - ReadFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 9.7GB object store (in=0.0B,out=9.7GB): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.28M/1.28M [02:10<00:00, 9.79k row/s] (pid=18548) - limit=1000000: Tasks: 0; Actors: 0; Queued blocks: 107; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - Map(map_fn): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - split(16, equal=True): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 63.7GB object store (in=0.0B,out=63.7GB); [locality disabled]: 100%|█████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] ``` **Fix** - When current Op is done, proactively drain the upstream Ops output queues and mark input done for that upstream Op. - Here you can observe that ReadFiles Op does not have object store resources locked up anymore. <img width="1293" height="131" alt="Screenshot 2025-09-19 at 9 48 04 AM" src="https://github.com/user-attachments/assets/407d291d-44a5-4357-9cf3-d86ccf40e83b" /> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
…project#56941) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? **Issue** - Consider a Pipeline ReadFiles->Limit->Map->... - When Limit, we mark the `mark_execution_finished` for current LimitOperator, so it won't dequeue from the upstream Ops output queue anymore. - When this happens, we can have Object Store resource leaks for the life of the pipeline because of pending unneeded Blocks in the upstream Op's output queue. - In the below progress bar, ReadFiles has 9.7GB locked up. ``` (pid=18548) ✔️ Dataset train_14_0 execution finished in 130.77 seconds: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 887k/887k [02:10<00:00, 6.78k row/s] (pid=18548) - ListFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 513/513 [02:10<00:00, 3.92 row/s] (pid=18548) - ReadFiles: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 9.7GB object store (in=0.0B,out=9.7GB): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.28M/1.28M [02:10<00:00, 9.79k row/s] (pid=18548) - limit=1000000: Tasks: 0; Actors: 0; Queued blocks: 107; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - Map(map_fn): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store (in=0.0B,out=0.0B): 100%|████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] (pid=18548) - split(16, equal=True): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 63.7GB object store (in=0.0B,out=63.7GB); [locality disabled]: 100%|█████████████████████████████████████████████████████████| 1.00M/1.00M [02:10<00:00, 7.64k row/s] ``` **Fix** - When current Op is done, proactively drain the upstream Ops output queues and mark input done for that upstream Op. - Here you can observe that ReadFiles Op does not have object store resources locked up anymore. <img width="1293" height="131" alt="Screenshot 2025-09-19 at 9 48 04 AM" src="https://github.com/user-attachments/assets/407d291d-44a5-4357-9cf3-d86ccf40e83b" /> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
Why are these changes needed?
Issue
mark_execution_finishedfor currentLimitOperator, so it won't dequeue from the upstream Ops output queue
anymore.
life of the pipeline because of pending unneeded Blocks in the upstream
Op's output queue.
Fix
queues and mark input done for that upstream Op.
resources locked up anymore.
Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.