-
Notifications
You must be signed in to change notification settings - Fork 33
Python Guide
Welcome to the comprehensive Python guide for FACT (Fast Augmented Context Tools) system. This guide covers everything you need to know to work with FACT in Python, from basic installation to advanced integrations.
- Quick Start
- Installation
- Basic Usage
- Core Modules
- Async Programming with FACT
- Configuration
- Integration with Claude API
- Error Handling
- Performance Optimization
- Complete Code Examples
- Best Practices
- Troubleshooting
import asyncio
from fact_system import FACTDriver
async def main():
# Initialize FACT driver
driver = FACTDriver()
await driver.initialize()
# Process a query
response = await driver.process_query("What is the current database schema?")
print(response)
# Clean shutdown
await driver.shutdown()
# Run the example
asyncio.run(main())# Install the stable version
pip install fact-system
# Install with all optional dependencies
pip install fact-system[all]
# Install development version
pip install fact-system[dev]
# Install with specific features
pip install fact-system[security,monitoring]# Clone the repository
git clone https://github.com/fact-team/FACT.git
cd FACT
# Install in development mode
pip install -e .
# Or install with dependencies
pip install -e .[all]The FACT system includes several optional dependency groups:
-
dev: Development tools (pytest, black, mypy, etc.) -
security: Security enhancements (cryptography, bcrypt, PyJWT) -
monitoring: Monitoring and metrics (prometheus-client, opentelemetry) -
all: All optional dependencies
from fact_system import FACTDriver
import asyncio
async def simple_example():
"""Basic query processing example."""
# Create driver with default configuration
driver = FACTDriver()
try:
# Initialize the system
await driver.initialize()
# Process queries
queries = [
"List all tables in the database",
"Show me the schema for the users table",
"What are the performance metrics?"
]
for query in queries:
print(f"\nQuery: {query}")
response = await driver.process_query(query)
print(f"Response: {response[:200]}...")
finally:
# Always clean up
await driver.shutdown()
# Run the example
asyncio.run(simple_example())from fact_system import get_driver
import asyncio
async def context_manager_example():
"""Using FACT with async context manager pattern."""
async def safe_driver_usage():
driver = await get_driver()
try:
response = await driver.process_query("Show database statistics")
return response
finally:
# Driver cleanup is handled by the global instance
pass
# Process multiple queries safely
for i in range(3):
response = await safe_driver_usage()
print(f"Query {i+1}: {response[:100]}...")
asyncio.run(context_manager_example())The FACTDriver is the main orchestrator for the FACT system:
from fact_system.core.driver import FACTDriver
from fact_system.core.config import Config
# Custom configuration
config = Config(
anthropic_api_key="your-api-key",
claude_model="claude-3-5-sonnet-20241022",
database_path="./data/fact_system.db",
cache_config={
"prefix": "my_app",
"max_size": "50MB",
"ttl_seconds": 7200
}
)
# Initialize with custom config
driver = FACTDriver(config)# System lifecycle
await driver.initialize() # Initialize all components
await driver.shutdown() # Clean shutdown
driver.get_metrics() # Get performance metrics
# Query processing
response = await driver.process_query("Your query here")
# Direct component access
cache_system = driver.cache_system
database_manager = driver.database_manager
tool_registry = driver.tool_registryFACT provides intelligent caching for Claude API responses:
from fact_system.cache import (
CacheManager,
initialize_cache_system,
get_cache_system
)
# Initialize cache system
cache_config = {
"prefix": "my_cache",
"min_tokens": 500,
"max_size": "100MB",
"ttl_seconds": 3600,
"hit_target_ms": 50
}
cache_system = await initialize_cache_system(cache_config)
# Direct cache operations
cached_response = await cache_system.get_cached_response("test query")
success = await cache_system.store_response("query", "response")
# Cache management
health_report = await cache_system.get_health_report()
optimization_result = await cache_system.optimize_cache()from fact_system.cache.metrics import get_metrics_collector
# Get detailed cache metrics
metrics_collector = get_metrics_collector()
cache_manager = get_cache_system().cache_manager
# Performance analysis
health_score = metrics_collector.get_cache_health_score(cache_manager)
latency_analysis = metrics_collector.get_latency_analysis()
cost_analysis = metrics_collector.get_cost_analysis(cache_manager)
print(f"Cache health score: {health_score.overall_health_score}")
print(f"Average hit latency: {latency_analysis.avg_hit_latency_ms}ms")
print(f"Token efficiency: {cost_analysis.tokens_per_kb}")FACT provides a powerful tool system for extending functionality:
from fact_system.tools import Tool, get_tool_registry
# Register a custom tool
@Tool(
name="calculate_fibonacci",
description="Calculate fibonacci number at position n",
parameters={
"type": "object",
"properties": {
"n": {"type": "integer", "minimum": 0, "maximum": 100}
},
"required": ["n"]
}
)
async def fibonacci_tool(n: int) -> dict:
"""Calculate fibonacci number."""
if n <= 1:
return {"result": n, "position": n}
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return {"result": b, "position": n}
# Use the tool registry
registry = get_tool_registry()
print(f"Registered tools: {list(registry.tools.keys())}")
# Export tool schemas for Claude
schemas = registry.export_all_schemas()Built-in security features for safe operations:
from fact_system.security import (
AuthorizationManager,
validate_cache_content_security,
TokenManager
)
# Authorization management
auth_manager = AuthorizationManager()
# Validate operations
is_authorized = await auth_manager.authorize_operation(
operation="database_query",
context={"user_id": "user123", "query": "SELECT * FROM users"}
)
# Token management for API keys
token_manager = TokenManager()
encrypted_token = token_manager.encrypt_token("sensitive-api-key")
decrypted_token = token_manager.decrypt_token(encrypted_token)FACT is built with async/await patterns for optimal performance:
import asyncio
from fact_system import FACTDriver
async def concurrent_queries_example():
"""Process multiple queries concurrently."""
driver = FACTDriver()
await driver.initialize()
try:
# Define multiple queries
queries = [
"Show database schema",
"List all tables",
"Get performance metrics",
"Show cache statistics",
"List registered tools"
]
# Process all queries concurrently
tasks = [driver.process_query(query) for query in queries]
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, (query, response) in enumerate(zip(queries, responses)):
if isinstance(response, Exception):
print(f"Query {i+1} failed: {response}")
else:
print(f"Query {i+1}: {query}")
print(f"Response: {response[:150]}...")
print("-" * 50)
finally:
await driver.shutdown()
asyncio.run(concurrent_queries_example())import asyncio
from fact_system.cache import initialize_cache_system
async def background_tasks_example():
"""Example of managing background tasks."""
# Initialize cache with background tasks enabled
cache_config = {
"prefix": "background_demo",
"min_tokens": 100,
"max_size": "50MB",
"ttl_seconds": 1800
}
cache_system = await initialize_cache_system(
cache_config,
enable_background_tasks=True
)
# Background tasks are now running:
# - Cache metrics monitoring
# - Automatic optimization
# - Scheduled cache warming
try:
# Do your work while background tasks run
for i in range(10):
query = f"Background query {i}"
await cache_system.store_response(query, f"Response for query {i}")
await asyncio.sleep(1)
# Get health report (includes background task metrics)
health = await cache_system.get_health_report()
print(f"Cache health: {health['overall_health']}")
finally:
# Clean shutdown stops all background tasks
await cache_system.shutdown()
asyncio.run(background_tasks_example())FACT supports configuration via environment variables:
# Core configuration
export ANTHROPIC_API_KEY="your-api-key"
export CLAUDE_MODEL="claude-3-5-sonnet-20241022"
export DATABASE_PATH="./data/fact.db"
# Cache configuration
export CACHE_PREFIX="my_app"
export CACHE_MAX_SIZE="100MB"
export CACHE_TTL_SECONDS="3600"
export CACHE_MIN_TOKENS="500"
# Performance tuning
export REQUEST_TIMEOUT="30"
export MAX_RETRIES="3"
export ENABLE_METRICS="true"from fact_system.core.config import Config, get_config
import os
# Load from environment
config = get_config()
# Or create custom configuration
config = Config(
# API Configuration
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
claude_model="claude-3-5-sonnet-20241022",
system_prompt="You are a helpful AI assistant with access to tools.",
# Database
database_path="./data/my_app.db",
# Cache settings
cache_config={
"prefix": "my_app_v1",
"min_tokens": 500,
"max_size": "100MB",
"ttl_seconds": 3600,
"hit_target_ms": 50,
"miss_target_ms": 150
},
# Performance
request_timeout=30,
max_retries=3,
enable_metrics=True,
# Security
enable_auth=True,
enable_input_validation=True
)
# Validate configuration
from fact_system.core.config import validate_configuration
validate_configuration(config)from fact_system import FACTDriver
async def dynamic_config_example():
"""Update configuration at runtime."""
driver = FACTDriver()
await driver.initialize()
try:
# Update cache configuration
new_cache_config = {
"prefix": "updated_cache",
"max_size": "200MB",
"ttl_seconds": 7200
}
# Reinitialize cache with new config
await driver.cache_system.shutdown()
driver.cache_system = await initialize_cache_system(new_cache_config)
# Update other settings
driver.config.request_timeout = 60
driver.config.max_retries = 5
print("Configuration updated successfully")
finally:
await driver.shutdown()
asyncio.run(dynamic_config_example())from fact_system.core.driver import FACTDriver
import anthropic
async def claude_integration_example():
"""Advanced Claude API integration."""
# Initialize FACT driver
driver = FACTDriver()
await driver.initialize()
try:
# Get tool schemas for Claude
tool_schemas = driver.tool_registry.export_all_schemas()
# Direct Anthropic client usage (FACT uses this internally)
client = anthropic.Anthropic(api_key=driver.config.anthropic_api_key)
# Make a call with FACT tools
response = client.messages.create(
model=driver.config.claude_model,
system=driver.config.system_prompt,
messages=[{"role": "user", "content": "Show me database statistics"}],
max_tokens=4096,
tools=tool_schemas,
tool_choice={"type": "any"}
)
# Process tool calls if present
if hasattr(response, 'content'):
for block in response.content:
if hasattr(block, 'type') and block.type == 'tool_use':
print(f"Tool called: {block.name}")
print(f"Arguments: {block.input}")
finally:
await driver.shutdown()
asyncio.run(claude_integration_example())from fact_system.core.conversation import ConversationManager
from fact_system import FACTDriver
async def conversation_example():
"""Manage multi-turn conversations with Claude."""
driver = FACTDriver()
await driver.initialize()
try:
# Conversation history
messages = [
{"role": "user", "content": "What tables are in the database?"},
]
# Process with conversation context
response = await driver._call_llm_with_cache(
messages=messages,
tools=driver.tool_registry.export_all_schemas(),
cache_mode="read"
)
# Add assistant response to conversation
messages.append({
"role": "assistant",
"content": response.content
})
# Continue conversation
messages.append({
"role": "user",
"content": "Now show me the schema for the first table"
})
response2 = await driver._call_llm_with_cache(
messages=messages,
tools=driver.tool_registry.export_all_schemas(),
cache_mode="read"
)
print("Final response:", response2)
finally:
await driver.shutdown()
asyncio.run(conversation_example())FACT provides comprehensive error handling with graceful degradation:
from fact_system import FACTDriver
from fact_system.core.errors import (
FACTError, ConfigurationError, ConnectionError,
ToolExecutionError, CacheError
)
async def error_handling_example():
"""Comprehensive error handling patterns."""
driver = FACTDriver()
try:
await driver.initialize()
# This might fail due to various reasons
response = await driver.process_query("Complex query that might fail")
print(f"Success: {response}")
except ConfigurationError as e:
print(f"Configuration issue: {e}")
print("Check your API keys and database path")
except ConnectionError as e:
print(f"Connection failed: {e}")
print("Check network connectivity and API status")
except ToolExecutionError as e:
print(f"Tool execution failed: {e}")
print("The query might be too complex or invalid")
except CacheError as e:
print(f"Cache operation failed: {e}")
print("Cache will be bypassed automatically")
except FACTError as e:
print(f"FACT system error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
try:
await driver.shutdown()
except Exception as e:
print(f"Shutdown error (non-critical): {e}")
asyncio.run(error_handling_example())from fact_system import FACTDriver
from fact_system.core.errors import classify_error, provide_graceful_degradation
async def graceful_degradation_example():
"""Handle errors with graceful degradation."""
driver = FACTDriver()
await driver.initialize()
try:
queries = [
"Show database schema",
"Invalid SQL query that will fail",
"Another valid query"
]
for query in queries:
try:
response = await driver.process_query(query)
print(f"Success: {query} -> {response[:100]}...")
except Exception as e:
# Classify the error
error_category = classify_error(e)
print(f"Error category: {error_category}")
# Provide graceful degradation
if error_category in ["connectivity", "tool_execution"]:
fallback_response = provide_graceful_degradation(error_category)
print(f"Fallback: {fallback_response}")
else:
print(f"Unrecoverable error: {e}")
finally:
await driver.shutdown()
asyncio.run(graceful_degradation_example())from fact_system.cache.resilience import ResilientCacheWrapper, CacheCircuitBreaker, CircuitBreakerConfig
async def circuit_breaker_example():
"""Use circuit breaker for cache resilience."""
# Configure circuit breaker
circuit_config = CircuitBreakerConfig(
failure_threshold=3, # Open after 3 failures
success_threshold=2, # Close after 2 successes
timeout_seconds=30.0, # Wait 30s before retry
rolling_window_seconds=300.0, # 5-minute window
gradual_recovery=True,
recovery_factor=0.5 # 50% of requests during recovery
)
circuit_breaker = CacheCircuitBreaker(circuit_config)
# This would normally be done in the driver initialization
# Here we show how to use it directly
from fact_system.cache import get_cache_manager
cache_manager = get_cache_manager()
resilient_cache = ResilientCacheWrapper(cache_manager, circuit_breaker)
await resilient_cache.start_monitoring()
try:
# Test cache operations with circuit breaker
for i in range(10):
try:
# This will use circuit breaker protection
result = await resilient_cache.get(f"test_key_{i}")
if result:
print(f"Cache hit for key {i}")
else:
print(f"Cache miss for key {i}")
# Store something for next iteration
await resilient_cache.store(f"test_key_{i}", f"Test value {i}")
except Exception as e:
print(f"Cache operation failed: {e}")
finally:
await resilient_cache.stop_monitoring()
asyncio.run(circuit_breaker_example())from fact_system import FACTDriver
import asyncio
async def batch_operations_example():
"""Optimize performance with batch operations."""
driver = FACTDriver()
await driver.initialize()
try:
# Prepare multiple queries
queries = [
"Show table schemas",
"Get performance metrics",
"List recent queries",
"Show cache statistics",
"Get system health"
]
# Method 1: Sequential (slower)
print("Sequential processing:")
start_time = time.time()
for query in queries:
response = await driver.process_query(query)
print(f" {query}: {len(response)} chars")
sequential_time = time.time() - start_time
# Method 2: Concurrent (faster)
print("\nConcurrent processing:")
start_time = time.time()
tasks = [driver.process_query(query) for query in queries]
responses = await asyncio.gather(*tasks)
for query, response in zip(queries, responses):
print(f" {query}: {len(response)} chars")
concurrent_time = time.time() - start_time
print(f"\nPerformance improvement: {sequential_time/concurrent_time:.2f}x faster")
finally:
await driver.shutdown()
asyncio.run(batch_operations_example())from fact_system.cache import get_cache_system
from fact_system.cache.strategy import optimize_cache_automatically
async def cache_optimization_example():
"""Optimize cache performance."""
cache_system = get_cache_system()
if cache_system:
# Get current metrics
initial_metrics = cache_system.cache_manager.get_metrics()
print(f"Initial hit rate: {initial_metrics.hit_rate:.2f}%")
# Run optimization
optimization_result = await cache_system.optimize_cache()
print(f"Optimization completed: {optimization_result}")
# Compare metrics
final_metrics = cache_system.cache_manager.get_metrics()
print(f"Final hit rate: {final_metrics.hit_rate:.2f}%")
# Start automatic optimization (runs in background)
optimization_task = asyncio.create_task(
optimize_cache_automatically(
cache_system.cache_manager,
interval_seconds=600 # Every 10 minutes
)
)
# Let it run for a while
await asyncio.sleep(5)
# Cancel background optimization
optimization_task.cancel()
asyncio.run(cache_optimization_example())from fact_system import FACTDriver
import psutil
import gc
async def memory_management_example():
"""Monitor and optimize memory usage."""
def get_memory_usage():
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # MB
print(f"Initial memory: {get_memory_usage():.1f} MB")
driver = FACTDriver()
await driver.initialize()
try:
print(f"After init: {get_memory_usage():.1f} MB")
# Process many queries
for i in range(100):
query = f"Test query {i} with different content"
response = await driver.process_query(query)
# Periodic cleanup
if i % 20 == 0:
gc.collect() # Force garbage collection
print(f"After {i} queries: {get_memory_usage():.1f} MB")
# Get system metrics
metrics = driver.get_metrics()
print(f"Cache entries: {metrics.get('cache_total_entries', 0)}")
print(f"Cache size: {metrics.get('cache_total_size', 0)} bytes")
finally:
await driver.shutdown()
gc.collect()
print(f"After shutdown: {get_memory_usage():.1f} MB")
asyncio.run(memory_management_example())"""
Complete example of integrating FACT with Arcade.dev for tool execution.
This example shows real-world usage patterns and best practices.
"""
import asyncio
import os
from typing import Dict, Any, Optional
from fact_system import FACTDriver
from fact_system.tools import Tool, get_tool_registry
# Try to import arcade SDK
try:
from arcadepy import Arcade
ARCADE_AVAILABLE = True
except ImportError:
ARCADE_AVAILABLE = False
class ArcadeIntegration:
"""Integration between FACT and Arcade.dev platform."""
def __init__(self, api_key: str, user_id: str = "[email protected]"):
self.api_key = api_key
self.user_id = user_id
self.arcade_client = None
self.fact_driver = None
async def initialize(self):
"""Initialize both FACT and Arcade systems."""
# Initialize FACT
self.fact_driver = FACTDriver()
await self.fact_driver.initialize()
# Initialize Arcade client
if ARCADE_AVAILABLE and self.api_key:
self.arcade_client = Arcade(api_key=self.api_key)
# Register Arcade tools with FACT
await self._register_arcade_tools()
async def _register_arcade_tools(self):
"""Register Arcade.dev tools with FACT system."""
@Tool(
name="arcade_math_sqrt",
description="Calculate square root using Arcade.dev Math.Sqrt tool",
parameters={
"type": "object",
"properties": {
"number": {"type": "number", "minimum": 0}
},
"required": ["number"]
}
)
async def arcade_sqrt(number: float) -> Dict[str, Any]:
"""Calculate square root via Arcade."""
if not self.arcade_client:
# Fallback calculation
import math
result = math.sqrt(number)
return {
"result": result,
"input": number,
"source": "fallback",
"execution_time_ms": 1
}
try:
response = self.arcade_client.tools.execute(
tool_name="Math.Sqrt",
input={"a": number},
user_id=self.user_id
)
return {
"result": response.result.get("value") if response.result else None,
"input": number,
"source": "arcade",
"execution_time_ms": getattr(response, 'execution_time_ms', None),
"id": response.id
}
except Exception as e:
# Graceful fallback
import math
result = math.sqrt(number)
return {
"result": result,
"input": number,
"source": "fallback_after_error",
"error": str(e),
"execution_time_ms": 1
}
@Tool(
name="arcade_list_emails",
description="List emails using Arcade.dev Google.ListEmails tool",
parameters={
"type": "object",
"properties": {
"count": {"type": "integer", "minimum": 1, "maximum": 50}
},
"required": ["count"]
}
)
async def arcade_list_emails(count: int) -> Dict[str, Any]:
"""List emails via Arcade."""
if not self.arcade_client:
# Return mock data
emails = [
{"id": f"demo_{i}", "subject": f"Demo Email {i}", "from": f"sender{i}@example.com"}
for i in range(1, min(count + 1, 6))
]
return {
"emails": emails,
"count": len(emails),
"source": "fallback",
"execution_time_ms": 100
}
try:
response = self.arcade_client.tools.execute(
tool_name="Google.ListEmails",
input={"n_emails": count},
user_id=self.user_id
)
return {
"emails": response.result.get("emails", []) if response.result else [],
"count": len(response.result.get("emails", [])) if response.result else 0,
"source": "arcade",
"execution_time_ms": getattr(response, 'execution_time_ms', None),
"id": response.id
}
except Exception as e:
# Graceful fallback with mock data
emails = [
{"id": f"fallback_{i}", "subject": f"Fallback Email {i}", "from": f"fallback{i}@example.com"}
for i in range(1, min(count + 1, 4))
]
return {
"emails": emails,
"count": len(emails),
"source": "fallback_after_error",
"error": str(e),
"execution_time_ms": 100
}
async def process_query_with_arcade(self, query: str) -> str:
"""Process query using FACT with Arcade tool access."""
if not self.fact_driver:
raise RuntimeError("System not initialized")
return await self.fact_driver.process_query(query)
async def shutdown(self):
"""Clean shutdown of all systems."""
if self.fact_driver:
await self.fact_driver.shutdown()
async def arcade_integration_example():
"""Complete Arcade.dev integration example."""
# Load configuration
api_key = os.getenv("ARCADE_API_KEY", "")
user_id = os.getenv("ARCADE_USER_ID", "[email protected]")
# Check if we have real credentials
has_real_creds = bool(api_key.strip()) and len(api_key.strip()) > 10
print("🎮 FACT + Arcade.dev Integration Example")
print("=" * 50)
if has_real_creds:
print(f"🔑 Using real Arcade API key: {api_key[:10]}...")
else:
print("🎭 Demo mode: Using fallback implementations")
print("💡 Set ARCADE_API_KEY environment variable for real integration")
# Initialize integration
integration = ArcadeIntegration(api_key, user_id)
await integration.initialize()
try:
# Example queries that will use Arcade tools
queries = [
"Calculate the square root of 144 using arcade tools",
"List 3 emails using the arcade email tool",
"What tools are available in the system?",
"Show me the performance metrics"
]
for i, query in enumerate(queries, 1):
print(f"\n🔍 Query {i}: {query}")
try:
response = await integration.process_query_with_arcade(query)
print(f"✅ Response: {response[:200]}...")
except Exception as e:
print(f"❌ Error: {e}")
# Show system metrics
metrics = integration.fact_driver.get_metrics()
print(f"\n📊 System Metrics:")
print(f" Total queries: {metrics.get('total_queries', 0)}")
print(f" Tool executions: {metrics.get('tool_executions', 0)}")
print(f" Cache hit rate: {metrics.get('cache_hit_rate', 0):.1f}%")
finally:
await integration.shutdown()
print("\n✅ Integration example completed")
# Run the example
if __name__ == "__main__":
asyncio.run(arcade_integration_example())"""
Complete database integration example showing FACT's database capabilities.
"""
import asyncio
import sqlite3
from pathlib import Path
from fact_system import FACTDriver
from fact_system.db import DatabaseManager
from fact_system.tools import Tool
async def database_integration_example():
"""Complete database integration example."""
print("🗄️ FACT Database Integration Example")
print("=" * 50)
# Setup test database
test_db_path = Path("./demo_database.db")
# Create test data
await setup_test_database(test_db_path)
# Initialize FACT with custom database
from fact_system.core.config import Config
config = Config(
database_path=str(test_db_path),
cache_config={
"prefix": "db_demo",
"min_tokens": 50, # Lower for demo
"max_size": "10MB",
"ttl_seconds": 1800
}
)
driver = FACTDriver(config)
await driver.initialize()
try:
# Register custom database tools
await register_custom_db_tools(driver.database_manager)
# Example queries
queries = [
"Show me all tables in the database",
"What's the schema of the users table?",
"How many users are in the database?",
"Show me the top 5 products by price",
"What are the recent orders?",
"Calculate total revenue from orders"
]
for i, query in enumerate(queries, 1):
print(f"\n🔍 Query {i}: {query}")
try:
response = await driver.process_query(query)
print(f"✅ Response: {response}")
except Exception as e:
print(f"❌ Error: {e}")
# Show database statistics
db_info = await driver.database_manager.get_database_info()
print(f"\n📊 Database Statistics: {db_info}")
finally:
await driver.shutdown()
# Cleanup
if test_db_path.exists():
test_db_path.unlink()
print("\n✅ Database integration example completed")
async def setup_test_database(db_path: Path):
"""Setup test database with sample data."""
# Remove existing database
if db_path.exists():
db_path.unlink()
# Create database and tables
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Users table
cursor.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
""")
# Products table
cursor.execute("""
CREATE TABLE products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(50),
stock_quantity INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Orders table
cursor.execute("""
CREATE TABLE orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
total_price DECIMAL(10,2) NOT NULL,
order_date DATETIME DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20) DEFAULT 'pending',
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (product_id) REFERENCES products(id)
)
""")
# Insert sample data
users_data = [
("alice", "[email protected]"),
("bob", "[email protected]"),
("charlie", "[email protected]"),
("diana", "[email protected]"),
("eve", "[email protected]")
]
cursor.executemany(
"INSERT INTO users (username, email) VALUES (?, ?)",
users_data
)
products_data = [
("Laptop", "High-performance laptop", 999.99, "Electronics", 10),
("Smartphone", "Latest smartphone", 699.99, "Electronics", 25),
("Book", "Programming guide", 49.99, "Books", 100),
("Coffee Mug", "Ceramic coffee mug", 12.99, "Home", 50),
("Headphones", "Noise-canceling headphones", 199.99, "Electronics", 15)
]
cursor.executemany(
"INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
products_data
)
orders_data = [
(1, 1, 1, 999.99, "completed"),
(2, 2, 2, 1399.98, "completed"),
(3, 3, 1, 49.99, "pending"),
(1, 4, 3, 38.97, "completed"),
(4, 5, 1, 199.99, "shipped")
]
cursor.executemany(
"INSERT INTO orders (user_id, product_id, quantity, total_price, status) VALUES (?, ?, ?, ?, ?)",
orders_data
)
conn.commit()
conn.close()
print(f"✅ Test database created at {db_path}")
async def register_custom_db_tools(db_manager: DatabaseManager):
"""Register custom database tools."""
@Tool(
name="get_user_count",
description="Get total number of users in the database",
parameters={"type": "object", "properties": {}}
)
async def get_user_count() -> dict:
"""Get user count from database."""
try:
result = await db_manager.execute_query("SELECT COUNT(*) as count FROM users")
return {
"user_count": result[0]["count"] if result else 0,
"status": "success"
}
except Exception as e:
return {"error": str(e), "status": "error"}
@Tool(
name="get_revenue_stats",
description="Calculate total revenue and order statistics",
parameters={"type": "object", "properties": {}}
)
async def get_revenue_stats() -> dict:
"""Get revenue statistics."""
try:
query = """
SELECT
COUNT(*) as total_orders,
SUM(total_price) as total_revenue,
AVG(total_price) as avg_order_value,
COUNT(DISTINCT user_id) as unique_customers
FROM orders
WHERE status = 'completed'
"""
result = await db_manager.execute_query(query)
if result:
stats = result[0]
return {
"total_orders": stats["total_orders"],
"total_revenue": float(stats["total_revenue"] or 0),
"average_order_value": float(stats["avg_order_value"] or 0),
"unique_customers": stats["unique_customers"],
"status": "success"
}
else:
return {"error": "No data found", "status": "error"}
except Exception as e:
return {"error": str(e), "status": "error"}
@Tool(
name="get_top_products",
description="Get top products by sales or price",
parameters={
"type": "object",
"properties": {
"limit": {"type": "integer", "minimum": 1, "maximum": 20, "default": 5},
"sort_by": {"type": "string", "enum": ["price", "sales"], "default": "price"}
}
}
)
async def get_top_products(limit: int = 5, sort_by: str = "price") -> dict:
"""Get top products by specified criteria."""
try:
if sort_by == "price":
query = "SELECT name, price, category FROM products ORDER BY price DESC LIMIT ?"
else: # sales
query = """
SELECT p.name, p.price, p.category, COALESCE(SUM(o.quantity), 0) as total_sold
FROM products p
LEFT JOIN orders o ON p.id = o.product_id AND o.status = 'completed'
GROUP BY p.id, p.name, p.price, p.category
ORDER BY total_sold DESC
LIMIT ?
"""
result = await db_manager.execute_query(query, (limit,))
products = []
for row in result:
product = {
"name": row["name"],
"price": float(row["price"]),
"category": row["category"]
}
if sort_by == "sales":
product["total_sold"] = row["total_sold"]
products.append(product)
return {
"products": products,
"count": len(products),
"sorted_by": sort_by,
"status": "success"
}
except Exception as e:
return {"error": str(e), "status": "error"}
# Run the example
if __name__ == "__main__":
asyncio.run(database_integration_example())# Always use proper async context management
async def good_practice():
driver = FACTDriver()
try:
await driver.initialize()
# Your code here
response = await driver.process_query("Your query")
return response
finally:
await driver.shutdown()
# Or use the global driver pattern
async def better_practice():
from fact_system import get_driver
driver = await get_driver() # Reuses global instance
return await driver.process_query("Your query")# Handle specific exceptions
from fact_system.core.errors import FACTError, CacheError
async def robust_query_processing():
driver = await get_driver()
try:
return await driver.process_query("Complex query")
except CacheError:
# Cache failed, but query still processed
logger.warning("Cache unavailable, performance may be slower")
return await driver.process_query("Complex query")
except FACTError as e:
# FACT-specific error
logger.error(f"FACT system error: {e}")
return "I apologize, but I encountered a system error."
except Exception as e:
# Unexpected error
logger.exception("Unexpected error")
return "I encountered an unexpected error."# Use concurrent processing for multiple queries
async def optimized_batch_processing(queries: list[str]):
driver = await get_driver()
# Process all queries concurrently
tasks = [driver.process_query(query) for query in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results
responses = []
for query, result in zip(queries, results):
if isinstance(result, Exception):
responses.append(f"Error processing '{query}': {result}")
else:
responses.append(result)
return responses# Use environment-based configuration
import os
from fact_system.core.config import Config
def create_production_config():
return Config(
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
claude_model=os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022"),
database_path=os.getenv("DATABASE_PATH", "./data/production.db"),
cache_config={
"prefix": os.getenv("CACHE_PREFIX", "prod"),
"max_size": os.getenv("CACHE_MAX_SIZE", "500MB"),
"ttl_seconds": int(os.getenv("CACHE_TTL", "7200"))
},
enable_metrics=os.getenv("ENABLE_METRICS", "true").lower() == "true"
)# Create well-documented, secure tools
from fact_system.tools import Tool
from fact_system.security.input_sanitizer import sanitize_input
@Tool(
name="safe_calculation",
description="Perform safe mathematical calculations",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"pattern": r"^[0-9+\-*/().\s]+$", # Only allow safe characters
"maxLength": 100
},
"precision": {
"type": "integer",
"minimum": 1,
"maximum": 10,
"default": 2
}
},
"required": ["expression"]
}
)
async def safe_calculation(expression: str, precision: int = 2) -> dict:
"""Safely evaluate mathematical expressions."""
try:
# Sanitize input
clean_expression = sanitize_input(expression, "math_expression")
# Use safe evaluation (implement safe_eval)
result = safe_eval(clean_expression)
return {
"expression": expression,
"result": round(result, precision),
"status": "success"
}
except Exception as e:
return {
"expression": expression,
"error": str(e),
"status": "error"
}
def safe_eval(expression: str) -> float:
"""Safely evaluate mathematical expressions."""
# Implement safe evaluation logic
# This is a simplified example - use a proper math expression evaluator
import ast
import operator
# Define allowed operations
ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.USub: operator.neg,
}
def eval_node(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
return ops[type(node.op)](eval_node(node.left), eval_node(node.right))
elif isinstance(node, ast.UnaryOp):
return ops[type(node.op)](eval_node(node.operand))
else:
raise ValueError(f"Unsupported operation: {type(node)}")
tree = ast.parse(expression, mode='eval')
return eval_node(tree.body)# Problem: Invalid or missing API key
# Solution: Verify API key configuration
import os
from fact_system.core.config import validate_configuration, Config
def debug_api_key():
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
print("❌ ANTHROPIC_API_KEY not set")
print("Set it with: export ANTHROPIC_API_KEY='your-key'")
return False
if len(api_key) < 10:
print("❌ API key seems too short")
return False
if not api_key.startswith("sk-ant-"):
print("⚠️ API key format might be incorrect")
print("Anthropic keys typically start with 'sk-ant-'")
print("✅ API key format looks correct")
return True
# Test configuration
try:
config = Config(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
validate_configuration(config)
print("✅ Configuration is valid")
except Exception as e:
print(f"❌ Configuration error: {e}")# Problem: Database connection failures
# Solution: Check database path and permissions
from pathlib import Path
import sqlite3
def debug_database_connection(db_path: str):
path = Path(db_path)
# Check if directory exists
if not path.parent.exists():
print(f"❌ Directory doesn't exist: {path.parent}")
print(f"Create it with: mkdir -p {path.parent}")
return False
# Check write permissions
if not os.access(path.parent, os.W_OK):
print(f"❌ No write permission to: {path.parent}")
return False
# Test database connection
try:
conn = sqlite3.connect(str(path))
cursor = conn.cursor()
cursor.execute("SELECT 1")
conn.close()
print("✅ Database connection successful")
return True
except Exception as e:
print(f"❌ Database connection failed: {e}")
return False# Problem: Cache not working or causing errors
# Solution: Debug cache configuration and operation
from fact_system.cache import get_cache_system
async def debug_cache_system():
cache_system = get_cache_system()
if not cache_system:
print("❌ Cache system not initialized")
return False
try:
# Test cache operations
test_key = "debug_test"
test_value = "Debug test value with enough tokens to meet minimum requirements for caching"
# Test store
success = await cache_system.store_response(test_key, test_value)
if not success:
print("❌ Cache store operation failed")
return False
# Test retrieve
retrieved = await cache_system.get_cached_response(test_key)
if retrieved != test_value:
print("❌ Cache retrieve operation failed")
return False
# Get health report
health = await cache_system.get_health_report()
print(f"✅ Cache system healthy: {health['overall_health']}")
return True
except Exception as e:
print(f"❌ Cache system error: {e}")
return False# Problem: Tools not executing or returning errors
# Solution: Debug tool registration and execution
from fact_system.tools import get_tool_registry
def debug_tool_system():
registry = get_tool_registry()
# List registered tools
tools = list(registry.tools.keys())
print(f"📋 Registered tools: {tools}")
if not tools:
print("❌ No tools registered")
return False
# Test each tool
for tool_name in tools:
try:
tool_def = registry.get_tool(tool_name)
print(f"✅ Tool '{tool_name}' is registered")
# Check tool schema
schema = tool_def.to_dict()
if not schema.get("parameters"):
print(f"⚠️ Tool '{tool_name}' has no parameters defined")
except Exception as e:
print(f"❌ Tool '{tool_name}' error: {e}")
return True# Problem: Slow query processing
# Solution: Profile and optimize performance
import time
import asyncio
from fact_system import get_driver
async def debug_performance():
driver = await get_driver()
# Test query performance
test_queries = [
"Simple query",
"Show database schema",
"List all tables",
"Get system metrics"
]
performance_results = []
for query in test_queries:
start_time = time.time()
try:
response = await driver.process_query(query)
end_time = time.time()
latency = (end_time - start_time) * 1000 # ms
performance_results.append({
"query": query,
"latency_ms": latency,
"response_length": len(response),
"status": "success"
})
except Exception as e:
end_time = time.time()
latency = (end_time - start_time) * 1000
performance_results.append({
"query": query,
"latency_ms": latency,
"error": str(e),
"status": "error"
})
# Analyze results
successful_queries = [r for r in performance_results if r["status"] == "success"]
if successful_queries:
avg_latency = sum(r["latency_ms"] for r in successful_queries) / len(successful_queries)
print(f"📊 Average query latency: {avg_latency:.1f}ms")
if avg_latency > 5000: # 5 seconds
print("⚠️ High latency detected. Consider:")
print(" - Checking API key and network connectivity")
print(" - Optimizing cache configuration")
print(" - Reducing query complexity")
# Get system metrics
metrics = driver.get_metrics()
cache_hit_rate = metrics.get("cache_hit_rate", 0)
print(f"📈 Cache hit rate: {cache_hit_rate:.1f}%")
if cache_hit_rate < 20:
print("⚠️ Low cache hit rate. Consider:")
print(" - Increasing cache TTL")
print(" - Adjusting minimum token requirements")
print(" - Using more similar queries")
return performance_results"""
Comprehensive debugging utilities for FACT system.
"""
import asyncio
import logging
from fact_system import FACTDriver
from fact_system.core.config import get_config, validate_configuration
async def full_system_diagnostic():
"""Run complete system diagnostic."""
print("🔍 FACT System Diagnostic")
print("=" * 50)
# 1. Configuration check
print("\n1. Configuration Check:")
try:
config = get_config()
validate_configuration(config)
print("✅ Configuration is valid")
# Show key config values (safely)
print(f" Model: {config.claude_model}")
print(f" Database: {config.database_path}")
print(f" Cache prefix: {config.cache_config['prefix']}")
except Exception as e:
print(f"❌ Configuration error: {e}")
return False
# 2. System initialization
print("\n2. System Initialization:")
driver = FACTDriver(config)
try:
await driver.initialize()
print("✅ System initialized successfully")
except Exception as e:
print(f"❌ Initialization failed: {e}")
return False
try:
# 3. Component checks
print("\n3. Component Health Checks:")
# Database check
if driver.database_manager:
try:
db_info = await driver.database_manager.get_database_info()
print(f"✅ Database: {db_info}")
except Exception as e:
print(f"❌ Database error: {e}")
# Cache check
if driver.cache_system:
try:
health = await driver.cache_system.get_health_report()
print(f"✅ Cache health: {health['overall_health']}")
except Exception as e:
print(f"❌ Cache error: {e}")
# Tools check
tool_count = len(driver.tool_registry.tools)
print(f"✅ Tools registered: {tool_count}")
# 4. Connectivity test
print("\n4. Connectivity Test:")
try:
test_response = await driver.process_query("Hello, can you respond?")
if test_response:
print("✅ API connectivity working")
else:
print("⚠️ API returned empty response")
except Exception as e:
print(f"❌ API connectivity failed: {e}")
# 5. Performance test
print("\n5. Performance Test:")
start_time = time.time()
await driver.process_query("What is 2+2?")
latency = (time.time() - start_time) * 1000
print(f"✅ Query latency: {latency:.1f}ms")
if latency > 10000: # 10 seconds
print("⚠️ High latency - check network and API status")
# 6. System metrics
print("\n6. System Metrics:")
metrics = driver.get_metrics()
for key, value in metrics.items():
print(f" {key}: {value}")
print("\n✅ Diagnostic completed successfully")
return True
finally:
await driver.shutdown()
# Run diagnostic
if __name__ == "__main__":
import time
asyncio.run(full_system_diagnostic())This comprehensive Python guide covers all aspects of working with the FACT system in Python. From basic installation and usage to advanced integrations and troubleshooting, this guide provides practical examples and best practices for getting the most out of FACT in your Python applications.
For additional help, refer to the API Reference and Architecture Overview documentation.