Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/crewai/src/crewai/events/types/llm_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
model: str | None = None
call_id: str

def __init__(self, **data: Any) -> None:
if data.get("from_task"):
Expand Down
386 changes: 208 additions & 178 deletions lib/crewai/src/crewai/llm.py

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions lib/crewai/src/crewai/llms/base_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final
import uuid

from pydantic import BaseModel

Expand Down Expand Up @@ -50,6 +54,32 @@
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)

_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)


@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)


def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id


class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
Expand Down Expand Up @@ -351,6 +381,7 @@ def _emit_call_started_event(
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

Expand All @@ -374,6 +405,7 @@ def _emit_call_completed_event(
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

Expand All @@ -394,6 +426,7 @@ def _emit_call_failed_event(
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

Expand Down Expand Up @@ -428,6 +461,7 @@ def _emit_stream_chunk_event(
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
),
)

Expand Down
150 changes: 77 additions & 73 deletions lib/crewai/src/crewai/llms/providers/anthropic/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import BaseModel

from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
Expand Down Expand Up @@ -266,57 +266,60 @@ def call(
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)

# Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
# Format messages for Anthropic
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)

if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")

# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)

effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format

# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)

# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)

return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)

except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise

async def acall(
self,
Expand All @@ -342,50 +345,51 @@ async def acall(
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)

formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)

completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)

effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format

if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)

if self.stream:
return await self._ahandle_streaming_completion(
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)

return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)

except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise

def _prepare_completion_params(
self,
Expand Down
Loading
Loading