feat(telemetry): emit gen_ai.usage attrs on streaming spans#272
Conversation
Move telemetry wrap site from the upstream SSE iterator to the constructed Stream. _TracedStream delegates the public Stream surface (__aiter__/__anext__/aclose/output/__aenter__/__aexit__/__iter__/ __enter__/__exit__/__repr__) to the inner stream and owns span lifecycle so output_attributes can read Stream.output after _parse_output runs on natural exhaustion — same call _predict already makes for non-streaming. Streaming spans now carry gen_ai.usage.input_tokens and gen_ai.usage.output_tokens, matching the non-streaming path. Zero changes to streaming.py. Add opentelemetry-sdk as a dev dep for InMemorySpanExporter-based unit tests.
Code reviewBug: sync iteration bypasses span lifecycle (telemetry.py lines 212-228) _TracedStream.iter returns iter(self._inner) directly, and exit delegates to self._inner.exit. This means OTel span lifecycle is skipped entirely for synchronous iteration. Root cause: Stream.iter (streaming.py:303-322) is its own generator driving Stream.anext via an anyio portal. It never calls _TracedStream.anext, so _finalize() is never reached. The inner Stream.exit is a no-op. Consequences:
Regression: Pre-PR, trace_stream wrapped the SSE iterator passed into the Stream constructor, so Stream.anext (used by the sync portal) activated the span lifecycle on both paths. The new approach only wires tracing on async paths. Sync iteration is documented in the Stream docstring and exercised by 8+ integration tests (tests/integration_tests//test_stream_.py). Ref: celeste-python/src/celeste/telemetry.py Lines 211 to 229 in 49b18a3 Suggested fix: iter should be a generator wrapping the inner sync iterator, recording TTFC, calling _finalize() on exhaustion/GeneratorExit/exceptions — mirroring anext. exit should also call _finalize() so the with-stream pattern ends the span. |
…ntent events (#273) * feat(telemetry): widen usage attrs, add metrics histograms, opt-in content events V2 telemetry expansion bundling three improvements on top of #270/#272: 1. Span attributes: output_attributes() now emits gen_ai.usage.total_tokens, reasoning_tokens, and cached_input_tokens when the typed Usage carries them. Off-spec modality fields fall through to celeste.usage.<field>. Adds the missing cached_tokens field to TextUsage so the data already produced by anthropic / openai / cohere / deepseek / openresponses providers actually reaches the span (previously dropped silently). Extends Google Gemini and chatcompletions provider mixins to surface cached-prompt tokens too. 2. Metrics: registers two GenAI semconv histograms — gen_ai.client.token.usage (one record per token category via gen_ai.token.type dimension) and gen_ai.client.operation.duration (with error.type on failures). Sampling- resilient token-rate and latency dashboards no longer require span scanning. Wired into _predict and _TracedStream._finalize. 3. Content events: opt-in via OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT env flag, default off for PII safety. When enabled, spans carry gen_ai.input.messages and gen_ai.output.messages events with the semconv {role, parts: [...]} shape. Text + reasoning + tool calls inline; multimodal artifacts (image/audio/video/document) by URL reference, never inline bytes. 19 new tests across unit_tests/test_telemetry_metrics.py and test_telemetry_content_events.py, plus extended attribute coverage in test_telemetry_streaming.py. 630 unit tests pass. Closes #271. * refactor(telemetry): UsageField enum keys, public event helpers, shared finalize Three small cleanups on top of the V2 telemetry expansion, no behavior change: 1. `_GEN_AI_USAGE_FIELDS` and `_GEN_AI_TOKEN_TYPES` now key on `UsageField` enum members instead of bare strings. Catches typos at import; aligns with the vocabulary providers already populate. 2. New public `add_input_event(span, inputs)` and `add_output_event(span, output)` helpers replace the cross-module calls to private `_input_messages_event` / `_output_messages_event`. Underscore-prefixed names stay as the dict-builder primitives; the public helpers handle `if event is not None: span.add_event(...)`. 3. New `record_output(span, output, attributes, duration, error=None)` extracts the four-step finalize sequence (set output_attributes + emit content event + record token usage + record duration) shared by `_predict` and `_TracedStream._finalize`. Also: drop dead `hasattr(type(usage), "model_fields")` guards (Usage is always Pydantic BaseModel by io.py contract); skip `bool` in the numeric usage iter (bool is a subclass of int — paranoid guard against ever populating a Usage boolean by accident). 630 tests still pass. * fix(telemetry): sync iteration drives the wrapper's __anext__, not the inner Stream's The previous `_TracedStream.__iter__` delegated to `iter(self._inner)`, which makes Python iterate the inner Stream directly via its blocking-portal `__iter__`. That bypasses the wrapper's `__anext__` — so for sync streaming consumers, the span never finalized, TTFC was never recorded, and metrics were never emitted. Fix: `_TracedStream.__iter__` now spins its own portal and drives `self.__anext__` (the wrapper's), mirroring `Stream.__iter__`. On exhaustion, exception, or generator-close (consumer break / GC), the finally block calls `self.aclose()` which finalizes the span via the existing async path. Caught by smoke-testing `celeste.text.sync.stream.generate(...)` against a real Groq llama-3.1-8b-instant call: previously the span was missing from finished spans entirely; now it appears alongside the async-streaming span with the same attribute set. * refactor: dedupe telemetry test fixtures, fix Windows time.monotonic flake Three review agents flagged the +800 LOC PR for: triplicated test stream classes, redundant fixtures, parametrizable repetition, and one Windows-fragile timing assertion. Applied: - New `tests/unit_tests/_telemetry_helpers.py` holds the canonical `TelemetryUsage` / `TelemetryOutput` / `TelemetryStream` / `async_iter` used by all three telemetry test files. - New `tests/unit_tests/conftest.py` exposes the shared `exporter` fixture + `start_test_span` helper. - `test_telemetry_metrics.py` parametrizes `record_token_usage` (3 cases) and `record_operation_duration` (success / failure). Drops the redundant `test_input_and_output_recorded_separately` (subsumed by the parametrized `all_token_types` case). - `test_telemetry_metrics.py:191` now asserts `>= 0` on duration sum — Windows `time.monotonic()` resolution can return 0 for sub-millisecond in-memory streams; the previous `> 0` failed CI on Windows runners. - `telemetry.py`: drop `add_output_event` (only ever called by `record_output` — inlined). Trim verbose comments above `_GEN_AI_USAGE_FIELDS`, `_GEN_AI_TOKEN_TYPES`, `_CAPTURE_CONTENT`, and the two messages-event helpers down to one line each. Net: -210 modified LOC, +72 new helper LOC. PR diff drops from +861 to ~+725. Coverage unchanged (630 tests pass). * refactor(telemetry): extract gen_ai_span context manager — drop try/except + import time from client User pushed back on `_predict` wrapping its entire body in try/except just to record `gen_ai.client.operation.duration` on the error path. The right home for that is a context manager inside `telemetry.py`. `gen_ai_span(model=, provider=, protocol=, modality=)` opens the span via `tracer.start_as_current_span`, captures the start time on enter, and in `finally` records the operation duration with `error.type` populated when the body raised. It yields `(span, request_attrs)` so `_predict` can call `add_input_event` and `record_output` against them. Result: - `_predict` is straight-line code: no try/except, no manual time math. - `client.py` no longer imports `time`. - `record_output` no longer takes `duration_seconds` / `error` params — duration is the span's lifecycle concern, not the output recorder's. - Streaming side (`_TracedStream._finalize`) updates symmetrically: records duration directly, then calls the simplified `record_output`. Real-call validated against gemini-3.1-flash-lite-preview text non-streaming + async streaming: same span attributes, same metric histograms, same content events as before.
Summary
Streaming spans now carry
gen_ai.usage.input_tokensandgen_ai.usage.output_tokens— matching the non-streaming path. The telemetry wrap site moves from the upstream SSE iterator to the constructedStreamso thatoutput_attributes(stream.output)can be called after_parse_outputpopulates the typedOutput.Why
telemetry.trace_streampreviously wrapped the SSE iterator (belowStream). When upstream SSE exhausted, the wrapper exited andwith use_span(span, end_on_exit=True)ended the span — beforeStream.__anext__ran_parse_outputto build_output.usage. Setting attributes on an already-ended span is a silent no-op, so streaming spans only carriedgen_ai.response.time_to_first_chunk.Stream._output.usageexists synchronously after natural exhaustion. The fix is positional: wrap theStreaminstead of the SSE iterator. Sameoutput_attributes(output)function_predictalready uses gets called, on the same typedOutputshape.Changes
src/celeste/telemetry.py: replace the iterator-shapedtrace_streamasync generator with a_TracedStreamwrapper class that delegates the publicStreamsurface (__aiter__/__anext__/aclose/output/__aenter__/__aexit__/__iter__/__enter__/__exit__/__repr__) and owns span lifecycle. Add_NoOpStatus/_NoOpStatusCodefor the existing OTel-missing fallback path.src/celeste/client.py: in_stream, construct theStreamfrom the SSE iterator and wrap theStream(not the SSE iterator) withtelemetry.trace_stream.tests/unit_tests/test_telemetry_streaming.py: 7 new tests covering natural exhaustion (+async with), earlyaclose(), exception path,outputproperty pre/post exhaustion, idempotent finalize.pyproject.toml: addopentelemetry-sdkto dev deps forInMemorySpanExporter-based unit tests.Zero changes to
src/celeste/streaming.py.Lifecycle
gen_ai.response.time_to_first_chunk.output_attributes(stream.output)→set_attributes→span.end().aclose(): end span without ERROR; emit usage only if_outputwas built before abandonment.record_exception+ ERROR status + end span + re-raise.asyncio.CancelledError: end span without ERROR; re-raise.use_span(span, end_on_exit=False)is held only across the innerawait __anext__()so HTTP child spans (HTTPX auto-instrumentation) nest under the GenAI span.Behavioral note
_streamreturns the wrapper, not aStreamsubclass.isinstance(x, Stream)checks against the return value will fail. Verified no such checks exist in this repo. Consumers usingasync for/.output/aclose()/async withcontinue to work via delegation.Out of scope
Widening
output_attributesto emittotal_tokens,reasoning_tokens, modality-specific usage fields, cached-input tokens, or GenAI metrics — tracked separately in #271.Test plan
uv run ruff check src testsuv run ruff format --check src testsuv run mypy src/celesteanduv run mypy tests/uv run pytest tests/unit_tests/ -m "not integration" -q— 611 passed