diff --git a/sgl-model-gateway/src/observability/events.rs b/sgl-model-gateway/src/observability/events.rs index fc537c3a99da..0001cf80b884 100644 --- a/sgl-model-gateway/src/observability/events.rs +++ b/sgl-model-gateway/src/observability/events.rs @@ -1,17 +1,24 @@ -//! request events for observability and monitoring +//! Request events for observability and monitoring. +//! +//! Events use conditional log levels: +//! - DEBUG when OTEL is disabled (keeps logs quiet) +//! - INFO when OTEL is enabled (passes through EnvFilter to OTEL layer) use tracing::{debug, event, Level}; -use crate::observability::otel_trace::is_otel_enabled; +use super::otel_trace::is_otel_enabled; +/// Module path used by CustomOtelFilter to identify events for OTEL export. pub fn get_module_path() -> &'static str { module_path!() } +/// Trait for emitting observability events. pub trait Event { fn emit(&self); } +/// Event emitted when a prefill-decode request pair is sent. #[derive(Debug)] pub struct RequestPDSentEvent { pub prefill_url: String, @@ -20,22 +27,24 @@ pub struct RequestPDSentEvent { impl Event for RequestPDSentEvent { fn emit(&self) { - if !is_otel_enabled() { - debug!( - "Sending concurrent requests to prefill={} decode={}", - self.prefill_url, self.decode_url - ); - } else { + if is_otel_enabled() { event!( Level::INFO, prefill_url = %self.prefill_url, decode_url = %self.decode_url, "Sending concurrent requests" ); + } else { + debug!( + prefill_url = %self.prefill_url, + decode_url = %self.decode_url, + "Sending concurrent requests" + ); } } } +/// Event emitted when a request is sent to a worker. #[derive(Debug)] pub struct RequestSentEvent { pub url: String, @@ -43,27 +52,24 @@ pub struct RequestSentEvent { impl Event for RequestSentEvent { fn emit(&self) { - if !is_otel_enabled() { - debug!("Sending request to {}", self.url); + if is_otel_enabled() { + event!(Level::INFO, url = %self.url, "Sending request"); } else { - event!( - Level::INFO, - url = %self.url, - "Sending requests" - ); + debug!(url = %self.url, "Sending request"); } } } +/// Event emitted when concurrent requests are received. #[derive(Debug)] -pub struct RequestReceivedEvent {} +pub struct RequestReceivedEvent; impl Event for RequestReceivedEvent { fn emit(&self) { - if !is_otel_enabled() { - debug!("Received concurrent requests"); - } else { + if is_otel_enabled() { event!(Level::INFO, "Received concurrent requests"); + } else { + debug!("Received concurrent requests"); } } } diff --git a/sgl-model-gateway/src/observability/otel_trace.rs b/sgl-model-gateway/src/observability/otel_trace.rs index ea91e267449f..1782decdfec0 100644 --- a/sgl-model-gateway/src/observability/otel_trace.rs +++ b/sgl-model-gateway/src/observability/otel_trace.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashSet, sync::{ atomic::{AtomicBool, Ordering}, OnceLock, @@ -25,25 +24,44 @@ use tracing_subscriber::{ Layer, }; -use super::events::get_module_path as http_router_get_module_path; +use super::events::get_module_path as events_module_path; static ENABLED: AtomicBool = AtomicBool::new(false); -// global tracer +// Global tracer and provider static TRACER: OnceLock = OnceLock::new(); static PROVIDER: OnceLock = OnceLock::new(); -pub struct CustomOtelFilter { - allowed_targets: HashSet, +/// Targets allowed for OTEL export. Using a static slice avoids allocations. +/// Note: "sgl_model_gateway::otel-trace" is a custom target used for manual spans, +/// not the actual module path. +static ALLOWED_TARGETS: OnceLock<[&'static str; 3]> = OnceLock::new(); + +fn get_allowed_targets() -> &'static [&'static str; 3] { + ALLOWED_TARGETS.get_or_init(|| { + [ + "sgl_model_gateway::otel-trace", // Custom target for manual spans + "sgl_model_gateway::observability::otel_trace", + events_module_path(), + ] + }) } +/// Filter that only allows specific module targets to be exported to OTEL. +/// This reduces noise and cost by only exporting relevant spans. +#[derive(Clone)] +pub struct CustomOtelFilter; + impl CustomOtelFilter { pub fn new() -> Self { - let mut allowed_targets = HashSet::new(); - allowed_targets.insert("sgl_model_gateway::otel-trace".to_string()); - allowed_targets.insert(http_router_get_module_path().to_string()); + Self + } - Self { allowed_targets } + #[inline] + fn is_allowed(target: &str) -> bool { + get_allowed_targets() + .iter() + .any(|allowed| target.starts_with(allowed)) } } @@ -52,11 +70,11 @@ where S: Subscriber, { fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool { - self.allowed_targets.contains(meta.target()) + Self::is_allowed(meta.target()) } fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> tracing::subscriber::Interest { - if self.allowed_targets.contains(meta.target()) { + if Self::is_allowed(meta.target()) { tracing::subscriber::Interest::always() } else { tracing::subscriber::Interest::never() @@ -70,7 +88,11 @@ impl Default for CustomOtelFilter { } } -/// init OpenTelemetry connection +/// Initialize OpenTelemetry tracing with OTLP exporter. +/// +/// # Arguments +/// * `enable` - Whether to enable OTEL tracing +/// * `otlp_endpoint` - OTLP collector endpoint (defaults to "localhost:4317") pub fn otel_tracing_init(enable: bool, otlp_endpoint: Option<&str>) -> Result<()> { if !enable { ENABLED.store(false, Ordering::Relaxed); @@ -84,117 +106,105 @@ pub fn otel_tracing_init(enable: bool, otlp_endpoint: Option<&str>) -> Result<() endpoint.to_string() }; - let result = std::panic::catch_unwind(|| -> Result<()> { - global::set_text_map_propagator(TraceContextPropagator::new()); + global::set_text_map_propagator(TraceContextPropagator::new()); - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .build()?; + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&endpoint) + .with_protocol(opentelemetry_otlp::Protocol::Grpc) + .build() + .map_err(|e| { + eprintln!("[tracing] Failed to create OTLP exporter: {}", e); + anyhow::anyhow!("Failed to create OTLP exporter: {}", e) + })?; - let batch_config = BatchConfigBuilder::default() - .with_scheduled_delay(Duration::from_millis(500)) - .with_max_export_batch_size(64) - .build(); + let batch_config = BatchConfigBuilder::default() + .with_scheduled_delay(Duration::from_millis(500)) + .with_max_export_batch_size(64) + .build(); - let span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio) - .with_batch_config(batch_config) - .build(); + let span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio) + .with_batch_config(batch_config) + .build(); - let resource = Resource::default().merge(&Resource::new(vec![KeyValue::new( - "service.name", - "sgl-router", - )])); + let resource = Resource::default().merge(&Resource::new(vec![KeyValue::new( + "service.name", + "sgl-router", + )])); - let provider = TracerProvider::builder() - .with_span_processor(span_processor) - .with_resource(resource) - .build(); - PROVIDER - .set(provider.clone()) - .map_err(|_| anyhow::anyhow!("Provider already initialized"))?; + let provider = TracerProvider::builder() + .with_span_processor(span_processor) + .with_resource(resource) + .build(); - let tracer = provider.tracer("sgl-router"); + PROVIDER + .set(provider.clone()) + .map_err(|_| anyhow::anyhow!("Provider already initialized"))?; - TRACER - .set(tracer) - .map_err(|_| anyhow::anyhow!("Tracer already initialized"))?; + let tracer = provider.tracer("sgl-router"); - let _ = global::set_tracer_provider(provider); + TRACER + .set(tracer) + .map_err(|_| anyhow::anyhow!("Tracer already initialized"))?; - ENABLED.store(true, Ordering::Relaxed); + let _ = global::set_tracer_provider(provider); - Ok(()) - }); + ENABLED.store(true, Ordering::Relaxed); - match result { - Ok(Ok(())) => { - eprintln!( - "[tracing] OpenTelemetry initialized successfully, enabled: {}", - ENABLED.load(Ordering::Relaxed) - ); - Ok(()) - } - Ok(Err(e)) => { - eprintln!("[tracing] Failed to initialize OTLP tracer: {}", e); - ENABLED.store(false, Ordering::Relaxed); - Err(e) - } - Err(_) => { - eprintln!("[tracing] Panic during OpenTelemetry initialization"); - ENABLED.store(false, Ordering::Relaxed); - Err(anyhow::anyhow!("Panic during initialization")) - } - } + eprintln!("[tracing] OpenTelemetry initialized successfully"); + Ok(()) } -pub fn get_otel_layer() -> Result + Send + Sync + 'static>, &'static str> +/// Get the OpenTelemetry tracing layer to add to the subscriber. +/// +/// Must be called after `otel_tracing_init` with `enable=true`. +pub fn get_otel_layer() -> Result + Send + Sync + 'static>> where S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync, { if !is_otel_enabled() { - return Err("OpenTelemetry is not enabled"); + anyhow::bail!("OpenTelemetry is not enabled"); } let tracer = TRACER .get() - .ok_or("Tracer not initialized. Call otel_tracing_init first.")? + .ok_or_else(|| anyhow::anyhow!("Tracer not initialized. Call otel_tracing_init first."))? .clone(); - let custom_filter = CustomOtelFilter::new(); - let layer = tracing_opentelemetry::layer() .with_tracer(tracer) - .with_filter(custom_filter); + .with_filter(CustomOtelFilter::new()); Ok(Box::new(layer)) } +/// Returns whether OpenTelemetry tracing is enabled. +#[inline] pub fn is_otel_enabled() -> bool { ENABLED.load(Ordering::Relaxed) } +/// Flush all pending spans to the OTLP collector. +/// +/// This is useful before shutdown or when you need to ensure spans are exported. pub async fn flush_spans_async() -> Result<()> { if !is_otel_enabled() { return Ok(()); } - if let Some(provider) = PROVIDER.get() { - let provider = provider.clone(); + let provider = PROVIDER + .get() + .ok_or_else(|| anyhow::anyhow!("Provider not initialized"))? + .clone(); - spawn_blocking(move || provider.force_flush()) - .await - .map_err(|e| { - anyhow::anyhow!("Failed to join blocking task for flushing spans: {}", e) - })?; + spawn_blocking(move || provider.force_flush()) + .await + .map_err(|e| anyhow::anyhow!("Failed to flush spans: {}", e))?; - Ok(()) - } else { - Err(anyhow::anyhow!("Provider not initialized")) - } + Ok(()) } +/// Shutdown OpenTelemetry tracing and flush remaining spans. pub fn shutdown_otel() { if ENABLED.load(Ordering::Relaxed) { global::shutdown_tracer_provider(); @@ -203,16 +213,20 @@ pub fn shutdown_otel() { } } -pub fn inject_trace_context_http(headers: &mut HeaderMap) -> Result<()> { +/// Inject W3C trace context headers into an HTTP request. +/// +/// This propagates the current span context to downstream services. +/// Does nothing if OTEL is not enabled. +pub fn inject_trace_context_http(headers: &mut HeaderMap) { if !is_otel_enabled() { - return Err(anyhow::anyhow!("OTEL not enabled")); + return; } let context = tracing::Span::current().context(); struct HeaderInjector<'a>(&'a mut HeaderMap); - impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> { + impl opentelemetry::propagation::Injector for HeaderInjector<'_> { fn set(&mut self, key: &str, value: String) { if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) { if let Ok(header_value) = HeaderValue::from_str(&value) { @@ -225,5 +239,4 @@ pub fn inject_trace_context_http(headers: &mut HeaderMap) -> Result<()> { global::get_text_map_propagator(|propagator| { propagator.inject_context(&context, &mut HeaderInjector(headers)); }); - Ok(()) } diff --git a/sgl-model-gateway/src/routers/http/pd_router.rs b/sgl-model-gateway/src/routers/http/pd_router.rs index 94b620babdb9..68926cf7afbf 100644 --- a/sgl-model-gateway/src/routers/http/pd_router.rs +++ b/sgl-model-gateway/src/routers/http/pd_router.rs @@ -396,10 +396,8 @@ impl PDRouter { }; let mut headers_with_trace = headers.cloned().unwrap_or_default(); - let headers = match inject_trace_context_http(&mut headers_with_trace) { - Ok(()) => Some(&headers_with_trace), - Err(_) => headers, - }; + inject_trace_context_http(&mut headers_with_trace); + let headers = Some(&headers_with_trace); // Build both requests let prefill_request = self.build_post_with_headers( diff --git a/sgl-model-gateway/src/routers/http/router.rs b/sgl-model-gateway/src/routers/http/router.rs index a5614fa5acc2..f98905220128 100644 --- a/sgl-model-gateway/src/routers/http/router.rs +++ b/sgl-model-gateway/src/routers/http/router.rs @@ -221,10 +221,8 @@ impl Router { } .emit(); let mut headers_with_trace = headers.cloned().unwrap_or_default(); - let headers = match inject_trace_context_http(&mut headers_with_trace) { - Ok(()) => Some(&headers_with_trace), - Err(_) => headers, - }; + inject_trace_context_http(&mut headers_with_trace); + let headers = Some(&headers_with_trace); let response = self .send_typed_request(