[Data] Enable and Tune DownstreamCapacityBackpressurePolicy#59753
[Data] Enable and Tune DownstreamCapacityBackpressurePolicy#59753
Conversation
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces and enables a new DownstreamCapacityBackpressurePolicy, which is a sophisticated mechanism to manage memory pressure by considering downstream processing capacity. The implementation is well-structured with clear helper methods and is accompanied by a comprehensive set of new tests. The changes also include good refactoring of existing backpressure logic and a new callback mechanism to track prefetched bytes from external consumers, which is a thoughtful addition for accurate resource management. I have one suggestion regarding a workaround that could be improved for better long-term maintainability.
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
.../ray/data/_internal/execution/backpressure_policy/downstream_capacity_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Show resolved
Hide resolved
|
|
||
| # Report prefetched bytes to the executor's resource manager. | ||
| if self._prefetch_bytes_callback is not None and self._stats is not None: | ||
| self._prefetch_bytes_callback(self._stats.iter_prefetched_bytes) |
There was a problem hiding this comment.
Prefetch bytes callback missing from finally block
The prefetch_bytes_callback invocation in yield_batch_context is placed after the yield statement but not wrapped in a finally block. In a @contextmanager, code after yield only runs on normal exit, not when exceptions occur or when the generator is closed. If the caller raises an exception during batch processing, the callback won't be invoked, leaving stale prefetch bytes values in the resource manager. This could cause incorrect backpressure decisions. The reviewer @raulchen correctly noted this should be in a finally block, similar to how get_next_batch_context handles its cleanup.
…ay-project#59753)" This reverts commit a0442b9.
…ect#59753) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy - To backpressure a given Op, use Queue size build up / Downstream capacity ratio. This ratio represents the upper limit of buffering in Object store between pipeline stages to optimize for throughput. - Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization before this backpressure policy can kick in, so steady state is reached. - Skip this backpressure policy, if current Os or downstream Op is materializing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
…ect#59753) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy - To backpressure a given Op, use Queue size build up / Downstream capacity ratio. This ratio represents the upper limit of buffering in Object store between pipeline stages to optimize for throughput. - Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization before this backpressure policy can kick in, so steady state is reached. - Skip this backpressure policy, if current Os or downstream Op is materializing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
…ect#59753) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy - To backpressure a given Op, use Queue size build up / Downstream capacity ratio. This ratio represents the upper limit of buffering in Object store between pipeline stages to optimize for throughput. - Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization before this backpressure policy can kick in, so steady state is reached. - Skip this backpressure policy, if current Os or downstream Op is materializing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…ect#59753) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy - To backpressure a given Op, use Queue size build up / Downstream capacity ratio. This ratio represents the upper limit of buffering in Object store between pipeline stages to optimize for throughput. - Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization before this backpressure policy can kick in, so steady state is reached. - Skip this backpressure policy, if current Os or downstream Op is materializing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
[Data] Enable and Tune DownstreamCapacityBackpressurePolicy
Related issues
Additional information