Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bc3cc01
Update
Mustafa-Esoofally Dec 3, 2025
517f2d7
Update
Mustafa-Esoofally Dec 3, 2025
22ab15d
Update
Mustafa-Esoofally Dec 3, 2025
7011c06
Update
Mustafa-Esoofally Dec 4, 2025
8a43877
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 4, 2025
4c73db2
update
Mustafa-Esoofally Dec 4, 2025
6f43ed5
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 4, 2025
ff1e84a
update
Mustafa-Esoofally Dec 4, 2025
f6e7200
update
Mustafa-Esoofally Dec 4, 2025
259b5a7
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 4, 2025
8728502
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 5, 2025
3e51b13
Update
Mustafa-Esoofally Dec 5, 2025
2fec0e9
Update
Mustafa-Esoofally Dec 5, 2025
4269391
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 5, 2025
131f190
Update tests
Mustafa-Esoofally Dec 5, 2025
5d1ed33
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 5, 2025
5f09d4b
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 8, 2025
6458a30
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 8, 2025
be4e3c1
Update
Mustafa-Esoofally Dec 8, 2025
0f17f6d
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 8, 2025
d568ff0
Update
Mustafa-Esoofally Dec 8, 2025
4dc5a2b
Update tests and resolve comments
Mustafa-Esoofally Dec 8, 2025
5e7dbeb
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 9, 2025
bb73ed7
Update libs/agno/agno/utils/tokens.py
Mustafa-Esoofally Dec 9, 2025
7f4498e
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 9, 2025
c4a74aa
update ouput_schema tokens
Mustafa-Esoofally Dec 9, 2025
b38b84b
Merge branch 'main' into feat/improve-token-counting
Mustafa-Esoofally Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
This example shows how to set a context token based limit for tool call compression.
Run: `python cookbook/agents/context_compression/token_based_tool_call_compression.py`
"""

from agno.agent import Agent
from agno.compression.manager import CompressionManager
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.tools.duckduckgo import DuckDuckGoTools

compression_prompt = """
You are a compression expert. Your goal is to compress web search results for a competitive intelligence analyst.
YOUR GOAL: Extract only actionable competitive insights while being extremely concise.
MUST PRESERVE:
- Competitor names and specific actions (product launches, partnerships, acquisitions, pricing changes)
- Exact numbers (revenue, market share, growth rates, pricing, headcount)
- Precise dates (announcement dates, launch dates, deal dates)
- Direct quotes from executives or official statements
- Funding rounds and valuations
MUST REMOVE:
- Company history and background information
- General industry trends (unless competitor-specific)
- Analyst opinions and speculation (keep only facts)
- Detailed product descriptions (keep only key differentiators and pricing)
- Marketing fluff and promotional language
OUTPUT FORMAT:
Return a bullet-point list where each line follows this format:
"[Company Name] - [Date]: [Action/Event] ([Key Numbers/Details])"
Keep it under 200 words total. Be ruthlessly concise. Facts only.
Example:
- Acme Corp - Mar 15, 2024: Launched AcmeGPT at $99/user/month, targeting enterprise market
- TechCo - Feb 10, 2024: Acquired DataStart for $150M, gaining 500 enterprise customers
"""

compression_manager = CompressionManager(
model=OpenAIChat(id="gpt-5-mini"),
compress_tool_results_token_limit=5000,
compress_tool_call_instructions=compression_prompt,
)

agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGoTools()],
description="Specialized in tracking competitor activities",
instructions="Use the search tools and always use the latest information and data.",
db=SqliteDb(db_file="tmp/dbs/token_based_tool_call_compression.db"),
compression_manager=compression_manager,
add_history_to_context=True, # Add history to context
num_history_runs=3,
session_id="token_based_tool_call_compression",
)

agent.print_response(
"""
Use the search tools and always use the latest information and data.
Research recent activities (last 3 months) for these AI companies:
1. OpenAI - product launches, partnerships, pricing
2. Anthropic - new features, enterprise deals, funding
3. Google DeepMind - research breakthroughs, product releases
4. Meta AI - open source releases, research papers
For each, find specific actions with dates and numbers.""",
stream=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

compression_manager = CompressionManager(
model=OpenAIChat(id="gpt-4o-mini"),
compress_tool_results_limit=1,
compress_tool_results_limit=4,
compress_tool_call_instructions=compression_prompt,
)

Expand Down
48 changes: 34 additions & 14 deletions libs/agno/agno/compression/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,41 @@
class CompressionManager:
model: Optional[Model] = None
compress_tool_results: bool = True
compress_tool_results_limit: int = 3
compress_tool_results_limit: Optional[int] = None
compress_tool_results_token_limit: Optional[int] = None
compress_tool_call_instructions: Optional[str] = None

stats: Dict[str, Any] = field(default_factory=dict)

def _is_tool_result_message(self, msg: Message) -> bool:
return msg.role == "tool"

def should_compress(self, messages: List[Message]) -> bool:
def should_compress(
self,
messages: List[Message],
tools: Optional[List] = None,
main_model: Optional[Model] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target_model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use both as model

) -> bool:
if not self.compress_tool_results:
return False

uncompressed_tools_count = len(
[m for m in messages if self._is_tool_result_message(m) and m.compressed_content is None]
)
should_compress = uncompressed_tools_count >= self.compress_tool_results_limit

if should_compress:
log_info(f"Tool call compression threshold hit. Compressing {uncompressed_tools_count} tool results")
# Token-based threshold check
if self.compress_tool_results_token_limit is not None and main_model is not None:
tokens = main_model.count_tokens(messages, tools)
if tokens >= self.compress_tool_results_token_limit:
log_info(f"Token limit hit: {tokens} >= {self.compress_tool_results_token_limit}")
return True

# Count-based threshold check
if self.compress_tool_results_limit is not None:
uncompressed_tools_count = len(
[m for m in messages if self._is_tool_result_message(m) and m.compressed_content is None]
)
if uncompressed_tools_count >= self.compress_tool_results_limit:
log_info(f"Tool count limit hit: {uncompressed_tools_count} >= {self.compress_tool_results_limit}")
return True

return should_compress
return False

def _compress_tool_result(self, tool_result: Message) -> Optional[str]:
if not tool_result:
Expand Down Expand Up @@ -112,8 +126,11 @@ def compress(self, messages: List[Message]) -> None:
compressed = self._compress_tool_result(tool_msg)
if compressed:
tool_msg.compressed_content = compressed
# Track stats
self.stats["messages_compressed"] = self.stats.get("messages_compressed", 0) + 1
# Count actual tool results (Gemini combines multiple in one message)
tool_results_count = len(tool_msg.tool_calls) if tool_msg.tool_calls else 1
self.stats["tool_results_compressed"] = (
self.stats.get("tool_results_compressed", 0) + tool_results_count
)
self.stats["original_size"] = self.stats.get("original_size", 0) + original_len
self.stats["compressed_size"] = self.stats.get("compressed_size", 0) + len(compressed)
else:
Expand Down Expand Up @@ -168,8 +185,11 @@ async def acompress(self, messages: List[Message]) -> None:
for msg, compressed, original_len in zip(uncompressed_tools, results, original_sizes):
if compressed:
msg.compressed_content = compressed
# Track stats
self.stats["messages_compressed"] = self.stats.get("messages_compressed", 0) + 1
# Count actual tool results (Gemini combines multiple in one message)
tool_results_count = len(msg.tool_calls) if msg.tool_calls else 1
self.stats["tool_results_compressed"] = (
self.stats.get("tool_results_compressed", 0) + tool_results_count
)
self.stats["original_size"] = self.stats.get("original_size", 0) + original_len
self.stats["compressed_size"] = self.stats.get("compressed_size", 0) + len(compressed)
else:
Expand Down
7 changes: 3 additions & 4 deletions libs/agno/agno/memory/strategies/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from agno.db.schemas import UserMemory
from agno.models.base import Model
from agno.utils.tokens import count_tokens as count_text_tokens
from agno.utils.tokens import count_text_tokens


class MemoryOptimizationStrategy(ABC):
Expand Down Expand Up @@ -60,8 +60,7 @@ def count_tokens(self, memories: List[UserMemory]) -> int:

Args:
memories: List of UserMemory objects

Returns:
Total token count using tiktoken (or fallback estimation)
Total token count
"""
return sum(count_text_tokens(mem.memory or "") for mem in memories)
return sum(count_text_tokens(m.memory or "") for m in memories)
27 changes: 27 additions & 0 deletions libs/agno/agno/models/anthropic/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from agno.models.metrics import Metrics
from agno.models.response import ModelResponse
from agno.run.agent import RunOutput
from agno.tools.function import Function
from agno.utils.http import get_default_async_client, get_default_sync_client
from agno.utils.log import log_debug, log_error, log_warning
from agno.utils.models.claude import MCPServerConfiguration, format_messages, format_tools_for_model
Expand Down Expand Up @@ -399,6 +400,26 @@ def get_async_client(self) -> AsyncAnthropicClient:
self.async_client = AsyncAnthropicClient(**_client_params)
return self.async_client

def count_tokens(
self,
messages: List[Message],
tools: Optional[List[Union[Function, Dict[str, Any]]]] = None,
) -> int:
anthropic_messages, system_prompt = format_messages(messages, compress_tool_results=True)
anthropic_tools = None
if tools:
formatted_tools = self._format_tools(tools)
anthropic_tools = format_tools_for_model(formatted_tools)

kwargs: Dict[str, Any] = {"messages": anthropic_messages, "model": self.id}
if system_prompt:
kwargs["system"] = system_prompt
if anthropic_tools:
kwargs["tools"] = anthropic_tools

response = self.get_client().messages.count_tokens(**kwargs)
return response.input_tokens

def get_request_params(
self,
response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
Expand Down Expand Up @@ -1056,4 +1077,10 @@ def _get_metrics(self, response_usage: Union[Usage, MessageDeltaUsage, BetaUsage
metrics.provider_metrics = metrics.provider_metrics or {}
metrics.provider_metrics["service_tier"] = response_usage.service_tier

log_debug(
f"Anthropic response metrics: input_tokens={metrics.input_tokens}, "
f"output_tokens={metrics.output_tokens}, total_tokens={metrics.total_tokens}, "
f"cache_read={metrics.cache_read_tokens}, cache_write={metrics.cache_write_tokens}"
)

return metrics
31 changes: 31 additions & 0 deletions libs/agno/agno/models/aws/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,32 @@ def _format_messages(
# TODO: Add caching: https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference-call.html
return formatted_messages, system_message

def count_tokens(
self,
messages: List[Message],
tools: Optional[List[Dict[str, Any]]] = None,
) -> int:
try:
formatted_messages, system_message = self._format_messages(messages, compress_tool_results=True)
converse_input: Dict[str, Any] = {"messages": formatted_messages}
if system_message:
converse_input["system"] = system_message

response = self.get_client().count_tokens(modelId=self.id, input={"converse": converse_input})
tokens = response.get("inputTokens", 0)

# Count tool tokens
if tools:
from agno.utils.tokens import _count_tool_tokens

includes_system = any(m.role == "system" for m in messages)
tokens += _count_tool_tokens(tools, self.id, includes_system)

return tokens
except Exception as e:
log_warning(f"Failed to count tokens via Bedrock API: {e}")
return super().count_tokens(messages, tools)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use this? probably the same counting mechanism, as it should just depend on the model encoding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token counting logic won't work for our Claude models

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we use the count_tokens fn of the base Claude class: libs/agno/agno/models/anthropic/claude.py ?

Copy link
Contributor Author

@Mustafa-Esoofally Mustafa-Esoofally Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bedrock supports other Non-Anthropic models as well so count_tokens fn of the base Claude class won't work? Also I am not sure if token couting is same here because Claude has intelligent caching


def invoke(
self,
messages: List[Message],
Expand Down Expand Up @@ -719,4 +745,9 @@ def _get_metrics(self, response_usage: Dict[str, Any]) -> Metrics:
metrics.output_tokens = response_usage.get("outputTokens", 0) or 0
metrics.total_tokens = metrics.input_tokens + metrics.output_tokens

log_debug(
f"Bedrock response metrics: input_tokens={metrics.input_tokens}, "
f"output_tokens={metrics.output_tokens}, total_tokens={metrics.total_tokens}"
)

return metrics
46 changes: 26 additions & 20 deletions libs/agno/agno/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
List,
Literal,
Optional,
Sequence,
Tuple,
Type,
Union,
Expand Down Expand Up @@ -427,6 +428,15 @@ def _format_tools(self, tools: Optional[List[Union[Function, dict]]]) -> List[Di
_tool_dicts.append(tool)
return _tool_dicts

def count_tokens(
self,
messages: List[Message],
tools: Optional[Sequence[Union[Function, Dict[str, Any]]]] = None,
) -> int:
from agno.utils.tokens import count_tokens

return count_tokens(messages, tools=list(tools) if tools else None, model_id=self.id)

def response(
self,
messages: List[Message],
Expand Down Expand Up @@ -476,6 +486,10 @@ def response(
_compress_tool_results = compression_manager is not None and compression_manager.compress_tool_results

while True:
# Compress tool results
if compression_manager and compression_manager.should_compress(messages, tools, main_model=self):
compression_manager.compress(messages)

# Get response from model
assistant_message = Message(role=self.assistant_message_role)
self._process_model_response(
Expand Down Expand Up @@ -574,11 +588,6 @@ def response(
# Add a function call for each successful execution
function_call_count += len(function_call_results)

all_messages = messages + function_call_results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing this? I think probably you are right, but there was a reason we did it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we were limited by tool call count but now we can estimate token count for messages before API calls so moved it up (before 1st API call)

# Compress tool results
if compression_manager and compression_manager.should_compress(all_messages):
compression_manager.compress(all_messages)

# Format and add results to messages
self.format_function_call_results(
messages=messages,
Expand Down Expand Up @@ -678,6 +687,10 @@ async def aresponse(
function_call_count = 0

while True:
# Compress existing tool results BEFORE making API call to avoid context overflow
if compression_manager and compression_manager.should_compress(messages, tools, main_model=self):
await compression_manager.acompress(messages)

# Get response from model
assistant_message = Message(role=self.assistant_message_role)
await self._aprocess_model_response(
Expand Down Expand Up @@ -775,11 +788,6 @@ async def aresponse(
# Add a function call for each successful execution
function_call_count += len(function_call_results)

all_messages = messages + function_call_results
# Compress tool results
if compression_manager and compression_manager.should_compress(all_messages):
await compression_manager.acompress(all_messages)

# Format and add results to messages
self.format_function_call_results(
messages=messages,
Expand Down Expand Up @@ -1105,6 +1113,10 @@ def response_stream(
function_call_count = 0

while True:
# Compress existing tool results BEFORE invoke
if compression_manager and compression_manager.should_compress(messages, tools, main_model=self):
compression_manager.compress(messages)

assistant_message = Message(role=self.assistant_message_role)
# Create assistant message and stream data
stream_data = MessageData()
Expand Down Expand Up @@ -1166,11 +1178,6 @@ def response_stream(
# Add a function call for each successful execution
function_call_count += len(function_call_results)

all_messages = messages + function_call_results
# Compress tool results
if compression_manager and compression_manager.should_compress(all_messages):
compression_manager.compress(all_messages)

# Format and add results to messages
if stream_data and stream_data.extra is not None:
self.format_function_call_results(
Expand Down Expand Up @@ -1323,6 +1330,10 @@ async def aresponse_stream(
function_call_count = 0

while True:
# Compress existing tool results BEFORE making API call to avoid context overflow
if compression_manager and compression_manager.should_compress(messages, tools, main_model=self):
await compression_manager.acompress(messages)

# Create assistant message and stream data
assistant_message = Message(role=self.assistant_message_role)
stream_data = MessageData()
Expand Down Expand Up @@ -1384,11 +1395,6 @@ async def aresponse_stream(
# Add a function call for each successful execution
function_call_count += len(function_call_results)

all_messages = messages + function_call_results
# Compress tool results
if compression_manager and compression_manager.should_compress(all_messages):
await compression_manager.acompress(all_messages)

# Format and add results to messages
if stream_data and stream_data.extra is not None:
self.format_function_call_results(
Expand Down
Loading