Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr-test-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ jobs:
pytest-rust-2:
if: github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'run-ci')
runs-on: 4-gpu-a10
timeout-minutes: 16
timeout-minutes: 32
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
30 changes: 26 additions & 4 deletions sgl-router/src/core/workflow/steps/worker_registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,29 @@ impl StepExecutor for ActivateWorkerStep {
/// Note: Actual health check timeouts and retry attempts are configured per-worker
/// via WorkerConfigRequest (populated from router config). The timeouts and retry
/// policies here serve as workflow-level bounds to prevent infinite waiting.
pub fn create_worker_registration_workflow() -> WorkflowDefinition {
///
/// # Arguments
/// * `router_config` - Router configuration containing health check settings
pub fn create_worker_registration_workflow(
router_config: &crate::config::RouterConfig,
) -> WorkflowDefinition {
// Use health check timeout from config with 30 second buffer as workflow-level upper bound
let detect_timeout = Duration::from_secs(router_config.health_check.timeout_secs + 30);

// Calculate max_attempts to match the detect_timeout
// With Linear backoff (increment 1s, max 5s):
// - Attempts 1-5: 0s, 1s, 2s, 3s, 4s = 10s total
// - Attempts 6+: 5s each
// max_attempts = 5 + (timeout_seconds - 10) / 5
// Use 90% of timeout to leave buffer for actual connection attempts
let timeout_secs = detect_timeout.as_secs() as f64;
let effective_timeout = timeout_secs * 0.9;
let max_attempts = if effective_timeout > 10.0 {
(5 + ((effective_timeout - 10.0) / 5.0).ceil() as u32).max(3)
} else {
3
};

WorkflowDefinition::new("worker_registration", "Worker Registration")
.add_step(
StepDefinition::new(
Expand All @@ -829,14 +851,14 @@ pub fn create_worker_registration_workflow() -> WorkflowDefinition {
Arc::new(DetectConnectionModeStep),
)
.with_retry(RetryPolicy {
max_attempts: 100,
max_attempts,
backoff: BackoffStrategy::Linear {
increment: Duration::from_secs(1),
max: Duration::from_secs(5),
},
})
// Workflow-level timeout (upper bound); step uses config.health_check_timeout_secs
.with_timeout(Duration::from_secs(7200)) // 2 hours max
// Workflow-level timeout uses configured health check timeout + buffer
.with_timeout(detect_timeout)
.with_failure_action(FailureAction::FailWorkflow),
)
.add_step(
Expand Down
9 changes: 7 additions & 2 deletions sgl-router/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,14 +737,17 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err
.subscribe(Arc::new(LoggingSubscriber))
.await;

engine.register_workflow(create_worker_registration_workflow());
engine.register_workflow(create_worker_registration_workflow(&config.router_config));
engine.register_workflow(create_worker_removal_workflow());
engine.register_workflow(create_mcp_registration_workflow());
app_context
.workflow_engine
.set(engine)
.expect("WorkflowEngine should only be initialized once");
info!("Workflow engine initialized with worker and MCP registration workflows");
info!(
"Workflow engine initialized with worker and MCP registration workflows (health check timeout: {}s)",
config.router_config.health_check.timeout_secs
);

info!(
"Initializing workers for routing mode: {:?}",
Expand All @@ -764,6 +767,8 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err
.await
.map_err(|e| format!("Failed to submit worker initialization job: {}", e))?;

info!("Worker initialization job submitted (will complete in background)");

if let Some(mcp_config) = &config.router_config.mcp_config {
info!("Found {} MCP server(s) in config", mcp_config.servers.len());
let mcp_job = Job::InitializeMcpServers {
Expand Down
8 changes: 4 additions & 4 deletions sgl-router/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn create_test_context(config: RouterConfig) -> Arc<AppContext> {

let app_context = Arc::new(
AppContext::builder()
.router_config(config)
.router_config(config.clone())
.client(client)
.rate_limiter(rate_limiter)
.tokenizer(None) // tokenizer
Expand Down Expand Up @@ -104,7 +104,7 @@ pub async fn create_test_context(config: RouterConfig) -> Arc<AppContext> {
create_worker_registration_workflow, create_worker_removal_workflow, WorkflowEngine,
};
let engine = Arc::new(WorkflowEngine::new());
engine.register_workflow(create_worker_registration_workflow());
engine.register_workflow(create_worker_registration_workflow(&config));
engine.register_workflow(create_worker_removal_workflow());
app_context
.workflow_engine
Expand Down Expand Up @@ -180,7 +180,7 @@ pub async fn create_test_context_with_mcp_config(

let app_context = Arc::new(
AppContext::builder()
.router_config(config)
.router_config(config.clone())
.client(client)
.rate_limiter(rate_limiter)
.tokenizer(None) // tokenizer
Expand Down Expand Up @@ -215,7 +215,7 @@ pub async fn create_test_context_with_mcp_config(
create_worker_registration_workflow, create_worker_removal_workflow, WorkflowEngine,
};
let engine = Arc::new(WorkflowEngine::new());
engine.register_workflow(create_worker_registration_workflow());
engine.register_workflow(create_worker_registration_workflow(&config));
engine.register_workflow(create_worker_removal_workflow());
app_context
.workflow_engine
Expand Down
Loading