Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions sgl-model-gateway/src/observability/events.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
//! request events for observability and monitoring
//! Request events for observability and monitoring.
//!
//! Events use conditional log levels:
//! - DEBUG when OTEL is disabled (keeps logs quiet)
//! - INFO when OTEL is enabled (passes through EnvFilter to OTEL layer)

use tracing::{debug, event, Level};

use crate::observability::otel_trace::is_otel_enabled;
use super::otel_trace::is_otel_enabled;

/// Module path used by CustomOtelFilter to identify events for OTEL export.
pub fn get_module_path() -> &'static str {
module_path!()
}

/// Trait for emitting observability events.
pub trait Event {
fn emit(&self);
}

/// Event emitted when a prefill-decode request pair is sent.
#[derive(Debug)]
pub struct RequestPDSentEvent {
pub prefill_url: String,
Expand All @@ -20,50 +27,49 @@ pub struct RequestPDSentEvent {

impl Event for RequestPDSentEvent {
fn emit(&self) {
if !is_otel_enabled() {
debug!(
"Sending concurrent requests to prefill={} decode={}",
self.prefill_url, self.decode_url
);
} else {
if is_otel_enabled() {
event!(
Level::INFO,
prefill_url = %self.prefill_url,
decode_url = %self.decode_url,
"Sending concurrent requests"
);
} else {
debug!(
prefill_url = %self.prefill_url,
decode_url = %self.decode_url,
"Sending concurrent requests"
);
}
}
}

/// Event emitted when a request is sent to a worker.
#[derive(Debug)]
pub struct RequestSentEvent {
pub url: String,
}

impl Event for RequestSentEvent {
fn emit(&self) {
if !is_otel_enabled() {
debug!("Sending request to {}", self.url);
if is_otel_enabled() {
event!(Level::INFO, url = %self.url, "Sending request");
} else {
event!(
Level::INFO,
url = %self.url,
"Sending requests"
);
debug!(url = %self.url, "Sending request");
}
}
}

/// Event emitted when concurrent requests are received.
#[derive(Debug)]
pub struct RequestReceivedEvent {}
pub struct RequestReceivedEvent;

impl Event for RequestReceivedEvent {
fn emit(&self) {
if !is_otel_enabled() {
debug!("Received concurrent requests");
} else {
if is_otel_enabled() {
event!(Level::INFO, "Received concurrent requests");
} else {
debug!("Received concurrent requests");
}
}
}
183 changes: 98 additions & 85 deletions sgl-model-gateway/src/observability/otel_trace.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, Ordering},
OnceLock,
Expand All @@ -25,25 +24,44 @@ use tracing_subscriber::{
Layer,
};

use super::events::get_module_path as http_router_get_module_path;
use super::events::get_module_path as events_module_path;

static ENABLED: AtomicBool = AtomicBool::new(false);

// global tracer
// Global tracer and provider
static TRACER: OnceLock<SdkTracer> = OnceLock::new();
static PROVIDER: OnceLock<TracerProvider> = OnceLock::new();

pub struct CustomOtelFilter {
allowed_targets: HashSet<String>,
/// Targets allowed for OTEL export. Using a static slice avoids allocations.
/// Note: "sgl_model_gateway::otel-trace" is a custom target used for manual spans,
/// not the actual module path.
static ALLOWED_TARGETS: OnceLock<[&'static str; 3]> = OnceLock::new();

fn get_allowed_targets() -> &'static [&'static str; 3] {
ALLOWED_TARGETS.get_or_init(|| {
[
"sgl_model_gateway::otel-trace", // Custom target for manual spans
"sgl_model_gateway::observability::otel_trace",
events_module_path(),
]
})
}

/// Filter that only allows specific module targets to be exported to OTEL.
/// This reduces noise and cost by only exporting relevant spans.
#[derive(Clone)]
pub struct CustomOtelFilter;

impl CustomOtelFilter {
pub fn new() -> Self {
let mut allowed_targets = HashSet::new();
allowed_targets.insert("sgl_model_gateway::otel-trace".to_string());
allowed_targets.insert(http_router_get_module_path().to_string());
Self
}

Self { allowed_targets }
#[inline]
fn is_allowed(target: &str) -> bool {
get_allowed_targets()
.iter()
.any(|allowed| target.starts_with(allowed))
}
}

Expand All @@ -52,11 +70,11 @@ where
S: Subscriber,
{
fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
self.allowed_targets.contains(meta.target())
Self::is_allowed(meta.target())
}

fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> tracing::subscriber::Interest {
if self.allowed_targets.contains(meta.target()) {
if Self::is_allowed(meta.target()) {
tracing::subscriber::Interest::always()
} else {
tracing::subscriber::Interest::never()
Expand All @@ -70,7 +88,11 @@ impl Default for CustomOtelFilter {
}
}

/// init OpenTelemetry connection
/// Initialize OpenTelemetry tracing with OTLP exporter.
///
/// # Arguments
/// * `enable` - Whether to enable OTEL tracing
/// * `otlp_endpoint` - OTLP collector endpoint (defaults to "localhost:4317")
pub fn otel_tracing_init(enable: bool, otlp_endpoint: Option<&str>) -> Result<()> {
if !enable {
ENABLED.store(false, Ordering::Relaxed);
Expand All @@ -84,117 +106,105 @@ pub fn otel_tracing_init(enable: bool, otlp_endpoint: Option<&str>) -> Result<()
endpoint.to_string()
};

let result = std::panic::catch_unwind(|| -> Result<()> {
global::set_text_map_propagator(TraceContextPropagator::new());
global::set_text_map_propagator(TraceContextPropagator::new());

let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.build()?;
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.build()
.map_err(|e| {
eprintln!("[tracing] Failed to create OTLP exporter: {}", e);
anyhow::anyhow!("Failed to create OTLP exporter: {}", e)
})?;

let batch_config = BatchConfigBuilder::default()
.with_scheduled_delay(Duration::from_millis(500))
.with_max_export_batch_size(64)
.build();
let batch_config = BatchConfigBuilder::default()
.with_scheduled_delay(Duration::from_millis(500))
.with_max_export_batch_size(64)
.build();

let span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio)
.with_batch_config(batch_config)
.build();
let span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio)
.with_batch_config(batch_config)
.build();

let resource = Resource::default().merge(&Resource::new(vec![KeyValue::new(
"service.name",
"sgl-router",
)]));
let resource = Resource::default().merge(&Resource::new(vec![KeyValue::new(
"service.name",
"sgl-router",
)]));

let provider = TracerProvider::builder()
.with_span_processor(span_processor)
.with_resource(resource)
.build();
PROVIDER
.set(provider.clone())
.map_err(|_| anyhow::anyhow!("Provider already initialized"))?;
let provider = TracerProvider::builder()
.with_span_processor(span_processor)
.with_resource(resource)
.build();

let tracer = provider.tracer("sgl-router");
PROVIDER
.set(provider.clone())
.map_err(|_| anyhow::anyhow!("Provider already initialized"))?;

TRACER
.set(tracer)
.map_err(|_| anyhow::anyhow!("Tracer already initialized"))?;
let tracer = provider.tracer("sgl-router");

let _ = global::set_tracer_provider(provider);
TRACER
.set(tracer)
.map_err(|_| anyhow::anyhow!("Tracer already initialized"))?;

ENABLED.store(true, Ordering::Relaxed);
let _ = global::set_tracer_provider(provider);

Ok(())
});
ENABLED.store(true, Ordering::Relaxed);

match result {
Ok(Ok(())) => {
eprintln!(
"[tracing] OpenTelemetry initialized successfully, enabled: {}",
ENABLED.load(Ordering::Relaxed)
);
Ok(())
}
Ok(Err(e)) => {
eprintln!("[tracing] Failed to initialize OTLP tracer: {}", e);
ENABLED.store(false, Ordering::Relaxed);
Err(e)
}
Err(_) => {
eprintln!("[tracing] Panic during OpenTelemetry initialization");
ENABLED.store(false, Ordering::Relaxed);
Err(anyhow::anyhow!("Panic during initialization"))
}
}
eprintln!("[tracing] OpenTelemetry initialized successfully");
Ok(())
}

pub fn get_otel_layer<S>() -> Result<Box<dyn Layer<S> + Send + Sync + 'static>, &'static str>
/// Get the OpenTelemetry tracing layer to add to the subscriber.
///
/// Must be called after `otel_tracing_init` with `enable=true`.
pub fn get_otel_layer<S>() -> Result<Box<dyn Layer<S> + Send + Sync + 'static>>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync,
{
if !is_otel_enabled() {
return Err("OpenTelemetry is not enabled");
anyhow::bail!("OpenTelemetry is not enabled");
}

let tracer = TRACER
.get()
.ok_or("Tracer not initialized. Call otel_tracing_init first.")?
.ok_or_else(|| anyhow::anyhow!("Tracer not initialized. Call otel_tracing_init first."))?
.clone();

let custom_filter = CustomOtelFilter::new();

let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(custom_filter);
.with_filter(CustomOtelFilter::new());

Ok(Box::new(layer))
}

/// Returns whether OpenTelemetry tracing is enabled.
#[inline]
pub fn is_otel_enabled() -> bool {
ENABLED.load(Ordering::Relaxed)
}

/// Flush all pending spans to the OTLP collector.
///
/// This is useful before shutdown or when you need to ensure spans are exported.
pub async fn flush_spans_async() -> Result<()> {
if !is_otel_enabled() {
return Ok(());
}

if let Some(provider) = PROVIDER.get() {
let provider = provider.clone();
let provider = PROVIDER
.get()
.ok_or_else(|| anyhow::anyhow!("Provider not initialized"))?
.clone();

spawn_blocking(move || provider.force_flush())
.await
.map_err(|e| {
anyhow::anyhow!("Failed to join blocking task for flushing spans: {}", e)
})?;
spawn_blocking(move || provider.force_flush())
.await
.map_err(|e| anyhow::anyhow!("Failed to flush spans: {}", e))?;

Ok(())
} else {
Err(anyhow::anyhow!("Provider not initialized"))
}
Ok(())
}

/// Shutdown OpenTelemetry tracing and flush remaining spans.
pub fn shutdown_otel() {
if ENABLED.load(Ordering::Relaxed) {
global::shutdown_tracer_provider();
Expand All @@ -203,16 +213,20 @@ pub fn shutdown_otel() {
}
}

pub fn inject_trace_context_http(headers: &mut HeaderMap) -> Result<()> {
/// Inject W3C trace context headers into an HTTP request.
///
/// This propagates the current span context to downstream services.
/// Does nothing if OTEL is not enabled.
pub fn inject_trace_context_http(headers: &mut HeaderMap) {
if !is_otel_enabled() {
return Err(anyhow::anyhow!("OTEL not enabled"));
return;
}

let context = tracing::Span::current().context();

struct HeaderInjector<'a>(&'a mut HeaderMap);

impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> {
impl opentelemetry::propagation::Injector for HeaderInjector<'_> {
fn set(&mut self, key: &str, value: String) {
if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
if let Ok(header_value) = HeaderValue::from_str(&value) {
Expand All @@ -225,5 +239,4 @@ pub fn inject_trace_context_http(headers: &mut HeaderMap) -> Result<()> {
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut HeaderInjector(headers));
});
Ok(())
}
6 changes: 2 additions & 4 deletions sgl-model-gateway/src/routers/http/pd_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,8 @@ impl PDRouter {
};

let mut headers_with_trace = headers.cloned().unwrap_or_default();
let headers = match inject_trace_context_http(&mut headers_with_trace) {
Ok(()) => Some(&headers_with_trace),
Err(_) => headers,
};
inject_trace_context_http(&mut headers_with_trace);
let headers = Some(&headers_with_trace);

// Build both requests
let prefill_request = self.build_post_with_headers(
Expand Down
Loading
Loading