Skip to content

Commit e7eafff

Browse files
committed
[Data] fix cap_resource_request_limits to handle heterogenous nodes and refactor
Signed-off-by: Marwan Sarieddine <sarieddine.marwan@gmail.com>
1 parent 5d6a2f5 commit e7eafff

File tree

4 files changed

+127
-88
lines changed

4 files changed

+127
-88
lines changed

python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler.py

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import logging
21
import math
32
import time
4-
from typing import TYPE_CHECKING, Dict, List
3+
from typing import TYPE_CHECKING, Dict
54

65
import ray
76
from .base_cluster_autoscaler import ClusterAutoscaler
7+
from .util import cap_resource_request_to_limits
88
from ray.data._internal.execution.autoscaling_requester import (
99
get_or_create_autoscaling_requester_actor,
1010
)
@@ -14,9 +14,6 @@
1414
from ray.data._internal.execution.streaming_executor_state import Topology
1515

1616

17-
logger = logging.getLogger(__name__)
18-
19-
2017
class DefaultClusterAutoscaler(ClusterAutoscaler):
2118
# Min number of seconds between two autoscaling requests.
2219
MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS = 20
@@ -35,46 +32,6 @@ def __init__(
3532
# Last time when a request was sent to Ray's autoscaler.
3633
self._last_request_time = 0
3734

38-
def _cap_resource_request_to_limits(
39-
self, resource_request: List[Dict]
40-
) -> List[Dict]:
41-
"""Cap the resource request to not exceed user-configured resource limits.
42-
43-
If the user has set explicit (non-infinite) resource limits, this method
44-
filters the resource request to ensure the total requested resources do not
45-
exceed those limits.
46-
47-
Args:
48-
resource_request: List of resource bundles to request.
49-
50-
Returns:
51-
A filtered list of resource bundles that respects user limits.
52-
"""
53-
limits = self._resource_limits
54-
55-
# If no explicit limits are set (all infinite), return the original request
56-
if limits == ExecutionResources.inf():
57-
return resource_request
58-
59-
capped_request = []
60-
total = ExecutionResources.zero()
61-
62-
for bundle in resource_request:
63-
new_total = total.add(ExecutionResources.from_resource_dict(bundle))
64-
65-
if not new_total.satisfies_limit(limits):
66-
logger.debug(
67-
f"Capped autoscaling resource request from {len(resource_request)} "
68-
f"bundles to {len(capped_request)} bundles to respect "
69-
f"user-configured resource limits: {limits}."
70-
)
71-
break
72-
73-
capped_request.append(bundle)
74-
total = new_total
75-
76-
return capped_request
77-
7835
def try_trigger_scaling(self):
7936
"""Try to scale up the cluster to accommodate the provided in-progress workload.
8037
@@ -132,7 +89,9 @@ def to_bundle(resource: ExecutionResources) -> Dict:
13289
resource_request.append(task_bundle)
13390

13491
# Cap the resource request to respect user-configured limits
135-
resource_request = self._cap_resource_request_to_limits(resource_request)
92+
resource_request = cap_resource_request_to_limits(
93+
resource_request, self._resource_limits
94+
)
13695

13796
self._send_resource_request(resource_request)
13897

python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py

Lines changed: 5 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from collections import defaultdict
55
from dataclasses import dataclass
66
from logging import getLogger
7-
from typing import TYPE_CHECKING, Callable, Dict, List, Optional
7+
from typing import TYPE_CHECKING, Callable, Dict, Optional
88

99
import ray
1010
from .base_autoscaling_coordinator import AutoscalingCoordinator
@@ -15,6 +15,7 @@
1515
ResourceUtilizationGauge,
1616
RollingLogicalUtilizationGauge,
1717
)
18+
from .util import cap_resource_request_to_limits
1819
from ray.data._internal.cluster_autoscaler import ClusterAutoscaler
1920
from ray.data._internal.execution.interfaces.execution_options import ExecutionResources
2021

@@ -163,46 +164,6 @@ def __init__(
163164
# so the first `get_total_resources` call can get the allocated resources.
164165
self._send_resource_request([])
165166

166-
def _cap_resource_request_to_limits(
167-
self, resource_request: List[Dict]
168-
) -> List[Dict]:
169-
"""Cap the resource request to not exceed user-configured resource limits.
170-
171-
If the user has set explicit (non-infinite) resource limits, this method
172-
filters the resource request to ensure the total requested resources do not
173-
exceed those limits.
174-
175-
Args:
176-
resource_request: List of resource bundles (node specs) to request.
177-
178-
Returns:
179-
A filtered list of resource bundles that respects user limits.
180-
"""
181-
limits = self._resource_limits
182-
183-
# If no explicit limits are set (all infinite), return the original request
184-
if limits == ExecutionResources.inf():
185-
return resource_request
186-
187-
capped_request = []
188-
total = ExecutionResources.zero()
189-
190-
for bundle in resource_request:
191-
new_total = total.add(ExecutionResources.from_resource_dict(bundle))
192-
193-
if not new_total.satisfies_limit(limits):
194-
logger.debug(
195-
f"Capped autoscaling resource request from {len(resource_request)} "
196-
f"bundles to {len(capped_request)} bundles to respect "
197-
f"user-configured resource limits: {limits}."
198-
)
199-
break
200-
201-
capped_request.append(bundle)
202-
total = new_total
203-
204-
return capped_request
205-
206167
def try_trigger_scaling(self):
207168
# Note, should call this method before checking `_last_request_time`,
208169
# in order to update the average cluster utilization.
@@ -256,7 +217,9 @@ def try_trigger_scaling(self):
256217
logger.debug(debug_msg)
257218

258219
# Cap the resource request to respect user-configured limits
259-
resource_request = self._cap_resource_request_to_limits(resource_request)
220+
resource_request = cap_resource_request_to_limits(
221+
resource_request, self._resource_limits
222+
)
260223

261224
self._send_resource_request(resource_request)
262225

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
from typing import Dict, List
3+
4+
from ray.data._internal.execution.interfaces import ExecutionResources
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def cap_resource_request_to_limits(
10+
resource_request: List[Dict],
11+
resource_limits: ExecutionResources,
12+
) -> List[Dict]:
13+
"""Cap the resource request to not exceed user-configured resource limits.
14+
15+
If the user has set explicit (non-infinite) resource limits, this function
16+
filters the resource request to ensure the total requested resources do not
17+
exceed those limits.
18+
19+
Bundles are sorted by size (smallest first) to maximize the number of bundles
20+
that can fit within the limits. This ensures that smaller bundles are not
21+
excluded just because a larger bundle appeared earlier in iteration order.
22+
23+
Args:
24+
resource_request: List of resource bundles to request.
25+
resource_limits: The user-configured resource limits.
26+
27+
Returns:
28+
A filtered list of resource bundles that respects user limits.
29+
"""
30+
# If no explicit limits are set (all infinite), return the original request
31+
if resource_limits == ExecutionResources.inf():
32+
return resource_request
33+
34+
# Sort bundles by size (smallest first) to maximize packing within limits.
35+
# This ensures smaller bundles aren't excluded due to larger bundles
36+
# appearing earlier in arbitrary iteration order.
37+
def bundle_sort_key(bundle: Dict) -> tuple:
38+
return (
39+
bundle.get("CPU", 0),
40+
bundle.get("GPU", 0),
41+
bundle.get("memory", 0),
42+
)
43+
44+
sorted_bundles = sorted(resource_request, key=bundle_sort_key)
45+
46+
capped_request = []
47+
total = ExecutionResources.zero()
48+
49+
for bundle in sorted_bundles:
50+
new_total = total.add(ExecutionResources.from_resource_dict(bundle))
51+
52+
# Skip bundles that don't fit, continue checking smaller ones
53+
if not new_total.satisfies_limit(resource_limits):
54+
continue
55+
56+
capped_request.append(bundle)
57+
total = new_total
58+
59+
if len(capped_request) < len(resource_request):
60+
logger.debug(
61+
f"Capped autoscaling resource request from {len(resource_request)} "
62+
f"bundles to {len(capped_request)} bundles to respect "
63+
f"user-configured resource limits: {resource_limits}."
64+
)
65+
66+
return capped_request

python/ray/data/tests/test_default_cluster_autoscaler_v2.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,57 @@ def test_try_scale_up_respects_resource_limits(
402402
assert resources_allocated.gpu == node_spec.gpu * expected_nodes
403403
assert resources_allocated.memory == node_spec.mem * expected_nodes
404404

405+
def test_try_scale_up_respects_resource_limits_heterogeneous_nodes(self):
406+
"""Test that smaller bundles are included even when larger bundles exceed limits.
407+
408+
This tests the fix for an issue where heterogeneous node types could result
409+
in empty or suboptimal resource requests if a large bundle appeared first
410+
in iteration order and exceeded limits, causing smaller valid bundles to
411+
be skipped.
412+
"""
413+
# Set a CPU limit that:
414+
# - Is smaller than a single large node (12 CPUs)
415+
# - But can fit multiple small nodes (4 CPUs each)
416+
resource_limits = ExecutionResources(cpu=10)
417+
418+
large_node_spec = _NodeResourceSpec.of(cpu=12, gpu=1, mem=8000)
419+
small_node_spec = _NodeResourceSpec.of(cpu=4, gpu=0, mem=2000)
420+
421+
scale_up_threshold = 0.75
422+
utilization = ExecutionResources(cpu=0.9, gpu=0.9, object_store_memory=0.9)
423+
fake_coordinator = FakeAutoscalingCoordinator()
424+
425+
# Return heterogeneous node types - the order here shouldn't matter
426+
# because the implementation should sort bundles by size
427+
def get_heterogeneous_nodes():
428+
return {
429+
large_node_spec: 1, # 1 existing large node, wants 2 bundles
430+
small_node_spec: 1, # 1 existing small node, wants 2 bundles
431+
}
432+
433+
autoscaler = DefaultClusterAutoscalerV2(
434+
resource_manager=MagicMock(),
435+
resource_limits=resource_limits,
436+
execution_id="test_execution_id",
437+
cluster_scaling_up_delta=1,
438+
resource_utilization_calculator=StubUtilizationGauge(utilization),
439+
cluster_scaling_up_util_threshold=scale_up_threshold,
440+
min_gap_between_autoscaling_requests_s=0,
441+
autoscaling_coordinator=fake_coordinator,
442+
get_node_counts=get_heterogeneous_nodes,
443+
)
444+
445+
autoscaler.try_trigger_scaling()
446+
447+
resources_allocated = autoscaler.get_total_resources()
448+
# Should get 2 small nodes (8 CPUs) since large nodes (12 CPUs) exceed limit
449+
assert resources_allocated.cpu == 8, (
450+
f"Expected 8 CPUs (2 small nodes), got {resources_allocated.cpu}. "
451+
"Smaller bundles should be included even when larger ones exceed limits."
452+
)
453+
assert resources_allocated.gpu == 0
454+
assert resources_allocated.memory == 4000
455+
405456

406457
if __name__ == "__main__":
407458
import sys

0 commit comments

Comments
 (0)