Skip to content

[Data] Enable Concurrency cap backpressure with tuning#59392

Merged
alexeykudinkin merged 11 commits intomasterfrom
srinathk10/concurrency_cap_tuning_latest
Dec 17, 2025
Merged

[Data] Enable Concurrency cap backpressure with tuning#59392
alexeykudinkin merged 11 commits intomasterfrom
srinathk10/concurrency_cap_tuning_latest

Conversation

@srinathk10
Copy link
Contributor

@srinathk10 srinathk10 commented Dec 11, 2025

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Concurrency cap backpressure tuning

EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@srinathk10 srinathk10 requested a review from a team as a code owner December 11, 2025 19:24
@srinathk10 srinathk10 force-pushed the srinathk10/concurrency_cap_tuning_latest branch from 9dce1c0 to 7c224fe Compare December 11, 2025 19:25
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request tunes the concurrency cap backpressure policy by adjusting EWMA parameters and, most notably, adding a hard guardrail based on the initial queue size to make the system more responsive to downstream queueing. The changes are generally well-implemented and include relevant test updates. My review includes a high-severity comment about a potential bug in detecting downstream materializing operators, which could lead to incorrect backpressure behavior. I've also included a couple of medium-severity suggestions to improve code clarity and maintainability by refactoring a magic number and removing unrelated changes from this PR.

I am having trouble creating individual review comments. Click here to see my feedback.

python/ray/data/_internal/execution/resource_manager.py (409-414)

high

The current implementation of has_materializing_downstream_op only checks for immediate downstream operators. This can lead to incorrect behavior if there are non-materializing operators (like limit or filter) between the current operator and a materializing one (e.g., map -> limit -> all_to_all). In such a scenario, backpressure might be incorrectly applied to the map operator, potentially starving the all_to_all operator. The check should traverse the full downstream DAG to be correct.

A breadth-first search would be a robust way to implement this traversal.

    def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
        """Check if the operator has a downstream materializing operator."""
        q = list(op.output_dependencies)
        visited = set(op.output_dependencies)
        while q:
            curr_op = q.pop(0)
            if isinstance(curr_op, MATERIALIZING_OPERATORS):
                return True
            for next_op in curr_op.output_dependencies:
                if next_op not in visited:
                    visited.add(next_op)
                    q.append(next_op)
        return False

python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py (231)

medium

The formula for max_queue_bytes uses a magic number 2 for division. To improve readability and make the tuning parameter more explicit, consider defining this factor as a named constant at the class level, for example: HARD_GUARDRAIL_BUDGET_FACTOR = 0.5. This makes the logic clearer and easier to adjust in the future.

            max_queue_bytes = initial_q * (1.0 + self.OBJECT_STORE_BUDGET_RATIO * 0.5)

python/ray/data/context.py (254-256)

medium

This new context setting DEFAULT_DOWNSTREAM_CAPACITY_OUTPUTS_RATIO is added but appears to be unused within this pull request. To keep PRs focused and easier to review, it's generally better to introduce new configurations in the same PR where they are first used. Consider moving this to a future PR where it's implemented. The same applies to other unrelated changes in this file, such as the polars rename and _epoch_idx addition.

@srinathk10 srinathk10 force-pushed the srinathk10/concurrency_cap_tuning_latest branch from 7c224fe to 56c502c Compare December 11, 2025 19:40
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 force-pushed the srinathk10/concurrency_cap_tuning_latest branch from 56c502c to a9a4e10 Compare December 11, 2025 19:48
@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Dec 11, 2025
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 12, 2025
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
* actor_pool.max_tasks_in_flight_per_actor()
for actor_pool in op.get_autoscaling_actor_pools()
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: ActorPoolMapOperator cap uses different formula than TaskPoolMapOperator

The concurrency cap calculation for ActorPoolMapOperator uses max_tasks_in_flight_per_actor() (lines 85-86), but for TaskPoolMapOperator the code uses op.get_max_concurrency_limit() (line 91). The existing ActorPoolMapOperator.get_max_concurrency_limit() method uses max_actor_concurrency() instead of max_tasks_in_flight_per_actor(). Since max_tasks_in_flight_per_actor defaults to 2x max_actor_concurrency, this results in an inconsistent cap that is twice as high for actor pools compared to what get_max_concurrency_limit() would return.

Fix in Cursor Fix in Web

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
# Initialize concurrency caps from operators (infinite if unset)
for op in self._topology:
concurrency_cap: float = float("inf")
if isinstance(op, ActorPoolMapOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid piling up changes.

This cap is not necessary -- we won't schedule more tasks then max_tasks_inflight.

We don't need TPMA cap either, but let's clean it up in a separate change.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>

DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabled by default, but will disable in 2.53.

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srinathk10 please re-run release tests on the latest state to confirm results are looking good still

@srinathk10
Copy link
Contributor Author

Release test runs comparison with baseline.

@alexeykudinkin alexeykudinkin merged commit fdc9e1d into master Dec 17, 2025
6 checks passed
@alexeykudinkin alexeykudinkin deleted the srinathk10/concurrency_cap_tuning_latest branch December 17, 2025 18:52
aslonnie pushed a commit that referenced this pull request Dec 17, 2025
EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

cherry-pick of #59392
zzchun pushed a commit to zzchun/ray that referenced this pull request Dec 18, 2025
…9392)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Concurrency cap backpressure tuning
**EWMA_ALPHA**
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more
in-favor of limiting concurrency by being more sensitive to
downstreaming queuing.

**K_DEV**
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of
limiting concurrency by being more sensitive to downstreaming queuing.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Yicheng-Lu-llll pushed a commit to Yicheng-Lu-llll/ray that referenced this pull request Dec 22, 2025
…9392)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Concurrency cap backpressure tuning
**EWMA_ALPHA**
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more
in-favor of limiting concurrency by being more sensitive to
downstreaming queuing.

**K_DEV**
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of
limiting concurrency by being more sensitive to downstreaming queuing.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
weiquanlee pushed a commit to antgroup/ant-ray that referenced this pull request Jan 5, 2026
…ct#59519)

EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

cherry-pick of ray-project#59392
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…9392)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Concurrency cap backpressure tuning
**EWMA_ALPHA**
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more
in-favor of limiting concurrency by being more sensitive to
downstreaming queuing.

**K_DEV**
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of
limiting concurrency by being more sensitive to downstreaming queuing.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants