@@ -399,6 +399,80 @@ async def call(self):
399399 check_num_requests_eq , client = client , id = dep_id , expected = 0 , timeout = 20
400400 )
401401
402+ @pytest .mark .skipif (
403+ not RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE ,
404+ reason = "Needs metric collection at handle." ,
405+ )
406+ def test_downstream_does_not_overscale_waiting_for_upstream_args (
407+ self , serve_instance_with_signal
408+ ):
409+ client , signal = serve_instance_with_signal
410+
411+ @serve .deployment (max_ongoing_requests = 100 )
412+ class SlowUpstream :
413+ async def __call__ (self ):
414+ await signal .wait .remote ()
415+ return "result"
416+
417+ @serve .deployment (
418+ max_ongoing_requests = 5 ,
419+ autoscaling_config = {
420+ "target_ongoing_requests" : 1 ,
421+ "metrics_interval_s" : 0.1 ,
422+ "min_replicas" : 1 ,
423+ "max_replicas" : 10 ,
424+ "upscale_delay_s" : 0.2 ,
425+ "downscale_delay_s" : 0.5 ,
426+ "look_back_period_s" : 0.5 ,
427+ },
428+ )
429+ class FastDownstream :
430+ async def __call__ (self , data : str ):
431+ # Instant processing - just return
432+ return f"processed: { data } "
433+
434+ @serve .deployment (max_ongoing_requests = 100 )
435+ class Router :
436+ def __init__ (self , up : DeploymentHandle , down : DeploymentHandle ):
437+ self ._up , self ._down = up , down
438+
439+ async def __call__ (self ):
440+ # Pass upstream response directly to downstream as an argument
441+ return await self ._down .remote (self ._up .remote ())
442+
443+ handle = serve .run (Router .bind (SlowUpstream .bind (), FastDownstream .bind ()))
444+ wait_for_condition (check_num_replicas_eq , name = "FastDownstream" , target = 1 )
445+ wait_for_condition (check_num_replicas_eq , name = "SlowUpstream" , target = 1 )
446+
447+ # Send 5 requests - they will be blocked at SlowUpstream
448+ responses = [handle .remote () for _ in range (5 )]
449+
450+ # Wait for all 5 requests to be blocked at SlowUpstream (waiting on signal)
451+ wait_for_condition (lambda : ray .get (signal .cur_num_waiters .remote ()) == 5 )
452+
453+ # Key assertion: FastDownstream should NOT scale up while waiting
454+ # for upstream arguments. It should stay at 1 replica because
455+ # num_queued_requests should only be incremented AFTER arguments
456+ # are resolved.
457+ num_downstream_replicas = get_num_alive_replicas ("FastDownstream" )
458+ assert num_downstream_replicas == 1 , (
459+ f"FastDownstream over-provisioned to { num_downstream_replicas } replicas "
460+ f"while waiting for upstream arguments. Expected 1 replica."
461+ )
462+
463+ # Also verify the controller doesn't see inflated request count for downstream
464+ downstream_dep_id = DeploymentID (name = "FastDownstream" )
465+ downstream_requests = get_num_requests (client , downstream_dep_id )
466+ assert downstream_requests == 0 , (
467+ f"Controller sees { downstream_requests } requests for FastDownstream "
468+ f"while they're still blocked at SlowUpstream. Expected 0."
469+ )
470+
471+ # Release the signal to complete requests
472+ ray .get (signal .send .remote ())
473+ for r in responses :
474+ assert r .result () == "processed: result"
475+
402476
403477@pytest .mark .parametrize ("min_replicas" , [1 , 2 ])
404478@pytest .mark .parametrize ("aggregation_function" , ["mean" , "max" , "min" ])
0 commit comments