Skip to content

Commit 3d628a7

Browse files
abrarsheikhelliot-barn
authored andcommitted
[1/n] add application level autoscaling policy in schema (#57535)
part 1 of #56149 1. move `_serialized_policy_def` into `AutoscalingPolicy` from `AutoscalingConfig`. We need this in order to reuse `AutoscalingPolicy` for application-level autoscaling. 2. Make `autoscaling_policy` a top-level config in `ServeApplicationSchema`. --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
1 parent 60dc254 commit 3d628a7

File tree

9 files changed

+85
-60
lines changed

9 files changed

+85
-60
lines changed

python/ray/serve/_private/autoscaling_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def register(self, info: DeploymentInfo, curr_target_num_replicas: int) -> int:
106106

107107
self._deployment_info = info
108108
self._config = config
109-
self._policy = self._config.get_policy()
109+
self._policy = self._config.policy.get_policy()
110110
self._target_capacity = info.target_capacity
111111
self._target_capacity_direction = info.target_capacity_direction
112112
self._policy_state = {}

python/ray/serve/config.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,39 @@ class AggregationFunction(str, Enum):
172172

173173
@PublicAPI(stability="alpha")
174174
class AutoscalingPolicy(BaseModel):
175-
name: Union[str, Callable] = Field(
175+
# Cloudpickled policy definition.
176+
_serialized_policy_def: bytes = PrivateAttr(default=b"")
177+
178+
policy_function: Union[str, Callable] = Field(
176179
default=DEFAULT_AUTOSCALING_POLICY_NAME,
177-
description="Name of the policy function or the import path of the policy. "
178-
"Will be the concatenation of the policy module and the policy name if user passed a callable.",
180+
description="Policy function can be a string import path or a function callable. "
181+
"If it's a string import path, it must be of the form `path.to.module:function_name`. ",
179182
)
180183

184+
def __init__(self, **kwargs):
185+
super().__init__(**kwargs)
186+
self.serialize_policy()
187+
188+
def serialize_policy(self) -> None:
189+
"""Serialize policy with cloudpickle.
190+
191+
Import the policy if it's passed in as a string import path. Then cloudpickle
192+
the policy and set `serialized_policy_def` if it's empty.
193+
"""
194+
policy_path = self.policy_function
195+
196+
if isinstance(policy_path, Callable):
197+
policy_path = f"{policy_path.__module__}.{policy_path.__name__}"
198+
199+
if not self._serialized_policy_def:
200+
self._serialized_policy_def = cloudpickle.dumps(import_attr(policy_path))
201+
202+
self.policy_function = policy_path
203+
204+
def get_policy(self) -> Callable:
205+
"""Deserialize policy from cloudpickled bytes."""
206+
return cloudpickle.loads(self._serialized_policy_def)
207+
181208

182209
@PublicAPI(stability="stable")
183210
class AutoscalingConfig(BaseModel):
@@ -247,9 +274,6 @@ class AutoscalingConfig(BaseModel):
247274
description="Function used to aggregate metrics across a time window.",
248275
)
249276

250-
# Cloudpickled policy definition.
251-
_serialized_policy_def: bytes = PrivateAttr(default=b"")
252-
253277
# Autoscaling policy. This policy is deployment scoped. Defaults to the request-based autoscaler.
254278
policy: AutoscalingPolicy = Field(
255279
default_factory=AutoscalingPolicy,
@@ -298,27 +322,6 @@ def aggregation_function_valid(cls, v: Union[str, AggregationFunction]):
298322
return v
299323
return AggregationFunction(str(v).lower())
300324

301-
def __init__(self, **kwargs):
302-
super().__init__(**kwargs)
303-
self.serialize_policy()
304-
305-
def serialize_policy(self) -> None:
306-
"""Serialize policy with cloudpickle.
307-
308-
Import the policy if it's passed in as a string import path. Then cloudpickle
309-
the policy and set `serialized_policy_def` if it's empty.
310-
"""
311-
policy = self.policy
312-
policy_name = policy.name
313-
314-
if isinstance(policy_name, Callable):
315-
policy_name = f"{policy_name.__module__}.{policy_name.__name__}"
316-
317-
if not self._serialized_policy_def:
318-
self._serialized_policy_def = cloudpickle.dumps(import_attr(policy_name))
319-
320-
self.policy = AutoscalingPolicy(name=policy_name)
321-
322325
@classmethod
323326
def default(cls):
324327
return cls(
@@ -327,10 +330,6 @@ def default(cls):
327330
max_replicas=100,
328331
)
329332

330-
def get_policy(self) -> Callable:
331-
"""Deserialize policy from cloudpickled bytes."""
332-
return cloudpickle.loads(self._serialized_policy_def)
333-
334333
def get_upscaling_factor(self) -> PositiveFloat:
335334
if self.upscaling_factor:
336335
return self.upscaling_factor

python/ray/serve/schema.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,15 @@ class ServeApplicationSchema(BaseModel):
560560
default=[],
561561
description="Deployment options that override options specified in the code.",
562562
)
563+
autoscaling_policy: Optional[dict] = Field(
564+
default=None,
565+
description=(
566+
"Application-level autoscaling policy. "
567+
"If null, serve fallbacks to autoscaling policy in each deployment. "
568+
"This option is under development and not yet supported."
569+
),
570+
)
571+
563572
args: Dict = Field(
564573
default={},
565574
description="Arguments that will be passed to the application builder.",

python/ray/serve/tests/test_autoscaling_policy.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,11 +1540,13 @@ def custom_autoscaling_policy(ctx: AutoscalingContext):
15401540
@pytest.mark.parametrize(
15411541
"policy",
15421542
[
1543-
{"name": "ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy"},
1543+
{
1544+
"policy_function": "ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy"
1545+
},
15441546
AutoscalingPolicy(
1545-
name="ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy"
1547+
policy_function="ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy"
15461548
),
1547-
AutoscalingPolicy(name=custom_autoscaling_policy),
1549+
AutoscalingPolicy(policy_function=custom_autoscaling_policy),
15481550
],
15491551
)
15501552
def test_e2e_scale_up_down_basic_with_custom_policy(serve_instance_with_signal, policy):

python/ray/serve/tests/test_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def autoscaling_app():
180180
"upscale_delay_s": 30.0,
181181
"aggregation_function": "mean",
182182
"policy": {
183-
"name": "ray.serve.autoscaling_policy:default_autoscaling_policy"
183+
"policy_function": "ray.serve.autoscaling_policy:default_autoscaling_policy"
184184
},
185185
},
186186
"graceful_shutdown_wait_loop_s": 2.0,

python/ray/serve/tests/test_deploy_2.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ async def __call__(self):
333333
"smoothing_factor": 1.0,
334334
"initial_replicas": None,
335335
"aggregation_function": "mean",
336-
"policy": {"name": "ray.serve.autoscaling_policy:default_autoscaling_policy"},
336+
"policy": {
337+
"policy_function": "ray.serve.autoscaling_policy:default_autoscaling_policy"
338+
},
337339
}
338340

