Skip to content

Commit 85fe019

Browse files
ryankert01400Ping
authored andcommitted
[Data] Deprecate V1 cluster autoscaler in favor of V2 (ray-project#60474)
## Description This document describes how to verify the V1 cluster autoscaler deprecation. 1. Added `@Deprecated` annotation to `DefaultClusterAutoscaler` in `default_cluster_autoscaler.py` 2. Changed default from `V1` to `V2` in `__init__.py` ## Related issues Closes ray-project#60459 ## Additional information ### Testing **Test V1 via env var emits deprecation warning:** ```bash RAY_DATA_CLUSTER_AUTOSCALER=V1 python -c " import warnings warnings.simplefilter('always') from unittest.mock import MagicMock from ray.data._internal.cluster_autoscaler import create_cluster_autoscaler mock_data_context = MagicMock() mock_data_context.execution_options.resource_limits = None autoscaler = create_cluster_autoscaler(MagicMock(), MagicMock(), mock_data_context, execution_id='test') print(f'Created: {type(autoscaler).__name__}') " ``` ``` ...RayDeprecationWarning: This API is deprecated and may be removed in future Ray releases... DefaultClusterAutoscaler (V1) is deprecated. Use DefaultClusterAutoscalerV2 instead by setting RAY_DATA_CLUSTER_AUTOSCALER=V2 or using the default. Created: DefaultClusterAutoscaler ``` **Test V2 (default) has no deprecation warning:** ```bash python -c " import warnings warnings.filterwarnings('always', category=DeprecationWarning) from unittest.mock import MagicMock from ray.data._internal.cluster_autoscaler import create_cluster_autoscaler mock_data_context = MagicMock() mock_data_context.execution_options.resource_limits = None autoscaler = create_cluster_autoscaler(MagicMock(), MagicMock(), mock_data_context, execution_id='test') print(f'Created: {type(autoscaler).__name__}') " ``` ``` Created: DefaultClusterAutoscalerV2 ``` --------- Signed-off-by: Ryan Huang <ryankert01@gmail.com> Signed-off-by: 400Ping <jiekaichang@apache.org>
1 parent 776f9a1 commit 85fe019

File tree

3 files changed

+31
-24
lines changed

3 files changed

+31
-24
lines changed

python/ray/data/_internal/cluster_autoscaler/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020

2121
CLUSTER_AUTOSCALER_ENV_KEY = "RAY_DATA_CLUSTER_AUTOSCALER"
22-
CLUSTER_AUTOSCALER_ENV_DEFAULT_VALUE = "V1"
22+
CLUSTER_AUTOSCALER_ENV_DEFAULT_VALUE = "V2"
2323

2424

2525
class ClusterAutoscalerVersion(Enum):

python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import math
22
import time
3+
import warnings
34
from typing import TYPE_CHECKING, Dict
45

56
import ray
@@ -9,11 +10,17 @@
910
get_or_create_autoscaling_requester_actor,
1011
)
1112
from ray.data._internal.execution.interfaces import ExecutionResources
13+
from ray.util.annotations import Deprecated, RayDeprecationWarning
1214

1315
if TYPE_CHECKING:
1416
from ray.data._internal.execution.streaming_executor_state import Topology
1517

1618

19+
@Deprecated(
20+
message="DefaultClusterAutoscaler (V1) is deprecated and will be removed "
21+
"after June 2026. Use DefaultClusterAutoscalerV2 instead by setting "
22+
"RAY_DATA_CLUSTER_AUTOSCALER=V2 or using the default."
23+
)
1724
class DefaultClusterAutoscaler(ClusterAutoscaler):
1825
# Min number of seconds between two autoscaling requests.
1926
MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS = 20
@@ -25,6 +32,13 @@ def __init__(
2532
*,
2633
execution_id: str,
2734
):
35+
warnings.warn(
36+
"DefaultClusterAutoscaler (V1) is deprecated and will be removed "
37+
"after June 2026. Use DefaultClusterAutoscalerV2 instead by setting "
38+
"RAY_DATA_CLUSTER_AUTOSCALER=V2 or using the default.",
39+
RayDeprecationWarning,
40+
stacklevel=2,
41+
)
2842
self._topology = topology
2943
self._resource_limits = resource_limits
3044
self._execution_id = execution_id

python/ray/train/v2/tests/test_data_resource_cleanup.py

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import pytest
66

77
import ray
8-
from ray.data._internal.execution.autoscaling_requester import (
9-
get_or_create_autoscaling_requester_actor,
8+
from ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator import (
9+
get_or_create_autoscaling_coordinator,
1010
)
1111
from ray.data._internal.iterator.stream_split_iterator import (
1212
SplitCoordinator,
@@ -101,16 +101,15 @@ def test_after_worker_group_shutdown():
101101
def test_split_coordinator_shutdown_executor(ray_start_4_cpus):
102102
"""Tests that the SplitCoordinator properly requests resources for the data executor and cleans up after it is shutdown"""
103103

104-
def get_resources_when_updated(requester, prev_requests=None, timeout=3.0):
105-
"""Retrieve resource requests within the specified timeout. Returns after a new request is made or when the time expires."""
106-
prev_requests = prev_requests or {}
104+
def get_ongoing_requests(coordinator, timeout=3.0):
105+
"""Retrieve ongoing requests from the AutoscalingCoordinator."""
107106
deadline = time.time() + timeout
108107
requests = {}
109108
while time.time() < deadline:
110109
requests = ray.get(
111-
requester.__ray_call__.remote(lambda r: r._resource_requests)
110+
coordinator.__ray_call__.remote(lambda c: dict(c._ongoing_reqs))
112111
)
113-
if requests != prev_requests:
112+
if requests:
114113
break
115114
time.sleep(0.05)
116115
return requests
@@ -123,34 +122,28 @@ def get_resources_when_updated(requester, prev_requests=None, timeout=3.0):
123122
)
124123
ray.get(coord.start_epoch.remote(0))
125124

126-
# Explicity trigger autoscaling
125+
# Explicitly trigger autoscaling
127126
ray.get(
128127
coord.__ray_call__.remote(
129128
lambda coord: coord._executor._cluster_autoscaler.try_trigger_scaling()
130129
)
131130
)
132131

133-
# Collect requests
134-
requester = get_or_create_autoscaling_requester_actor()
135-
requests = get_resources_when_updated(requester)
132+
# Collect requests from the AutoscalingCoordinator
133+
coordinator = get_or_create_autoscaling_coordinator()
134+
requests = get_ongoing_requests(coordinator)
136135

137-
# One request made, with non-empty resource bundle
136+
# One request made (V2 registers with the coordinator)
138137
assert len(requests) == 1
139-
resource_bundles = list(requests.values())[0][0]
140-
assert isinstance(resource_bundles, list)
141-
bundle = resource_bundles[0]
142-
assert bundle != {}
138+
requester_id = list(requests.keys())[0]
139+
assert requester_id.startswith("data-")
143140

144141
# Shutdown data executor
145142
ray.get(coord.shutdown_executor.remote())
146143

147-
requests = get_resources_when_updated(requester, prev_requests=requests)
148-
149-
# Old resource request overwritten by new cleanup request
150-
assert len(requests) == 1
151-
resource_bundles = list(requests.values())[0][0]
152-
assert isinstance(resource_bundles, dict)
153-
assert resource_bundles == {}
144+
# Verify that the request is cancelled (removed from ongoing requests)
145+
requests = ray.get(coordinator.__ray_call__.remote(lambda c: dict(c._ongoing_reqs)))
146+
assert len(requests) == 0, "Resource request was not cancelled"
154147

155148

156149
if __name__ == "__main__":

0 commit comments

Comments
 (0)