Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/advanced_features/router.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ SGLang Model Gateway is a high-performance model-routing gateway for large-scale

### Control Plane
- **Worker Manager** discovers capabilities (`/get_server_info`, `/get_model_info`), tracks load, and registers/removes workers in the shared registry.
- **Job Queue** serializes add/remove requests and exposes status (`/workers/{url}`) so clients can track onboarding progress.
- **Job Queue** serializes add/remove requests and exposes status (`/workers/{worker_id}`) so clients can track onboarding progress.
- **Load Monitor** feeds cache-aware and power-of-two policies with live worker load statistics.
- **Health Checker** continuously probes workers and updates readiness, circuit breaker state, and router metrics.

Expand Down Expand Up @@ -171,11 +171,13 @@ curl -X POST http://localhost:30000/workers \
# Inspect registry
curl http://localhost:30000/workers

# Remove a worker
curl -X DELETE http://localhost:30000/workers/grpc%3A%2F%2F0.0.0.0%3A31000
# Remove a worker (RESTful: delete by UUID)
# Tip: POST /workers returns a JSON body containing worker_id and a Location header.
WORKER_ID="$(curl -s http://localhost:30000/workers | jq -r '.workers[0].id')"
curl -X DELETE "http://localhost:30000/workers/${WORKER_ID}"
```

Legacy endpoints (`/add_worker`, `/remove_worker`, `/list_workers`) remain available but will be deprecated. `/workers/{url}` returns both registry data and queued job status. The worker url in the removal request should be escaped.
Legacy endpoints (`/add_worker`, `/remove_worker`, `/list_workers`) remain available but will be deprecated. `/workers/{worker_id}` returns both registry data and queued job status.

---

Expand Down Expand Up @@ -337,7 +339,7 @@ Use CLI flags to select parsers:
| `GET`/`DELETE` | `/v1/conversations/{id}/items/{item_id}` | Inspect/delete conversation item. |
| `GET` | `/workers` | List registered workers with health/load. |
| `POST` | `/workers` | Queue worker registration. |
| `DELETE` | `/workers/{url}` | Queue worker removal. |
| `GET`/`PUT`/`DELETE` | `/workers/{worker_id}` | Get/update/remove a worker by UUID. |
| `POST` | `/flush_cache` | Flush worker caches (HTTP workers). |
| `GET` | `/get_loads` | Retrieve worker load snapshot. |
| `GET` | `/liveness` / `/readiness` / `/health` | Health probes. |
Expand Down
13 changes: 7 additions & 6 deletions sgl-model-gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ High-performance model routing control and data plane for large-scale LLM deploy
### Architecture at a Glance
**Control Plane**
- Worker Manager validates workers, discovers capabilities, and keeps the registry in sync.
- Job Queue serializes background operations (add/remove) and exposes status via `/workers/{url}`.
- Job Queue serializes background operations (add/remove) and exposes status via `/workers/{worker_id}`.
- Background health checker and load monitor keep circuit breakers and policies informed.
- Optional Kubernetes service discovery keeps the registry aligned with pods.

Expand Down Expand Up @@ -186,8 +186,8 @@ Sample response (http workers):
```json
{
"workers": [
{"id":"http://0.0.0.0:31378","url":"http://0.0.0.0:31378","model_id":"mistral","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"},
{"id":"http://0.0.0.0:34881","url":"http://0.0.0.0:34881","model_id":"llama3","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"}
{"id":"2f3a0c3e-3a7b-4c3f-8c70-1b7d4c3a6e1f","url":"http://0.0.0.0:31378","model_id":"mistral","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"},
{"id":"9b0f6c2a-1c4f-4c2a-9f4a-1f2a6c0b9d3e","url":"http://0.0.0.0:34881","model_id":"llama3","priority":50,"cost":1.0,"worker_type":"regular","is_healthy":true,"load":0,"connection_mode":"Http"}
],
"total": 2,
"stats": {
Expand All @@ -197,7 +197,7 @@ Sample response (http workers):
}
}
```
Add more workers with the same API; include optional `labels` (for per-model policies) or `tokenizer_path` / `reasoning_parser` / `tool_parser` fields as needed. `/workers/{url}` exposes queued job status while background jobs finalize registration.
Add more workers with the same API; include optional `labels` (for per-model policies) or `tokenizer_path` / `reasoning_parser` / `tool_parser` fields as needed. `/workers/{worker_id}` exposes queued job status while background jobs finalize registration.

### gRPC Routing
- **Rust binary**
Expand Down Expand Up @@ -406,8 +406,9 @@ Use upstream SGLang binaries to start dedicated worker processes.
|----------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| `POST` | `/workers` | Queue worker registration (prefill/decode/regular). Body matches `WorkerConfigRequest`. Returns `202 Accepted` while the job queue processes the request. |
| `GET` | `/workers` | List workers with health, load, policy metadata, and queued job status. |
| `GET` | `/workers/{url}` | Inspect a specific worker or job queue entry. |
| `DELETE` | `/workers/{url}` | Queue worker removal. |
| `GET` | `/workers/{worker_id}` | Inspect a specific worker or job queue entry (UUID). |
| `PUT` | `/workers/{worker_id}` | Queue worker update by UUID. |
| `DELETE` | `/workers/{worker_id}` | Queue worker removal by UUID. |
| `POST` | `/flush_cache` | Trigger cache flush across HTTP workers with success/failure breakdown. |
| `GET` | `/get_loads` | Sample current load reported by each worker. |

Expand Down
18 changes: 14 additions & 4 deletions sgl-model-gateway/py_test/e2e_http/test_regular_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ def _wait_for_workers(
)


def _get_worker_id_by_url(base_url: str, worker_url: str, headers: dict = None) -> str:
r = requests.get(f"{base_url}/workers", headers=headers, timeout=10)
r.raise_for_status()
workers = r.json().get("workers", [])
worker_id = next((w.get("id") for w in workers if w.get("url") == worker_url), None)
if not worker_id:
raise RuntimeError(
f"Could not find worker_id for url={worker_url}. workers={workers}"
)
return worker_id


@pytest.mark.e2e
def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model):
# Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance
Expand Down Expand Up @@ -111,10 +123,8 @@ def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_
r.raise_for_status()

# Remove the worker
from urllib.parse import quote

encoded_url = quote(worker_url, safe="")
r = requests.delete(f"{base}/workers/{encoded_url}", timeout=60)
worker_id = _get_worker_id_by_url(base, worker_url)
r = requests.delete(f"{base}/workers/{worker_id}", timeout=60)
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"


Expand Down
28 changes: 19 additions & 9 deletions sgl-model-gateway/py_test/fixtures/router_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,16 @@ def add_worker(self, base_url: str, worker_url: str, timeout: float = 30.0) -> N
r.status_code == 202
), f"add_worker failed: {r.status_code} {r.text}" # ACCEPTED status

# Poll until worker is actually added and healthy
from urllib.parse import quote
payload = r.json()
worker_id = payload.get("worker_id")
assert worker_id, f"add_worker did not return worker_id: {payload}"

encoded_url = quote(worker_url, safe="")
# Poll until worker is actually added and healthy
start = time.time()
with requests.Session() as s:
while time.time() - start < timeout:
try:
r = s.get(f"{base_url}/workers/{encoded_url}", timeout=2)
r = s.get(f"{base_url}/workers/{worker_id}", timeout=2)
if r.status_code == 200:
data = r.json()
# Check if registration job failed
Expand All @@ -179,11 +180,20 @@ def add_worker(self, base_url: str, worker_url: str, timeout: float = 30.0) -> N
def remove_worker(
self, base_url: str, worker_url: str, timeout: float = 30.0
) -> None:
# URL encode the worker_url for path parameter
from urllib.parse import quote
# Resolve worker_id from the current registry snapshot
r_list = requests.get(f"{base_url}/workers")
assert (
r_list.status_code == 200
), f"list_workers failed: {r_list.status_code} {r_list.text}"
workers = r_list.json().get("workers", [])
worker_id = next(
(w.get("id") for w in workers if w.get("url") == worker_url), None
)
assert (
worker_id
), f"could not find worker_id for url={worker_url}. workers={workers}"

encoded_url = quote(worker_url, safe="")
r = requests.delete(f"{base_url}/workers/{encoded_url}")
r = requests.delete(f"{base_url}/workers/{worker_id}")
assert (
r.status_code == 202
), f"remove_worker failed: {r.status_code} {r.text}" # ACCEPTED status
Expand All @@ -194,7 +204,7 @@ def remove_worker(
with requests.Session() as s:
while time.time() - start < timeout:
try:
r = s.get(f"{base_url}/workers/{encoded_url}", timeout=2)
r = s.get(f"{base_url}/workers/{worker_id}", timeout=2)
if r.status_code == 404:
# Worker successfully removed
return
Expand Down
20 changes: 20 additions & 0 deletions sgl-model-gateway/src/core/worker_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ impl WorkerRegistry {
worker_id
}

/// Reserve (or retrieve) a stable UUID for a worker URL.
pub fn reserve_id_for_url(&self, url: &str) -> WorkerId {
if let Some(existing_id) = self.url_to_id.get(url) {
return existing_id.clone();
}
let worker_id = WorkerId::new();
self.url_to_id.insert(url.to_string(), worker_id.clone());
worker_id
Comment on lines +126 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's a race condition in this function. If two threads call reserve_id_for_url for the same new URL concurrently, both can pass the if let Some(...) check, generate a new WorkerId, and insert it into the map. The last one to insert will win, but both threads will return a WorkerId, one of which will be for a mapping that was immediately overwritten. This can lead to inconsistent state.

To fix this and make the operation atomic, you can use the entry API of DashMap.

Suggested change
if let Some(existing_id) = self.url_to_id.get(url) {
return existing_id.clone();
}
let worker_id = WorkerId::new();
self.url_to_id.insert(url.to_string(), worker_id.clone());
worker_id
self.url_to_id
.entry(url.to_string())
.or_insert_with(WorkerId::new)
.clone()

}

/// Best-effort lookup of the URL for a given worker ID.
pub fn get_url_by_id(&self, worker_id: &WorkerId) -> Option<String> {
if let Some(worker) = self.get(worker_id) {
return Some(worker.url().to_string());
}
self.url_to_id
.iter()
.find_map(|entry| (entry.value() == worker_id).then(|| entry.key().clone()))
Comment on lines +139 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fallback logic in this function iterates over the url_to_id map to find a URL by its worker ID. This is an O(N) operation and can become inefficient if the number of workers is large.

For better performance, consider adding a reverse mapping, e.g., id_to_url: Arc<DashMap<WorkerId, String>>. This would make the lookup an O(1) operation. You would need to update this new map in reserve_id_for_url and remove functions accordingly.

}

/// Remove a worker by ID
pub fn remove(&self, worker_id: &WorkerId) -> Option<Arc<dyn Worker>> {
if let Some((_, worker)) = self.workers.remove(worker_id) {
Expand Down
Loading
Loading