339341

@@ -397,7 +399,9 @@ async def __call__(self):
397399
"smoothing_factor": 1.0,
398400
"initial_replicas": None,
399401
"aggregation_function": "mean",
400-
"policy": {"name": "ray.serve.autoscaling_policy:default_autoscaling_policy"},
402+
"policy": {
403+
"policy_function": "ray.serve.autoscaling_policy:default_autoscaling_policy"
404+
},
401405
}
402406

403407
for i in range(3):

python/ray/serve/tests/test_deploy_app_2.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,9 @@ def test_num_replicas_auto_api(serve_instance):
596596
"smoothing_factor": 1.0,
597597
"initial_replicas": None,
598598
"aggregation_function": "mean",
599-
"policy": {"name": "ray.serve.autoscaling_policy:default_autoscaling_policy"},
599+
"policy": {
600+
"policy_function": "ray.serve.autoscaling_policy:default_autoscaling_policy"
601+
},
600602
}
601603

602604

@@ -651,7 +653,9 @@ def test_num_replicas_auto_basic(serve_instance):
651653
"smoothing_factor": 1.0,
652654
"initial_replicas": None,
653655
"aggregation_function": "mean",
654-
"policy": {"name": "ray.serve.autoscaling_policy:default_autoscaling_policy"},
656+
"policy": {
657+
"policy_function": "ray.serve.autoscaling_policy:default_autoscaling_policy"
658+
},
655659
}
656660

657661
h = serve.get_app_handle(SERVE_DEFAULT_APP_NAME)

python/ray/serve/tests/unit/test_config.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,12 @@ def test_deployment_mode_to_proxy_location():
784784

785785

786786
@pytest.mark.parametrize(
787-
"policy", [None, fake_policy, "ray.serve.tests.unit.test_config:fake_policy"]
787+
"policy",
788+
[
789+
None,
790+
{"policy_function": "ray.serve.tests.unit.test_config:fake_policy"},
791+
{"policy_function": fake_policy},
792+
],
788793
)
789794
def test_autoscaling_policy_serializations(policy):
790795
"""Test that autoscaling policy can be serialized and deserialized.
@@ -794,16 +799,19 @@ def test_autoscaling_policy_serializations(policy):
794799
"""
795800
autoscaling_config = AutoscalingConfig()
796801
if policy:
797-
autoscaling_config = AutoscalingConfig(_policy=policy)
802+
autoscaling_config = AutoscalingConfig(policy=policy)
798803

799804
config = DeploymentConfig.from_default(autoscaling_config=autoscaling_config)
800805
deserialized_autoscaling_policy = DeploymentConfig.from_proto_bytes(
801806
config.to_proto_bytes()
802-
).autoscaling_config.get_policy()
807+
).autoscaling_config.policy.get_policy()
803808

804-
# Right now we don't allow modifying the autoscaling policy, so this will always
805-
# be the default autoscaling policy
806-
assert deserialized_autoscaling_policy == default_autoscaling_policy
809+
if policy is None:
810+
assert deserialized_autoscaling_policy == default_autoscaling_policy
811+
else:
812+
# Compare function behavior instead of function objects
813+
# since serialization/deserialization creates new function objects
814+
assert deserialized_autoscaling_policy() == fake_policy()
807815

808816

809817
def test_autoscaling_policy_import_fails_for_non_existing_policy():
@@ -814,7 +822,8 @@ def test_autoscaling_policy_import_fails_for_non_existing_policy():
814822
"""
815823
# Right now we don't allow modifying the autoscaling policy, so this will not fail
816824
policy = "i.dont.exist:fake_policy"
817-
AutoscalingConfig(_policy=policy)
825+
with pytest.raises(ModuleNotFoundError):
826+
AutoscalingConfig(policy={"policy_function": policy})
818827

819828

820829
def test_default_autoscaling_policy_import_path():

src/ray/protobuf/serve.proto

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ option java_multiple_files = true;
2424

2525
// Configuration options for Serve's autoscaling policy
2626
message AutoscalingPolicy {
27-
// Name of the policy function or the import path of the policy if user passed a string.
28-
// Will be the concatenation of the policy module and the policy name if user passed a
29-
// callable.
30-
string name = 1;
27+
// Policy function needs to be a string import path.
28+
string policy_function = 1;
29+
30+
// The cloudpickled policy definition.
31+
bytes _serialized_policy_def = 2;
3132
}
3233

3334
// Configuration options for Serve's replica autoscaler.
@@ -66,28 +67,25 @@ message AutoscalingConfig {
6667
// [DEPRECATED] Use `downscaling_factor` instead.
6768
optional double downscale_smoothing_factor = 10;
6869

69-
// The cloudpickled policy definition.
70-
bytes _serialized_policy_def = 11;
71-
7270
// The autoscaling policy definition.
73-
AutoscalingPolicy policy = 12;
71+
AutoscalingPolicy policy = 11;
7472

7573
// Target number of in flight requests per replica. This is the primary configuration
7674
// knob for replica autoscaler. Lower the number, the more rapidly the replicas
7775
// scales up. Must be a non-negative integer.
78-
double target_ongoing_requests = 13;
76+
double target_ongoing_requests = 12;
7977

8078
// The multiplicative "gain" factor to limit upscale.
81-
optional double upscaling_factor = 14;
79+
optional double upscaling_factor = 13;
8280

8381
// The multiplicative "gain" factor to limit downscale.
84-
optional double downscaling_factor = 15;
82+
optional double downscaling_factor = 14;
8583

8684
// How long to wait before scaling down replicas from 1 to 0
87-
optional double downscale_to_zero_delay_s = 16;
85+
optional double downscale_to_zero_delay_s = 15;
8886

8987
// How metrics are aggregated for autoscaling. One of "mean", "max", "min".
90-
string aggregation_function = 17;
88+
string aggregation_function = 16;
9189
}
9290

9391
//[Begin] LOGGING CONFIG

0 commit comments

Comments
 (0)