Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/app-server-test-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::config::Config;
use codex_otel::OtelProvider;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::otel_provider::OtelProvider;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::W3cTraceContext;
use codex_utils_cli::CliConfigOverrides;
Expand Down
5 changes: 4 additions & 1 deletion codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ pub async fn run_main_with_transport(
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());

let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(feedback_layer)
Expand Down Expand Up @@ -826,6 +825,10 @@ pub async fn run_main_with_transport(
let _ = handle.await;
}

if let Some(otel) = otel {
otel.shutdown();
}

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/otel_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::config::types::OtelExporterKind as Kind;
use crate::config::types::OtelHttpProtocol as Protocol;
use crate::default_client::originator;
use crate::features::Feature;
use codex_otel::OtelProvider;
use codex_otel::config::OtelExporter;
use codex_otel::config::OtelHttpProtocol;
use codex_otel::config::OtelSettings;
use codex_otel::config::OtelTlsConfig as OtelTlsSettings;
use codex_otel::otel_provider::OtelProvider;
use std::error::Error;

/// Build an OpenTelemetry provider from the app Config.
Expand Down
1 change: 1 addition & 0 deletions codex-rs/otel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ opentelemetry-otlp = { workspace = true, features = [
]}
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true, features = [
"experimental_trace_batch_span_processor_with_async_runtime",
"experimental_metrics_custom_reader",
"logs",
"metrics",
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/otel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

`codex-otel` is the OpenTelemetry integration crate for Codex. It provides:

- Provider wiring for log/trace/metric exporters (`codex_otel::OtelProvider`,
`codex_otel::provider`, and the compatibility shim `codex_otel::otel_provider`).
- Provider wiring for log/trace/metric exporters (`codex_otel::OtelProvider`
and `codex_otel::provider`).
- Session-scoped business event emission via `codex_otel::SessionTelemetry`.
- Low-level metrics APIs via `codex_otel::metrics`.
- Trace-context helpers via `codex_otel::trace_context` and crate-root re-exports.
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/otel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod config;
mod events;
pub mod metrics;
pub mod otel_provider;
pub mod provider;
pub mod trace_context;

Expand All @@ -24,6 +23,7 @@ pub use crate::trace_context::current_span_trace_id;
pub use crate::trace_context::current_span_w3c_trace_context;
pub use crate::trace_context::set_parent_from_context;
pub use crate::trace_context::set_parent_from_w3c_trace_context;
pub use crate::trace_context::span_w3c_trace_context;
pub use crate::trace_context::traceparent_context_from_env;
pub use codex_utils_string::sanitize_metric_tag_value;

Expand Down
4 changes: 0 additions & 4 deletions codex-rs/otel/src/otel_provider.rs

This file was deleted.

111 changes: 110 additions & 1 deletion codex-rs/otel/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,29 @@ pub(crate) fn build_http_client(
tls: &OtelTlsConfig,
timeout_var: &str,
) -> Result<reqwest::blocking::Client, Box<dyn Error>> {
if tokio::runtime::Handle::try_current().is_ok() {
if current_tokio_runtime_is_multi_thread() {
tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var))
} else if tokio::runtime::Handle::try_current().is_ok() {
let tls = tls.clone();
let timeout_var = timeout_var.to_string();
std::thread::spawn(move || {
build_http_client_inner(&tls, &timeout_var).map_err(|err| err.to_string())
})
.join()
.map_err(|_| config_error("failed to join OTLP blocking HTTP client builder thread"))?
.map_err(config_error)
} else {
build_http_client_inner(tls, timeout_var)
}
}

pub(crate) fn current_tokio_runtime_is_multi_thread() -> bool {
match tokio::runtime::Handle::try_current() {
Ok(handle) => handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread,
Err(_) => false,
}
}

fn build_http_client_inner(
tls: &OtelTlsConfig,
timeout_var: &str,
Expand Down Expand Up @@ -129,6 +145,54 @@ fn build_http_client_inner(
.map_err(|error| Box::new(error) as Box<dyn Error>)
}

pub(crate) fn build_async_http_client(
tls: Option<&OtelTlsConfig>,
timeout_var: &str,
) -> Result<reqwest::Client, Box<dyn Error>> {
let mut builder = reqwest::Client::builder().timeout(resolve_otlp_timeout(timeout_var));

if let Some(tls) = tls {
if let Some(path) = tls.ca_certificate.as_ref() {
let (pem, location) = read_bytes(path)?;
let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| {
config_error(format!(
"failed to parse certificate {}: {error}",
location.display()
))
})?;
builder = builder
.tls_built_in_root_certs(false)
.add_root_certificate(certificate);
}

match (&tls.client_certificate, &tls.client_private_key) {
(Some(cert_path), Some(key_path)) => {
let (mut cert_pem, cert_location) = read_bytes(cert_path)?;
let (key_pem, key_location) = read_bytes(key_path)?;
cert_pem.extend_from_slice(key_pem.as_slice());
let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| {
config_error(format!(
"failed to parse client identity using {} and {}: {error}",
cert_location.display(),
key_location.display()
))
})?;
builder = builder.identity(identity).https_only(true);
}
(Some(_), None) | (None, Some(_)) => {
return Err(config_error(
"client_certificate and client_private_key must both be provided for mTLS",
));
}
(None, None) => {}
}
}

