Skip to content

feat(telemetry): emit gen_ai.usage attrs on streaming spans#272

Merged
Kamilbenkirane merged 1 commit into
mainfrom
feat/trace-stream-output-attributes
May 5, 2026
Merged

feat(telemetry): emit gen_ai.usage attrs on streaming spans#272
Kamilbenkirane merged 1 commit into
mainfrom
feat/trace-stream-output-attributes

Conversation

@Kamilbenkirane
Copy link
Copy Markdown
Member

Summary

Streaming spans now carry gen_ai.usage.input_tokens and gen_ai.usage.output_tokens — matching the non-streaming path. The telemetry wrap site moves from the upstream SSE iterator to the constructed Stream so that output_attributes(stream.output) can be called after _parse_output populates the typed Output.

Why

telemetry.trace_stream previously wrapped the SSE iterator (below Stream). When upstream SSE exhausted, the wrapper exited and with use_span(span, end_on_exit=True) ended the span — before Stream.__anext__ ran _parse_output to build _output.usage. Setting attributes on an already-ended span is a silent no-op, so streaming spans only carried gen_ai.response.time_to_first_chunk.

Stream._output.usage exists synchronously after natural exhaustion. The fix is positional: wrap the Stream instead of the SSE iterator. Same output_attributes(output) function _predict already uses gets called, on the same typed Output shape.

Changes

  • src/celeste/telemetry.py: replace the iterator-shaped trace_stream async generator with a _TracedStream wrapper class that delegates the public Stream surface (__aiter__/__anext__/aclose/output/__aenter__/__aexit__/__iter__/__enter__/__exit__/__repr__) and owns span lifecycle. Add _NoOpStatus / _NoOpStatusCode for the existing OTel-missing fallback path.
  • src/celeste/client.py: in _stream, construct the Stream from the SSE iterator and wrap the Stream (not the SSE iterator) with telemetry.trace_stream.
  • tests/unit_tests/test_telemetry_streaming.py: 7 new tests covering natural exhaustion (+ async with), early aclose(), exception path, output property pre/post exhaustion, idempotent finalize.
  • pyproject.toml: add opentelemetry-sdk to dev deps for InMemorySpanExporter-based unit tests.

Zero changes to src/celeste/streaming.py.

Lifecycle

  • First chunk: emit gen_ai.response.time_to_first_chunk.
  • Natural exhaustion: output_attributes(stream.output)set_attributesspan.end().
  • Early aclose(): end span without ERROR; emit usage only if _output was built before abandonment.
  • Exception during iteration: record_exception + ERROR status + end span + re-raise.
  • asyncio.CancelledError: end span without ERROR; re-raise.

use_span(span, end_on_exit=False) is held only across the inner await __anext__() so HTTP child spans (HTTPX auto-instrumentation) nest under the GenAI span.

Behavioral note

_stream returns the wrapper, not a Stream subclass. isinstance(x, Stream) checks against the return value will fail. Verified no such checks exist in this repo. Consumers using async for / .output / aclose() / async with continue to work via delegation.

Out of scope

Widening output_attributes to emit total_tokens, reasoning_tokens, modality-specific usage fields, cached-input tokens, or GenAI metrics — tracked separately in #271.

Test plan

  • uv run ruff check src tests
  • uv run ruff format --check src tests
  • uv run mypy src/celeste and uv run mypy tests/
  • uv run pytest tests/unit_tests/ -m "not integration" -q — 611 passed

Move telemetry wrap site from the upstream SSE iterator to the
constructed Stream. _TracedStream delegates the public Stream surface
(__aiter__/__anext__/aclose/output/__aenter__/__aexit__/__iter__/
__enter__/__exit__/__repr__) to the inner stream and owns span
lifecycle so output_attributes can read Stream.output after
_parse_output runs on natural exhaustion — same call _predict already
makes for non-streaming.

Streaming spans now carry gen_ai.usage.input_tokens and
gen_ai.usage.output_tokens, matching the non-streaming path. Zero
changes to streaming.py. Add opentelemetry-sdk as a dev dep for
InMemorySpanExporter-based unit tests.
@Kamilbenkirane Kamilbenkirane merged commit 466db80 into main May 5, 2026
11 checks passed
@claude
Copy link
Copy Markdown

