Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8748aa1
[monitor] add more metrics
Dec 24, 2025
0148d75
[bugfix] change multiprocess_mode from `sum` to `mostrecent`
Dec 25, 2025
b887199
[bugfix] remove redundant metric `cache_hit`
Dec 25, 2025
5d1bc8b
[bugfix] remove redundant metrics `cache_all`
Dec 25, 2025
c12a5c8
[bugfix] remove unused metrics `prefill_run_batch_time`
Dec 25, 2025
99cb07e
sync latest code
Dec 25, 2025
d3a3923
fix missing import statement
Dec 25, 2025
d7944ab
fix undefined variable `server_args`
Dec 25, 2025
7659c6e
format code by pre-commit
Dec 25, 2025
a9ea8ce
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 25, 2025
17924a5
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 25, 2025
3ce41bc
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 25, 2025
838af83
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 25, 2025
2ca1768
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 26, 2025
1c83aa4
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 26, 2025
bfe286a
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 26, 2025
2f47ef5
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 26, 2025
e7e1da2
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 29, 2025
375035e
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 29, 2025
c0ef44c
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 30, 2025
cc63e11
Merge branch 'main' into pr-monitor
Baidu-AIAK Dec 31, 2025
667f97b
Merge branch 'main' into pr-monitor
Baidu-AIAK Jan 4, 2026
0ec9123
Merge branch 'main' into pr-monitor
Baidu-AIAK Jan 4, 2026
9d583e3
Merge branch 'main' into pr-monitor
Baidu-AIAK Jan 5, 2026
36da24e
[Bugfix] `init_metrics `deps on attribute defined in `init_model_worker`
Jan 7, 2026
b1754bd
Merge branch 'pr-monitor' into pr-monitor-remote
Jan 7, 2026
b9c84c2
[Bugfix]`init_ipc_channels`deps on attribute defined in `init_metrics`
Jan 7, 2026
956c16f
[bugfix] `current_scheduler_metrics_enabled` is a bool object
Jan 7, 2026
29e37c7
[bugfix]fix only DP0 log metrics`run_batch_time` and `equest_first_to…
Jan 9, 2026
898d763
[bugfix] recovery comment and use `perf_counter` instead of `time`
Jan 12, 2026
0754208
Merge remote-tracking branch 'community/main' into pr-monitor
Jan 12, 2026
397c4f2
Merge branch 'main' into pr-monitor
Baidu-AIAK Jan 12, 2026
b510bff
split huge `log_decode_stats` into sub `log_decode_*`
Jan 12, 2026
6a02ade
Merge branch 'pr-monitor' of https://github.com/Baidu-AIAK/sglang int…
Jan 13, 2026
a882c92
Merge branch 'main' into pr-monitor
Baidu-AIAK Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/sglang/srt/disaggregation/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ def event_loop_normal_disagg_decode(self: Scheduler):

while True:
# Receive requests
self.iter_start_time = time.perf_counter()
recv_reqs = self.recv_requests()
self.process_input_requests(recv_reqs)
# polling and allocating kv cache
Expand Down Expand Up @@ -863,6 +864,7 @@ def event_loop_overlap_disagg_decode(self: Scheduler):

while True:
# Receive requests
self.iter_start_time = time.perf_counter()
recv_reqs = self.recv_requests()
self.process_input_requests(recv_reqs)
# polling and allocating kv cache
Expand Down Expand Up @@ -980,6 +982,10 @@ def get_new_prebuilt_batch(self: Scheduler) -> Optional[ScheduleBatch]:

for req in can_run_list:
req.time_stats.forward_entry_time = time.perf_counter()
if self.enable_metrics:
self.metrics_collector.observe_request_waiting_time(
req.time_stats.get_request_waiting_time(),
)

# construct a schedule batch with those requests and mark as decode
new_batch = ScheduleBatch.init_new(
Expand Down
16 changes: 16 additions & 0 deletions python/sglang/srt/disaggregation/prefill.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,15 @@ def process_batch_result_disagg_prefill(
logits_output.input_token_logprobs.tolist()
)

if self.enable_metrics:
self.iter_forward_finish_time = time.time()
run_batch_time = (
self.iter_forward_finish_time - self.iter_forward_start_time
)
self.stats.run_batch_time = run_batch_time
self.metrics_collector.log_stats(self.stats)

hidden_state_offset = 0
for i, (req, next_token_id) in enumerate(
zip(batch.reqs, next_token_ids, strict=True)
):
Expand Down Expand Up @@ -519,6 +528,9 @@ def process_batch_result_disagg_prefill(
RequestStage.PREFILL_CHUNKED_FORWARD, req.rid, auto_next_anon=True
)

# Log DP-level prefill load-balancing metrics
if self.current_scheduler_metrics_enabled:
self.log_prefill_dp_balance_stats(batch)
self.maybe_send_health_check_signal()

def process_disagg_prefill_inflight_queue(
Expand Down Expand Up @@ -577,6 +589,10 @@ def process_disagg_prefill_inflight_queue(

for req in done_reqs:
req.time_stats.completion_time = time.perf_counter()
if self.enable_metrics:
self.metrics_collector.observe_request_first_token_forward_time(
req.time_stats.get_request_first_token_forward_time()
)

# Stream requests which have finished transfer
self.stream_output(
Expand Down
8 changes: 6 additions & 2 deletions python/sglang/srt/entrypoints/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,8 +1206,10 @@ async def continue_generation(obj: ContinueGenerationReqInput, request: Request)
@app.post("/v1/completions", dependencies=[Depends(validate_json_request)])
async def openai_v1_completions(request: CompletionRequest, raw_request: Request):
"""OpenAI-compatible text completion endpoint."""
# Timestamp when the HTTP request is received and handed off to the tokenizer
tokenizer_rev_request_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat redundant with #13432

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat redundant with #13432

Yes, I think this variable is essentially no different from received_time, and it can also be used for embedding requests placed within the serving_base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat redundant with #13432

Yes, I think this variable is essentially no different from received_time, and it can also be used for embedding requests placed within the serving_base.

yes, it's redundant. I will reuse received_time

return await raw_request.app.state.openai_serving_completion.handle_request(
request, raw_request
request, raw_request, tokenizer_rev_request_time
)


Expand All @@ -1216,8 +1218,10 @@ async def openai_v1_chat_completions(
request: ChatCompletionRequest, raw_request: Request
):
"""OpenAI-compatible chat completion endpoint."""
# Timestamp when the HTTP request is received and handed off to the tokenizer
tokenizer_rev_request_time = time.time()
return await raw_request.app.state.openai_serving_chat.handle_request(
request, raw_request
request, raw_request, tokenizer_rev_request_time
)


Expand Down
15 changes: 12 additions & 3 deletions python/sglang/srt/entrypoints/openai/serving_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def _validate_lora_enabled(self, adapter_name: str) -> None:
)

async def handle_request(
self, request: OpenAIServingRequest, raw_request: Request
self,
request: OpenAIServingRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> Union[Any, StreamingResponse, ErrorResponse]:
"""Handle the specific request type with common pattern
If you want to override this method, you should be careful to record the validation time.
Expand Down Expand Up @@ -114,11 +117,17 @@ async def handle_request(
# Note(Xinyuan): raw_request below is only used for detecting the connection of the client
if hasattr(request, "stream") and request.stream:
return await self._handle_streaming_request(
adapted_request, processed_request, raw_request
adapted_request,
processed_request,
raw_request,
tokenizer_rev_request_time,
)
else:
return await self._handle_non_streaming_request(
adapted_request, processed_request, raw_request
adapted_request,
processed_request,
raw_request,
tokenizer_rev_request_time,
)
except HTTPException as e:
return self.create_error_response(
Expand Down
11 changes: 8 additions & 3 deletions python/sglang/srt/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,13 @@ async def _handle_streaming_request(
adapted_request: GenerateReqInput,
request: ChatCompletionRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> StreamingResponse:
"""Handle streaming chat completion request"""
return StreamingResponse(
self._generate_chat_stream(adapted_request, request, raw_request),
self._generate_chat_stream(
adapted_request, request, raw_request, tokenizer_rev_request_time
),
media_type="text/event-stream",
background=self.tokenizer_manager.create_abort_task(adapted_request),
)
Expand All @@ -530,6 +533,7 @@ async def _generate_chat_stream(
adapted_request: GenerateReqInput,
request: ChatCompletionRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> AsyncGenerator[str, None]:
"""Generate streaming chat completion response"""
# Parsers for tool calls and reasoning
Expand All @@ -551,7 +555,7 @@ async def _generate_chat_stream(

try:
async for content in self.tokenizer_manager.generate_request(
adapted_request, raw_request
adapted_request, raw_request, tokenizer_rev_request_time
):
index = content.get("index", 0)

Expand Down Expand Up @@ -769,11 +773,12 @@ async def _handle_non_streaming_request(
adapted_request: GenerateReqInput,
request: ChatCompletionRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> Union[ChatCompletionResponse, ErrorResponse, ORJSONResponse]:
"""Handle non-streaming chat completion request"""
try:
ret = await self.tokenizer_manager.generate_request(
adapted_request, raw_request
adapted_request, raw_request, tokenizer_rev_request_time
).__anext__()
except ValueError as e:
return self.create_error_response(str(e))
Expand Down
11 changes: 8 additions & 3 deletions python/sglang/srt/entrypoints/openai/serving_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,13 @@ async def _handle_streaming_request(
adapted_request: GenerateReqInput,
request: CompletionRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> StreamingResponse:
"""Handle streaming completion request"""
return StreamingResponse(
self._generate_completion_stream(adapted_request, request, raw_request),
self._generate_completion_stream(
adapted_request, request, raw_request, tokenizer_rev_request_time
),
media_type="text/event-stream",
background=self.tokenizer_manager.create_abort_task(adapted_request),
)
Expand All @@ -190,6 +193,7 @@ async def _generate_completion_stream(
adapted_request: GenerateReqInput,
request: CompletionRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> AsyncGenerator[str, None]:
"""Generate streaming completion response"""
created = int(time.time())
Expand All @@ -206,7 +210,7 @@ async def _generate_completion_stream(

try:
async for content in self.tokenizer_manager.generate_request(
adapted_request, raw_request
adapted_request, raw_request, tokenizer_rev_request_time
):
index = content.get("index", 0)

Expand Down Expand Up @@ -341,11 +345,12 @@ async def _handle_non_streaming_request(
adapted_request: GenerateReqInput,
request: CompletionRequest,
raw_request: Request,
tokenizer_rev_request_time: Optional[float] = None,
) -> Union[CompletionResponse, ErrorResponse, ORJSONResponse]:
"""Handle non-streaming completion request"""
try:
generator = self.tokenizer_manager.generate_request(
adapted_request, raw_request
adapted_request, raw_request, tokenizer_rev_request_time
)
ret = await generator.__anext__()
except ValueError as e:
Expand Down
5 changes: 5 additions & 0 deletions python/sglang/srt/managers/io_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ class TokenizedGenerateReqInput(BaseReq):
# Session info for continual prompting
session_params: Optional[SessionParams] = None

# Timestamp when tokenizer dispatches the request to the scheduler
dispatch_to_scheduler_time: Optional[float] = None

# LoRA related
lora_id: Optional[str] = None # None means just use the base model

Expand Down Expand Up @@ -924,6 +927,8 @@ class TokenizedEmbeddingReqInput(BaseReq):
priority: Optional[int] = None
# The number of dimensions the resulting output embeddings should have. It is applicable for Matryoshka Embeddings.
dimensions: Optional[int] = None
# Timestamp when tokenizer dispatches the request to the scheduler
dispatch_to_scheduler_time: Optional[float] = None


@dataclass
Expand Down
13 changes: 13 additions & 0 deletions python/sglang/srt/managers/schedule_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ def __init__(
return_hidden_states: bool = False,
return_routed_experts: bool = False,
eos_token_ids: Optional[Set[int]] = None,
dispatch_to_scheduler_time: Optional[float] = None,
bootstrap_host: Optional[str] = None,
bootstrap_port: Optional[int] = None,
bootstrap_room: Optional[int] = None,
Expand Down Expand Up @@ -745,6 +746,12 @@ def __init__(
self.has_log_time_stats: bool = False
self.last_tic = time.monotonic()

# Timestamp when tokenizer dispatches the request to the scheduler
self.dispatch_to_scheduler_time = dispatch_to_scheduler_time
# TODO (suhang): Move the dispatch_to_scheduler_time synchronization into Req’s own initializer:
# Once dispatch_to_scheduler_time is passed into Req
# TimeStats can synchronize it automatically, so the scheduler no longer needs that extra getattr check.

# For disaggregation
self.bootstrap_host: str = bootstrap_host
self.bootstrap_port: Optional[int] = bootstrap_port
Expand Down Expand Up @@ -1214,11 +1221,15 @@ class ScheduleBatch(ScheduleBatchDisaggregationDecodeMixin):
inner_idle_batch: Optional[ScheduleBatch] = None
global_num_tokens: Optional[List[int]] = None
global_num_tokens_for_logprob: Optional[List[int]] = None
dp_global_num_tokens_for_metric: Optional[List[int]] = None
is_extend_in_batch: bool = False
can_run_dp_cuda_graph: bool = False
tbo_split_seq_index: Optional[int] = None
global_forward_mode: Optional[ForwardMode] = None

# DP all_gather latency for this batch
all_gather_latency: float = 0.0

# For processing logprobs
return_logprob: bool = False
top_logprobs_nums: Optional[List[int]] = None
Expand Down Expand Up @@ -2195,6 +2206,8 @@ def copy(self):
spec_algorithm=self.spec_algorithm,
global_num_tokens=self.global_num_tokens,
global_num_tokens_for_logprob=self.global_num_tokens_for_logprob,
dp_global_num_tokens_for_metric=self.dp_global_num_tokens_for_metric,
all_gather_latency=self.all_gather_latency,
can_run_dp_cuda_graph=self.can_run_dp_cuda_graph,
is_extend_in_batch=self.is_extend_in_batch,
is_prefill_only=self.is_prefill_only,
Expand Down
29 changes: 23 additions & 6 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,6 @@ def __init__(
# Init model configs
self.init_model_config()

# Init metrics stats
self.init_metrics(tp_rank, pp_rank, dp_rank)

# Init inter-process communication
self.init_ipc_channels(port_args)

# Init PD-multiplexing context
if self.enable_pdmux:
self.init_pdmux()
Expand All @@ -338,6 +332,12 @@ def __init__(
# Launch a model worker and draft model worker if using speculative decoding
self.init_model_worker()

# Init metrics stats
self.init_metrics(tp_rank, pp_rank, dp_rank)

# Init inter-process communication
self.init_ipc_channels(port_args)

if (t := envs.SGLANG_TEST_STUCK_SCHEDULER_INIT.get()) > 0:
time.sleep(t)

Expand Down Expand Up @@ -1056,6 +1056,7 @@ def event_loop_normal(self):
"""A normal scheduler loop."""
while True:
# Receive requests
self.iter_start_time = time.perf_counter()
recv_reqs = self.recv_requests()
self.process_input_requests(recv_reqs)
if self._engine_paused:
Expand Down Expand Up @@ -1092,6 +1093,7 @@ def pop_and_process():

while True:
# Receive requests
self.iter_start_time = time.perf_counter()
recv_reqs = self.recv_requests()
self.process_input_requests(recv_reqs)
if self._engine_paused:
Expand Down Expand Up @@ -1429,6 +1431,7 @@ def handle_generate_request(
return_hidden_states=recv_req.return_hidden_states,
return_routed_experts=recv_req.return_routed_experts,
eos_token_ids=self.model_config.hf_eos_token_id,
dispatch_to_scheduler_time=recv_req.dispatch_to_scheduler_time,
bootstrap_host=recv_req.bootstrap_host,
bootstrap_port=recv_req.bootstrap_port,
bootstrap_room=recv_req.bootstrap_room,
Expand All @@ -1444,6 +1447,12 @@ def handle_generate_request(
dllm_config=self.dllm_config,
)
req.tokenizer = self.tokenizer
if getattr(recv_req, "dispatch_to_scheduler_time", 0.0):
# Keep dispatch timestamp only when present, clamp to zero to avoid negative values
req.time_stats.dispatch_to_scheduler_time = max(
0.0, recv_req.dispatch_to_scheduler_time
)
req.time_stats.arrive_scheduler_time = time.perf_counter()

if self.disaggregation_mode != DisaggregationMode.NULL:
# Invalid request for disaggregated mode
Expand Down Expand Up @@ -1674,6 +1683,7 @@ def handle_embedding_request(
recv_req.input_text,
recv_req.input_ids,
recv_req.sampling_params,
dispatch_to_scheduler_time=recv_req.dispatch_to_scheduler_time,
token_type_ids=recv_req.token_type_ids,
priority=recv_req.priority,
dimensions=recv_req.dimensions,
Expand Down Expand Up @@ -2001,9 +2011,15 @@ def _get_new_batch_prefill_raw(
if req.time_stats.forward_entry_time == 0:
req.time_stats.forward_entry_time = time.perf_counter()
if self.enable_metrics:
self.metrics_collector.observe_request_zmq_time(
req.time_stats.get_request_zmq_time(),
)
self.metrics_collector.observe_queue_time(
req.time_stats.get_queueing_time(),
)
self.metrics_collector.observe_request_waiting_time(
req.time_stats.get_request_waiting_time(),
)

# Create a new batch
new_batch = ScheduleBatch.init_new(
Expand Down Expand Up @@ -2141,6 +2157,7 @@ def run_batch(
) -> Union[GenerationBatchResult, EmbeddingBatchResult]:
"""Run a batch."""
self.forward_ct += 1
self.iter_forward_start_time = time.time()

# Whether to run the profiler
self._profile_batch_predicate(batch)
Expand Down
Loading
Loading