diff --git a/.github/workflows/pr-test-rust.yml b/.github/workflows/pr-test-rust.yml index 7b8c00b62b91..0ff03b49c62e 100644 --- a/.github/workflows/pr-test-rust.yml +++ b/.github/workflows/pr-test-rust.yml @@ -154,7 +154,7 @@ jobs: pytest-rust-2: if: github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'run-ci') runs-on: 4-gpu-a10 - timeout-minutes: 16 + timeout-minutes: 32 steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/sgl-router/src/core/workflow/steps/worker_registration.rs b/sgl-router/src/core/workflow/steps/worker_registration.rs index 5afd27429cc1..aad49f3956e7 100644 --- a/sgl-router/src/core/workflow/steps/worker_registration.rs +++ b/sgl-router/src/core/workflow/steps/worker_registration.rs @@ -820,7 +820,29 @@ impl StepExecutor for ActivateWorkerStep { /// Note: Actual health check timeouts and retry attempts are configured per-worker /// via WorkerConfigRequest (populated from router config). The timeouts and retry /// policies here serve as workflow-level bounds to prevent infinite waiting. -pub fn create_worker_registration_workflow() -> WorkflowDefinition { +/// +/// # Arguments +/// * `router_config` - Router configuration containing health check settings +pub fn create_worker_registration_workflow( + router_config: &crate::config::RouterConfig, +) -> WorkflowDefinition { + // Use health check timeout from config with 30 second buffer as workflow-level upper bound + let detect_timeout = Duration::from_secs(router_config.health_check.timeout_secs + 30); + + // Calculate max_attempts to match the detect_timeout + // With Linear backoff (increment 1s, max 5s): + // - Attempts 1-5: 0s, 1s, 2s, 3s, 4s = 10s total + // - Attempts 6+: 5s each + // max_attempts = 5 + (timeout_seconds - 10) / 5 + // Use 90% of timeout to leave buffer for actual connection attempts + let timeout_secs = detect_timeout.as_secs() as f64; + let effective_timeout = timeout_secs * 0.9; + let max_attempts = if effective_timeout > 10.0 { + (5 + ((effective_timeout - 10.0) / 5.0).ceil() as u32).max(3) + } else { + 3 + }; + WorkflowDefinition::new("worker_registration", "Worker Registration") .add_step( StepDefinition::new( @@ -829,14 +851,14 @@ pub fn create_worker_registration_workflow() -> WorkflowDefinition { Arc::new(DetectConnectionModeStep), ) .with_retry(RetryPolicy { - max_attempts: 100, + max_attempts, backoff: BackoffStrategy::Linear { increment: Duration::from_secs(1), max: Duration::from_secs(5), }, }) - // Workflow-level timeout (upper bound); step uses config.health_check_timeout_secs - .with_timeout(Duration::from_secs(7200)) // 2 hours max + // Workflow-level timeout uses configured health check timeout + buffer + .with_timeout(detect_timeout) .with_failure_action(FailureAction::FailWorkflow), ) .add_step( diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index 951957b9fb20..ebda889acda4 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -737,14 +737,17 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box Result<(), Box Arc { let app_context = Arc::new( AppContext::builder() - .router_config(config) + .router_config(config.clone()) .client(client) .rate_limiter(rate_limiter) .tokenizer(None) // tokenizer @@ -104,7 +104,7 @@ pub async fn create_test_context(config: RouterConfig) -> Arc { create_worker_registration_workflow, create_worker_removal_workflow, WorkflowEngine, }; let engine = Arc::new(WorkflowEngine::new()); - engine.register_workflow(create_worker_registration_workflow()); + engine.register_workflow(create_worker_registration_workflow(&config)); engine.register_workflow(create_worker_removal_workflow()); app_context .workflow_engine @@ -180,7 +180,7 @@ pub async fn create_test_context_with_mcp_config( let app_context = Arc::new( AppContext::builder() - .router_config(config) + .router_config(config.clone()) .client(client) .rate_limiter(rate_limiter) .tokenizer(None) // tokenizer @@ -215,7 +215,7 @@ pub async fn create_test_context_with_mcp_config( create_worker_registration_workflow, create_worker_removal_workflow, WorkflowEngine, }; let engine = Arc::new(WorkflowEngine::new()); - engine.register_workflow(create_worker_registration_workflow()); + engine.register_workflow(create_worker_registration_workflow(&config)); engine.register_workflow(create_worker_removal_workflow()); app_context .workflow_engine