claude Bot commented May 5, 2026

Code review

Bug: sync iteration bypasses span lifecycle (telemetry.py lines 212-228)

_TracedStream.iter returns iter(self._inner) directly, and exit delegates to self._inner.exit. This means OTel span lifecycle is skipped entirely for synchronous iteration.

Root cause: Stream.iter (streaming.py:303-322) is its own generator driving Stream.anext via an anyio portal. It never calls _TracedStream.anext, so _finalize() is never reached. The inner Stream.exit is a no-op.

Consequences:

  • Span leak: the OTel span from client._stream is never ended and never exported.
  • No gen_ai.usage.input_tokens / gen_ai.usage.output_tokens / TTFC emitted for sync streaming — defeating the purpose of this PR for sync callers.
  • record_exception / ERROR status never set on sync-path failures.

Regression: Pre-PR, trace_stream wrapped the SSE iterator passed into the Stream constructor, so Stream.anext (used by the sync portal) activated the span lifecycle on both paths. The new approach only wires tracing on async paths.

Sync iteration is documented in the Stream docstring and exercised by 8+ integration tests (tests/integration_tests//test_stream_.py).

Ref:

def __iter__(self) -> Iterator[Any]:
"""Delegate sync iteration to the inner Stream."""
return iter(self._inner)
def __enter__(self) -> "_TracedStream":
"""Enter sync context — delegate to inner Stream."""
self._inner.__enter__()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit sync context — delegate to inner Stream."""
self._inner.__exit__(exc_type, exc_val, exc_tb)

Suggested fix: iter should be a generator wrapping the inner sync iterator, recording TTFC, calling _finalize() on exhaustion/GeneratorExit/exceptions — mirroring anext. exit should also call _finalize() so the with-stream pattern ends the span.

Kamilbenkirane added a commit that referenced this pull request May 5, 2026
…ntent events (#273)

* feat(telemetry): widen usage attrs, add metrics histograms, opt-in content events

V2 telemetry expansion bundling three improvements on top of #270/#272:

1. Span attributes: output_attributes() now emits gen_ai.usage.total_tokens,
   reasoning_tokens, and cached_input_tokens when the typed Usage carries them.
   Off-spec modality fields fall through to celeste.usage.<field>. Adds the
   missing cached_tokens field to TextUsage so the data already produced by
   anthropic / openai / cohere / deepseek / openresponses providers actually
   reaches the span (previously dropped silently). Extends Google Gemini and
   chatcompletions provider mixins to surface cached-prompt tokens too.

2. Metrics: registers two GenAI semconv histograms — gen_ai.client.token.usage
   (one record per token category via gen_ai.token.type dimension) and
   gen_ai.client.operation.duration (with error.type on failures). Sampling-
   resilient token-rate and latency dashboards no longer require span scanning.
   Wired into _predict and _TracedStream._finalize.

3. Content events: opt-in via OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT
   env flag, default off for PII safety. When enabled, spans carry
   gen_ai.input.messages and gen_ai.output.messages events with the semconv
   {role, parts: [...]} shape. Text + reasoning + tool calls inline; multimodal
   artifacts (image/audio/video/document) by URL reference, never inline bytes.

19 new tests across unit_tests/test_telemetry_metrics.py and
test_telemetry_content_events.py, plus extended attribute coverage in
test_telemetry_streaming.py. 630 unit tests pass.

Closes #271.

* refactor(telemetry): UsageField enum keys, public event helpers, shared finalize

Three small cleanups on top of the V2 telemetry expansion, no behavior change:

1. `_GEN_AI_USAGE_FIELDS` and `_GEN_AI_TOKEN_TYPES` now key on `UsageField` enum
   members instead of bare strings. Catches typos at import; aligns with the
   vocabulary providers already populate.

2. New public `add_input_event(span, inputs)` and `add_output_event(span, output)`
   helpers replace the cross-module calls to private `_input_messages_event` /
   `_output_messages_event`. Underscore-prefixed names stay as the dict-builder
   primitives; the public helpers handle `if event is not None: span.add_event(...)`.

3. New `record_output(span, output, attributes, duration, error=None)` extracts
   the four-step finalize sequence (set output_attributes + emit content event +
   record token usage + record duration) shared by `_predict` and
   `_TracedStream._finalize`.

Also: drop dead `hasattr(type(usage), "model_fields")` guards (Usage is always
Pydantic BaseModel by io.py contract); skip `bool` in the numeric usage iter
(bool is a subclass of int — paranoid guard against ever populating a Usage
boolean by accident).

630 tests still pass.

* fix(telemetry): sync iteration drives the wrapper's __anext__, not the inner Stream's

The previous `_TracedStream.__iter__` delegated to `iter(self._inner)`, which
makes Python iterate the inner Stream directly via its blocking-portal
`__iter__`. That bypasses the wrapper's `__anext__` — so for sync streaming
consumers, the span never finalized, TTFC was never recorded, and metrics
were never emitted.

Fix: `_TracedStream.__iter__` now spins its own portal and drives
`self.__anext__` (the wrapper's), mirroring `Stream.__iter__`. On
exhaustion, exception, or generator-close (consumer break / GC), the
finally block calls `self.aclose()` which finalizes the span via the
existing async path.

Caught by smoke-testing `celeste.text.sync.stream.generate(...)` against
a real Groq llama-3.1-8b-instant call: previously the span was missing
from finished spans entirely; now it appears alongside the async-streaming
span with the same attribute set.

* refactor: dedupe telemetry test fixtures, fix Windows time.monotonic flake

Three review agents flagged the +800 LOC PR for: triplicated test stream
classes, redundant fixtures, parametrizable repetition, and one Windows-fragile
timing assertion. Applied:

- New `tests/unit_tests/_telemetry_helpers.py` holds the canonical
  `TelemetryUsage` / `TelemetryOutput` / `TelemetryStream` / `async_iter`
  used by all three telemetry test files.
- New `tests/unit_tests/conftest.py` exposes the shared `exporter` fixture
  + `start_test_span` helper.
- `test_telemetry_metrics.py` parametrizes `record_token_usage` (3 cases)
  and `record_operation_duration` (success / failure). Drops the redundant
  `test_input_and_output_recorded_separately` (subsumed by the parametrized
  `all_token_types` case).
- `test_telemetry_metrics.py:191` now asserts `>= 0` on duration sum —
  Windows `time.monotonic()` resolution can return 0 for sub-millisecond
  in-memory streams; the previous `> 0` failed CI on Windows runners.
- `telemetry.py`: drop `add_output_event` (only ever called by
  `record_output` — inlined). Trim verbose comments above
  `_GEN_AI_USAGE_FIELDS`, `_GEN_AI_TOKEN_TYPES`, `_CAPTURE_CONTENT`,
  and the two messages-event helpers down to one line each.

Net: -210 modified LOC, +72 new helper LOC.
PR diff drops from +861 to ~+725. Coverage unchanged (630 tests pass).

* refactor(telemetry): extract gen_ai_span context manager — drop try/except + import time from client

User pushed back on `_predict` wrapping its entire body in try/except just to
record `gen_ai.client.operation.duration` on the error path. The right home
for that is a context manager inside `telemetry.py`.

`gen_ai_span(model=, provider=, protocol=, modality=)` opens the span via
`tracer.start_as_current_span`, captures the start time on enter, and in
`finally` records the operation duration with `error.type` populated when
the body raised. It yields `(span, request_attrs)` so `_predict` can call
`add_input_event` and `record_output` against them.

Result:
- `_predict` is straight-line code: no try/except, no manual time math.
- `client.py` no longer imports `time`.
- `record_output` no longer takes `duration_seconds` / `error` params —
  duration is the span's lifecycle concern, not the output recorder's.
- Streaming side (`_TracedStream._finalize`) updates symmetrically:
  records duration directly, then calls the simplified `record_output`.

Real-call validated against gemini-3.1-flash-lite-preview text non-streaming
+ async streaming: same span attributes, same metric histograms, same
content events as before.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant