diff --git a/ci/lint/pydoclint-baseline.txt b/ci/lint/pydoclint-baseline.txt index 92d50a665f33..cbc80271a9ff 100644 --- a/ci/lint/pydoclint-baseline.txt +++ b/ci/lint/pydoclint-baseline.txt @@ -1625,11 +1625,6 @@ python/ray/serve/api.py DOC103: Function `get_deployment_handle`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [_check_exists: bool, _record_telemetry: bool]. DOC201: Function `get_deployment_handle` does not have a return section in docstring -------------------- -python/ray/serve/autoscaling_policy.py - DOC101: Function `_calculate_desired_num_replicas`: Docstring contains fewer arguments than in function signature. - DOC111: Function `_calculate_desired_num_replicas`: The option `--arg-type-hints-in-docstring` is `False` but there are type hints in the docstring arg list - DOC103: Function `_calculate_desired_num_replicas`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [num_running_replicas: int, total_num_requests: int]. Arguments in the docstring but not in the function signature: [current_num_ongoing_requests: List[float]]. --------------------- python/ray/serve/batching.py DOC111: Method `_BatchQueue.__init__`: The option `--arg-type-hints-in-docstring` is `False` but there are type hints in the docstring arg list DOC101: Function `batch`: Docstring contains fewer arguments than in function signature. diff --git a/doc/source/serve/advanced-guides/advanced-autoscaling.md b/doc/source/serve/advanced-guides/advanced-autoscaling.md index 235e8e83141c..c89d270eeb8c 100644 --- a/doc/source/serve/advanced-guides/advanced-autoscaling.md +++ b/doc/source/serve/advanced-guides/advanced-autoscaling.md @@ -637,11 +637,41 @@ Policies are defined **per deployment**. If you don’t provide one, Ray Serve f The policy function is invoked by the Ray Serve controller every `RAY_SERVE_CONTROL_LOOP_INTERVAL_S` seconds (default **0.1s**), so your logic runs against near-real-time state. +Your policy can return an `int` or a `float` for `target_replicas`. If it returns a float, Ray Serve converts it to an integer replica count by rounding up to the next greatest integer. + :::{warning} Keep policy functions **fast and lightweight**. Slow logic can block the Serve controller and degrade cluster responsiveness. ::: +### Applying standard autoscaling parameters to custom policies + +Ray Serve automatically applies the following standard autoscaling parameters from your [`AutoscalingConfig`](../api/doc/ray.serve.config.AutoscalingConfig.rst) to custom policies: +- `upscale_delay_s`, `downscale_delay_s`, `downscale_to_zero_delay_s` +- `upscaling_factor`, `downscaling_factor` +- `min_replicas`, `max_replicas` + +The following example shows a custom autoscaling policy with standard autoscaling parameters applied. + +```{literalinclude} ../doc_code/autoscaling_policy.py +:language: python +:start-after: __begin_apply_autoscaling_config_example__ +:end-before: __end_apply_autoscaling_config_example__ +``` + +```{literalinclude} ../doc_code/autoscaling_policy.py +:language: python +:start-after: __begin_apply_autoscaling_config_usage__ +:end-before: __end_apply_autoscaling_config_usage__ +``` + +::::{note} +Your policy function should return the "raw" desired number of replicas. Ray Serve applies the `autoscaling_config` settings (delays, factors, and bounds) on top of your decision. + +Your policy can return an `int` or a `float` "raw desired" replica count. Ray Serve returns an integer decision number. +:::: + + ### Custom metrics You can make richer decisions by emitting your own metrics from the deployment. Implement `record_autoscaling_stats()` to return a `dict[str, float]`. Ray Serve will surface these values in the [`AutoscalingContext`](../api/doc/ray.serve.config.AutoscalingContext.rst). @@ -681,9 +711,10 @@ By default, each deployment in Ray Serve autoscales independently. When you have An application-level autoscaling policy is a function that takes a `dict[DeploymentID, AutoscalingContext]` objects (one per deployment) and returns a tuple of `(decisions, policy_state)`. Each context contains metrics and bounds for one deployment, and the policy returns target replica counts for all deployments. -The `policy_state` returned from an application-level policy must be a `dict[DeploymentID, dict]`— a dictionary mapping each deployment ID to its own state dictionary. Serve stores this per-deployment state and on the next control-loop iteration, injects each deployment's state back into that deployment's `AutoscalingContext.policy_state`. +The `policy_state` returned from an application-level policy must be a `Dict[DeploymentID, Dict]`— a dictionary mapping each deployment ID to its own state dictionary. Serve stores this per-deployment state and on the next control-loop iteration, injects each deployment's state back into that deployment's `AutoscalingContext.policy_state`. +The per deployment number replicas returned from the policy can be an `int` or a `float`. If it returns a float, Ray Serve converts it to an integer replica count by rounding up to the next greatest integer. -Serve itself does not interpret the contents of `policy_state`. All the keys in each deployment's state dictionary are user-controlled. +Serve itself does not interpret the contents of `policy_state`. All the keys in each deployment's state dictionary are user-controlled except for internal keys that are used when default parameters are applied to custom autoscaling policies. The following example shows a policy that scales deployments based on their relative load, ensuring that downstream deployments have enough capacity for upstream traffic: `autoscaling_policy.py` file: @@ -728,6 +759,19 @@ Programmatic configuration of application-level autoscaling policies through `se When you specify both a deployment-level policy and an application-level policy, the application-level policy takes precedence. Ray Serve logs a warning if you configure both. ::: + +#### Applying standard autoscaling parameters to application-level policies +Ray Serve automatically applies standard autoscaling parameters (delays, factors, and min/max bounds) to application-level policies on a per-deployment basis. +These parameters include: +- `upscale_delay_s`, `downscale_delay_s`, `downscale_to_zero_delay_s` +- `upscaling_factor`, `downscaling_factor` +- `min_replicas`, `max_replicas` + +The YAML configuration file shows the default parameters applied to the application level policy. +```{literalinclude} ../doc_code/application_level_autoscaling_with_defaults.yaml +:language: yaml +``` +Your application level policy can return per deployment desired replicas as `int` or `float` values. Ray Serve applies the autoscaling config parameters per deployment and returns integer decisions. :::{warning} ### Gotchas and limitations diff --git a/doc/source/serve/api/index.md b/doc/source/serve/api/index.md index eb664f4b279e..59ba85cc3397 100644 --- a/doc/source/serve/api/index.md +++ b/doc/source/serve/api/index.md @@ -86,6 +86,7 @@ See the [model composition guide](serve-model-composition) for how to update cod serve.config.AutoscalingConfig serve.config.AutoscalingPolicy serve.config.AutoscalingContext + serve.autoscaling_policy.replica_queue_length_autoscaling_policy serve.config.AggregationFunction serve.config.RequestRouterConfig ``` diff --git a/doc/source/serve/doc_code/application_level_autoscaling_with_defaults.yaml b/doc/source/serve/doc_code/application_level_autoscaling_with_defaults.yaml new file mode 100644 index 000000000000..8a7cdbe77686 --- /dev/null +++ b/doc/source/serve/doc_code/application_level_autoscaling_with_defaults.yaml @@ -0,0 +1,20 @@ +applications: + - name: MyApp + import_path: application_level_autoscaling:app + autoscaling_policy: + policy_function: autoscaling_policy:coordinated_scaling_policy_with_defaults + deployments: + - name: Preprocessor + autoscaling_config: + min_replicas: 1 + max_replicas: 10 + target_ongoing_requests: 1 + upscale_delay_s: 2 + downscale_delay_s: 5 + - name: Model + autoscaling_config: + min_replicas: 2 + max_replicas: 20 + target_ongoing_requests: 1 + upscale_delay_s: 3 + downscale_delay_s: 5 \ No newline at end of file diff --git a/doc/source/serve/doc_code/autoscaling_policy.py b/doc/source/serve/doc_code/autoscaling_policy.py index 6249d7196120..04ca5f26f884 100644 --- a/doc/source/serve/doc_code/autoscaling_policy.py +++ b/doc/source/serve/doc_code/autoscaling_policy.py @@ -131,3 +131,48 @@ def stateful_application_level_policy( # __end_stateful_application_level_policy__ +# __begin_apply_autoscaling_config_example__ +from typing import Any, Dict +from ray.serve.config import AutoscalingContext + + +def queue_length_based_autoscaling_policy( + ctx: AutoscalingContext, +) -> tuple[int, Dict[str, Any]]: + # This policy calculates the "raw" desired replicas based on queue length. + # Ray Serve automatically applies scaling factors, delays, and bounds from + # the deployment's autoscaling_config on top of this decision. + + queue_length = ctx.total_num_requests + + if queue_length > 50: + return 10, {} + elif queue_length > 10: + return 5, {} + else: + return 0, {} +# __end_apply_autoscaling_config_example__ + +# __begin_apply_autoscaling_config_usage__ +from ray import serve +from ray.serve.config import AutoscalingConfig, AutoscalingPolicy + +@serve.deployment( + autoscaling_config=AutoscalingConfig( + min_replicas=1, + max_replicas=10, + metrics_interval_s=0.1, + upscale_delay_s=1.0, + downscale_delay_s=1.0, + policy=AutoscalingPolicy( + policy_function=queue_length_based_autoscaling_policy + ) + ), + max_ongoing_requests=5, +) +class MyDeployment: + def __call__(self) -> str: + return "Hello, world!" + +app = MyDeployment.bind() +# __end_apply_autoscaling_config_usage__ \ No newline at end of file diff --git a/python/ray/serve/_private/autoscaling_state.py b/python/ray/serve/_private/autoscaling_state.py index 187ebdbb72cc..519b592976d1 100644 --- a/python/ray/serve/_private/autoscaling_state.py +++ b/python/ray/serve/_private/autoscaling_state.py @@ -1,7 +1,8 @@ import logging +import math import time from collections import defaultdict -from typing import Any, Callable, Dict, List, Optional, Set, Tuple +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from ray.serve._private.common import ( RUNNING_REQUESTS_KEY, @@ -28,6 +29,10 @@ ) from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import get_capacity_adjusted_num_replicas +from ray.serve.autoscaling_policy import ( + _apply_app_level_autoscaling_config, + _apply_autoscaling_config, +) from ray.serve.config import AutoscalingContext, AutoscalingPolicy from ray.util import metrics @@ -52,7 +57,9 @@ def __init__(self, deployment_id: DeploymentID): self._deployment_info = None self._config = None self._policy: Optional[ - Callable[[AutoscalingContext], Tuple[int, Optional[Dict[str, Any]]]] + Callable[ + [AutoscalingContext], Tuple[Union[int, float], Optional[Dict[str, Any]]] + ] ] = None # user defined policy returns a dictionary of state that is persisted between autoscaling decisions # content of the dictionary is determined by the user defined policy @@ -113,7 +120,8 @@ def register(self, info: DeploymentInfo, curr_target_num_replicas: int) -> int: self._deployment_info = info self._config = config - self._policy = self._config.policy.get_policy() + # Apply default autoscaling config to the policy + self._policy = _apply_autoscaling_config(self._config.policy.get_policy()) self._target_capacity = info.target_capacity self._target_capacity_direction = info.target_capacity_direction self._policy_state = {} @@ -305,6 +313,9 @@ def get_decision_num_replicas( # Time the policy execution start_time = time.time() decision_num_replicas, self._policy_state = self._policy(autoscaling_context) + # The policy can return a float value. + if isinstance(decision_num_replicas, float): + decision_num_replicas = math.ceil(decision_num_replicas) policy_execution_time_ms = (time.time() - start_time) * 1000 self.record_autoscaling_metrics( @@ -815,7 +826,10 @@ def __init__( self._policy: Optional[ Callable[ [Dict[DeploymentID, AutoscalingContext]], - Tuple[Dict[DeploymentID, int], Optional[Dict[DeploymentID, Dict]]], + Tuple[ + Dict[DeploymentID, Union[int, float]], + Optional[Dict[DeploymentID, Dict]], + ], ] ] = None # user defined policy returns a dictionary of state that is persisted between autoscaling decisions @@ -837,7 +851,10 @@ def register( Args: autoscaling_policy: The autoscaling policy to register. """ - self._policy = autoscaling_policy.get_policy() + # Apply default autoscaling config to the policy + self._policy = _apply_app_level_autoscaling_config( + autoscaling_policy.get_policy() + ) self._policy_state = {} # Log when custom autoscaling policy is used for application @@ -974,10 +991,10 @@ def get_decision_num_replicas( ) results[deployment_id] = ( self._deployment_autoscaling_states[deployment_id].apply_bounds( - num_replicas + math.ceil(num_replicas) ) if not _skip_bound_check - else num_replicas + else math.ceil(num_replicas) ) return results else: diff --git a/python/ray/serve/autoscaling_policy.py b/python/ray/serve/autoscaling_policy.py index 35083aa68df0..78e642b5284d 100644 --- a/python/ray/serve/autoscaling_policy.py +++ b/python/ray/serve/autoscaling_policy.py @@ -1,7 +1,9 @@ +import functools import logging import math -from typing import Any, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple, Union +from ray.serve._private.common import DeploymentID from ray.serve._private.constants import ( CONTROL_LOOP_INTERVAL_S, SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, @@ -13,121 +15,44 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) -def _calculate_desired_num_replicas( +def _apply_scaling_factors( + desired_num_replicas: Union[int, float], + current_num_replicas: int, autoscaling_config: AutoscalingConfig, - total_num_requests: int, - num_running_replicas: int, - override_min_replicas: Optional[float] = None, - override_max_replicas: Optional[float] = None, ) -> int: - """Returns the number of replicas to scale to based on the given metrics. - - Args: - autoscaling_config: The autoscaling parameters to use for this - calculation. - current_num_ongoing_requests (List[float]): A list of the number of - ongoing requests for each replica. Assumes each entry has already - been time-averaged over the desired lookback window. - override_min_replicas: Overrides min_replicas from the config - when calculating the final number of replicas. - override_max_replicas: Overrides max_replicas from the config - when calculating the final number of replicas. - - Returns: - desired_num_replicas: The desired number of replicas to scale to, based - on the input metrics and the current number of replicas. + """Apply scaling factors to the desired number of replicas. + Returns the scaled number of replicas depending on the scaling factor. + The computation uses the difference between desired and current to scale. """ - if num_running_replicas == 0: - raise ValueError("Number of replicas cannot be zero") - - # Example: if error_ratio == 2.0, we have two times too many ongoing - # requests per replica, so we desire twice as many replicas. - target_num_requests = ( - autoscaling_config.get_target_ongoing_requests() * num_running_replicas + replicas_delta = desired_num_replicas - current_num_replicas + scaling_factor = ( + autoscaling_config.get_upscaling_factor() + if replicas_delta > 0 + else autoscaling_config.get_downscaling_factor() ) - error_ratio: float = total_num_requests / target_num_requests - - # If error ratio >= 1, then the number of ongoing requests per - # replica exceeds the target and we will make an upscale decision, - # so we apply the upscale smoothing factor. Otherwise, the number of - # ongoing requests per replica is lower than the target and we will - # make a downscale decision, so we apply the downscale smoothing - # factor. - if error_ratio >= 1: - scaling_factor = autoscaling_config.get_upscaling_factor() - else: - scaling_factor = autoscaling_config.get_downscaling_factor() - - # Multiply the distance to 1 by the smoothing ("gain") factor (default=1). - smoothed_error_ratio = 1 + ((error_ratio - 1) * scaling_factor) - desired_num_replicas = math.ceil(num_running_replicas * smoothed_error_ratio) - - # If desired num replicas is "stuck" because of the smoothing factor - # (meaning the traffic is low enough for the replicas to downscale - # without the smoothing factor), decrease desired_num_replicas by 1. + scaled_num_replicas = math.ceil( + current_num_replicas + scaling_factor * replicas_delta + ) + # If the scaled_replicas are stuck during downscaling because of scaling factor, decrement by 1. if ( - math.ceil(num_running_replicas * error_ratio) < num_running_replicas - and desired_num_replicas == num_running_replicas + math.ceil(float(desired_num_replicas)) < current_num_replicas + and scaled_num_replicas == current_num_replicas ): - desired_num_replicas -= 1 - - min_replicas = autoscaling_config.min_replicas - max_replicas = autoscaling_config.max_replicas - if override_min_replicas is not None: - min_replicas = override_min_replicas - if override_max_replicas is not None: - max_replicas = override_max_replicas - - # Ensure scaled_min_replicas <= desired_num_replicas <= scaled_max_replicas. - desired_num_replicas = max(min_replicas, min(max_replicas, desired_num_replicas)) + scaled_num_replicas -= 1 + return scaled_num_replicas - return desired_num_replicas - -@PublicAPI(stability="alpha") -def replica_queue_length_autoscaling_policy( - ctx: AutoscalingContext, +def _apply_delay_logic( + desired_num_replicas: int, + curr_target_num_replicas: int, + config: AutoscalingConfig, + policy_state: Dict[str, Any], ) -> Tuple[int, Dict[str, Any]]: - """The default autoscaling policy based on basic thresholds for scaling. - There is a minimum threshold for the average queue length in the cluster - to scale up and a maximum threshold to scale down. Each period, a 'scale - up' or 'scale down' decision is made. This decision must be made for a - specified number of periods in a row before the number of replicas is - actually scaled. See config options for more details. Assumes - `get_decision_num_replicas` is called once every CONTROL_LOOP_PERIOD_S - seconds. - """ - - curr_target_num_replicas: int = ctx.target_num_replicas - total_num_requests: int = ctx.total_num_requests - num_running_replicas: int = ctx.current_num_replicas - config: Optional[AutoscalingConfig] = ctx.config - capacity_adjusted_min_replicas: int = ctx.capacity_adjusted_min_replicas - capacity_adjusted_max_replicas: int = ctx.capacity_adjusted_max_replicas - policy_state: Dict[str, Any] = ctx.policy_state - decision_counter = policy_state.get(SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0) - if num_running_replicas == 0: - # When 0 replicas and queries are queued, scale up the replicas - if total_num_requests > 0: - return ( - max( - math.ceil(1 * config.get_upscaling_factor()), - curr_target_num_replicas, - ), - policy_state, - ) - return curr_target_num_replicas, policy_state + """Apply delay logic to the desired number of replicas.""" decision_num_replicas = curr_target_num_replicas - - desired_num_replicas = _calculate_desired_num_replicas( - config, - total_num_requests, - num_running_replicas=num_running_replicas, - override_min_replicas=capacity_adjusted_min_replicas, - override_max_replicas=capacity_adjusted_max_replicas, - ) + decision_counter = policy_state.get(SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0) # Scale up. if desired_num_replicas > curr_target_num_replicas: # If the previous decision was to scale down (the counter was @@ -177,4 +102,227 @@ def replica_queue_length_autoscaling_policy( return decision_num_replicas, policy_state +def _apply_bounds( + num_replicas: int, + capacity_adjusted_min_replicas: int, + capacity_adjusted_max_replicas: int, +) -> int: + """Clip replica count to be within capacity-adjusted min/max bounds.""" + return max( + capacity_adjusted_min_replicas, + min(capacity_adjusted_max_replicas, num_replicas), + ) + + +def _apply_default_params( + desired_num_replicas: Union[int, float], + ctx: AutoscalingContext, + policy_state: Dict[str, Any], +) -> Tuple[int, Dict[str, Any]]: + """Apply the default parameters to the desired number of replicas.""" + + desired_num_replicas = _apply_scaling_factors( + desired_num_replicas, ctx.current_num_replicas, ctx.config + ) + # Apply bounds + bounded_num_replicas = _apply_bounds( + desired_num_replicas, + ctx.capacity_adjusted_min_replicas, + ctx.capacity_adjusted_max_replicas, + ) + # Apply delay logic + # Only send the internal state here to avoid overwriting the custom policy state. + final_num_replicas, updated_state = _apply_delay_logic( + bounded_num_replicas, ctx.target_num_replicas, ctx.config, policy_state + ) + + return final_num_replicas, updated_state + + +def _apply_default_params_and_merge_state( + policy_state: Dict[str, Any], + user_policy_state: Dict[str, Any], + desired_num_replicas: Union[int, float], + ctx: AutoscalingContext, +) -> Tuple[int, Dict[str, Any]]: + + # Extract internal polciy state from policy_state + internal_policy_state = { + SERVE_AUTOSCALING_DECISION_COUNTERS_KEY: policy_state.get( + SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0 + ) + } + # Only pass the internal state used for delay counters so we don't + # overwrite any custom user state. + final_num_replicas, updated_state = _apply_default_params( + desired_num_replicas, ctx, internal_policy_state + ) + # Merge internal updated_state with the user's custom policy state. + if updated_state: + user_policy_state.update(updated_state) + return final_num_replicas, user_policy_state + + +def _merge_user_state_with_internal_state( + policy_state: Dict[str, Any], + user_policy_state: Dict[str, Any], +) -> Dict[str, Any]: + """Merge user state with previous policy state, preserving internal keys. + + This mutates and returns `user_policy_state`. + """ + # Extract internal polciy state from policy_state + internal_policy_state = { + SERVE_AUTOSCALING_DECISION_COUNTERS_KEY: policy_state.get( + SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0 + ) + } + user_policy_state.update(internal_policy_state) + return user_policy_state + + +def _get_cold_start_scale_up_replicas(ctx: AutoscalingContext) -> Optional[int]: + """ + Returns the desired number of replicas if the cold start fast path applies, otherwise returns None. + """ + if ctx.current_num_replicas == 0: + if ctx.total_num_requests > 0: + return max( + math.ceil(1 * ctx.config.get_upscaling_factor()), + ctx.target_num_replicas, + ) + return ctx.target_num_replicas + return None + + +def _apply_autoscaling_config( + policy_func: Callable[ + [AutoscalingContext], Tuple[Union[int, float], Dict[str, Any]] + ] +) -> Callable[[AutoscalingContext], Tuple[int, Dict[str, Any]]]: + """ + Wraps a custom policy function to automatically apply: + - upscaling_factor / downscaling_factor + - min_replicas / max_replicas bounds + - upscale_delay_s / downscale_delay_s / downscale_to_zero_delay_s + """ + + @functools.wraps(policy_func) + def wrapped_policy(ctx: AutoscalingContext) -> Tuple[int, Dict[str, Any]]: + + # Cold start fast path: 0 replicas bypasses delay logic for immediate scale-up + cold_start_replicas = _get_cold_start_scale_up_replicas(ctx) + if cold_start_replicas is not None: + return cold_start_replicas, ctx.policy_state + policy_state = ctx.policy_state.copy() + desired_num_replicas, updated_custom_policy_state = policy_func(ctx) + final_num_replicas, final_state = _apply_default_params_and_merge_state( + policy_state, updated_custom_policy_state, desired_num_replicas, ctx + ) + + return final_num_replicas, final_state + + return wrapped_policy + + +def _apply_app_level_autoscaling_config( + policy_func: Callable[ + [Dict[DeploymentID, AutoscalingContext]], + Tuple[ + Dict[DeploymentID, Union[int, float]], + Optional[Dict[DeploymentID, Dict]], + ], + ] +) -> Callable[ + [Dict[DeploymentID, AutoscalingContext]], + Tuple[Dict[DeploymentID, int], Dict[DeploymentID, Dict]], +]: + """ + Wraps an application-level custom policy function to automatically apply per-deployment: + - upscaling_factor / downscaling_factor + - min_replicas / max_replicas bounds + - upscale_delay_s / downscale_delay_s / downscale_to_zero_delay_s + """ + + @functools.wraps(policy_func) + def wrapped_policy( + contexts: Dict[DeploymentID, AutoscalingContext] + ) -> Tuple[Dict[DeploymentID, int], Dict[DeploymentID, Dict]]: + + # Store the policy state per deployment + state_per_deployment = {} + for dep_id, ctx in contexts.items(): + state_per_deployment[dep_id] = ctx.policy_state.copy() + + # Send to the actual policy + desired_num_replicas_dict, updated_custom_policy_state = policy_func(contexts) + updated_custom_policy_state = updated_custom_policy_state or {} + + # Build per-deployment replicas count and state dictionary. + final_decisions: Dict[DeploymentID, int] = {} + final_state: Dict[DeploymentID, Dict] = {} + for dep_id, ctx in contexts.items(): + if dep_id not in desired_num_replicas_dict: + final_state[dep_id] = state_per_deployment[dep_id] + continue + + custom_policy_state_per_deployment = updated_custom_policy_state.get( + dep_id, {} + ) + # Cold start fast path: 0 replicas bypasses delay logic for immediate scale-up + cold_start_replicas = _get_cold_start_scale_up_replicas(ctx) + if cold_start_replicas is not None: + final_decisions[dep_id] = cold_start_replicas + # Merge user policy state with internal policy state + final_state[dep_id] = _merge_user_state_with_internal_state( + state_per_deployment[dep_id], + custom_policy_state_per_deployment, + ) + continue + final_num_replicas, final_dep_state = _apply_default_params_and_merge_state( + state_per_deployment[dep_id], + custom_policy_state_per_deployment, + desired_num_replicas_dict[dep_id], + ctx, + ) + final_decisions[dep_id] = final_num_replicas + final_state[dep_id] = final_dep_state + return final_decisions, final_state + + return wrapped_policy + + +def _core_replica_queue_length_policy( + ctx: AutoscalingContext, +) -> Tuple[float, Dict[str, Any]]: + num_running_replicas = ctx.current_num_replicas + config = ctx.config + if num_running_replicas == 0: + raise ValueError("Number of replicas cannot be zero") + target_num_requests = config.get_target_ongoing_requests() * num_running_replicas + error_ratio = ctx.total_num_requests / target_num_requests + desired_num_replicas = num_running_replicas * error_ratio + return desired_num_replicas, {} + + +@PublicAPI(stability="alpha") +def replica_queue_length_autoscaling_policy( + ctx: AutoscalingContext, +) -> Tuple[Union[int, float], Dict[str, Any]]: + """The default autoscaling policy based on basic thresholds for scaling. + There is a minimum threshold for the average queue length in the cluster + to scale up and a maximum threshold to scale down. Each period, a 'scale + up' or 'scale down' decision is made. This decision must be made for a + specified number of periods in a row before the number of replicas is + actually scaled. See config options for more details. Assumes + `get_decision_num_replicas` is called once every CONTROL_LOOP_PERIOD_S + seconds. + """ + # Adding this guard makes the public policy safe to call directly. + cold_start_replicas = _get_cold_start_scale_up_replicas(ctx) + if cold_start_replicas is not None: + return cold_start_replicas, ctx.policy_state + return _core_replica_queue_length_policy(ctx) + + default_autoscaling_policy = replica_queue_length_autoscaling_policy diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 2e7f978bb954..1d36f0b640df 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -1531,6 +1531,7 @@ def check_expected_statuses( print("Statuses are as expected.") +# Serve applies autoscaling config to custom policies at registration time. def custom_autoscaling_policy(ctx: AutoscalingContext): if ctx.total_num_requests > 50: return 3, {} @@ -1542,7 +1543,7 @@ def custom_autoscaling_policy(ctx: AutoscalingContext): "policy", [ { - "policy_function": "ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy" + "policy_function": "ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy", }, AutoscalingPolicy( policy_function="ray.serve.tests.test_autoscaling_policy.custom_autoscaling_policy" diff --git a/python/ray/serve/tests/unit/test_application_state.py b/python/ray/serve/tests/unit/test_application_state.py index 67f8c57ab852..881670fdc590 100644 --- a/python/ray/serve/tests/unit/test_application_state.py +++ b/python/ray/serve/tests/unit/test_application_state.py @@ -28,7 +28,10 @@ TimeStampedValue, ) from ray.serve._private.config import DeploymentConfig, ReplicaConfig -from ray.serve._private.constants import RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE +from ray.serve._private.constants import ( + CONTROL_LOOP_INTERVAL_S, + RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, +) from ray.serve._private.deploy_utils import deploy_args_to_deployment_info from ray.serve._private.deployment_info import DeploymentInfo from ray.serve._private.test_utils import MockKVStore @@ -2596,6 +2599,29 @@ def stateful_app_level_policy(contexts): return decisions, new_state +def app_level_policy_with_decorator(contexts): + """App-level policy used to verify that the decorator applies delay logic.""" + decisions = {} + for dep_id, ctx in contexts.items(): + curr = ctx.target_num_replicas + if curr < 5: + decisions[dep_id] = 5 + elif curr > 1: + decisions[dep_id] = 1 + else: + decisions[dep_id] = curr + return decisions, {} + + +def partial_app_level_policy(contexts): + """Policy that returns decisions for only a subset of deployments.""" + decisions = {} + for deployment_id in contexts.keys(): + if deployment_id.name == "d1": + decisions[deployment_id] = 4 + return decisions, {} + + class TestApplicationLevelAutoscaling: """Test application-level autoscaling policy registration, execution, and lifecycle.""" @@ -2616,6 +2642,13 @@ def _create_app_config( ) ] + # Overriding the default delay values for deterministic behavior for unit tests + for d in deployments: + if d.autoscaling_config is None: + continue + d.autoscaling_config.setdefault("upscale_delay_s", 0.0) + d.autoscaling_config.setdefault("downscale_delay_s", 0.0) + return ServeApplicationSchema( name=app_name, import_path="fake.import.path", @@ -2650,12 +2683,7 @@ def _deploy_app_with_mocks(self, app_state_manager, app_config): deployment_infos[deployment.name] = deployment_info( deployment.name, "/hi" if deployment.name == "d1" else None, - autoscaling_config={ - "target_ongoing_requests": 1, - "min_replicas": 1, - "max_replicas": 5, - "initial_replicas": 1, - }, + autoscaling_config=deployment.autoscaling_config, ) mock_reconcile.return_value = ( @@ -2670,18 +2698,14 @@ def _deploy_app_with_mocks(self, app_state_manager, app_config): def _register_deployments(self, app_state_manager, app_config): """Helper to register deployments with autoscaling manager.""" + # Pick autoscaling config from the app config asm = app_state_manager._autoscaling_state_manager for deployment in app_config.deployments: deployment_id = DeploymentID(name=deployment.name, app_name=app_config.name) deployment_info_obj = deployment_info( deployment.name, "/hi" if deployment.name == "d1" else None, - autoscaling_config={ - "target_ongoing_requests": 1, - "min_replicas": 1, - "max_replicas": 5, - "initial_replicas": 1, - }, + autoscaling_config=deployment.autoscaling_config, ) asm.register_deployment(deployment_id, deployment_info_obj, 1) return asm @@ -2768,6 +2792,77 @@ def test_app_level_autoscaling_policy_registration_and_execution( # Verify policy was executed (scales to 3 replicas) assert deployment_state_manager._scaling_decisions[d1_id] == 3 + @pytest.mark.parametrize( + "policy_import_path", + [ + "ray.serve.tests.unit.test_application_state:partial_app_level_policy", + ], + ) + def test_app_level_autoscaling_policy_can_return_partial_decisions( + self, mocked_application_state_manager, policy_import_path + ): + """Omitted deployments from decisions should not be autoscaled.""" + ( + app_state_manager, + deployment_state_manager, + _, + ) = mocked_application_state_manager + + # Create app config with two deployments and override to use the partial policy. + deployments = [ + DeploymentSchema( + name="d1", + autoscaling_config={ + "target_ongoing_requests": 1, + "min_replicas": 1, + "max_replicas": 5, + "initial_replicas": 1, + "upscale_delay_s": 0.0, + "downscale_delay_s": 0.0, + "metrics_interval_s": 0.1, + }, + ), + DeploymentSchema( + name="d2", + autoscaling_config={ + "target_ongoing_requests": 1, + "min_replicas": 1, + "max_replicas": 5, + "initial_replicas": 1, + "upscale_delay_s": 0.0, + "downscale_delay_s": 0.0, + "metrics_interval_s": 0.1, + }, + ), + ] + app_config = self._create_app_config(deployments=deployments) + app_config.autoscaling_policy = {"policy_function": policy_import_path} + + _ = self._deploy_app_with_mocks(app_state_manager, app_config) + asm = self._register_deployments(app_state_manager, app_config) + + d1_id = DeploymentID(name="d1", app_name="test_app") + d2_id = DeploymentID(name="d2", app_name="test_app") + + # Create replicas so autoscaling runs. + d1_replicas = [ + ReplicaID(unique_id=f"d1_replica_{i}", deployment_id=d1_id) for i in [1, 2] + ] + d2_replicas = [ + ReplicaID(unique_id=f"d2_replica_{i}", deployment_id=d2_id) for i in [1, 2] + ] + asm.update_running_replica_ids(d1_id, d1_replicas) + asm.update_running_replica_ids(d2_id, d2_replicas) + + # Add a previous decision for both depoloyments + deployment_state_manager._scaling_decisions[d1_id] = 2 + deployment_state_manager._scaling_decisions[d2_id] = 99 + + app_state_manager.update() + + assert deployment_state_manager._scaling_decisions[d1_id] == 4 + assert deployment_state_manager._scaling_decisions[d2_id] == 99 + def test_app_level_autoscaling_policy_recovery( self, mocked_application_state_manager ): @@ -3289,6 +3384,82 @@ def test_validate_policy_state(self, mocked_application_state_manager): with pytest.raises(AssertionError, match="must be a dictionary"): app_autoscaling_state._validate_policy_state(invalid_value_state) + def test_app_level_autoscaling_with_decorator_applies_delays( + self, mocked_application_state_manager + ): + """Test that apply_app_level_autoscaling_config applies delay logic for an app-level policy.""" + + ( + app_state_manager, + deployment_state_manager, + _, + ) = mocked_application_state_manager + # Create deployments for the policy + deployments = [ + DeploymentSchema( + name="d1", + autoscaling_config={ + "target_ongoing_requests": 1, + "min_replicas": 1, + "max_replicas": 5, + "initial_replicas": 1, + "upscale_delay_s": 0.4, + "downscale_delay_s": 0.6, + "metrics_interval_s": 0.1, + }, + ) + ] + + # Create app config but override to use the decorated app-level policy. + app_config = self._create_app_config(deployments=deployments) + app_config.autoscaling_policy = { + "policy_function": "ray.serve.tests.unit.test_application_state:app_level_policy_with_decorator" + } + + # Deploy app and register deployments with autoscaling manager. + _ = self._deploy_app_with_mocks(app_state_manager, app_config) + asm = self._register_deployments(app_state_manager, app_config) + + d1_id = DeploymentID(name="d1", app_name="test_app") + + # Get the delay values from the deployment config + upscale_delay_s = deployments[0].autoscaling_config["upscale_delay_s"] + wait_periods_upscale = int(upscale_delay_s / CONTROL_LOOP_INTERVAL_S) + downscale_delay_s = deployments[0].autoscaling_config["downscale_delay_s"] + wait_periods_downscale = int(downscale_delay_s / CONTROL_LOOP_INTERVAL_S) + + # Create replicas so autoscaling runs. + d1_replicas = [ + ReplicaID(unique_id=f"d1_replica_{i}", deployment_id=d1_id) for i in [1, 2] + ] + asm.update_running_replica_ids(d1_id, d1_replicas) + + app_state = app_state_manager._application_states["test_app"] + + for _ in range(wait_periods_upscale): + app_state.autoscale() + current_replicas = deployment_state_manager._scaling_decisions[d1_id] + assert current_replicas == 1 + + app_state.autoscale() + # Count the number of replicas are 5 now + assert d1_id in deployment_state_manager._scaling_decisions + assert deployment_state_manager._scaling_decisions[d1_id] == 5 + + # Set the number of replicas to 5 so the policy can scale down to 1 + deployment_state_manager.deployment_infos[ + d1_id + ].deployment_config.num_replicas = 5 + + # Scale down to 1 + for _ in range(wait_periods_downscale): + app_state.autoscale() + current_replicas = deployment_state_manager._scaling_decisions[d1_id] + assert current_replicas == 5 + app_state.autoscale() + assert d1_id in deployment_state_manager._scaling_decisions + assert deployment_state_manager._scaling_decisions[d1_id] == 1 + def test_get_external_scaler_enabled(mocked_application_state_manager): """Test get_external_scaler_enabled returns correct value based on app config. diff --git a/python/ray/serve/tests/unit/test_autoscaling_policy.py b/python/ray/serve/tests/unit/test_autoscaling_policy.py index 337507b52584..f7daa5edd0e3 100644 --- a/python/ray/serve/tests/unit/test_autoscaling_policy.py +++ b/python/ray/serve/tests/unit/test_autoscaling_policy.py @@ -5,11 +5,19 @@ from ray.serve._private.common import DeploymentID, ReplicaID, TimeStampedValue from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S from ray.serve.autoscaling_policy import ( - _calculate_desired_num_replicas, + _apply_app_level_autoscaling_config, + _apply_autoscaling_config, + _apply_bounds, + _apply_delay_logic, + _apply_scaling_factors, replica_queue_length_autoscaling_policy, ) from ray.serve.config import AutoscalingConfig, AutoscalingContext +wrapped_replica_queue_length_autoscaling_policy = _apply_autoscaling_config( + replica_queue_length_autoscaling_policy +) + def create_context_with_overrides( base_ctx: AutoscalingContext, **kwargs @@ -50,192 +58,242 @@ def create_context_with_overrides( return AutoscalingContext(**params) -class TestCalculateDesiredNumReplicas: - def test_bounds_checking(self): - num_replicas = 10 - max_replicas = 11 - min_replicas = 9 - config = AutoscalingConfig( - max_replicas=max_replicas, - min_replicas=min_replicas, - target_ongoing_requests=100, - ) +def _run_upscale_downscale_flow( + policy, + config: AutoscalingConfig, + ctx: AutoscalingContext, + overload_requests: int, + start_replicas: int, + upscale_target: int, +): + """ + This runs the upscale and downscale flow to test the delays during upscale and + downscale. + This can be used by both the default autoscaling policy and custom autoscaling policy + with default parameters to verify scaling is properly enabled. + The downscale flow is from upscale_target upto zero + """ - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=150 * num_replicas, - num_running_replicas=num_replicas, + upscale_wait_periods = int(config.upscale_delay_s / CONTROL_LOOP_INTERVAL_S) + downscale_wait_periods = int(config.downscale_delay_s / CONTROL_LOOP_INTERVAL_S) + # Check if downscale_to_zero_delay_s is set + if config.downscale_to_zero_delay_s: + downscale_to_zero_wait_periods = int( + config.downscale_to_zero_delay_s / CONTROL_LOOP_INTERVAL_S ) - assert desired_num_replicas == max_replicas - - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=50 * num_replicas, - num_running_replicas=num_replicas, + else: + downscale_to_zero_wait_periods = int( + config.downscale_delay_s / CONTROL_LOOP_INTERVAL_S ) - assert desired_num_replicas == min_replicas - - for i in range(50, 150): - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=i * num_replicas, - num_running_replicas=num_replicas, - ) - assert min_replicas <= desired_num_replicas <= max_replicas - @pytest.mark.parametrize("target_requests", [0.5, 1.0, 1.5]) - def test_scale_up(self, target_requests): - config = AutoscalingConfig( - min_replicas=0, max_replicas=100, target_ongoing_requests=target_requests - ) - num_replicas = 10 - num_ongoing_requests = 2 * target_requests * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, - ) - assert 19 <= desired_num_replicas <= 21 # 10 * 2 = 20 + # Initialize local policy_state from the base context, so both default and + # decorated policies can persist their internal state + policy_state = ctx.policy_state or {} - @pytest.mark.parametrize("target_requests", [0.5, 1.0, 1.5]) - def test_scale_down(self, target_requests): - config = AutoscalingConfig( - min_replicas=0, max_replicas=100, target_ongoing_requests=target_requests - ) - num_replicas = 10 - num_ongoing_requests = 0.5 * target_requests * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # We should scale up only after enough consecutive scale-up decisions. + for i in range(upscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=start_replicas, + target_num_replicas=start_replicas, + policy_state=policy_state, ) - assert 4 <= desired_num_replicas <= 6 # 10 * 0.5 = 5 - - @pytest.mark.parametrize("use_deprecated_smoothing_factor", [True, False]) - def test_scaling_factor(self, use_deprecated_smoothing_factor): - config = {"min_replicas": 0, "max_replicas": 100, "target_ongoing_requests": 2} - - if use_deprecated_smoothing_factor: - config["smoothing_factor"] = 0.5 - else: - config["upscaling_factor"] = 0.5 - config["downscaling_factor"] = 0.5 + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == start_replicas, i + + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=start_replicas, + target_num_replicas=start_replicas, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == upscale_target - config = AutoscalingConfig(**config) - num_replicas = 10 + no_requests = 0 - num_ongoing_requests = 8.0 * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # We should scale down only after enough consecutive scale-down decisions. + for i in range(downscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=upscale_target, + target_num_replicas=upscale_target, + policy_state=policy_state, ) - assert 24 <= desired_num_replicas <= 26 # 10 + 0.5 * (40 - 10) = 25 + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == upscale_target, i + + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=upscale_target, + target_num_replicas=upscale_target, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1 - num_ongoing_requests = 0.25 * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # We should scale down to zero only after enough consecutive downscale-to-zero decisions. + for i in range(downscale_to_zero_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, ) - assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25 - - @pytest.mark.parametrize("use_deprecated_smoothing_factor", [True, False]) - def test_upscaling_factor(self, use_deprecated_smoothing_factor): - config = {"min_replicas": 0, "max_replicas": 100, "target_ongoing_requests": 2} + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1, i + + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 0 - if use_deprecated_smoothing_factor: - config["upscale_smoothing_factor"] = 0.5 - else: - config["upscaling_factor"] = 0.5 + # Get some scale-up decisions, but not enough to trigger a scale up. + for i in range(int(upscale_wait_periods / 2)): + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1, i - config = AutoscalingConfig(**config) - num_replicas = 10 + # Interrupt with a scale-down decision. + ctx = create_context_with_overrides( + ctx, + total_num_requests=0, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, + ) + _, policy_state = policy(ctx=ctx) - # Should use upscale smoothing factor of 0.5 - num_ongoing_requests = 8.0 * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # The counter should be reset, so it should require `upscale_wait_periods` + # more periods before we actually scale up. + for i in range(upscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, ) - assert 24 <= desired_num_replicas <= 26 # 10 + 0.5 * (40 - 10) = 25 + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1, i + + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == upscale_target - # Should use downscale smoothing factor of 1 (default) - num_ongoing_requests = 0.25 * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # Get some scale-down decisions, but not enough to trigger a scale down. + for i in range(int(downscale_wait_periods / 2)): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=upscale_target, + target_num_replicas=upscale_target, + policy_state=policy_state, ) - assert 1 <= desired_num_replicas <= 4 # 10 + (2.5 - 10) = 2.5 - - @pytest.mark.parametrize("use_deprecated_smoothing_factor", [True, False]) - def test_downscaling_factor(self, use_deprecated_smoothing_factor): - config = {"min_replicas": 0, "max_replicas": 100, "target_ongoing_requests": 2} - - if use_deprecated_smoothing_factor: - config["downscale_smoothing_factor"] = 0.5 - else: - config["downscaling_factor"] = 0.5 - - config = AutoscalingConfig(**config) - num_replicas = 10 + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == upscale_target, i + + # Interrupt with a scale-up decision. + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=upscale_target, + target_num_replicas=upscale_target, + policy_state=policy_state, + ) + _, policy_state = policy(ctx=ctx) - # Should use upscale smoothing factor of 1 (default) - num_ongoing_requests = 8.0 * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # The counter should be reset so it should require `downscale_wait_periods` + # more periods before we actually scale down. + # We should scale down only after enough consecutive scale-down decisions. + for i in range(downscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=upscale_target, + target_num_replicas=upscale_target, + policy_state=policy_state, ) - assert 39 <= desired_num_replicas <= 41 # 10 + (40 - 10) = 40 + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == upscale_target, i + + # First scale down to 1 replica + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=upscale_target, + target_num_replicas=upscale_target, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1 - # Should use downscale smoothing factor of 0.5 - num_ongoing_requests = 0.25 * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=num_ongoing_requests, - num_running_replicas=num_replicas, + # Scale down to 0, but not enough to trigger a complete scale down to zero. + for i in range(int(downscale_to_zero_wait_periods / 2)): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, ) - assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25 - - @pytest.mark.parametrize( - "num_replicas,ratio,scaling_factor", - [ - # All of the parametrized scenarios should downscale by 1 - # replica. Compare the first theoretical calculation that's - # with smoothing factor, and the second calculation without - # smoothing factor. In these cases, downscaling should not - # be blocked by fractional smoothing factor. - (2, 0.3, 0.5), # 2 - 0.5 (2 * 0.7) = 1.3 | 2 - (2 * 0.7) = 0.6 - (5, 0.4, 0.2), # 5 - 0.2 (5 * 0.6) = 4.4 | 5 - (5 * 0.6) = 2 - (10, 0.4, 0.1), # 10 - 0.1 (10 * 0.6) = 9.4 | 10 - (10 * 0.6) = 4 - ], + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1, i + + # Interrupt with a scale-up decision. + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, ) - @pytest.mark.parametrize("use_deprecated_smoothing_factor", [True, False]) - def test_downscaling_with_fractional_scaling_factor( - self, - num_replicas: int, - ratio: float, - scaling_factor: float, - use_deprecated_smoothing_factor, - ): - config = {"min_replicas": 0, "max_replicas": 100, "target_ongoing_requests": 1} - - if use_deprecated_smoothing_factor: - config["downscale_smoothing_factor"] = scaling_factor - else: - config["downscaling_factor"] = scaling_factor + _, policy_state = policy(ctx=ctx) - config = AutoscalingConfig(**config) - total_num_requests = ratio * num_replicas - desired_num_replicas = _calculate_desired_num_replicas( - autoscaling_config=config, - total_num_requests=total_num_requests, - num_running_replicas=num_replicas, + # The counter should be reset so it should require `downscale_to_zero_wait_periods` + # more periods before we actually scale down. + for i in range(downscale_to_zero_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, ) - assert desired_num_replicas == num_replicas - 1 + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 1, i + + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + policy_state=policy_state, + ) + new_num_replicas, policy_state = policy(ctx=ctx) + assert new_num_replicas == 0 class TestReplicaQueueLengthPolicy: @@ -277,7 +335,7 @@ def test_scaling_factor_scale_up_from_0_replicas( last_scale_up_time=None, last_scale_down_time=None, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) # 1 * 10 assert new_num_replicas == 10 @@ -287,7 +345,7 @@ def test_scaling_factor_scale_up_from_0_replicas( if use_upscaling_factor: config.upscaling_factor = 0.5 - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) # math.ceil(1 * 0.5) assert new_num_replicas == 1 @@ -333,13 +391,13 @@ def test_scaling_factor_scale_down_to_0_replicas( last_scale_up_time=None, last_scale_down_time=None, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) # Downscaling to 0 first stops at 1 assert new_num_replicas == 1 # Need to trigger this the second time to go to zero ctx.target_num_replicas = 1 ctx.current_num_replicas = 1 - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 0 # With smoothing factor < 1, the desired number of replicas shouldn't @@ -359,17 +417,13 @@ def test_scaling_factor_scale_down_to_0_replicas( current_num_replicas=num_replicas, target_num_replicas=num_replicas, ) - num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert num_replicas == 0 @pytest.mark.parametrize("downscale_to_zero_delay_s", [None, 300]) def test_upscale_downscale_delay(self, downscale_to_zero_delay_s): """Unit test for upscale_delay_s, downscale_delay_s and downscale_to_zero_delay_s""" - - upscale_delay_s = 30.0 - downscale_delay_s = 600.0 - min_replicas = 0 max_replicas = 2 policy_state = {} @@ -382,18 +436,6 @@ def test_upscale_downscale_delay(self, downscale_to_zero_delay_s): downscale_to_zero_delay_s=downscale_to_zero_delay_s, ) - upscale_wait_periods = int(upscale_delay_s / CONTROL_LOOP_INTERVAL_S) - downscale_wait_periods = int(downscale_delay_s / CONTROL_LOOP_INTERVAL_S) - # Check if downscale_to_zero_delay_s is set - if downscale_to_zero_delay_s: - downscale_to_zero_wait_periods = int( - downscale_to_zero_delay_s / CONTROL_LOOP_INTERVAL_S - ) - else: - downscale_to_zero_wait_periods = int( - downscale_delay_s / CONTROL_LOOP_INTERVAL_S - ) - overload_requests = 100 ctx = AutoscalingContext( @@ -417,197 +459,17 @@ def test_upscale_downscale_delay(self, downscale_to_zero_delay_s): ) # Scale up when there are 0 replicas and current_handle_queued_queries > 0 - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1 - - # We should scale up only after enough consecutive scale-up decisions. - for i in range(upscale_wait_periods): - ctx = create_context_with_overrides( - ctx, - total_num_requests=overload_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1, i - - ctx = create_context_with_overrides( + # Run the basic upscale/downscale flow + _run_upscale_downscale_flow( + wrapped_replica_queue_length_autoscaling_policy, + config, ctx, - total_num_requests=overload_requests, - current_num_replicas=1, - target_num_replicas=1, + overload_requests=overload_requests, + start_replicas=1, + upscale_target=2, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 2 - - no_requests = 0 - - # We should scale down only after enough consecutive scale-down decisions. - # Downscaling to zero follows current_num_replicas->1->0 - for i in range(downscale_wait_periods): - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=2, - target_num_replicas=2, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 2, i - - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=2, - target_num_replicas=2, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1 - - # We should scale down to zero only after enough consecutive downscale-to-zero decisions. - for i in range(downscale_to_zero_wait_periods): - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1, i - - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 0 - - # Get some scale-up decisions, but not enough to trigger a scale up. - for i in range(int(upscale_wait_periods / 2)): - ctx = create_context_with_overrides( - ctx, - total_num_requests=overload_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1, i - - # Interrupt with a scale-down decision. - ctx = create_context_with_overrides( - ctx, - total_num_requests=0, - current_num_replicas=1, - target_num_replicas=1, - ) - replica_queue_length_autoscaling_policy(ctx=ctx) - - # The counter should be reset, so it should require `upscale_wait_periods` - # more periods before we actually scale up. - for i in range(upscale_wait_periods): - ctx = create_context_with_overrides( - ctx, - total_num_requests=overload_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1, i - - ctx = create_context_with_overrides( - ctx, - total_num_requests=overload_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 2 - - # Get some scale-down decisions, but not enough to trigger a scale down. - for i in range(int(downscale_wait_periods / 2)): - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=2, - target_num_replicas=2, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 2, i - - # Interrupt with a scale-up decision. - ctx = create_context_with_overrides( - ctx, - total_num_requests=200, - current_num_replicas=2, - target_num_replicas=2, - ) - replica_queue_length_autoscaling_policy(ctx=ctx) - - # The counter should be reset so it should require `downscale_wait_periods` - # more periods before we actually scale down. - # We should scale down only after enough consecutive scale-down decisions. - for i in range(downscale_wait_periods): - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=2, - target_num_replicas=2, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 2, i - - # First scale down to 1 replica - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=2, - target_num_replicas=2, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1 - - # Scale down to 0, but not enough to trigger a complete scale down to zero. - for i in range(int(downscale_to_zero_wait_periods / 2)): - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 1, i - - # Interrupt with a scale-up decision. - ctx = create_context_with_overrides( - ctx, - total_num_requests=100, - current_num_replicas=1, - target_num_replicas=1, - ) - replica_queue_length_autoscaling_policy(ctx=ctx) - - # The counter should be reset so it should require `downscale_to_zero_wait_periods` - # more periods before we actually scale down. - for i in range(downscale_to_zero_wait_periods): - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, v = replica_queue_length_autoscaling_policy(ctx=ctx) - # print(new_num_replicas, v) - assert new_num_replicas == 1, i - - ctx = create_context_with_overrides( - ctx, - total_num_requests=no_requests, - current_num_replicas=1, - target_num_replicas=1, - ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) - assert new_num_replicas == 0 def test_replicas_delayed_startup(self): """Unit test simulating replicas taking time to start up.""" @@ -644,7 +506,7 @@ def test_replicas_delayed_startup(self): ) # new_num_replicas = policy_manager.get_decision_num_replicas(1, 100, 1) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 100 # New target is 100, but no new replicas finished spinning up during this @@ -655,7 +517,7 @@ def test_replicas_delayed_startup(self): current_num_replicas=1, target_num_replicas=100, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 100 # Two new replicas spun up during this timestep. @@ -665,7 +527,7 @@ def test_replicas_delayed_startup(self): current_num_replicas=3, target_num_replicas=100, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 123 # A lot of queries got drained and a lot of replicas started up, but @@ -676,7 +538,7 @@ def test_replicas_delayed_startup(self): current_num_replicas=4, target_num_replicas=123, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 123 @pytest.mark.parametrize("delay_s", [30.0, 0.0]) @@ -734,7 +596,9 @@ def test_fluctuating_ongoing_requests(self, delay_s): current_num_replicas=1, target_num_replicas=1, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy( + ctx=ctx + ) if delay_s > 0: assert new_num_replicas == 1, trial else: @@ -746,7 +610,9 @@ def test_fluctuating_ongoing_requests(self, delay_s): current_num_replicas=2, target_num_replicas=2, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy( + ctx=ctx + ) if delay_s > 0: assert new_num_replicas == 2, trial else: @@ -787,7 +653,7 @@ def test_single_replica_receives_all_requests(self, ongoing_requests): last_scale_down_time=None, ) - new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) + new_num_replicas, _ = wrapped_replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == ongoing_requests / target_requests def test_callable_and_direct_values(self): @@ -874,5 +740,357 @@ def test_callable_and_direct_values(self): assert ctx2.raw_metrics["m2"][replica_id][0].value == 25.0 +class TestAutoscalingConfigParameters: + def test_apply_scaling_factors_upscale(self): + config = AutoscalingConfig( + min_replicas=1, max_replicas=30, upscaling_factor=0.5 + ) + desired_num_replicas = 20 + current_num_replicas = 10 + result = _apply_scaling_factors( + desired_num_replicas, current_num_replicas, config + ) + # Expected: 10 + 0.5 * (20 - 10) = 15 + assert result == 15 + + def test_apply_scaling_factors_downscale(self): + config = AutoscalingConfig( + min_replicas=1, max_replicas=30, downscaling_factor=0.5 + ) + desired_num_replicas = 5 + current_num_replicas = 20 + result = _apply_scaling_factors( + desired_num_replicas, current_num_replicas, config + ) + # Expected: 20 - 0.5 * (20 - 5) = ceil(12.5) = 13 + assert result == 13 + + def test_apply_scaling_factors_stuck_downscale(self): + config = AutoscalingConfig( + min_replicas=1, max_replicas=30, downscaling_factor=0.5 + ) + desired_num_replicas = 9 + current_num_replicas = 10 + result = _apply_scaling_factors( + desired_num_replicas, current_num_replicas, config + ) + # Expected: 10 - 0.5 * (10 - 9) = 9.5 = ceil(9.5) = 10. The logic then adjusts it to 9. + assert result == 9 + + def test_apply_bounds(self): + num_replicas = 5 + capacity_adjusted_min_replicas = 1 + capacity_adjusted_max_replicas = 10 + result = _apply_bounds( + num_replicas, capacity_adjusted_min_replicas, capacity_adjusted_max_replicas + ) + # Expected: max(1, min(10, 5)) = 5 + assert result == 5 + + def test_apply_delay_logic_upscale(self): + """Test upscale delay requires consecutive periods.""" + config = AutoscalingConfig( + min_replicas=1, + max_replicas=10, + upscale_delay_s=0.3, + ) + + ctx = AutoscalingContext( + target_num_replicas=1, + current_num_replicas=1, + config=config, + capacity_adjusted_min_replicas=1, + capacity_adjusted_max_replicas=10, + policy_state={}, + deployment_id=None, + deployment_name=None, + app_name=None, + running_replicas=None, + current_time=None, + total_num_requests=None, + total_queued_requests=None, + aggregated_metrics=None, + raw_metrics=None, + last_scale_up_time=None, + last_scale_down_time=None, + ) + upscale_wait_period = int(ctx.config.upscale_delay_s / CONTROL_LOOP_INTERVAL_S) + for i in range(upscale_wait_period): + decision, ctx.policy_state = _apply_delay_logic( + desired_num_replicas=5, + curr_target_num_replicas=ctx.target_num_replicas, + config=ctx.config, + policy_state=ctx.policy_state, + ) + assert decision == 1, f"Should not scale up on iteration {i}" + + decision, _ = _apply_delay_logic( + desired_num_replicas=5, + curr_target_num_replicas=ctx.target_num_replicas, + config=ctx.config, + policy_state=ctx.policy_state, + ) + assert decision == 5 + + def test_apply_delay_logic_downscale(self): + config = AutoscalingConfig( + min_replicas=0, + max_replicas=10, + downscale_to_zero_delay_s=0.4, + downscale_delay_s=0.3, + ) + ctx = AutoscalingContext( + target_num_replicas=4, + current_num_replicas=4, + config=config, + capacity_adjusted_min_replicas=0, + capacity_adjusted_max_replicas=10, + policy_state={}, + deployment_id=None, + deployment_name=None, + app_name=None, + running_replicas=None, + current_time=None, + total_num_requests=None, + total_queued_requests=None, + aggregated_metrics=None, + raw_metrics=None, + last_scale_up_time=None, + last_scale_down_time=None, + ) + downscale_to_zero_wait_period = int( + ctx.config.downscale_to_zero_delay_s / CONTROL_LOOP_INTERVAL_S + ) + downscale_wait_period = int( + ctx.config.downscale_delay_s / CONTROL_LOOP_INTERVAL_S + ) + # Downscale form 4->1 + for i in range(downscale_wait_period): + decision_num_replicas, ctx.policy_state = _apply_delay_logic( + desired_num_replicas=0, + curr_target_num_replicas=ctx.target_num_replicas, + config=ctx.config, + policy_state=ctx.policy_state, + ) + assert ( + decision_num_replicas == 4 + ), f"Should not scale down to 0 on iteration {i}" + + decision_num_replicas, _ = _apply_delay_logic( + desired_num_replicas=0, + curr_target_num_replicas=ctx.target_num_replicas, + config=ctx.config, + policy_state=ctx.policy_state, + ) + assert decision_num_replicas == 1 + ctx.target_num_replicas = decision_num_replicas + # Downscale from 1->0 + for i in range(downscale_to_zero_wait_period): + decision_num_replicas, ctx.policy_state = _apply_delay_logic( + desired_num_replicas=0, + curr_target_num_replicas=ctx.target_num_replicas, + config=ctx.config, + policy_state=ctx.policy_state, + ) + assert ( + decision_num_replicas == 1 + ), f"Should not scale down from 1 to 0 on tick {i}" + decision_num_replicas, _ = _apply_delay_logic( + desired_num_replicas=0, + curr_target_num_replicas=ctx.target_num_replicas, + config=ctx.config, + policy_state=ctx.policy_state, + ) + assert decision_num_replicas == 0 + + +@_apply_autoscaling_config +def simple_custom_policy(ctx: AutoscalingContext): + """ + Custom policy to check default parameters are applied + """ + if ctx.total_num_requests > 0: + desired_num_replicas = 3 + else: + desired_num_replicas = 0 + return desired_num_replicas, {} + + +@_apply_app_level_autoscaling_config +def simple_app_level_policy(ctxs): + """App-level policy that always requests scaling up to 5 replicas.""" + return {deployment_id: 5 for deployment_id in ctxs.keys()}, {} + + +class TestCustomPolicyWithDefaultParameters: + @pytest.mark.parametrize("downscale_to_zero_delay_s", [None, 300]) + def test_upscale_downscale_delay(self, downscale_to_zero_delay_s): + """Unit test for upscale_delay_s, downscale_delay_s and downscale_to_zero_delay_s""" + + min_replicas = 0 + max_replicas = 4 + policy_state = {} + config = AutoscalingConfig( + min_replicas=min_replicas, + max_replicas=max_replicas, + target_ongoing_requests=1, + upscale_delay_s=20.0, + downscale_delay_s=400.0, + downscale_to_zero_delay_s=downscale_to_zero_delay_s, + ) + + ctx = AutoscalingContext( + config=config, + total_num_requests=1, + current_num_replicas=0, + target_num_replicas=0, + capacity_adjusted_min_replicas=min_replicas, + capacity_adjusted_max_replicas=max_replicas, + policy_state=policy_state, + deployment_id=None, + deployment_name=None, + app_name=None, + running_replicas=None, + current_time=None, + total_queued_requests=None, + aggregated_metrics=None, + raw_metrics=None, + last_scale_up_time=None, + last_scale_down_time=None, + ) + + # Scale up when there are 0 replicas and current_handle_queued_queries > 0 + new_num_replicas, _ = simple_custom_policy(ctx=ctx) + assert new_num_replicas == 1 + _run_upscale_downscale_flow( + simple_custom_policy, + config, + ctx, + overload_requests=70, + start_replicas=1, + upscale_target=3, + ) + + @pytest.mark.parametrize( + "current_replicas, total_requests, upscale_factor, downscale_factor, expected_replicas", + [ + # Upscale cases + # current=1, desired_raw=3 + # factor=0.5 → ceil(1 + 0.5*(3-1)) = 2 + (1, 100, 0.5, None, 2), + # factor=1.0 → ceil(1 + 1.0*(3-1)) = 3 + (1, 100, 1.0, None, 3), + # Downscale cases + # current=3, desired_raw=0 + # factor=0.5 → ceil(3 + 0.5*(0-3)) = ceil(1.5) = 2 + (3, 0, None, 0.5, 2), + # factor=1.0 → max(ceil(3 + 1.0*(0-3)),1) = 1 + (3, 0, None, 1.0, 1), + ], + ) + def test_apply_scaling_factors( + self, + current_replicas, + total_requests, + upscale_factor, + downscale_factor, + expected_replicas, + ): + """ + The test checks if the scaling factors are applied + """ + config = AutoscalingConfig( + min_replicas=0, + max_replicas=10, + upscaling_factor=upscale_factor, + downscaling_factor=downscale_factor, + upscale_delay_s=0.0, + downscale_delay_s=0.0, + downscale_to_zero_delay_s=0.0, + ) + ctx = AutoscalingContext( + config=config, + deployment_id=None, + deployment_name="test", + app_name=None, + current_num_replicas=current_replicas, + target_num_replicas=current_replicas, + running_replicas=None, + total_num_requests=0, + total_queued_requests=None, + aggregated_metrics=None, + raw_metrics=None, + capacity_adjusted_min_replicas=config.min_replicas, + capacity_adjusted_max_replicas=config.max_replicas, + policy_state={}, + last_scale_up_time=None, + last_scale_down_time=None, + current_time=None, + ) + ctx = create_context_with_overrides(ctx, total_num_requests=total_requests) + num_replicas, _ = simple_custom_policy(ctx) + assert num_replicas == expected_replicas + + +class TestAppLevelPolicyWithDefaultParameters: + def test_cold_start_fast_path(self): + """App-level decorator should cold-start immediately (0 -> 1) even with delays.""" + config = AutoscalingConfig( + min_replicas=0, + max_replicas=10, + target_ongoing_requests=10, + upscale_delay_s=20.0, + downscale_delay_s=200.0, + ) + + d1 = DeploymentID(name="d1", app_name="app") + d2 = DeploymentID(name="d2", app_name="app") + + contexts = { + d1: AutoscalingContext( + config=config, + deployment_id=d1, + deployment_name="d1", + app_name="app", + current_num_replicas=0, + target_num_replicas=0, + running_replicas=[], + total_num_requests=1, + total_queued_requests=None, + aggregated_metrics=None, + raw_metrics=None, + capacity_adjusted_min_replicas=0, + capacity_adjusted_max_replicas=10, + policy_state={}, + last_scale_up_time=None, + last_scale_down_time=None, + current_time=None, + ), + d2: AutoscalingContext( + config=config, + deployment_id=d2, + deployment_name="d2", + app_name="app", + current_num_replicas=0, + target_num_replicas=0, + running_replicas=[], + total_num_requests=1, + total_queued_requests=None, + aggregated_metrics=None, + raw_metrics=None, + capacity_adjusted_min_replicas=0, + capacity_adjusted_max_replicas=10, + policy_state={}, + last_scale_up_time=None, + last_scale_down_time=None, + current_time=None, + ), + } + + decisions, _ = simple_app_level_policy(contexts) + assert decisions[d1] == 1 + assert decisions[d2] == 1 + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__]))