Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions python/sglang/srt/entrypoints/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import argparse
import asyncio
import dataclasses
import logging
import multiprocessing as mp
import os
Expand All @@ -15,8 +16,11 @@

import grpc
from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp
from grpc_reflection.v1alpha import reflection

import sglang
from sglang.srt.disaggregation.utils import FAKE_BOOTSTRAP_HOST, DisaggregationMode
from sglang.srt.entrypoints.grpc_request_manager import GrpcRequestManager
from sglang.srt.grpc import sglang_scheduler_pb2, sglang_scheduler_pb2_grpc
Expand Down Expand Up @@ -173,11 +177,13 @@ def __init__(
request_manager: GrpcRequestManager,
server_args: ServerArgs,
model_info: Dict,
scheduler_info: Dict,
):
"""Initialize the standalone gRPC service."""
self.request_manager = request_manager
self.server_args = server_args
self.model_info = model_info
self.scheduler_info = scheduler_info
self.start_time = time.time()

# Start the request manager's event loop using auto_create_handle_loop
Expand Down Expand Up @@ -396,6 +402,89 @@ async def Abort(
message=str(e),
)

async def GetModelInfo(
self,
request: sglang_scheduler_pb2.GetModelInfoRequest,
context: grpc.aio.ServicerContext,
) -> sglang_scheduler_pb2.GetModelInfoResponse:
"""Get model information."""
logger.info("Model info request received")

is_generation = self.scheduler_info.get("is_generation")
if is_generation is None:
is_generation = not self.server_args.is_embedding

return sglang_scheduler_pb2.GetModelInfoResponse(
model_path=self.server_args.model_path,
tokenizer_path=self.server_args.tokenizer_path or "",
is_generation=is_generation,
preferred_sampling_params=(
self.server_args.preferred_sampling_params or ""
),
weight_version=self.server_args.weight_version or "",
served_model_name=self.server_args.served_model_name,
max_context_length=self.model_info["max_context_length"],
vocab_size=self.model_info["vocab_size"],
supports_vision=self.model_info["supports_vision"],
model_type=self.model_info["model_type"],
eos_token_ids=self.model_info["eos_token_ids"],
pad_token_id=self.model_info["pad_token_id"],
bos_token_id=self.model_info["bos_token_id"],
max_req_input_len=self.model_info["max_req_input_len"],
)

async def GetServerInfo(
self,
request: sglang_scheduler_pb2.GetServerInfoRequest,
context: grpc.aio.ServicerContext,
) -> sglang_scheduler_pb2.GetServerInfoResponse:
"""Get server information."""
logger.info("Server info request received")

server_args_dict = dataclasses.asdict(self.server_args)
server_args_struct = Struct()