builder
.build()
.map_err(|error| Box::new(error) as Box<dyn Error>)
}

pub(crate) fn resolve_otlp_timeout(signal_var: &str) -> Duration {
if let Some(timeout) = read_timeout_env(signal_var) {
return timeout;
Expand Down Expand Up @@ -161,3 +225,48 @@ fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec<u8>, PathBuf), Box<dyn Erro
fn config_error(message: impl Into<String>) -> Box<dyn Error> {
Box::new(io::Error::new(ErrorKind::InvalidData, message.into()))
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tokio::runtime::Builder;

#[test]
fn current_tokio_runtime_is_multi_thread_detects_runtime_flavor() {
assert!(!current_tokio_runtime_is_multi_thread());

let current_thread_runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("current-thread runtime");
assert_eq!(
current_thread_runtime.block_on(async { current_tokio_runtime_is_multi_thread() }),
false
);

let multi_thread_runtime = Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("multi-thread runtime");
assert_eq!(
multi_thread_runtime.block_on(async { current_tokio_runtime_is_multi_thread() }),
true
);
}

#[test]
fn build_http_client_works_in_current_thread_runtime() {
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("current-thread runtime");

let client = runtime.block_on(async {
build_http_client(&OtelTlsConfig::default(), OTEL_EXPORTER_OTLP_TIMEOUT)
});

assert!(client.is_ok());
}
}
44 changes: 38 additions & 6 deletions codex-rs/otel/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::BatchSpanProcessor;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::trace::Tracer;
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor as TokioBatchSpanProcessor;
use opentelemetry_semantic_conventions as semconv;
use std::error::Error;
use tracing::debug;
Expand All @@ -50,15 +52,16 @@ pub struct OtelProvider {

impl OtelProvider {
pub fn shutdown(&self) {
if let Some(logger) = &self.logger {
let _ = logger.shutdown();
}
if let Some(tracer_provider) = &self.tracer_provider {
let _ = tracer_provider.force_flush();
let _ = tracer_provider.shutdown();
}
if let Some(metrics) = &self.metrics {
let _ = metrics.shutdown();
}
if let Some(logger) = &self.logger {
let _ = logger.shutdown();
}
}

pub fn from(settings: &OtelSettings) -> Result<Option<Self>, Box<dyn Error>> {
Expand Down Expand Up @@ -159,15 +162,16 @@ impl OtelProvider {

impl Drop for OtelProvider {
fn drop(&mut self) {
if let Some(logger) = &self.logger {
let _ = logger.shutdown();
}
if let Some(tracer_provider) = &self.tracer_provider {
let _ = tracer_provider.force_flush();
let _ = tracer_provider.shutdown();
}
if let Some(metrics) = &self.metrics {
let _ = metrics.shutdown();
}
if let Some(logger) = &self.logger {
let _ = logger.shutdown();
}
}
}

Expand Down Expand Up @@ -321,6 +325,34 @@ fn build_tracer_provider(
} => {
debug!("Using OTLP Http exporter for traces: {endpoint}");

if crate::otlp::current_tokio_runtime_is_multi_thread() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is it not true?

Copy link
Copy Markdown
Collaborator Author

@owenlin0 owenlin0 Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our main binaries are multi-threaded, but app-server-test-client and unit tests marked with just #[tokio::test] are single-threaded

let protocol = match protocol {
OtelHttpProtocol::Binary => Protocol::HttpBinary,
OtelHttpProtocol::Json => Protocol::HttpJson,
};

let mut exporter_builder = SpanExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_protocol(protocol)
.with_headers(headers);

let client = crate::otlp::build_async_http_client(
tls.as_ref(),
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
)?;
exporter_builder = exporter_builder.with_http_client(client);

let processor =
TokioBatchSpanProcessor::builder(exporter_builder.build()?, runtime::Tokio)
.build();

return Ok(SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_span_processor(processor)
.build());
}

let protocol = match protocol {
OtelHttpProtocol::Binary => Protocol::HttpBinary,
OtelHttpProtocol::Json => Protocol::HttpJson,
Expand Down
6 changes: 5 additions & 1 deletion codex-rs/otel/src/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ const TRACESTATE_ENV_VAR: &str = "TRACESTATE";
static TRACEPARENT_CONTEXT: OnceLock<Option<Context>> = OnceLock::new();

pub fn current_span_w3c_trace_context() -> Option<W3cTraceContext> {
let context = Span::current().context();
span_w3c_trace_context(&Span::current())
}

pub fn span_w3c_trace_context(span: &Span) -> Option<W3cTraceContext> {
let context = span.context();
if !context.span().span_context().is_valid() {
return None;
}
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/otel/tests/suite/otel_export_routing_policy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use codex_otel::OtelProvider;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_otel::otel_provider::OtelProvider;
use opentelemetry::KeyValue;
use opentelemetry::logs::AnyValue;
use opentelemetry::trace::TracerProvider as _;
Expand Down
Loading
Loading