Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions lib/crewai/src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ def run_crew() -> None:
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)

result = self._post_kickoff(result)

self.usage_metrics = self.calculate_usage_metrics()

return result
Expand All @@ -764,6 +766,9 @@ def run_crew() -> None:
clear_files(self.id)
detach(token)

def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result

def kickoff_for_each(
self,
inputs: list[dict[str, Any]],
Expand Down Expand Up @@ -936,6 +941,8 @@ async def run_crew() -> None:
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)

result = self._post_kickoff(result)

self.usage_metrics = self.calculate_usage_metrics()

return result
Expand Down Expand Up @@ -1181,6 +1188,9 @@ def _create_manager_agent(self) -> None:
self.manager_agent = manager
manager.crew = self

def _get_execution_start_index(self, tasks: list[Task]) -> int | None:
return None

def _execute_tasks(
self,
tasks: list[Task],
Expand All @@ -1197,6 +1207,9 @@ def _execute_tasks(
Returns:
CrewOutput: Final output of the crew
"""
custom_start = self._get_execution_start_index(tasks)
if custom_start is not None:
start_index = custom_start

task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
Expand Down Expand Up @@ -1305,8 +1318,10 @@ def _prepare_tools(
if files:
supported_types: list[str] = []
if agent and agent.llm and agent.llm.supports_multimodal():
provider = getattr(agent.llm, "provider", None) or getattr(
agent.llm, "model", "openai"
provider = (
getattr(agent.llm, "provider", None)
or getattr(agent.llm, "model", None)
or "openai"
)
api = getattr(agent.llm, "api", None)
supported_types = get_supported_content_types(provider, api)
Expand Down
18 changes: 17 additions & 1 deletion lib/crewai/src/crewai/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
"ToolUsageFinishedEvent",
"ToolUsageStartedEvent",
"ToolValidateInputErrorEvent",
"_extension_exports",
"crewai_event_bus",
]

Expand All @@ -210,14 +211,29 @@
"LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events",
}

_extension_exports: dict[str, Any] = {}


def __getattr__(name: str) -> Any:
"""Lazy import for agent events to avoid circular imports."""
"""Lazy import for agent events and registered extensions."""
if name in _AGENT_EVENT_MAPPING:
import importlib

module_path = _AGENT_EVENT_MAPPING[name]
module = importlib.import_module(module_path)
return getattr(module, name)

if name in _extension_exports:
import importlib

value = _extension_exports[name]
if isinstance(value, str):
module_path, _, attr_name = value.rpartition(".")
if module_path:
module = importlib.import_module(module_path)
return getattr(module, attr_name)
return importlib.import_module(value)
return value

msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)
33 changes: 33 additions & 0 deletions lib/crewai/src/crewai/events/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,39 @@ def decorator(handler: Callable[P, R]) -> Callable[P, R]:

return decorator

def off(
self,
event_type: type[BaseEvent],
handler: Callable[..., Any],
) -> None:
"""Unregister an event handler for a specific event type.

Args:
event_type: The event class to stop listening for
handler: The handler function to unregister
"""
with self._rwlock.w_locked():
if event_type in self._sync_handlers:
existing_sync = self._sync_handlers[event_type]
if handler in existing_sync:
self._sync_handlers[event_type] = existing_sync - {handler}
if not self._sync_handlers[event_type]:
del self._sync_handlers[event_type]

if event_type in self._async_handlers:
existing_async = self._async_handlers[event_type]
if handler in existing_async:
self._async_handlers[event_type] = existing_async - {handler}
if not self._async_handlers[event_type]:
del self._async_handlers[event_type]

if event_type in self._handler_dependencies:
self._handler_dependencies[event_type].pop(handler, None)
if not self._handler_dependencies[event_type]:
del self._handler_dependencies[event_type]

self._execution_plan_cache.pop(event_type, None)

def _call_handlers(
self,
source: Any,
Expand Down
7 changes: 6 additions & 1 deletion lib/crewai/src/crewai/events/listeners/tracing/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Callable
from contextvars import ContextVar, Token
from datetime import datetime
import getpass
Expand Down Expand Up @@ -26,6 +27,8 @@

_tracing_enabled: ContextVar[bool | None] = ContextVar("_tracing_enabled", default=None)

_first_time_trace_hook: Callable[[], bool] | None = None


def should_enable_tracing(*, override: bool | None = None) -> bool:
"""Determine if tracing should be enabled.
Expand Down Expand Up @@ -407,10 +410,12 @@ def truncate_messages(
def should_auto_collect_first_time_traces() -> bool:
"""True if we should auto-collect traces for first-time user.


Returns:
True if first-time user AND telemetry not disabled AND tracing not explicitly enabled, False otherwise.
"""
if _first_time_trace_hook is not None:
return _first_time_trace_hook()

if _is_test_environment():
return False

Expand Down
8 changes: 4 additions & 4 deletions lib/crewai/src/crewai/events/types/tool_usage_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ToolUsageEvent(BaseEvent):
tool_name: str
tool_args: dict[str, Any] | str
tool_class: str | None = None
run_attempts: int | None = None
run_attempts: int = 0
delegations: int | None = None
agent: Any | None = None
task_name: str | None = None
Expand All @@ -26,7 +26,7 @@ class ToolUsageEvent(BaseEvent):

model_config = ConfigDict(arbitrary_types_allowed=True)

def __init__(self, **data):
def __init__(self, **data: Any) -> None:
if data.get("from_task"):
task = data["from_task"]
data["task_id"] = str(task.id)
Expand Down Expand Up @@ -96,10 +96,10 @@ class ToolExecutionErrorEvent(BaseEvent):
type: str = "tool_execution_error"
tool_name: str
tool_args: dict[str, Any]
tool_class: Callable
tool_class: Callable[..., Any]
agent: Any | None = None

def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
Expand Down
3 changes: 3 additions & 0 deletions lib/crewai/src/crewai/events/utils/console_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def _show_version_update_message_if_needed(self) -> None:
if os.getenv("CI", "").lower() in ("true", "1"):
return

if os.getenv("CREWAI_DISABLE_VERSION_CHECK", "").lower() in ("true", "1"):
return

try:
is_newer, current, latest = is_newer_version_available()
if is_newer and latest:
Expand Down
13 changes: 13 additions & 0 deletions lib/crewai/src/crewai/flow/async_feedback/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def review(self):
```
"""

from typing import Any

from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
Expand All @@ -41,4 +43,15 @@ def review(self):
"HumanFeedbackPending",
"HumanFeedbackProvider",
"PendingFeedbackContext",
"_extension_exports",
]

_extension_exports: dict[str, Any] = {}


def __getattr__(name: str) -> Any:
"""Support extensions via dynamic attribute lookup."""
if name in _extension_exports:
return _extension_exports[name]
msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)
1 change: 1 addition & 0 deletions lib/crewai/src/crewai/knowledge/source/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Knowledge source utilities."""
70 changes: 70 additions & 0 deletions lib/crewai/src/crewai/knowledge/source/utils/source_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Helper utilities for knowledge sources."""

from typing import Any, ClassVar

from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
from crewai.knowledge.source.excel_knowledge_source import ExcelKnowledgeSource
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource


class SourceHelper:
"""Helper class for creating and managing knowledge sources."""

SUPPORTED_FILE_TYPES: ClassVar[list[str]] = [
".csv",
".pdf",
".json",
".txt",
".xlsx",
".xls",
]

_FILE_TYPE_MAP: ClassVar[dict[str, type[BaseKnowledgeSource]]] = {
".csv": CSVKnowledgeSource,
".pdf": PDFKnowledgeSource,
".json": JSONKnowledgeSource,
".txt": TextFileKnowledgeSource,
".xlsx": ExcelKnowledgeSource,
".xls": ExcelKnowledgeSource,
}

@classmethod
def is_supported_file(cls, file_path: str) -> bool:
"""Check if a file type is supported.

Args:
file_path: Path to the file.

Returns:
True if the file type is supported.
"""
return file_path.lower().endswith(tuple(cls.SUPPORTED_FILE_TYPES))

@classmethod
def get_source(
cls, file_path: str, metadata: dict[str, Any] | None = None
) -> BaseKnowledgeSource:
"""Create appropriate KnowledgeSource based on file extension.

Args:
file_path: Path to the file.
metadata: Optional metadata to attach to the source.

Returns:
The appropriate KnowledgeSource instance.

Raises:
ValueError: If the file type is not supported.
"""
if not cls.is_supported_file(file_path):
raise ValueError(f"Unsupported file type: {file_path}")

lower_path = file_path.lower()
for ext, source_cls in cls._FILE_TYPE_MAP.items():
if lower_path.endswith(ext):
return source_cls(file_path=[file_path], metadata=metadata)

raise ValueError(f"Unsupported file type: {file_path}")
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from collections import defaultdict
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, Field, InstanceOf
from rich.box import HEAVY_EDGE
Expand Down Expand Up @@ -36,7 +36,13 @@ class CrewEvaluator:
iteration: The current iteration of the evaluation.
"""

def __init__(self, crew: Crew, eval_llm: InstanceOf[BaseLLM]) -> None:
def __init__(
self,
crew: Crew,
eval_llm: InstanceOf[BaseLLM] | str | None = None,
openai_model_name: str | None = None,
llm: InstanceOf[BaseLLM] | str | None = None,
) -> None:
self.crew = crew
self.llm = eval_llm
self.tasks_scores: defaultdict[int, list[float]] = defaultdict(list)
Expand Down Expand Up @@ -86,7 +92,9 @@ def set_iteration(self, iteration: int) -> None:
"""
self.iteration = iteration

def print_crew_evaluation_result(self) -> None:
def print_crew_evaluation_result(
self, token_usage: list[dict[str, Any]] | None = None
) -> None:
"""
Prints the evaluation result of the crew in a table.
A Crew with 2 tasks using the command crewai test -n 3
Expand Down Expand Up @@ -204,7 +212,7 @@ def evaluate(self, task_output: TaskOutput) -> None:
CrewTestResultEvent(
quality=quality_score,
execution_duration=current_task.execution_duration,
model=self.llm.model,
model=getattr(self.llm, "model", str(self.llm)),
crew_name=self.crew.name,
crew=self.crew,
),
Expand Down
Loading