def make_serializable(obj):
if obj is None:
return None
elif isinstance(obj, (str, int, float, bool)):
return obj
elif isinstance(obj, (list, tuple, set)):
return [make_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {k: make_serializable(v) for k, v in obj.items()}
else:
return str(obj)

serializable_args = make_serializable(server_args_dict)
server_args_struct.update(serializable_args)

# Convert scheduler_info to Struct
scheduler_info_struct = Struct()
scheduler_info_struct.update(self.scheduler_info)

# Get runtime state from request manager
manager_state = self.request_manager.get_server_info()

# Calculate uptime
uptime = time.time() - self.start_time

# Create timestamp
start_timestamp = Timestamp()
start_timestamp.FromSeconds(int(self.start_time))

return sglang_scheduler_pb2.GetServerInfoResponse(
server_args=server_args_struct,
scheduler_info=scheduler_info_struct,
active_requests=manager_state["active_requests"],
is_paused=manager_state["paused"],
last_receive_timestamp=manager_state["last_receive_time"],
uptime_seconds=uptime,
sglang_version=sglang.__version__,
server_type="grpc",
start_time=start_timestamp,
)

# Helper methods for request/response conversion

def _convert_generate_request(
Expand Down Expand Up @@ -756,6 +845,7 @@ async def serve_grpc(
request_manager=request_manager,
server_args=server_args,
model_info=model_info,
scheduler_info=scheduler_info,
)
sglang_scheduler_pb2_grpc.add_SglangSchedulerServicer_to_server(servicer, server)

Expand Down
59 changes: 59 additions & 0 deletions python/sglang/srt/grpc/sglang_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ service SglangScheduler {
// Abort a running request
rpc Abort(AbortRequest) returns (AbortResponse);

// Get model information
rpc GetModelInfo(GetModelInfoRequest) returns (GetModelInfoResponse);

// Get server information
rpc GetServerInfo(GetServerInfoRequest) returns (GetServerInfoResponse);

}

// =====================
Expand Down Expand Up @@ -401,3 +407,56 @@ message SetInternalStateResponse {
bool success = 1;
string message = 2;
}

// =====================
// Model and Server Info
// =====================

// Get model information
message GetModelInfoRequest {}

message GetModelInfoResponse {
string model_path = 1;
string tokenizer_path = 2;
bool is_generation = 3;
string preferred_sampling_params = 4; // JSON string or empty
string weight_version = 5;
string served_model_name = 6;
int32 max_context_length = 7;
int32 vocab_size = 8;
bool supports_vision = 9;
string model_type = 10;
repeated int32 eos_token_ids = 11;
int32 pad_token_id = 12;
int32 bos_token_id = 13;
int32 max_req_input_len = 14;
}

// Get server information
message GetServerInfoRequest {}

message GetServerInfoResponse {
// Server configuration (as structured data)
google.protobuf.Struct server_args = 1;

// Scheduler metrics (from scheduler initialization)
google.protobuf.Struct scheduler_info = 2;

// Runtime state
int32 active_requests = 3;
bool is_paused = 4;
double last_receive_timestamp = 5;
double uptime_seconds = 6;

// Version info
string sglang_version = 7;

// Server metadata
string server_type = 8; // "grpc"
google.protobuf.Timestamp start_time = 9;

// Note: internal_states not provided in gRPC mode
// Scheduler-side metrics (memory usage, throughput) require
// bidirectional communicator infrastructure not available in gRPC.
// Use HTTP /get_server_info if scheduler internal state is needed.
}
14 changes: 11 additions & 3 deletions python/sglang/srt/grpc/sglang_scheduler_pb2.py

Large diffs are not rendered by default.

62 changes: 62 additions & 0 deletions python/sglang/srt/grpc/sglang_scheduler_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,65 @@ class SetInternalStateResponse(_message.Message):
success: bool
message: str
def __init__(self, success: bool = ..., message: _Optional[str] = ...) -> None: ...

class GetModelInfoRequest(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

class GetModelInfoResponse(_message.Message):
__slots__ = ("model_path", "tokenizer_path", "is_generation", "preferred_sampling_params", "weight_version", "served_model_name", "max_context_length", "vocab_size", "supports_vision", "model_type", "eos_token_ids", "pad_token_id", "bos_token_id", "max_req_input_len")
MODEL_PATH_FIELD_NUMBER: _ClassVar[int]
TOKENIZER_PATH_FIELD_NUMBER: _ClassVar[int]
IS_GENERATION_FIELD_NUMBER: _ClassVar[int]
PREFERRED_SAMPLING_PARAMS_FIELD_NUMBER: _ClassVar[int]
WEIGHT_VERSION_FIELD_NUMBER: _ClassVar[int]
SERVED_MODEL_NAME_FIELD_NUMBER: _ClassVar[int]
MAX_CONTEXT_LENGTH_FIELD_NUMBER: _ClassVar[int]
VOCAB_SIZE_FIELD_NUMBER: _ClassVar[int]
SUPPORTS_VISION_FIELD_NUMBER: _ClassVar[int]
MODEL_TYPE_FIELD_NUMBER: _ClassVar[int]
EOS_TOKEN_IDS_FIELD_NUMBER: _ClassVar[int]
PAD_TOKEN_ID_FIELD_NUMBER: _ClassVar[int]
BOS_TOKEN_ID_FIELD_NUMBER: _ClassVar[int]
MAX_REQ_INPUT_LEN_FIELD_NUMBER: _ClassVar[int]
model_path: str
tokenizer_path: str
is_generation: bool
preferred_sampling_params: str
weight_version: str
served_model_name: str
max_context_length: int
vocab_size: int
supports_vision: bool
model_type: str
eos_token_ids: _containers.RepeatedScalarFieldContainer[int]
pad_token_id: int
bos_token_id: int
max_req_input_len: int
def __init__(self, model_path: _Optional[str] = ..., tokenizer_path: _Optional[str] = ..., is_generation: bool = ..., preferred_sampling_params: _Optional[str] = ..., weight_version: _Optional[str] = ..., served_model_name: _Optional[str] = ..., max_context_length: _Optional[int] = ..., vocab_size: _Optional[int] = ..., supports_vision: bool = ..., model_type: _Optional[str] = ..., eos_token_ids: _Optional[_Iterable[int]] = ..., pad_token_id: _Optional[int] = ..., bos_token_id: _Optional[int] = ..., max_req_input_len: _Optional[int] = ...) -> None: ...

class GetServerInfoRequest(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

class GetServerInfoResponse(_message.Message):
__slots__ = ("server_args", "scheduler_info", "active_requests", "is_paused", "last_receive_timestamp", "uptime_seconds", "sglang_version", "server_type", "start_time")
SERVER_ARGS_FIELD_NUMBER: _ClassVar[int]
SCHEDULER_INFO_FIELD_NUMBER: _ClassVar[int]
ACTIVE_REQUESTS_FIELD_NUMBER: _ClassVar[int]
IS_PAUSED_FIELD_NUMBER: _ClassVar[int]
LAST_RECEIVE_TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
UPTIME_SECONDS_FIELD_NUMBER: _ClassVar[int]
SGLANG_VERSION_FIELD_NUMBER: _ClassVar[int]
SERVER_TYPE_FIELD_NUMBER: _ClassVar[int]
START_TIME_FIELD_NUMBER: _ClassVar[int]
server_args: _struct_pb2.Struct
scheduler_info: _struct_pb2.Struct
active_requests: int
is_paused: bool
last_receive_timestamp: float
uptime_seconds: float
sglang_version: str
server_type: str
start_time: _timestamp_pb2.Timestamp
def __init__(self, server_args: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., scheduler_info: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., active_requests: _Optional[int] = ..., is_paused: bool = ..., last_receive_timestamp: _Optional[float] = ..., uptime_seconds: _Optional[float] = ..., sglang_version: _Optional[str] = ..., server_type: _Optional[str] = ..., start_time: _Optional[_Union[datetime.datetime, _timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...
88 changes: 88 additions & 0 deletions python/sglang/srt/grpc/sglang_scheduler_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def __init__(self, channel):
request_serializer=sglang__scheduler__pb2.AbortRequest.SerializeToString,
response_deserializer=sglang__scheduler__pb2.AbortResponse.FromString,
_registered_method=True)
self.GetModelInfo = channel.unary_unary(
'/sglang.grpc.scheduler.SglangScheduler/GetModelInfo',
request_serializer=sglang__scheduler__pb2.GetModelInfoRequest.SerializeToString,
response_deserializer=sglang__scheduler__pb2.GetModelInfoResponse.FromString,
_registered_method=True)
self.GetServerInfo = channel.unary_unary(
'/sglang.grpc.scheduler.SglangScheduler/GetServerInfo',
request_serializer=sglang__scheduler__pb2.GetServerInfoRequest.SerializeToString,
response_deserializer=sglang__scheduler__pb2.GetServerInfoResponse.FromString,
_registered_method=True)


class SglangSchedulerServicer(object):
Expand Down Expand Up @@ -94,6 +104,20 @@ def Abort(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetModelInfo(self, request, context):
"""Get model information
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetServerInfo(self, request, context):
"""Get server information
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_SglangSchedulerServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand All @@ -117,6 +141,16 @@ def add_SglangSchedulerServicer_to_server(servicer, server):
request_deserializer=sglang__scheduler__pb2.AbortRequest.FromString,
response_serializer=sglang__scheduler__pb2.AbortResponse.SerializeToString,
),
'GetModelInfo': grpc.unary_unary_rpc_method_handler(
servicer.GetModelInfo,
request_deserializer=sglang__scheduler__pb2.GetModelInfoRequest.FromString,
response_serializer=sglang__scheduler__pb2.GetModelInfoResponse.SerializeToString,
),
'GetServerInfo': grpc.unary_unary_rpc_method_handler(
servicer.GetServerInfo,
request_deserializer=sglang__scheduler__pb2.GetServerInfoRequest.FromString,
response_serializer=sglang__scheduler__pb2.GetServerInfoResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'sglang.grpc.scheduler.SglangScheduler', rpc_method_handlers)
Expand Down Expand Up @@ -237,3 +271,57 @@ def Abort(request,
timeout,
metadata,
_registered_method=True)

@staticmethod
def GetModelInfo(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/sglang.grpc.scheduler.SglangScheduler/GetModelInfo',
sglang__scheduler__pb2.GetModelInfoRequest.SerializeToString,
sglang__scheduler__pb2.GetModelInfoResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def GetServerInfo(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/sglang.grpc.scheduler.SglangScheduler/GetServerInfo',
sglang__scheduler__pb2.GetServerInfoRequest.SerializeToString,
sglang__scheduler__pb2.GetServerInfoResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
Loading
Loading