diff --git a/docs/advanced_features/router.md b/docs/advanced_features/router.md index 2d068a51f8cf..e01288fe2fbe 100644 --- a/docs/advanced_features/router.md +++ b/docs/advanced_features/router.md @@ -46,7 +46,7 @@ SGLang Model Gateway is a high-performance model-routing gateway for large-scale ### Control Plane - **Worker Manager** discovers capabilities (`/get_server_info`, `/get_model_info`), tracks load, and registers/removes workers in the shared registry. -- **Job Queue** serializes add/remove requests and exposes status (`/workers/{url}`) so clients can track onboarding progress. +- **Job Queue** serializes add/remove requests and exposes status (`/workers/{worker_id}`) so clients can track onboarding progress. - **Load Monitor** feeds cache-aware and power-of-two policies with live worker load statistics. - **Health Checker** continuously probes workers and updates readiness, circuit breaker state, and router metrics. @@ -171,11 +171,13 @@ curl -X POST http://localhost:30000/workers \ # Inspect registry curl http://localhost:30000/workers -# Remove a worker -curl -X DELETE http://localhost:30000/workers/grpc%3A%2F%2F0.0.0.0%3A31000 +# Remove a worker (RESTful: delete by UUID) +# Tip: POST /workers returns a JSON body containing worker_id and a Location header. +WORKER_ID="$(curl -s http://localhost:30000/workers | jq -r '.workers[0].id')" +curl -X DELETE "http://localhost:30000/workers/${WORKER_ID}" ``` -Legacy endpoints (`/add_worker`, `/remove_worker`, `/list_workers`) remain available but will be deprecated. `/workers/{url}` returns both registry data and queued job status. The worker url in the removal request should be escaped. +Legacy endpoints (`/add_worker`, `/remove_worker`, `/list_workers`) remain available but will be deprecated. `/workers/{worker_id}` returns both registry data and queued job status. --- @@ -337,7 +339,7 @@ Use CLI flags to select parsers: | `GET`/`DELETE` | `/v1/conversations/{id}/items/{item_id}` | Inspect/delete conversation item. | | `GET` | `/workers` | List registered workers with health/load. | | `POST` | `/workers` | Queue worker registration. | -| `DELETE` | `/workers/{url}` | Queue worker removal. | +| `GET`/`PUT`/`DELETE` | `/workers/{worker_id}` | Get/update/remove a worker by UUID. | | `POST` | `/flush_cache` | Flush worker caches (HTTP workers). | | `GET` | `/get_loads` | Retrieve worker load snapshot. | | `GET` | `/liveness` / `/readiness` / `/health` | Health probes. | diff --git a/sgl-model-gateway/README.md b/sgl-model-gateway/README.md index acb83ba600b5..1f562de7d907 100644 --- a/sgl-model-gateway/README.md +++ b/sgl-model-gateway/README.md @@ -14,7 +14,7 @@ High-performance model routing control and data plane for large-scale LLM deploy ### Architecture at a Glance **Control Plane** - Worker Manager validates workers, discovers capabilities, and keeps the registry in sync. -- Job Queue serializes background operations (add/remove) and exposes status via `/workers/{url}`. +- Job Queue serializes background operations (add/remove) and exposes status via `/workers/{worker_id}`. - Background health checker and load monitor keep circuit breakers and policies informed. - Optional Kubernetes service discovery keeps the registry aligned with pods. @@ -186,8 +186,8 @@ Sample response (http workers): ```json { "workers": [ - {"id":"http://0.0.0.0:31378","url":"http://0.0.0.0:31378","model_id":"mistral","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"}, - {"id":"http://0.0.0.0:34881","url":"http://0.0.0.0:34881","model_id":"llama3","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"} + {"id":"2f3a0c3e-3a7b-4c3f-8c70-1b7d4c3a6e1f","url":"http://0.0.0.0:31378","model_id":"mistral","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"}, + {"id":"9b0f6c2a-1c4f-4c2a-9f4a-1f2a6c0b9d3e","url":"http://0.0.0.0:34881","model_id":"llama3","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"} ], "total": 2, "stats": { @@ -197,7 +197,7 @@ Sample response (http workers): } } ``` -Add more workers with the same API; include optional `labels` (for per-model policies) or `tokenizer_path` / `reasoning_parser` / `tool_parser` fields as needed. `/workers/{url}` exposes queued job status while background jobs finalize registration. +Add more workers with the same API; include optional `labels` (for per-model policies) or `tokenizer_path` / `reasoning_parser` / `tool_parser` fields as needed. `/workers/{worker_id}` exposes queued job status while background jobs finalize registration. ### gRPC Routing - **Rust binary** @@ -406,8 +406,9 @@ Use upstream SGLang binaries to start dedicated worker processes. |----------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| | `POST` | `/workers` | Queue worker registration (prefill/decode/regular). Body matches `WorkerConfigRequest`. Returns `202 Accepted` while the job queue processes the request. | | `GET` | `/workers` | List workers with health, load, policy metadata, and queued job status. | -| `GET` | `/workers/{url}` | Inspect a specific worker or job queue entry. | -| `DELETE` | `/workers/{url}` | Queue worker removal. | +| `GET` | `/workers/{worker_id}` | Inspect a specific worker or job queue entry (UUID). | +| `PUT` | `/workers/{worker_id}` | Queue worker update by UUID. | +| `DELETE` | `/workers/{worker_id}` | Queue worker removal by UUID. | | `POST` | `/flush_cache` | Trigger cache flush across HTTP workers with success/failure breakdown. | | `GET` | `/get_loads` | Sample current load reported by each worker. | diff --git a/sgl-model-gateway/py_test/e2e_http/test_regular_router.py b/sgl-model-gateway/py_test/e2e_http/test_regular_router.py index effb39ef4c52..f4707f61542f 100644 --- a/sgl-model-gateway/py_test/e2e_http/test_regular_router.py +++ b/sgl-model-gateway/py_test/e2e_http/test_regular_router.py @@ -29,6 +29,18 @@ def _wait_for_workers( ) +def _get_worker_id_by_url(base_url: str, worker_url: str, headers: dict = None) -> str: + r = requests.get(f"{base_url}/workers", headers=headers, timeout=10) + r.raise_for_status() + workers = r.json().get("workers", []) + worker_id = next((w.get("id") for w in workers if w.get("url") == worker_url), None) + if not worker_id: + raise RuntimeError( + f"Could not find worker_id for url={worker_url}. workers={workers}" + ) + return worker_id + + @pytest.mark.e2e def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model): # Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance @@ -111,10 +123,8 @@ def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_ r.raise_for_status() # Remove the worker - from urllib.parse import quote - - encoded_url = quote(worker_url, safe="") - r = requests.delete(f"{base}/workers/{encoded_url}", timeout=60) + worker_id = _get_worker_id_by_url(base, worker_url) + r = requests.delete(f"{base}/workers/{worker_id}", timeout=60) assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}" diff --git a/sgl-model-gateway/py_test/fixtures/router_manager.py b/sgl-model-gateway/py_test/fixtures/router_manager.py index 576e00b2ef8b..10d0b631f5e4 100644 --- a/sgl-model-gateway/py_test/fixtures/router_manager.py +++ b/sgl-model-gateway/py_test/fixtures/router_manager.py @@ -148,15 +148,16 @@ def add_worker(self, base_url: str, worker_url: str, timeout: float = 30.0) -> N r.status_code == 202 ), f"add_worker failed: {r.status_code} {r.text}" # ACCEPTED status - # Poll until worker is actually added and healthy - from urllib.parse import quote + payload = r.json() + worker_id = payload.get("worker_id") + assert worker_id, f"add_worker did not return worker_id: {payload}" - encoded_url = quote(worker_url, safe="") + # Poll until worker is actually added and healthy start = time.time() with requests.Session() as s: while time.time() - start < timeout: try: - r = s.get(f"{base_url}/workers/{encoded_url}", timeout=2) + r = s.get(f"{base_url}/workers/{worker_id}", timeout=2) if r.status_code == 200: data = r.json() # Check if registration job failed @@ -179,11 +180,20 @@ def add_worker(self, base_url: str, worker_url: str, timeout: float = 30.0) -> N def remove_worker( self, base_url: str, worker_url: str, timeout: float = 30.0 ) -> None: - # URL encode the worker_url for path parameter - from urllib.parse import quote + # Resolve worker_id from the current registry snapshot + r_list = requests.get(f"{base_url}/workers") + assert ( + r_list.status_code == 200 + ), f"list_workers failed: {r_list.status_code} {r_list.text}" + workers = r_list.json().get("workers", []) + worker_id = next( + (w.get("id") for w in workers if w.get("url") == worker_url), None + ) + assert ( + worker_id + ), f"could not find worker_id for url={worker_url}. workers={workers}" - encoded_url = quote(worker_url, safe="") - r = requests.delete(f"{base_url}/workers/{encoded_url}") + r = requests.delete(f"{base_url}/workers/{worker_id}") assert ( r.status_code == 202 ), f"remove_worker failed: {r.status_code} {r.text}" # ACCEPTED status @@ -194,7 +204,7 @@ def remove_worker( with requests.Session() as s: while time.time() - start < timeout: try: - r = s.get(f"{base_url}/workers/{encoded_url}", timeout=2) + r = s.get(f"{base_url}/workers/{worker_id}", timeout=2) if r.status_code == 404: # Worker successfully removed return diff --git a/sgl-model-gateway/src/core/worker_registry.rs b/sgl-model-gateway/src/core/worker_registry.rs index 4ae766aa2000..2d16bbd14486 100644 --- a/sgl-model-gateway/src/core/worker_registry.rs +++ b/sgl-model-gateway/src/core/worker_registry.rs @@ -121,6 +121,26 @@ impl WorkerRegistry { worker_id } + /// Reserve (or retrieve) a stable UUID for a worker URL. + pub fn reserve_id_for_url(&self, url: &str) -> WorkerId { + if let Some(existing_id) = self.url_to_id.get(url) { + return existing_id.clone(); + } + let worker_id = WorkerId::new(); + self.url_to_id.insert(url.to_string(), worker_id.clone()); + worker_id + } + + /// Best-effort lookup of the URL for a given worker ID. + pub fn get_url_by_id(&self, worker_id: &WorkerId) -> Option { + if let Some(worker) = self.get(worker_id) { + return Some(worker.url().to_string()); + } + self.url_to_id + .iter() + .find_map(|entry| (entry.value() == worker_id).then(|| entry.key().clone())) + } + /// Remove a worker by ID pub fn remove(&self, worker_id: &WorkerId) -> Option> { if let Some((_, worker)) = self.workers.remove(worker_id) { diff --git a/sgl-model-gateway/src/server.rs b/sgl-model-gateway/src/server.rs index 3bc0d13b8df8..f64cd6a64c07 100644 --- a/sgl-model-gateway/src/server.rs +++ b/sgl-model-gateway/src/server.rs @@ -19,6 +19,7 @@ use serde::Deserialize; use serde_json::{json, Value}; use tokio::{signal, spawn}; use tracing::{debug, error, info, warn, Level}; +use uuid::Uuid; use crate::{ app_context::AppContext, @@ -469,6 +470,10 @@ async fn create_worker( // Submit job for async processing let worker_url = config.url.clone(); + let worker_id = state + .context + .worker_registry + .reserve_id_for_url(&worker_url); let job = Job::AddWorker { config: Box::new(config), }; @@ -480,12 +485,20 @@ async fn create_worker( .expect("JobQueue not initialized"); match job_queue.submit(job).await { Ok(_) => { + let location = format!("/workers/{}", worker_id.as_str()); let response = json!({ "status": "accepted", - "worker_id": worker_url, + "worker_id": worker_id.as_str(), + "url": worker_url, + "location": location, "message": "Worker addition queued for background processing" }); - (StatusCode::ACCEPTED, Json(response)).into_response() + ( + StatusCode::ACCEPTED, + [(http::header::LOCATION, location)], + Json(response), + ) + .into_response() } Err(error) => { let error_response = WorkerErrorResponse { @@ -498,8 +511,15 @@ async fn create_worker( } async fn list_workers_rest(State(state): State>) -> Response { - let workers = state.context.worker_registry.get_all(); - let worker_infos: Vec = workers.iter().map(worker_to_info).collect(); + let workers = state.context.worker_registry.get_all_with_ids(); + let worker_infos: Vec = workers + .iter() + .map(|(worker_id, worker)| { + let mut info = worker_to_info(worker); + info.id = worker_id.as_str().to_string(); + info + }) + .collect(); let response = json!({ "workers": worker_infos, @@ -513,57 +533,102 @@ async fn list_workers_rest(State(state): State>) -> Response { Json(response).into_response() } -async fn get_worker(State(state): State>, Path(url): Path) -> Response { +fn parse_worker_id(raw: &str) -> Result { + Uuid::parse_str(raw) + .map_err(|e| format!("Invalid worker_id '{raw}' (expected UUID). Error: {e}"))?; + Ok(crate::core::WorkerId::from_string(raw.to_string())) +} + +async fn get_worker( + State(state): State>, + Path(worker_id_raw): Path, +) -> Response { + let worker_id = match parse_worker_id(&worker_id_raw) { + Ok(id) => id, + Err(msg) => { + let error = WorkerErrorResponse { + error: msg, + code: "BAD_REQUEST".to_string(), + }; + return (StatusCode::BAD_REQUEST, Json(error)).into_response(); + } + }; + let job_queue = state .context .worker_job_queue .get() .expect("JobQueue not initialized"); - if let Some(worker) = state.context.worker_registry.get_by_url(&url) { + if let Some(worker) = state.context.worker_registry.get(&worker_id) { // Worker exists in registry, get its full info and attach job status if any + let worker_url = worker.url().to_string(); let mut worker_info = worker_to_info(&worker); - if let Some(status) = job_queue.get_status(&url) { + worker_info.id = worker_id.as_str().to_string(); + if let Some(status) = job_queue.get_status(&worker_url) { worker_info.job_status = Some(status); } return Json(worker_info).into_response(); } - // Worker not in registry, check job queue for its status - if let Some(status) = job_queue.get_status(&url) { - // Create a partial WorkerInfo to report the job status - let worker_info = WorkerInfo { - id: url.clone(), - url: url.clone(), - model_id: "unknown".to_string(), - priority: 0, - cost: 1.0, - worker_type: "unknown".to_string(), - is_healthy: false, - load: 0, - connection_mode: "unknown".to_string(), - runtime_type: None, - tokenizer_path: None, - reasoning_parser: None, - tool_parser: None, - chat_template: None, - bootstrap_port: None, - metadata: HashMap::new(), - job_status: Some(status), - }; - return Json(worker_info).into_response(); + // Worker not in registry yet. If we can map id -> url (reserved IDs), return job status. + if let Some(worker_url) = state.context.worker_registry.get_url_by_id(&worker_id) { + if let Some(status) = job_queue.get_status(&worker_url) { + // Create a partial WorkerInfo to report the job status + let worker_info = WorkerInfo { + id: worker_id.as_str().to_string(), + url: worker_url.clone(), + model_id: "unknown".to_string(), + priority: 0, + cost: 1.0, + worker_type: "unknown".to_string(), + is_healthy: false, + load: 0, + connection_mode: "unknown".to_string(), + runtime_type: None, + tokenizer_path: None, + reasoning_parser: None, + tool_parser: None, + chat_template: None, + bootstrap_port: None, + metadata: HashMap::new(), + job_status: Some(status), + }; + return Json(worker_info).into_response(); + } } // Worker not found in registry or job queue let error = WorkerErrorResponse { - error: format!("Worker {url} not found"), + error: format!("Worker {worker_id_raw} not found"), code: "WORKER_NOT_FOUND".to_string(), }; (StatusCode::NOT_FOUND, Json(error)).into_response() } -async fn delete_worker(State(state): State>, Path(url): Path) -> Response { - let worker_id = url.clone(); +async fn delete_worker( + State(state): State>, + Path(worker_id_raw): Path, +) -> Response { + let worker_id = match parse_worker_id(&worker_id_raw) { + Ok(id) => id, + Err(msg) => { + let error = WorkerErrorResponse { + error: msg, + code: "BAD_REQUEST".to_string(), + }; + return (StatusCode::BAD_REQUEST, Json(error)).into_response(); + } + }; + + let Some(url) = state.context.worker_registry.get_url_by_id(&worker_id) else { + let error = WorkerErrorResponse { + error: format!("Worker {worker_id_raw} not found"), + code: "WORKER_NOT_FOUND".to_string(), + }; + return (StatusCode::NOT_FOUND, Json(error)).into_response(); + }; + let job = Job::RemoveWorker { url }; let job_queue = state @@ -575,7 +640,7 @@ async fn delete_worker(State(state): State>, Path(url): Path { let response = json!({ "status": "accepted", - "worker_id": worker_id, + "worker_id": worker_id.as_str(), "message": "Worker removal queued for background processing" }); (StatusCode::ACCEPTED, Json(response)).into_response() @@ -592,10 +657,28 @@ async fn delete_worker(State(state): State>, Path(url): Path>, - Path(url): Path, + Path(worker_id_raw): Path, Json(update): Json, ) -> Response { - let worker_id = url.clone(); + let worker_id = match parse_worker_id(&worker_id_raw) { + Ok(id) => id, + Err(msg) => { + let error = WorkerErrorResponse { + error: msg, + code: "BAD_REQUEST".to_string(), + }; + return (StatusCode::BAD_REQUEST, Json(error)).into_response(); + } + }; + + let Some(url) = state.context.worker_registry.get_url_by_id(&worker_id) else { + let error = WorkerErrorResponse { + error: format!("Worker {worker_id_raw} not found"), + code: "WORKER_NOT_FOUND".to_string(), + }; + return (StatusCode::NOT_FOUND, Json(error)).into_response(); + }; + let job = Job::UpdateWorker { url, update: Box::new(update), @@ -610,7 +693,7 @@ async fn update_worker( Ok(_) => { let response = json!({ "status": "accepted", - "worker_id": worker_id, + "worker_id": worker_id.as_str(), "message": "Worker update queued for background processing" }); (StatusCode::ACCEPTED, Json(response)).into_response() @@ -717,10 +800,9 @@ pub fn build_app( )); let worker_routes = Router::new() - .route("/workers", post(create_worker)) - .route("/workers", get(list_workers_rest)) + .route("/workers", post(create_worker).get(list_workers_rest)) .route( - "/workers/{url}", + "/workers/{worker_id}", get(get_worker).put(update_worker).delete(delete_worker), ) .route_layer(axum::middleware::from_fn_with_state(