Skip to content

Commit 22cf6ef

Browse files
bveeramaniowenowenismegemini-code-assist[bot]
authored
[Data] Don't downscale actors if the operator hasn't received any inputs (#59883)
If you have a pipeline like `read --> [some cpu transformation] --> [gpu transformation init_concurrency =N] --> write`, the `gpu transformation` might downscale to 0 actors if the CPU transformation is slow. This basically nullifies `init_concurrency` and can cause cold-start delays. --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent fc78704 commit 22cf6ef

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ def _derive_target_scaling_config(
122122
reason="pool exceeding max size",
123123
)
124124

125+
# To prevent unexpected downscaling from the initial size, short-circuit if
126+
# the operator hasn't received any inputs.
127+
if op.metrics.num_inputs_received == 0:
128+
return ActorPoolScalingRequest.no_op(reason="no inputs received")
129+
125130
# Determine whether to scale up based on the actor pool utilization.
126131
util = self._compute_utilization(actor_pool)
127132

python/ray/data/tests/test_autoscaler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def test_actor_pool_scaling():
6767
_inputs_complete=False,
6868
input_dependencies=[MagicMock()],
6969
internal_input_queue_num_blocks=MagicMock(return_value=1),
70-
metrics=MagicMock(average_num_inputs_per_task=1),
70+
metrics=MagicMock(average_num_inputs_per_task=1, num_inputs_received=1),
7171
)
7272
op_state = OpState(
7373
op, inqueues=[MagicMock(__len__=MagicMock(return_value=10), num_blocks=10)]
@@ -217,6 +217,13 @@ def assert_autoscaling_action(
217217
expected_reason="exceeded resource limits",
218218
)
219219

220+
# Should no-op because the op has not received any inputs.
221+
with patch(op.metrics, "num_inputs_received", 0, is_method=False):
222+
assert_autoscaling_action(
223+
delta=0,
224+
expected_reason="no inputs received",
225+
)
226+
220227

221228
@pytest.fixture
222229
def autoscaler_max_upscaling_delta_setup():
@@ -239,7 +246,7 @@ def autoscaler_max_upscaling_delta_setup():
239246
spec=InternalQueueOperatorMixin,
240247
has_completed=MagicMock(return_value=False),
241248
_inputs_complete=False,
242-
metrics=MagicMock(average_num_inputs_per_task=1),
249+
metrics=MagicMock(average_num_inputs_per_task=1, num_inputs_received=1),
243250
)
244251
op_state = MagicMock(
245252
spec=OpState,

0 commit comments

Comments
 (0)