Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/python/src/opik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .decorator.context_manager.trace_context_manager import start_as_current_trace
from .simulation import SimulatedUser, run_simulation
from .api_objects.local_recording import record_traces_locally
from .context_storage import project_context
from .opik_context import update_current_trace, update_current_span


Expand Down Expand Up @@ -86,6 +87,7 @@
"agent_config_context",
"update_current_trace",
"update_current_span",
"project_context",
]

sagemaker_auth.setup_aws_sagemaker_session_hook()
Expand Down
113 changes: 62 additions & 51 deletions sdks/python/src/opik/api_objects/opik_client.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions sdks/python/src/opik/api_objects/threads/threads_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def search_threads(
entity_type="threads",
)

project_name = project_name or self._opik_client.project_name
project_name = self._opik_client._resolve_project_name(project_name)

threads = rest_stream_parser.read_and_parse_full_stream(
read_source=lambda current_batch_size,
Expand Down Expand Up @@ -133,7 +133,7 @@ def log_threads_feedback_scores(
scores. If not provided, the project name configured in the Opik client will be used.
This parameter is used as a fallback if `project_name` is not specified in the score dictionary.
"""
project_name = project_name or self._opik_client.project_name
project_name = self._opik_client._resolve_project_name(project_name)

score_messages = helpers.parse_feedback_score_messages(
scores=scores,
Expand Down
120 changes: 120 additions & 0 deletions sdks/python/src/opik/context_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import contextvars
import contextlib
import logging

from typing import List, Optional, Generator, Tuple
from opik.api_objects import span, trace

LOGGER = logging.getLogger(__name__)


class OpikContextStorage:
"""
Expand Down Expand Up @@ -43,6 +46,12 @@ def __init__(self) -> None:
self._spans_data_stack_context: contextvars.ContextVar[
Tuple[span.SpanData, ...]
] = contextvars.ContextVar("spans_data_stack", default=default_span_stack)
self._current_project_name_context: contextvars.ContextVar[Optional[str]] = (
contextvars.ContextVar("current_project_name", default=None)
)
self._current_project_name_owner_context: contextvars.ContextVar[
Optional[str]
] = contextvars.ContextVar("current_project_name_owner", default=None)

def _has_span_id(self, span_id: str) -> bool:
return any(span.id == span_id for span in self._spans_data_stack_context.get())
Expand Down Expand Up @@ -150,11 +159,60 @@ def pop_trace_data(
def set_trace_data(self, trace: Optional[trace.TraceData]) -> None:
self._current_trace_data_context.set(trace)

def get_context_project_name(self) -> Optional[str]:
return self._current_project_name_context.get()

def try_acquire_context_project_name(
self, project_name: str, owner_id: str
) -> bool:
"""Try to set the project name for the current context.

The first caller becomes the owner. Subsequent calls with a
different ``owner_id`` are ignored (with a warning if the
requested project name differs).

Returns ``True`` if this call became the owner, ``False`` otherwise.
"""
current_owner = self._current_project_name_owner_context.get()
if current_owner is not None:
current_project = self._current_project_name_context.get()
if current_project != project_name:
LOGGER.warning(
"Attempted to set project name to %r, but it is already "
"set to %r by the enclosing trace/span. The outer "
"project name will be used.",
project_name,
current_project,
)
return False

self._current_project_name_context.set(project_name)
self._current_project_name_owner_context.set(owner_id)
return True

def release_context_project_name_if_owner(self, owner_id: str) -> None:
"""Release project name ownership if ``owner_id`` matches."""
if self._current_project_name_owner_context.get() == owner_id:
self._current_project_name_context.set(None)
self._current_project_name_owner_context.set(None)

def _raw_set_context_project_name(
self, project_name: Optional[str]
) -> contextvars.Token:
"""Low-level set used by ``temporary_context`` for save/restore."""
return self._current_project_name_context.set(project_name)

def _raw_reset_context_project_name(self, token: contextvars.Token) -> None:
"""Low-level reset used by ``temporary_context`` for save/restore."""
self._current_project_name_context.reset(token)

def clear_spans(self) -> None:
self._spans_data_stack_context.set(tuple())

def clear_all(self) -> None:
self._current_trace_data_context.set(None)
self._current_project_name_context.set(None)
self._current_project_name_owner_context.set(None)
self.clear_spans()


Expand All @@ -167,17 +225,71 @@ def clear_all(self) -> None:
get_trace_data = _context_storage.get_trace_data
pop_trace_data = _context_storage.pop_trace_data
set_trace_data = _context_storage.set_trace_data
get_context_project_name = _context_storage.get_context_project_name
try_acquire_context_project_name = _context_storage.try_acquire_context_project_name
release_context_project_name_if_owner = (
_context_storage.release_context_project_name_if_owner
)
clear_all = _context_storage.clear_all
span_data_stack_size = _context_storage.span_data_stack_size
trim_span_data_stack_to_certain_span = (
_context_storage.trim_span_data_stack_to_certain_span
)


def resolve_project_name(
default: Optional[str],
caller: str,
) -> Optional[str]:
"""Resolve project name for an integration or callback.

If an active project context exists (set by ``@track`` or
``opik.project_context``), it takes precedence over *default*.
A warning is logged when *default* is overridden.
"""
context_project = get_context_project_name()
if context_project is not None:
Comment thread
alexkuzmik marked this conversation as resolved.
if default is not None and default != context_project:
LOGGER.warning(
'%s was initialized with project "%s", but the active '
'context uses "%s". The context project will be used.',
caller,
default,
context_project,
)
return context_project
return default


def get_current_context_instance() -> OpikContextStorage:
return _context_storage


@contextlib.contextmanager
def project_context(project_name: str) -> Generator[None, None, None]:
"""
Context manager that sets the project name for all Opik operations
(traces, spans, agent configs, etc.) within the block.

The first context to set a project name becomes the owner. Nested
``project_context`` or ``@track(project_name=...)`` calls with a
different name are ignored (a warning is logged) and the outer
project name is preserved.

Usage::

with project_context("customer-support"):
customer_support_agent(query)
"""
owner_id = f"project_context_{id(project_name)}_{id(object())}"
Comment thread
alexkuzmik marked this conversation as resolved.
acquired = try_acquire_context_project_name(project_name, owner_id)
try:
yield
finally:
if acquired:
release_context_project_name_if_owner(owner_id)


@contextlib.contextmanager
def temporary_context(
span_data: span.SpanData, trace_data: Optional[trace.TraceData]
Expand All @@ -192,9 +304,17 @@ def temporary_context(
if trace_data is not None:
set_trace_data(trace=trace_data)

project_token = None
if span_data.project_name is not None:
project_token = _context_storage._raw_set_context_project_name(
span_data.project_name
)

add_span_data(span_data)

Comment thread
alexkuzmik marked this conversation as resolved.
yield
finally:
set_trace_data(original_trace)
if project_token is not None:
_context_storage._raw_reset_context_project_name(project_token)
pop_span_data()
Comment thread
alexkuzmik marked this conversation as resolved.
28 changes: 27 additions & 1 deletion sdks/python/src/opik/decorator/base_track_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ def pop_end_candidates() -> Tuple[span.SpanData, Optional[trace.TraceData]]:
"When pop_end_candidates is called, top span data must not be None. Otherwise something is wrong."
)

context_storage.release_context_project_name_if_owner(span_data_to_end.id)

trace_data_to_end = pop_end_candidate_trace_data()
return span_data_to_end, trace_data_to_end

Expand All @@ -680,13 +682,35 @@ def pop_end_candidate_trace_data() -> Optional[trace.TraceData]:
and possible_trace_data_to_end is not None
and possible_trace_data_to_end.id in TRACES_CREATED_BY_DECORATOR
):
trace_data_to_end = context_storage.pop_trace_data()
trace_data_to_end = context_storage.pop_trace_data(
ensure_id=possible_trace_data_to_end.id
)
TRACES_CREATED_BY_DECORATOR.discard(possible_trace_data_to_end.id)
context_storage.release_context_project_name_if_owner(
possible_trace_data_to_end.id
)
return trace_data_to_end

return None


def _try_acquire_project_name(
span_creation_result: span_creation_handler.SpanCreationResult,
) -> None:
if span_creation_result.should_process_span_data:
span_data = span_creation_result.span_data
if span_data.project_name is not None:
context_storage.try_acquire_context_project_name(
span_data.project_name, span_data.id
)
elif span_creation_result.trace_data is not None:
trace_data = span_creation_result.trace_data
if trace_data.project_name is not None:
context_storage.try_acquire_context_project_name(
trace_data.project_name, trace_data.id
)


def add_start_candidates(
start_span_parameters: arguments_helpers.StartSpanParameters,
opik_distributed_trace_headers: Optional[DistributedTraceHeadersDict],
Expand Down Expand Up @@ -750,6 +774,8 @@ def add_start_candidates(
tracing_active=tracing_active,
)

_try_acquire_project_name(span_creation_result)

return span_creation_result


Expand Down
17 changes: 17 additions & 0 deletions sdks/python/src/opik/decorator/span_creation_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import (
Optional,
NamedTuple,
Expand All @@ -10,6 +11,8 @@

from . import arguments_helpers

LOGGER = logging.getLogger(__name__)


class SpanCreationResult(NamedTuple):
"""
Expand Down Expand Up @@ -47,6 +50,20 @@ def create_span_respecting_context(
if opik_context_storage is None:
opik_context_storage = context_storage.get_current_context_instance()

context_project = context_storage.get_context_project_name()
if context_project is not None:
if (
start_span_arguments.project_name is not None
and start_span_arguments.project_name != context_project
):
LOGGER.warning(
'Nested @track requested project "%s", but the enclosing '
'trace already uses "%s". The outer project name will be used.',
start_span_arguments.project_name,
context_project,
)
start_span_arguments.project_name = context_project

if distributed_trace_headers:
span_data = arguments_helpers.create_span_data(
start_span_arguments=start_span_arguments,
Expand Down
12 changes: 8 additions & 4 deletions sdks/python/src/opik/integrations/dspy/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def _attach_span_to_existing_span(
) -> None:
project_name = helpers.resolve_child_span_project_name(
parent_project_name=current_span_data.project_name,
child_project_name=self._project_name,
child_project_name=context_storage.resolve_project_name(
self._project_name, "OpikCallback"
),
)
span_type = get_span_type(instance)

Expand All @@ -129,7 +131,7 @@ def _attach_span_to_existing_trace(
) -> None:
project_name = helpers.resolve_child_span_project_name(
current_trace_data.project_name,
self._project_name,
context_storage.resolve_project_name(self._project_name, "OpikCallback"),
)
span_type = get_span_type(instance)

Expand Down Expand Up @@ -164,7 +166,9 @@ def _start_trace(
name=instance.__class__.__name__,
input=inputs,
metadata=self._get_opik_metadata(instance),
project_name=self._project_name,
project_name=context_storage.resolve_project_name(
self._project_name, "OpikCallback"
),
)
self._map_call_id_to_trace_data[call_id] = trace_data
self._set_current_context_data(trace_data)
Expand Down Expand Up @@ -263,7 +267,7 @@ def _collect_common_span_data(

project_name = helpers.resolve_child_span_project_name(
current_callback_context_data.project_name,
self._project_name,
context_storage.resolve_project_name(self._project_name, "OpikCallback"),
)

if isinstance(current_callback_context_data, span.SpanData):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from typing_extensions import override

from opik import context_storage
from opik.api_objects import span
from opik.decorator import arguments_helpers, base_track_decorator
from opik.decorator import inspect_helpers
Expand Down Expand Up @@ -73,7 +74,9 @@ def _end_span_inputs_preprocessor(
if output is not None and output.done and output.response:
video_save_decorator.patch_videos_save(
output,
project_name=self._project_name,
project_name=context_storage.resolve_project_name(
self._project_name, "OperationsGetTrackDecorator"
),
tags=current_span_data.tags,
metadata=current_span_data.metadata,
upload_video=self._upload_videos,
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/src/opik/integrations/haystack/opik_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from haystack import tracing

from opik import tracing_runtime_config, url_helpers
from opik import context_storage, tracing_runtime_config, url_helpers
from opik.decorator import arguments_helpers, span_creation_handler
from opik.api_objects import opik_client
from opik.api_objects import span as opik_span
Expand Down Expand Up @@ -88,7 +88,9 @@ def _create_span_or_trace(
name=final_name,
type="general",
metadata=metadata,
project_name=self._project_name,
project_name=context_storage.resolve_project_name(
self._project_name, "OpikTracer"
),
)

result = span_creation_handler.create_span_respecting_context(
Expand Down
Loading
Loading