[Data] Enable Concurrency cap backpressure with tuning#59392
[Data] Enable Concurrency cap backpressure with tuning#59392alexeykudinkin merged 11 commits intomasterfrom
Conversation
9dce1c0 to
7c224fe
Compare
There was a problem hiding this comment.
Code Review
This pull request tunes the concurrency cap backpressure policy by adjusting EWMA parameters and, most notably, adding a hard guardrail based on the initial queue size to make the system more responsive to downstream queueing. The changes are generally well-implemented and include relevant test updates. My review includes a high-severity comment about a potential bug in detecting downstream materializing operators, which could lead to incorrect backpressure behavior. I've also included a couple of medium-severity suggestions to improve code clarity and maintainability by refactoring a magic number and removing unrelated changes from this PR.
I am having trouble creating individual review comments. Click here to see my feedback.
python/ray/data/_internal/execution/resource_manager.py (409-414)
The current implementation of has_materializing_downstream_op only checks for immediate downstream operators. This can lead to incorrect behavior if there are non-materializing operators (like limit or filter) between the current operator and a materializing one (e.g., map -> limit -> all_to_all). In such a scenario, backpressure might be incorrectly applied to the map operator, potentially starving the all_to_all operator. The check should traverse the full downstream DAG to be correct.
A breadth-first search would be a robust way to implement this traversal.
def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
"""Check if the operator has a downstream materializing operator."""
q = list(op.output_dependencies)
visited = set(op.output_dependencies)
while q:
curr_op = q.pop(0)
if isinstance(curr_op, MATERIALIZING_OPERATORS):
return True
for next_op in curr_op.output_dependencies:
if next_op not in visited:
visited.add(next_op)
q.append(next_op)
return False
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py (231)
The formula for max_queue_bytes uses a magic number 2 for division. To improve readability and make the tuning parameter more explicit, consider defining this factor as a named constant at the class level, for example: HARD_GUARDRAIL_BUDGET_FACTOR = 0.5. This makes the logic clearer and easier to adjust in the future.
max_queue_bytes = initial_q * (1.0 + self.OBJECT_STORE_BUDGET_RATIO * 0.5)
python/ray/data/context.py (254-256)
This new context setting DEFAULT_DOWNSTREAM_CAPACITY_OUTPUTS_RATIO is added but appears to be unused within this pull request. To keep PRs focused and easier to review, it's generally better to introduce new configurations in the same PR where they are first used. Consider moving this to a future PR where it's implemented. The same applies to other unrelated changes in this file, such as the polars rename and _epoch_idx addition.
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
7c224fe to
56c502c
Compare
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
56c502c to
a9a4e10
Compare
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
| * actor_pool.max_tasks_in_flight_per_actor() | ||
| for actor_pool in op.get_autoscaling_actor_pools() | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Bug: ActorPoolMapOperator cap uses different formula than TaskPoolMapOperator
The concurrency cap calculation for ActorPoolMapOperator uses max_tasks_in_flight_per_actor() (lines 85-86), but for TaskPoolMapOperator the code uses op.get_max_concurrency_limit() (line 91). The existing ActorPoolMapOperator.get_max_concurrency_limit() method uses max_actor_concurrency() instead of max_tasks_in_flight_per_actor(). Since max_tasks_in_flight_per_actor defaults to 2x max_actor_concurrency, this results in an inconsistent cap that is twice as high for actor pools compared to what get_max_concurrency_limit() would return.
| # Initialize concurrency caps from operators (infinite if unset) | ||
| for op in self._topology: | ||
| concurrency_cap: float = float("inf") | ||
| if isinstance(op, ActorPoolMapOperator): |
There was a problem hiding this comment.
Let's avoid piling up changes.
This cap is not necessary -- we won't schedule more tasks then max_tasks_inflight.
We don't need TPMA cap either, but let's clean it up in a separate change.
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
|
|
||
| DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool( | ||
| "RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False | ||
| "RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", True |
There was a problem hiding this comment.
Enabled by default, but will disable in 2.53.
There was a problem hiding this comment.
@srinathk10 please re-run release tests on the latest state to confirm results are looking good still
|
Release test runs comparison with baseline. |
EWMA_ALPHA Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. K_DEV Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. cherry-pick of #59392
…9392) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency cap backpressure tuning **EWMA_ALPHA** Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. **K_DEV** Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
…9392) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency cap backpressure tuning **EWMA_ALPHA** Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. **K_DEV** Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
…ct#59519) EWMA_ALPHA Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. K_DEV Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. cherry-pick of ray-project#59392
…9392) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency cap backpressure tuning **EWMA_ALPHA** Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. **K_DEV** Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
[Data] Concurrency cap backpressure tuning
EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.
K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.
Related issues
Additional information