Skip to content

[Data] Enable and Tune DownstreamCapacityBackpressurePolicy#59753

Merged
raulchen merged 9 commits intomasterfrom
srinathk10/downstream_capacity_backpressure
Dec 30, 2025
Merged

[Data] Enable and Tune DownstreamCapacityBackpressurePolicy#59753
raulchen merged 9 commits intomasterfrom
srinathk10/downstream_capacity_backpressure

Conversation

@srinathk10
Copy link
Contributor

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Enable and Tune DownstreamCapacityBackpressurePolicy

  • To backpressure a given Op, use Queue size build up / Downstream capacity ratio. This ratio represents the upper limit of buffering in Object store between pipeline stages to optimize for throughput.
  • Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization before this backpressure policy can kick in, so steady state is reached.
  • Skip this backpressure policy, if current Os or downstream Op is materializing.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 requested a review from a team as a code owner December 30, 2025 01:03
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces and enables a new DownstreamCapacityBackpressurePolicy, which is a sophisticated mechanism to manage memory pressure by considering downstream processing capacity. The implementation is well-structured with clear helper methods and is accompanied by a comprehensive set of new tests. The changes also include good refactoring of existing backpressure logic and a new callback mechanism to track prefetched bytes from external consumers, which is a thoughtful addition for accurate resource management. I have one suggestion regarding a workaround that could be improved for better long-term maintainability.

@srinathk10 srinathk10 marked this pull request as draft December 30, 2025 01:05
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 marked this pull request as ready for review December 30, 2025 01:14
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 30, 2025
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Dec 30, 2025
srinathk10 and others added 2 commits December 30, 2025 03:01
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>

# Report prefetched bytes to the executor's resource manager.
if self._prefetch_bytes_callback is not None and self._stats is not None:
self._prefetch_bytes_callback(self._stats.iter_prefetched_bytes)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefetch bytes callback missing from finally block

The prefetch_bytes_callback invocation in yield_batch_context is placed after the yield statement but not wrapped in a finally block. In a @contextmanager, code after yield only runs on normal exit, not when exceptions occur or when the generator is closed. If the caller raises an exception during batch processing, the callback won't be invoked, leaving stale prefetch bytes values in the resource manager. This could cause incorrect backpressure decisions. The reviewer @raulchen correctly noted this should be in a finally block, similar to how get_next_batch_context handles its cleanup.

Fix in Cursor Fix in Web

@raulchen raulchen merged commit a0442b9 into master Dec 30, 2025
6 checks passed
@raulchen raulchen deleted the srinathk10/downstream_capacity_backpressure branch December 30, 2025 21:58
iamjustinhsu added a commit to iamjustinhsu/ray that referenced this pull request Jan 12, 2026
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
…ect#59753)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy

- To backpressure a given Op, use Queue size build up / Downstream
capacity ratio. This ratio represents the upper limit of buffering in
Object store between pipeline stages to optimize for throughput.
- Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization
before this backpressure policy can kick in, so steady state is reached.
- Skip this backpressure policy, if current Os or downstream Op is
materializing.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
…ect#59753)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy

- To backpressure a given Op, use Queue size build up / Downstream
capacity ratio. This ratio represents the upper limit of buffering in
Object store between pipeline stages to optimize for throughput.
- Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization
before this backpressure policy can kick in, so steady state is reached.
- Skip this backpressure policy, if current Os or downstream Op is
materializing.


## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ect#59753)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy

- To backpressure a given Op, use Queue size build up / Downstream
capacity ratio. This ratio represents the upper limit of buffering in
Object store between pipeline stages to optimize for throughput.
- Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization
before this backpressure policy can kick in, so steady state is reached.
- Skip this backpressure policy, if current Os or downstream Op is
materializing.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ect#59753)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy

- To backpressure a given Op, use Queue size build up / Downstream
capacity ratio. This ratio represents the upper limit of buffering in
Object store between pipeline stages to optimize for throughput.
- Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization
before this backpressure policy can kick in, so steady state is reached.
- Skip this backpressure policy, if current Os or downstream Op is
materializing.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants