Skip to content

Commit cd69734

Browse files
srinathk10peterxcli
authored andcommitted
[Data] Enable and Tune DownstreamCapacityBackpressurePolicy (ray-project#59753)
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. > ⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Enable and Tune DownstreamCapacityBackpressurePolicy - To backpressure a given Op, use Queue size build up / Downstream capacity ratio. This ratio represents the upper limit of buffering in Object store between pipeline stages to optimize for throughput. - Wait until OBJECT_STORE_BUDGET_UTIL_THRESHOLD of the Op utilization before this backpressure policy can kick in, so steady state is reached. - Skip this backpressure policy, if current Os or downstream Op is materializing. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
1 parent cf85995 commit cd69734

File tree

14 files changed

+978
-210
lines changed

14 files changed

+978
-210
lines changed

python/ray/data/_internal/block_batching/iter_batches.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ class BatchIterator:
9696
the specified amount of formatted batches from blocks. This improves
9797
performance for non-CPU bound UDFs, allowing batch fetching compute and
9898
formatting to be overlapped with the UDF. Defaults to 1.
99+
prefetch_bytes_callback: A callback to report prefetched bytes to the executor's
100+
resource manager.
99101
"""
100102

101103
UPDATE_METRICS_INTERVAL_S: float = 5.0
@@ -116,6 +118,7 @@ def __init__(
116118
shuffle_seed: Optional[int] = None,
117119
ensure_copy: bool = False,
118120
prefetch_batches: int = 1,
121+
prefetch_bytes_callback: Optional[Callable[[int], None]] = None,
119122
):
120123
self._ref_bundles = ref_bundles
121124
self._stats = stats
@@ -129,6 +132,7 @@ def __init__(
129132
self._shuffle_seed = shuffle_seed
130133
self._ensure_copy = ensure_copy
131134
self._prefetch_batches = prefetch_batches
135+
self._prefetch_bytes_callback = prefetch_bytes_callback
132136
# TODO: pass the dataset's context down instead of fetching the global context here.
133137
self._ctx = DataContext.get_current()
134138
self._eager_free = clear_block_after_read and self._ctx.eager_free
@@ -271,6 +275,10 @@ def before_epoch_start(self):
271275
self._yielded_first_batch = False
272276

273277
def after_epoch_end(self):
278+
# Report 0 prefetched bytes at the end of iteration.
279+
if self._prefetch_bytes_callback is not None:
280+
self._prefetch_bytes_callback(0)
281+
274282
if self._stats is None:
275283
return
276284

@@ -300,6 +308,10 @@ def yield_batch_context(self, batch: Batch):
300308
with self._stats.iter_user_s.timer() if self._stats else nullcontext():
301309
yield
302310

311+
# Report prefetched bytes to the executor's resource manager.
312+
if self._prefetch_bytes_callback is not None and self._stats is not None:
313+
self._prefetch_bytes_callback(self._stats.iter_prefetched_bytes)
314+
303315
if self._stats is None:
304316
return
305317
now = time.time()

python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,18 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:
159159
return num_tasks_running < self._concurrency_caps[op]
160160

161161
# For this Op, if the objectstore budget (available) to total
162-
# ratio is below threshold (10%), skip dynamic output queue size backpressure.
163-
op_usage = self._resource_manager.get_op_usage(op)
164-
op_budget = self._resource_manager.get_budget(op)
165-
if op_usage is not None and op_budget is not None:
166-
total_mem = op_usage.object_store_memory + op_budget.object_store_memory
167-
if total_mem == 0 or (
168-
op_budget.object_store_memory / total_mem
169-
> self.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
170-
):
171-
# If the objectstore budget (available) to total
172-
# ratio is above threshold (10%), skip dynamic output queue size
173-
# backpressure, but still enforce the configured cap.
174-
return num_tasks_running < self._concurrency_caps[op]
162+
# ratio is above threshold, skip dynamic output queue size backpressure.
163+
available_budget_fraction = (
164+
self._resource_manager.get_available_object_store_budget_fraction(op)
165+
)
166+
if (
167+
available_budget_fraction is not None
168+
and available_budget_fraction > self.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
169+
):
170+
# If the objectstore budget (available) to total
171+
# ratio is above threshold, skip dynamic output queue size
172+
# backpressure, but still enforce the configured cap.
173+
return num_tasks_running < self._concurrency_caps[op]
175174

176175
# Current total queued bytes (this op + downstream)
177176
current_queue_size_bytes = self._resource_manager.get_mem_op_internal(
@@ -180,7 +179,7 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:
180179

181180
# Update EWMA state (level & dev) and compute effective cap. Note that
182181
# we don't update the EWMA state if the objectstore budget (available) vs total
183-
# ratio is above threshold (10%), because the level and dev adjusts quickly.
182+
# ratio is above threshold, because the level and dev adjusts quickly.
184183
self._update_level_and_dev(op, current_queue_size_bytes)
185184
effective_cap = self._effective_cap(
186185
op, num_tasks_running, current_queue_size_bytes
Lines changed: 112 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
import logging
2-
from typing import TYPE_CHECKING
2+
from typing import TYPE_CHECKING, Optional
33

44
from .backpressure_policy import BackpressurePolicy
5-
from ray.data._internal.execution.operators.actor_pool_map_operator import (
6-
ActorPoolMapOperator,
7-
)
5+
from ray._private.ray_constants import env_float
86
from ray.data.context import DataContext
97

108
if TYPE_CHECKING:
@@ -20,22 +18,17 @@
2018
class DownstreamCapacityBackpressurePolicy(BackpressurePolicy):
2119
"""Backpressure policy based on downstream processing capacity.
2220
23-
This policy triggers backpressure when the output bundles size exceeds both:
24-
1. A ratio threshold multiplied by the number of running tasks in downstream operators
25-
2. An absolute threshold for the output bundles size
26-
27-
The policy monitors actual downstream processing capacity by tracking the number
28-
of currently running tasks rather than configured parallelism. This approach
29-
ensures effective backpressure even when cluster resources are insufficient or
30-
scaling is slow, preventing memory pressure and maintaining pipeline stability.
31-
32-
Key benefits:
33-
- Prevents memory bloat from unprocessed output objects
34-
- Adapts to actual cluster conditions and resource availability
35-
- Maintains balanced throughput across pipeline operators
36-
- Reduces object spilling and unnecessary rebuilds
21+
To backpressure a given operator, use queue size build up / downstream capacity ratio.
22+
This ratio represents the upper limit of buffering in object store between pipeline stages
23+
to optimize for throughput.
3724
"""
3825

26+
# Threshold for per-Op object store budget utilization vs total
27+
# (utilization / total) ratio to enable downstream capacity backpressure.
28+
OBJECT_STORE_BUDGET_UTIL_THRESHOLD = env_float(
29+
"RAY_DATA_DOWNSTREAM_CAPACITY_OBJECT_STORE_BUDGET_UTIL_THRESHOLD", 0.9
30+
)
31+
3932
@property
4033
def name(self) -> str:
4134
return "DownstreamCapacity"
@@ -47,50 +40,113 @@ def __init__(
4740
resource_manager: "ResourceManager",
4841
):
4942
super().__init__(data_context, topology, resource_manager)
50-
self._backpressure_concurrency_ratio = (
43+
self._backpressure_capacity_ratio = (
5144
self._data_context.downstream_capacity_backpressure_ratio
5245
)
53-
self._backpressure_max_queued_blocks = (
54-
self._data_context.downstream_capacity_backpressure_max_queued_bundles
55-
)
56-
self._backpressure_disabled = (
57-
self._backpressure_concurrency_ratio is None
58-
or self._backpressure_max_queued_blocks is None
46+
if self._backpressure_capacity_ratio is not None:
47+
logger.debug(
48+
f"DownstreamCapacityBackpressurePolicy enabled with backpressure capacity ratio: {self._backpressure_capacity_ratio}"
49+
)
50+
51+
def _get_queue_size_bytes(self, op: "PhysicalOperator") -> int:
52+
"""Get the output current queue size
53+
(this operator + ineligible downstream operators) in bytes for the given operator.
54+
"""
55+
op_outputs_usage = self._topology[op].output_queue_bytes()
56+
# Also account the downstream ineligible operators' memory usage.
57+
op_outputs_usage += sum(
58+
self._resource_manager.get_op_usage(next_op).object_store_memory
59+
for next_op in self._resource_manager._get_downstream_ineligible_ops(op)
5960
)
61+
return op_outputs_usage
6062

61-
def _max_concurrent_tasks(self, op: "PhysicalOperator") -> int:
62-
if isinstance(op, ActorPoolMapOperator):
63-
return sum(
64-
[
65-
actor_pool.max_concurrent_tasks()
66-
for actor_pool in op.get_autoscaling_actor_pools()
67-
]
68-
)
69-
return op.num_active_tasks()
63+
def _get_downstream_capacity_size_bytes(self, op: "PhysicalOperator") -> int:
64+
"""Get the downstream capacity size for the given operator.
7065
71-
def can_add_input(self, op: "PhysicalOperator") -> bool:
72-
"""Determine if we can add input to the operator based on downstream capacity."""
73-
if self._backpressure_disabled:
74-
return True
66+
Downstream capacity size is the sum of the pending task inputs of the
67+
downstream eligible operators.
68+
69+
If an output dependency is ineligible, skip it and recurse down to find
70+
eligible output dependencies. If there are no output dependencies,
71+
return external consumer bytes.
72+
"""
73+
if not op.output_dependencies:
74+
# No output dependencies, return external consumer bytes.
75+
return self._resource_manager.get_external_consumer_bytes()
76+
77+
total_capacity_size_bytes = 0
7578
for output_dependency in op.output_dependencies:
76-
total_enqueued_blocks = self._topology[
77-
output_dependency
78-
].total_enqueued_input_blocks()
79+
if self._resource_manager.is_op_eligible(output_dependency):
80+
# Output dependency is eligible, add its pending task inputs.
81+
total_capacity_size_bytes += (
82+
output_dependency.metrics.obj_store_mem_pending_task_inputs or 0
83+
)
84+
else:
85+
# Output dependency is ineligible, recurse down to find eligible ops.
86+
total_capacity_size_bytes += self._get_downstream_capacity_size_bytes(
87+
output_dependency
88+
)
89+
return total_capacity_size_bytes
7990

80-
avg_inputs_per_task = (
81-
output_dependency.metrics.num_task_inputs_processed
82-
/ max(output_dependency.metrics.num_tasks_finished, 1)
83-
)
84-
outstanding_tasks = total_enqueued_blocks / max(avg_inputs_per_task, 1)
85-
max_allowed_outstanding = (
86-
self._max_concurrent_tasks(output_dependency)
87-
* self._backpressure_concurrency_ratio
88-
)
91+
def _should_skip_backpressure(self, op: "PhysicalOperator") -> bool:
92+
"""Check if backpressure should be skipped for the operator.
93+
TODO(srinathk10): Extract this to common logic to skip invoking BackpressurePolicy.
94+
"""
95+
if self._backpressure_capacity_ratio is None:
96+
# Downstream capacity backpressure is disabled.
97+
return True
98+
if not self._resource_manager.is_op_eligible(op):
99+
# Operator is not eligible for backpressure.
100+
return True
101+
if self._resource_manager.is_materializing_op(op):
102+
# Operator is materializing, so no need to perform backpressure.
103+
return True
104+
if self._resource_manager.has_materializing_downstream_op(op):
105+
# Downstream operator is materializing, so can't perform backpressure
106+
# based on downstream capacity which requires full materialization.
107+
return True
108+
return False
109+
110+
def _get_queue_ratio(self, op: "PhysicalOperator") -> float:
111+
"""Get queue/capacity ratio for the operator."""
112+
queue_size_bytes = self._get_queue_size_bytes(op)
113+
downstream_capacity_size_bytes = self._get_downstream_capacity_size_bytes(op)
114+
if downstream_capacity_size_bytes == 0:
115+
# No downstream capacity to backpressure against, so no backpressure.
116+
return 0
117+
return queue_size_bytes / downstream_capacity_size_bytes
89118

90-
if (
91-
total_enqueued_blocks > self._backpressure_max_queued_blocks
92-
and outstanding_tasks > max_allowed_outstanding
93-
):
94-
return False
119+
def _should_apply_backpressure(self, op: "PhysicalOperator") -> bool:
120+
"""Check if backpressure should be applied for the operator.
121+
122+
Returns True if backpressure should be applied, False otherwise.
123+
"""
124+
if self._should_skip_backpressure(op):
125+
return False
126+
127+
utilized_budget_fraction = (
128+
self._resource_manager.get_utilized_object_store_budget_fraction(op)
129+
)
130+
if (
131+
utilized_budget_fraction is not None
132+
and utilized_budget_fraction <= self.OBJECT_STORE_BUDGET_UTIL_THRESHOLD
133+
):
134+
# Utilized budget fraction is below threshold, so should skip backpressure.
135+
return False
136+
137+
queue_ratio = self._get_queue_ratio(op)
138+
# Apply backpressure if queue ratio exceeds the threshold.
139+
return queue_ratio > self._backpressure_capacity_ratio
140+
141+
def can_add_input(self, op: "PhysicalOperator") -> bool:
142+
"""Determine if we can add input to the operator based on
143+
downstream capacity.
144+
"""
145+
return not self._should_apply_backpressure(op)
95146

96-
return True
147+
def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
148+
"""Return the maximum bytes of pending task outputs can be read for
149+
the given operator. None means no limit."""
150+
if self._should_apply_backpressure(op):
151+
return 0
152+
return None

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ def __init__(
8989
# input buffers of the downstream operators.
9090
self._mem_op_outputs: Dict[PhysicalOperator, int] = defaultdict(int)
9191

92+
# Bytes buffered by external consumers (iterators) consuming Batches
93+
# (including the prefetched blocks). For example,
94+
# - ds.iter_batches -> one iterator
95+
# - streaming_split -> multiple iterators
96+
self._external_consumer_bytes: int = 0
97+
9298
self._op_resource_allocator: Optional[
9399
"OpResourceAllocator"
94100
] = create_resource_allocator(self, data_context)
@@ -134,6 +140,14 @@ def _warn_about_object_store_memory_if_needed(self):
134140
f"ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable."
135141
)
136142

143+
def set_external_consumer_bytes(self, num_bytes: int) -> None:
144+
"""Set the bytes buffered by external consumers."""
145+
self._external_consumer_bytes = num_bytes
146+
147+
def get_external_consumer_bytes(self) -> int:
148+
"""Get the bytes buffered by external consumers."""
149+
return self._external_consumer_bytes
150+
137151
def _estimate_object_store_memory_usage(
138152
self, op: "PhysicalOperator", state: "OpState"
139153
) -> int:
@@ -429,13 +443,39 @@ def get_op_outputs_object_store_usage_with_downstream(
429443
)
430444
return op_outputs_usage
431445

446+
def is_materializing_op(self, op: PhysicalOperator) -> bool:
447+
"""Check if the operator is a materializing operator."""
448+
return isinstance(op, MATERIALIZING_OPERATORS)
449+
432450
def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
433451
"""Check if the operator has a downstream materializing operator."""
434452
return any(
435453
isinstance(next_op, MATERIALIZING_OPERATORS)
436454
for next_op in op.output_dependencies
437455
)
438456

457+
def get_available_object_store_budget_fraction(
458+
self, op: PhysicalOperator
459+
) -> Optional[float]:
460+
"""Get available object store memory budget fraction for the operator. Returns None if not available."""
461+
op_usage = self.get_op_usage(op)
462+
op_budget = self.get_budget(op)
463+
if op_usage is None or op_budget is None:
464+
return None
465+
total_mem = op_usage.object_store_memory + op_budget.object_store_memory
466+
if total_mem == 0:
467+
return None
468+
return op_budget.object_store_memory / total_mem
469+
470+
def get_utilized_object_store_budget_fraction(
471+
self, op: PhysicalOperator
472+
) -> Optional[float]:
473+
"""Get utilized object store memory budget fraction for the operator. Returns None if not available."""
474+
available_fraction = self.get_available_object_store_budget_fraction(op)
475+
if available_fraction is None:
476+
return None
477+
return 1 - available_fraction
478+
439479

440480
def _get_first_pending_shuffle_op(topology: "Topology") -> int:
441481
for idx, op in enumerate(topology):

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,11 @@ def get_stats(self):
426426
else:
427427
return self._generate_stats()
428428

429+
def set_external_consumer_bytes(self, num_bytes: int) -> None:
430+
"""Set the bytes buffered by external consumers."""
431+
if self._resource_manager is not None:
432+
self._resource_manager.set_external_consumer_bytes(num_bytes)
433+
429434
def _generate_stats(self) -> DatasetStats:
430435
"""Create a new stats object reflecting execution status so far."""
431436
stats = self._initial_stats or DatasetStats(metadata={}, parent=None)

python/ray/data/_internal/iterator/iterator_impl.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
if TYPE_CHECKING:
99
import pyarrow
1010

11+
from ray.data._internal.execution.streaming_executor import StreamingExecutor
1112
from ray.data.dataset import Dataset
1213

1314

@@ -23,9 +24,18 @@ def __repr__(self) -> str:
2324

2425
def _to_ref_bundle_iterator(
2526
self,
26-
) -> Tuple[Iterator[RefBundle], Optional[DatasetStats], bool]:
27-
ref_bundles_iterator, stats = self._base_dataset._execute_to_iterator()
28-
return ref_bundles_iterator, stats, False
27+
) -> Tuple[
28+
Iterator[RefBundle],
29+
Optional[DatasetStats],
30+
bool,
31+
Optional["StreamingExecutor"],
32+
]:
33+
(
34+
ref_bundles_iterator,
35+
stats,
36+
executor,
37+
) = self._base_dataset._execute_to_iterator()
38+
return ref_bundles_iterator, stats, False, executor
2939

3040
def stats(self) -> str:
3141
return self._base_dataset.stats()

0 commit comments

Comments
 (0)