Skip to content

Python Guide

rUv edited this page Jul 31, 2025 · 1 revision

FACT Python Guide

Python 3.9+ PyPI License: MIT

Welcome to the comprehensive Python guide for FACT (Fast Augmented Context Tools) system. This guide covers everything you need to know to work with FACT in Python, from basic installation to advanced integrations.

Table of Contents

  1. Quick Start
  2. Installation
  3. Basic Usage
  4. Core Modules
  5. Async Programming with FACT
  6. Configuration
  7. Integration with Claude API
  8. Error Handling
  9. Performance Optimization
  10. Complete Code Examples
  11. Best Practices
  12. Troubleshooting

Quick Start

import asyncio
from fact_system import FACTDriver

async def main():
    # Initialize FACT driver
    driver = FACTDriver()
    await driver.initialize()
    
    # Process a query
    response = await driver.process_query("What is the current database schema?")
    print(response)
    
    # Clean shutdown
    await driver.shutdown()

# Run the example
asyncio.run(main())

Installation

Via pip (Recommended)

# Install the stable version
pip install fact-system

# Install with all optional dependencies
pip install fact-system[all]

# Install development version
pip install fact-system[dev]

# Install with specific features
pip install fact-system[security,monitoring]

From Source

# Clone the repository
git clone https://github.com/fact-team/FACT.git
cd FACT

# Install in development mode
pip install -e .

# Or install with dependencies
pip install -e .[all]

Optional Dependencies

The FACT system includes several optional dependency groups:

  • dev: Development tools (pytest, black, mypy, etc.)
  • security: Security enhancements (cryptography, bcrypt, PyJWT)
  • monitoring: Monitoring and metrics (prometheus-client, opentelemetry)
  • all: All optional dependencies

Basic Usage

Simple Query Processing

from fact_system import FACTDriver
import asyncio

async def simple_example():
    """Basic query processing example."""
    # Create driver with default configuration
    driver = FACTDriver()
    
    try:
        # Initialize the system
        await driver.initialize()
        
        # Process queries
        queries = [
            "List all tables in the database",
            "Show me the schema for the users table",
            "What are the performance metrics?"
        ]
        
        for query in queries:
            print(f"\nQuery: {query}")
            response = await driver.process_query(query)
            print(f"Response: {response[:200]}...")
            
    finally:
        # Always clean up
        await driver.shutdown()

# Run the example
asyncio.run(simple_example())

Using Context Manager

from fact_system import get_driver
import asyncio

async def context_manager_example():
    """Using FACT with async context manager pattern."""
    
    async def safe_driver_usage():
        driver = await get_driver()
        try:
            response = await driver.process_query("Show database statistics")
            return response
        finally:
            # Driver cleanup is handled by the global instance
            pass
    
    # Process multiple queries safely
    for i in range(3):
        response = await safe_driver_usage()
        print(f"Query {i+1}: {response[:100]}...")

asyncio.run(context_manager_example())

Core Modules

Driver Module

The FACTDriver is the main orchestrator for the FACT system:

from fact_system.core.driver import FACTDriver
from fact_system.core.config import Config

# Custom configuration
config = Config(
    anthropic_api_key="your-api-key",
    claude_model="claude-3-5-sonnet-20241022",
    database_path="./data/fact_system.db",
    cache_config={
        "prefix": "my_app",
        "max_size": "50MB",
        "ttl_seconds": 7200
    }
)

# Initialize with custom config
driver = FACTDriver(config)

Key Driver Methods

# System lifecycle
await driver.initialize()           # Initialize all components
await driver.shutdown()            # Clean shutdown
driver.get_metrics()               # Get performance metrics

# Query processing
response = await driver.process_query("Your query here")

# Direct component access
cache_system = driver.cache_system
database_manager = driver.database_manager
tool_registry = driver.tool_registry

Cache Module

FACT provides intelligent caching for Claude API responses:

from fact_system.cache import (
    CacheManager, 
    initialize_cache_system,
    get_cache_system
)

# Initialize cache system
cache_config = {
    "prefix": "my_cache",
    "min_tokens": 500,
    "max_size": "100MB", 
    "ttl_seconds": 3600,
    "hit_target_ms": 50
}

cache_system = await initialize_cache_system(cache_config)

# Direct cache operations
cached_response = await cache_system.get_cached_response("test query")
success = await cache_system.store_response("query", "response")

# Cache management
health_report = await cache_system.get_health_report()
optimization_result = await cache_system.optimize_cache()

Cache Performance Monitoring

from fact_system.cache.metrics import get_metrics_collector

# Get detailed cache metrics
metrics_collector = get_metrics_collector()
cache_manager = get_cache_system().cache_manager

# Performance analysis
health_score = metrics_collector.get_cache_health_score(cache_manager)
latency_analysis = metrics_collector.get_latency_analysis()
cost_analysis = metrics_collector.get_cost_analysis(cache_manager)

print(f"Cache health score: {health_score.overall_health_score}")
print(f"Average hit latency: {latency_analysis.avg_hit_latency_ms}ms")
print(f"Token efficiency: {cost_analysis.tokens_per_kb}")

Tools Module

FACT provides a powerful tool system for extending functionality:

from fact_system.tools import Tool, get_tool_registry

# Register a custom tool
@Tool(
    name="calculate_fibonacci",
    description="Calculate fibonacci number at position n",
    parameters={
        "type": "object",
        "properties": {
            "n": {"type": "integer", "minimum": 0, "maximum": 100}
        },
        "required": ["n"]
    }
)
async def fibonacci_tool(n: int) -> dict:
    """Calculate fibonacci number."""
    if n <= 1:
        return {"result": n, "position": n}
    
    a, b = 0, 1
    for _ in range(2, n + 1):
        a, b = b, a + b
    
    return {"result": b, "position": n}

# Use the tool registry
registry = get_tool_registry()
print(f"Registered tools: {list(registry.tools.keys())}")

# Export tool schemas for Claude
schemas = registry.export_all_schemas()

Security Module

Built-in security features for safe operations:

from fact_system.security import (
    AuthorizationManager,
    validate_cache_content_security,
    TokenManager
)

# Authorization management
auth_manager = AuthorizationManager()

# Validate operations
is_authorized = await auth_manager.authorize_operation(
    operation="database_query",
    context={"user_id": "user123", "query": "SELECT * FROM users"}
)

# Token management for API keys
token_manager = TokenManager()
encrypted_token = token_manager.encrypt_token("sensitive-api-key")
decrypted_token = token_manager.decrypt_token(encrypted_token)

Async Programming with FACT

FACT is built with async/await patterns for optimal performance:

Concurrent Query Processing

import asyncio
from fact_system import FACTDriver

async def concurrent_queries_example():
    """Process multiple queries concurrently."""
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        # Define multiple queries
        queries = [
            "Show database schema",
            "List all tables",
            "Get performance metrics",
            "Show cache statistics",
            "List registered tools"
        ]
        
        # Process all queries concurrently
        tasks = [driver.process_query(query) for query in queries]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        for i, (query, response) in enumerate(zip(queries, responses)):
            if isinstance(response, Exception):
                print(f"Query {i+1} failed: {response}")
            else:
                print(f"Query {i+1}: {query}")
                print(f"Response: {response[:150]}...")
                print("-" * 50)
                
    finally:
        await driver.shutdown()

asyncio.run(concurrent_queries_example())

Background Task Management

import asyncio
from fact_system.cache import initialize_cache_system

async def background_tasks_example():
    """Example of managing background tasks."""
    
    # Initialize cache with background tasks enabled
    cache_config = {
        "prefix": "background_demo",
        "min_tokens": 100,
        "max_size": "50MB",
        "ttl_seconds": 1800
    }
    
    cache_system = await initialize_cache_system(
        cache_config, 
        enable_background_tasks=True
    )
    
    # Background tasks are now running:
    # - Cache metrics monitoring
    # - Automatic optimization
    # - Scheduled cache warming
    
    try:
        # Do your work while background tasks run
        for i in range(10):
            query = f"Background query {i}"
            await cache_system.store_response(query, f"Response for query {i}")
            await asyncio.sleep(1)
        
        # Get health report (includes background task metrics)
        health = await cache_system.get_health_report()
        print(f"Cache health: {health['overall_health']}")
        
    finally:
        # Clean shutdown stops all background tasks
        await cache_system.shutdown()

asyncio.run(background_tasks_example())

Configuration

Environment Variables

FACT supports configuration via environment variables:

# Core configuration
export ANTHROPIC_API_KEY="your-api-key"
export CLAUDE_MODEL="claude-3-5-sonnet-20241022"
export DATABASE_PATH="./data/fact.db"

# Cache configuration
export CACHE_PREFIX="my_app"
export CACHE_MAX_SIZE="100MB"
export CACHE_TTL_SECONDS="3600"
export CACHE_MIN_TOKENS="500"

# Performance tuning
export REQUEST_TIMEOUT="30"
export MAX_RETRIES="3"
export ENABLE_METRICS="true"

Configuration File

from fact_system.core.config import Config, get_config
import os

# Load from environment
config = get_config()

# Or create custom configuration
config = Config(
    # API Configuration
    anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
    claude_model="claude-3-5-sonnet-20241022",
    system_prompt="You are a helpful AI assistant with access to tools.",
    
    # Database
    database_path="./data/my_app.db",
    
    # Cache settings
    cache_config={
        "prefix": "my_app_v1",
        "min_tokens": 500,
        "max_size": "100MB",
        "ttl_seconds": 3600,
        "hit_target_ms": 50,
        "miss_target_ms": 150
    },
    
    # Performance
    request_timeout=30,
    max_retries=3,
    enable_metrics=True,
    
    # Security
    enable_auth=True,
    enable_input_validation=True
)

# Validate configuration
from fact_system.core.config import validate_configuration
validate_configuration(config)

Dynamic Configuration Updates

from fact_system import FACTDriver

async def dynamic_config_example():
    """Update configuration at runtime."""
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        # Update cache configuration
        new_cache_config = {
            "prefix": "updated_cache",
            "max_size": "200MB",
            "ttl_seconds": 7200
        }
        
        # Reinitialize cache with new config
        await driver.cache_system.shutdown()
        driver.cache_system = await initialize_cache_system(new_cache_config)
        
        # Update other settings
        driver.config.request_timeout = 60
        driver.config.max_retries = 5
        
        print("Configuration updated successfully")
        
    finally:
        await driver.shutdown()

asyncio.run(dynamic_config_example())

Integration with Claude API

Direct API Integration

from fact_system.core.driver import FACTDriver
import anthropic

async def claude_integration_example():
    """Advanced Claude API integration."""
    
    # Initialize FACT driver
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        # Get tool schemas for Claude
        tool_schemas = driver.tool_registry.export_all_schemas()
        
        # Direct Anthropic client usage (FACT uses this internally)
        client = anthropic.Anthropic(api_key=driver.config.anthropic_api_key)
        
        # Make a call with FACT tools
        response = client.messages.create(
            model=driver.config.claude_model,
            system=driver.config.system_prompt,
            messages=[{"role": "user", "content": "Show me database statistics"}],
            max_tokens=4096,
            tools=tool_schemas,
            tool_choice={"type": "any"}
        )
        
        # Process tool calls if present
        if hasattr(response, 'content'):
            for block in response.content:
                if hasattr(block, 'type') and block.type == 'tool_use':
                    print(f"Tool called: {block.name}")
                    print(f"Arguments: {block.input}")
                
    finally:
        await driver.shutdown()

asyncio.run(claude_integration_example())

Custom Message Handling

from fact_system.core.conversation import ConversationManager
from fact_system import FACTDriver

async def conversation_example():
    """Manage multi-turn conversations with Claude."""
    
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        # Conversation history
        messages = [
            {"role": "user", "content": "What tables are in the database?"},
        ]
        
        # Process with conversation context
        response = await driver._call_llm_with_cache(
            messages=messages,
            tools=driver.tool_registry.export_all_schemas(),
            cache_mode="read"
        )
        
        # Add assistant response to conversation
        messages.append({
            "role": "assistant", 
            "content": response.content
        })
        
        # Continue conversation
        messages.append({
            "role": "user", 
            "content": "Now show me the schema for the first table"
        })
        
        response2 = await driver._call_llm_with_cache(
            messages=messages,
            tools=driver.tool_registry.export_all_schemas(),
            cache_mode="read"
        )
        
        print("Final response:", response2)
        
    finally:
        await driver.shutdown()

asyncio.run(conversation_example())

Error Handling

FACT provides comprehensive error handling with graceful degradation:

Basic Error Handling

from fact_system import FACTDriver
from fact_system.core.errors import (
    FACTError, ConfigurationError, ConnectionError, 
    ToolExecutionError, CacheError
)

async def error_handling_example():
    """Comprehensive error handling patterns."""
    
    driver = FACTDriver()
    
    try:
        await driver.initialize()
        
        # This might fail due to various reasons
        response = await driver.process_query("Complex query that might fail")
        print(f"Success: {response}")
        
    except ConfigurationError as e:
        print(f"Configuration issue: {e}")
        print("Check your API keys and database path")
        
    except ConnectionError as e:
        print(f"Connection failed: {e}")
        print("Check network connectivity and API status")
        
    except ToolExecutionError as e:
        print(f"Tool execution failed: {e}")
        print("The query might be too complex or invalid")
        
    except CacheError as e:
        print(f"Cache operation failed: {e}")
        print("Cache will be bypassed automatically")
        
    except FACTError as e:
        print(f"FACT system error: {e}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        
    finally:
        try:
            await driver.shutdown()
        except Exception as e:
            print(f"Shutdown error (non-critical): {e}")

asyncio.run(error_handling_example())

Graceful Degradation

from fact_system import FACTDriver
from fact_system.core.errors import classify_error, provide_graceful_degradation

async def graceful_degradation_example():
    """Handle errors with graceful degradation."""
    
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        queries = [
            "Show database schema",
            "Invalid SQL query that will fail",
            "Another valid query"
        ]
        
        for query in queries:
            try:
                response = await driver.process_query(query)
                print(f"Success: {query} -> {response[:100]}...")
                
            except Exception as e:
                # Classify the error
                error_category = classify_error(e)
                print(f"Error category: {error_category}")
                
                # Provide graceful degradation
                if error_category in ["connectivity", "tool_execution"]:
                    fallback_response = provide_graceful_degradation(error_category)
                    print(f"Fallback: {fallback_response}")
                else:
                    print(f"Unrecoverable error: {e}")
                    
    finally:
        await driver.shutdown()

asyncio.run(graceful_degradation_example())

Circuit Breaker Pattern

from fact_system.cache.resilience import ResilientCacheWrapper, CacheCircuitBreaker, CircuitBreakerConfig

async def circuit_breaker_example():
    """Use circuit breaker for cache resilience."""
    
    # Configure circuit breaker
    circuit_config = CircuitBreakerConfig(
        failure_threshold=3,    # Open after 3 failures
        success_threshold=2,    # Close after 2 successes  
        timeout_seconds=30.0,   # Wait 30s before retry
        rolling_window_seconds=300.0,  # 5-minute window
        gradual_recovery=True,
        recovery_factor=0.5     # 50% of requests during recovery
    )
    
    circuit_breaker = CacheCircuitBreaker(circuit_config)
    
    # This would normally be done in the driver initialization
    # Here we show how to use it directly
    from fact_system.cache import get_cache_manager
    cache_manager = get_cache_manager()
    
    resilient_cache = ResilientCacheWrapper(cache_manager, circuit_breaker)
    await resilient_cache.start_monitoring()
    
    try:
        # Test cache operations with circuit breaker
        for i in range(10):
            try:
                # This will use circuit breaker protection
                result = await resilient_cache.get(f"test_key_{i}")
                if result:
                    print(f"Cache hit for key {i}")
                else:
                    print(f"Cache miss for key {i}")
                    # Store something for next iteration
                    await resilient_cache.store(f"test_key_{i}", f"Test value {i}")
                    
            except Exception as e:
                print(f"Cache operation failed: {e}")
                
    finally:
        await resilient_cache.stop_monitoring()

asyncio.run(circuit_breaker_example())

Performance Optimization

Batch Operations

from fact_system import FACTDriver
import asyncio

async def batch_operations_example():
    """Optimize performance with batch operations."""
    
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        # Prepare multiple queries
        queries = [
            "Show table schemas",
            "Get performance metrics", 
            "List recent queries",
            "Show cache statistics",
            "Get system health"
        ]
        
        # Method 1: Sequential (slower)
        print("Sequential processing:")
        start_time = time.time()
        for query in queries:
            response = await driver.process_query(query)
            print(f"  {query}: {len(response)} chars")
        sequential_time = time.time() - start_time
        
        # Method 2: Concurrent (faster)
        print("\nConcurrent processing:")
        start_time = time.time()
        tasks = [driver.process_query(query) for query in queries]
        responses = await asyncio.gather(*tasks)
        for query, response in zip(queries, responses):
            print(f"  {query}: {len(response)} chars")
        concurrent_time = time.time() - start_time
        
        print(f"\nPerformance improvement: {sequential_time/concurrent_time:.2f}x faster")
        
    finally:
        await driver.shutdown()

asyncio.run(batch_operations_example())

Cache Optimization

from fact_system.cache import get_cache_system
from fact_system.cache.strategy import optimize_cache_automatically

async def cache_optimization_example():
    """Optimize cache performance."""
    
    cache_system = get_cache_system()
    
    if cache_system:
        # Get current metrics
        initial_metrics = cache_system.cache_manager.get_metrics()
        print(f"Initial hit rate: {initial_metrics.hit_rate:.2f}%")
        
        # Run optimization
        optimization_result = await cache_system.optimize_cache()
        print(f"Optimization completed: {optimization_result}")
        
        # Compare metrics
        final_metrics = cache_system.cache_manager.get_metrics()
        print(f"Final hit rate: {final_metrics.hit_rate:.2f}%")
        
        # Start automatic optimization (runs in background)
        optimization_task = asyncio.create_task(
            optimize_cache_automatically(
                cache_system.cache_manager, 
                interval_seconds=600  # Every 10 minutes
            )
        )
        
        # Let it run for a while
        await asyncio.sleep(5)
        
        # Cancel background optimization
        optimization_task.cancel()

asyncio.run(cache_optimization_example())

Memory Management

from fact_system import FACTDriver
import psutil
import gc

async def memory_management_example():
    """Monitor and optimize memory usage."""
    
    def get_memory_usage():
        process = psutil.Process()
        return process.memory_info().rss / 1024 / 1024  # MB
    
    print(f"Initial memory: {get_memory_usage():.1f} MB")
    
    driver = FACTDriver()
    await driver.initialize()
    
    try:
        print(f"After init: {get_memory_usage():.1f} MB")
        
        # Process many queries
        for i in range(100):
            query = f"Test query {i} with different content"
            response = await driver.process_query(query)
            
            # Periodic cleanup
            if i % 20 == 0:
                gc.collect()  # Force garbage collection
                print(f"After {i} queries: {get_memory_usage():.1f} MB")
        
        # Get system metrics
        metrics = driver.get_metrics()
        print(f"Cache entries: {metrics.get('cache_total_entries', 0)}")
        print(f"Cache size: {metrics.get('cache_total_size', 0)} bytes")
        
    finally:
        await driver.shutdown()
        gc.collect()
        print(f"After shutdown: {get_memory_usage():.1f} MB")

asyncio.run(memory_management_example())

Complete Code Examples

Arcade.dev Integration

"""
Complete example of integrating FACT with Arcade.dev for tool execution.
This example shows real-world usage patterns and best practices.
"""

import asyncio
import os
from typing import Dict, Any, Optional
from fact_system import FACTDriver
from fact_system.tools import Tool, get_tool_registry

# Try to import arcade SDK
try:
    from arcadepy import Arcade
    ARCADE_AVAILABLE = True
except ImportError:
    ARCADE_AVAILABLE = False

class ArcadeIntegration:
    """Integration between FACT and Arcade.dev platform."""
    
    def __init__(self, api_key: str, user_id: str = "[email protected]"):
        self.api_key = api_key
        self.user_id = user_id
        self.arcade_client = None
        self.fact_driver = None
        
    async def initialize(self):
        """Initialize both FACT and Arcade systems."""
        # Initialize FACT
        self.fact_driver = FACTDriver()
        await self.fact_driver.initialize()
        
        # Initialize Arcade client
        if ARCADE_AVAILABLE and self.api_key:
            self.arcade_client = Arcade(api_key=self.api_key)
        
        # Register Arcade tools with FACT
        await self._register_arcade_tools()
        
    async def _register_arcade_tools(self):
        """Register Arcade.dev tools with FACT system."""
        
        @Tool(
            name="arcade_math_sqrt",
            description="Calculate square root using Arcade.dev Math.Sqrt tool",
            parameters={
                "type": "object",
                "properties": {
                    "number": {"type": "number", "minimum": 0}
                },
                "required": ["number"]
            }
        )
        async def arcade_sqrt(number: float) -> Dict[str, Any]:
            """Calculate square root via Arcade."""
            if not self.arcade_client:
                # Fallback calculation
                import math
                result = math.sqrt(number)
                return {
                    "result": result,
                    "input": number,
                    "source": "fallback",
                    "execution_time_ms": 1
                }
            
            try:
                response = self.arcade_client.tools.execute(
                    tool_name="Math.Sqrt",
                    input={"a": number},
                    user_id=self.user_id
                )
                
                return {
                    "result": response.result.get("value") if response.result else None,
                    "input": number,
                    "source": "arcade",
                    "execution_time_ms": getattr(response, 'execution_time_ms', None),
                    "id": response.id
                }
            except Exception as e:
                # Graceful fallback
                import math
                result = math.sqrt(number)
                return {
                    "result": result,
                    "input": number,
                    "source": "fallback_after_error",
                    "error": str(e),
                    "execution_time_ms": 1
                }
        
        @Tool(
            name="arcade_list_emails",
            description="List emails using Arcade.dev Google.ListEmails tool",
            parameters={
                "type": "object",
                "properties": {
                    "count": {"type": "integer", "minimum": 1, "maximum": 50}
                },
                "required": ["count"]
            }
        )
        async def arcade_list_emails(count: int) -> Dict[str, Any]:
            """List emails via Arcade."""
            if not self.arcade_client:
                # Return mock data
                emails = [
                    {"id": f"demo_{i}", "subject": f"Demo Email {i}", "from": f"sender{i}@example.com"}
                    for i in range(1, min(count + 1, 6))
                ]
                return {
                    "emails": emails,
                    "count": len(emails),
                    "source": "fallback",
                    "execution_time_ms": 100
                }
            
            try:
                response = self.arcade_client.tools.execute(
                    tool_name="Google.ListEmails",
                    input={"n_emails": count},
                    user_id=self.user_id
                )
                
                return {
                    "emails": response.result.get("emails", []) if response.result else [],
                    "count": len(response.result.get("emails", [])) if response.result else 0,
                    "source": "arcade",
                    "execution_time_ms": getattr(response, 'execution_time_ms', None),
                    "id": response.id
                }
            except Exception as e:
                # Graceful fallback with mock data
                emails = [
                    {"id": f"fallback_{i}", "subject": f"Fallback Email {i}", "from": f"fallback{i}@example.com"}
                    for i in range(1, min(count + 1, 4))
                ]
                return {
                    "emails": emails,
                    "count": len(emails),
                    "source": "fallback_after_error",
                    "error": str(e),
                    "execution_time_ms": 100
                }
    
    async def process_query_with_arcade(self, query: str) -> str:
        """Process query using FACT with Arcade tool access."""
        if not self.fact_driver:
            raise RuntimeError("System not initialized")
        
        return await self.fact_driver.process_query(query)
    
    async def shutdown(self):
        """Clean shutdown of all systems."""
        if self.fact_driver:
            await self.fact_driver.shutdown()

async def arcade_integration_example():
    """Complete Arcade.dev integration example."""
    
    # Load configuration
    api_key = os.getenv("ARCADE_API_KEY", "")
    user_id = os.getenv("ARCADE_USER_ID", "[email protected]")
    
    # Check if we have real credentials
    has_real_creds = bool(api_key.strip()) and len(api_key.strip()) > 10
    
    print("🎮 FACT + Arcade.dev Integration Example")
    print("=" * 50)
    
    if has_real_creds:
        print(f"🔑 Using real Arcade API key: {api_key[:10]}...")
    else:
        print("🎭 Demo mode: Using fallback implementations")
        print("💡 Set ARCADE_API_KEY environment variable for real integration")
    
    # Initialize integration
    integration = ArcadeIntegration(api_key, user_id)
    await integration.initialize()
    
    try:
        # Example queries that will use Arcade tools
        queries = [
            "Calculate the square root of 144 using arcade tools",
            "List 3 emails using the arcade email tool",
            "What tools are available in the system?",
            "Show me the performance metrics"
        ]
        
        for i, query in enumerate(queries, 1):
            print(f"\n🔍 Query {i}: {query}")
            try:
                response = await integration.process_query_with_arcade(query)
                print(f"✅ Response: {response[:200]}...")
            except Exception as e:
                print(f"❌ Error: {e}")
        
        # Show system metrics
        metrics = integration.fact_driver.get_metrics()
        print(f"\n📊 System Metrics:")
        print(f"   Total queries: {metrics.get('total_queries', 0)}")
        print(f"   Tool executions: {metrics.get('tool_executions', 0)}")
        print(f"   Cache hit rate: {metrics.get('cache_hit_rate', 0):.1f}%")
        
    finally:
        await integration.shutdown()
        print("\n✅ Integration example completed")

# Run the example
if __name__ == "__main__":
    asyncio.run(arcade_integration_example())

Database Integration

"""
Complete database integration example showing FACT's database capabilities.
"""

import asyncio
import sqlite3
from pathlib import Path
from fact_system import FACTDriver
from fact_system.db import DatabaseManager
from fact_system.tools import Tool

async def database_integration_example():
    """Complete database integration example."""
    
    print("🗄️  FACT Database Integration Example")
    print("=" * 50)
    
    # Setup test database
    test_db_path = Path("./demo_database.db")
    
    # Create test data
    await setup_test_database(test_db_path)
    
    # Initialize FACT with custom database
    from fact_system.core.config import Config
    config = Config(
        database_path=str(test_db_path),
        cache_config={
            "prefix": "db_demo",
            "min_tokens": 50,  # Lower for demo
            "max_size": "10MB",
            "ttl_seconds": 1800
        }
    )
    
    driver = FACTDriver(config)
    await driver.initialize()
    
    try:
        # Register custom database tools
        await register_custom_db_tools(driver.database_manager)
        
        # Example queries
        queries = [
            "Show me all tables in the database",
            "What's the schema of the users table?", 
            "How many users are in the database?",
            "Show me the top 5 products by price",
            "What are the recent orders?",
            "Calculate total revenue from orders"
        ]
        
        for i, query in enumerate(queries, 1):
            print(f"\n🔍 Query {i}: {query}")
            try:
                response = await driver.process_query(query)
                print(f"✅ Response: {response}")
            except Exception as e:
                print(f"❌ Error: {e}")
        
        # Show database statistics
        db_info = await driver.database_manager.get_database_info()
        print(f"\n📊 Database Statistics: {db_info}")
        
    finally:
        await driver.shutdown()
        # Cleanup
        if test_db_path.exists():
            test_db_path.unlink()
        print("\n✅ Database integration example completed")

async def setup_test_database(db_path: Path):
    """Setup test database with sample data."""
    
    # Remove existing database
    if db_path.exists():
        db_path.unlink()
    
    # Create database and tables
    conn = sqlite3.connect(str(db_path))
    cursor = conn.cursor()
    
    # Users table
    cursor.execute("""
        CREATE TABLE users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username VARCHAR(50) UNIQUE NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            is_active BOOLEAN DEFAULT 1
        )
    """)
    
    # Products table
    cursor.execute("""
        CREATE TABLE products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name VARCHAR(100) NOT NULL,
            description TEXT,
            price DECIMAL(10,2) NOT NULL,
            category VARCHAR(50),
            stock_quantity INTEGER DEFAULT 0,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    # Orders table
    cursor.execute("""
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            product_id INTEGER NOT NULL,
            quantity INTEGER NOT NULL,
            total_price DECIMAL(10,2) NOT NULL,
            order_date DATETIME DEFAULT CURRENT_TIMESTAMP,
            status VARCHAR(20) DEFAULT 'pending',
            FOREIGN KEY (user_id) REFERENCES users(id),
            FOREIGN KEY (product_id) REFERENCES products(id)
        )
    """)
    
    # Insert sample data
    users_data = [
        ("alice", "[email protected]"),
        ("bob", "[email protected]"),
        ("charlie", "[email protected]"),
        ("diana", "[email protected]"),
        ("eve", "[email protected]")
    ]
    
    cursor.executemany(
        "INSERT INTO users (username, email) VALUES (?, ?)",
        users_data
    )
    
    products_data = [
        ("Laptop", "High-performance laptop", 999.99, "Electronics", 10),
        ("Smartphone", "Latest smartphone", 699.99, "Electronics", 25),
        ("Book", "Programming guide", 49.99, "Books", 100),
        ("Coffee Mug", "Ceramic coffee mug", 12.99, "Home", 50),
        ("Headphones", "Noise-canceling headphones", 199.99, "Electronics", 15)
    ]
    
    cursor.executemany(
        "INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
        products_data
    )
    
    orders_data = [
        (1, 1, 1, 999.99, "completed"),
        (2, 2, 2, 1399.98, "completed"),
        (3, 3, 1, 49.99, "pending"),
        (1, 4, 3, 38.97, "completed"),
        (4, 5, 1, 199.99, "shipped")
    ]
    
    cursor.executemany(
        "INSERT INTO orders (user_id, product_id, quantity, total_price, status) VALUES (?, ?, ?, ?, ?)",
        orders_data
    )
    
    conn.commit()
    conn.close()
    
    print(f"✅ Test database created at {db_path}")

async def register_custom_db_tools(db_manager: DatabaseManager):
    """Register custom database tools."""
    
    @Tool(
        name="get_user_count",
        description="Get total number of users in the database",
        parameters={"type": "object", "properties": {}}
    )
    async def get_user_count() -> dict:
        """Get user count from database."""
        try:
            result = await db_manager.execute_query("SELECT COUNT(*) as count FROM users")
            return {
                "user_count": result[0]["count"] if result else 0,
                "status": "success"
            }
        except Exception as e:
            return {"error": str(e), "status": "error"}
    
    @Tool(
        name="get_revenue_stats",
        description="Calculate total revenue and order statistics",
        parameters={"type": "object", "properties": {}}
    )
    async def get_revenue_stats() -> dict:
        """Get revenue statistics."""
        try:
            query = """
            SELECT 
                COUNT(*) as total_orders,
                SUM(total_price) as total_revenue,
                AVG(total_price) as avg_order_value,
                COUNT(DISTINCT user_id) as unique_customers
            FROM orders 
            WHERE status = 'completed'
            """
            result = await db_manager.execute_query(query)
            
            if result:
                stats = result[0]
                return {
                    "total_orders": stats["total_orders"],
                    "total_revenue": float(stats["total_revenue"] or 0),
                    "average_order_value": float(stats["avg_order_value"] or 0),
                    "unique_customers": stats["unique_customers"],
                    "status": "success"
                }
            else:
                return {"error": "No data found", "status": "error"}
                
        except Exception as e:
            return {"error": str(e), "status": "error"}
    
    @Tool(
        name="get_top_products",
        description="Get top products by sales or price",
        parameters={
            "type": "object",
            "properties": {
                "limit": {"type": "integer", "minimum": 1, "maximum": 20, "default": 5},
                "sort_by": {"type": "string", "enum": ["price", "sales"], "default": "price"}
            }
        }
    )
    async def get_top_products(limit: int = 5, sort_by: str = "price") -> dict:
        """Get top products by specified criteria."""
        try:
            if sort_by == "price":
                query = "SELECT name, price, category FROM products ORDER BY price DESC LIMIT ?"
            else:  # sales
                query = """
                SELECT p.name, p.price, p.category, COALESCE(SUM(o.quantity), 0) as total_sold
                FROM products p
                LEFT JOIN orders o ON p.id = o.product_id AND o.status = 'completed'
                GROUP BY p.id, p.name, p.price, p.category
                ORDER BY total_sold DESC
                LIMIT ?
                """
            
            result = await db_manager.execute_query(query, (limit,))
            
            products = []
            for row in result:
                product = {
                    "name": row["name"],
                    "price": float(row["price"]),
                    "category": row["category"]
                }
                if sort_by == "sales":
                    product["total_sold"] = row["total_sold"]
                products.append(product)
            
            return {
                "products": products,
                "count": len(products),
                "sorted_by": sort_by,
                "status": "success"
            }
            
        except Exception as e:
            return {"error": str(e), "status": "error"}

# Run the example
if __name__ == "__main__":
    asyncio.run(database_integration_example())

Best Practices

1. Resource Management

# Always use proper async context management
async def good_practice():
    driver = FACTDriver()
    try:
        await driver.initialize()
        # Your code here
        response = await driver.process_query("Your query")
        return response
    finally:
        await driver.shutdown()

# Or use the global driver pattern
async def better_practice():
    from fact_system import get_driver
    driver = await get_driver()  # Reuses global instance
    return await driver.process_query("Your query")

2. Error Handling

# Handle specific exceptions
from fact_system.core.errors import FACTError, CacheError

async def robust_query_processing():
    driver = await get_driver()
    
    try:
        return await driver.process_query("Complex query")
    except CacheError:
        # Cache failed, but query still processed
        logger.warning("Cache unavailable, performance may be slower")
        return await driver.process_query("Complex query")
    except FACTError as e:
        # FACT-specific error
        logger.error(f"FACT system error: {e}")
        return "I apologize, but I encountered a system error."
    except Exception as e:
        # Unexpected error
        logger.exception("Unexpected error")
        return "I encountered an unexpected error."

3. Performance Optimization

# Use concurrent processing for multiple queries
async def optimized_batch_processing(queries: list[str]):
    driver = await get_driver()
    
    # Process all queries concurrently
    tasks = [driver.process_query(query) for query in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle results
    responses = []
    for query, result in zip(queries, results):
        if isinstance(result, Exception):
            responses.append(f"Error processing '{query}': {result}")
        else:
            responses.append(result)
    
    return responses

4. Configuration Management

# Use environment-based configuration
import os
from fact_system.core.config import Config

def create_production_config():
    return Config(
        anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
        claude_model=os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022"),
        database_path=os.getenv("DATABASE_PATH", "./data/production.db"),
        cache_config={
            "prefix": os.getenv("CACHE_PREFIX", "prod"),
            "max_size": os.getenv("CACHE_MAX_SIZE", "500MB"),
            "ttl_seconds": int(os.getenv("CACHE_TTL", "7200"))
        },
        enable_metrics=os.getenv("ENABLE_METRICS", "true").lower() == "true"
    )

5. Tool Development

# Create well-documented, secure tools
from fact_system.tools import Tool
from fact_system.security.input_sanitizer import sanitize_input

@Tool(
    name="safe_calculation",
    description="Perform safe mathematical calculations",
    parameters={
        "type": "object",
        "properties": {
            "expression": {
                "type": "string",
                "pattern": r"^[0-9+\-*/().\s]+$",  # Only allow safe characters
                "maxLength": 100
            },
            "precision": {
                "type": "integer",
                "minimum": 1,
                "maximum": 10,
                "default": 2
            }
        },
        "required": ["expression"]
    }
)
async def safe_calculation(expression: str, precision: int = 2) -> dict:
    """Safely evaluate mathematical expressions."""
    try:
        # Sanitize input
        clean_expression = sanitize_input(expression, "math_expression")
        
        # Use safe evaluation (implement safe_eval)
        result = safe_eval(clean_expression)
        
        return {
            "expression": expression,
            "result": round(result, precision),
            "status": "success"
        }
        
    except Exception as e:
        return {
            "expression": expression,
            "error": str(e),
            "status": "error"
        }

def safe_eval(expression: str) -> float:
    """Safely evaluate mathematical expressions."""
    # Implement safe evaluation logic
    # This is a simplified example - use a proper math expression evaluator
    import ast
    import operator
    
    # Define allowed operations
    ops = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.USub: operator.neg,
    }
    
    def eval_node(node):
        if isinstance(node, ast.Constant):
            return node.value
        elif isinstance(node, ast.BinOp):
            return ops[type(node.op)](eval_node(node.left), eval_node(node.right))
        elif isinstance(node, ast.UnaryOp):
            return ops[type(node.op)](eval_node(node.operand))
        else:
            raise ValueError(f"Unsupported operation: {type(node)}")
    
    tree = ast.parse(expression, mode='eval')
    return eval_node(tree.body)

Troubleshooting

Common Issues and Solutions

1. API Key Issues

# Problem: Invalid or missing API key
# Solution: Verify API key configuration

import os
from fact_system.core.config import validate_configuration, Config

def debug_api_key():
    api_key = os.getenv("ANTHROPIC_API_KEY")
    
    if not api_key:
        print("❌ ANTHROPIC_API_KEY not set")
        print("Set it with: export ANTHROPIC_API_KEY='your-key'")
        return False
    
    if len(api_key) < 10:
        print("❌ API key seems too short")
        return False
    
    if not api_key.startswith("sk-ant-"):
        print("⚠️  API key format might be incorrect")
        print("Anthropic keys typically start with 'sk-ant-'")
    
    print("✅ API key format looks correct")
    return True

# Test configuration
try:
    config = Config(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
    validate_configuration(config)
    print("✅ Configuration is valid")
except Exception as e:
    print(f"❌ Configuration error: {e}")

2. Database Connection Issues

# Problem: Database connection failures
# Solution: Check database path and permissions

from pathlib import Path
import sqlite3

def debug_database_connection(db_path: str):
    path = Path(db_path)
    
    # Check if directory exists
    if not path.parent.exists():
        print(f"❌ Directory doesn't exist: {path.parent}")
        print(f"Create it with: mkdir -p {path.parent}")
        return False
    
    # Check write permissions
    if not os.access(path.parent, os.W_OK):
        print(f"❌ No write permission to: {path.parent}")
        return False
    
    # Test database connection
    try:
        conn = sqlite3.connect(str(path))
        cursor = conn.cursor()
        cursor.execute("SELECT 1")
        conn.close()
        print("✅ Database connection successful")
        return True
    except Exception as e:
        print(f"❌ Database connection failed: {e}")
        return False

3. Cache Issues

# Problem: Cache not working or causing errors
# Solution: Debug cache configuration and operation

from fact_system.cache import get_cache_system

async def debug_cache_system():
    cache_system = get_cache_system()
    
    if not cache_system:
        print("❌ Cache system not initialized")
        return False
    
    try:
        # Test cache operations
        test_key = "debug_test"
        test_value = "Debug test value with enough tokens to meet minimum requirements for caching"
        
        # Test store
        success = await cache_system.store_response(test_key, test_value)
        if not success:
            print("❌ Cache store operation failed")
            return False
        
        # Test retrieve
        retrieved = await cache_system.get_cached_response(test_key)
        if retrieved != test_value:
            print("❌ Cache retrieve operation failed")
            return False
        
        # Get health report
        health = await cache_system.get_health_report()
        print(f"✅ Cache system healthy: {health['overall_health']}")
        
        return True
        
    except Exception as e:
        print(f"❌ Cache system error: {e}")
        return False

4. Tool Execution Issues

# Problem: Tools not executing or returning errors
# Solution: Debug tool registration and execution

from fact_system.tools import get_tool_registry

def debug_tool_system():
    registry = get_tool_registry()
    
    # List registered tools
    tools = list(registry.tools.keys())
    print(f"📋 Registered tools: {tools}")
    
    if not tools:
        print("❌ No tools registered")
        return False
    
    # Test each tool
    for tool_name in tools:
        try:
            tool_def = registry.get_tool(tool_name)
            print(f"✅ Tool '{tool_name}' is registered")
            
            # Check tool schema
            schema = tool_def.to_dict()
            if not schema.get("parameters"):
                print(f"⚠️  Tool '{tool_name}' has no parameters defined")
            
        except Exception as e:
            print(f"❌ Tool '{tool_name}' error: {e}")
    
    return True

5. Performance Issues

# Problem: Slow query processing
# Solution: Profile and optimize performance

import time
import asyncio
from fact_system import get_driver

async def debug_performance():
    driver = await get_driver()
    
    # Test query performance
    test_queries = [
        "Simple query",
        "Show database schema",
        "List all tables",
        "Get system metrics"
    ]
    
    performance_results = []
    
    for query in test_queries:
        start_time = time.time()
        try:
            response = await driver.process_query(query)
            end_time = time.time()
            latency = (end_time - start_time) * 1000  # ms
            
            performance_results.append({
                "query": query,
                "latency_ms": latency,
                "response_length": len(response),
                "status": "success"
            })
            
        except Exception as e:
            end_time = time.time()
            latency = (end_time - start_time) * 1000
            
            performance_results.append({
                "query": query,
                "latency_ms": latency,
                "error": str(e),
                "status": "error"
            })
    
    # Analyze results
    successful_queries = [r for r in performance_results if r["status"] == "success"]
    
    if successful_queries:
        avg_latency = sum(r["latency_ms"] for r in successful_queries) / len(successful_queries)
        print(f"📊 Average query latency: {avg_latency:.1f}ms")
        
        if avg_latency > 5000:  # 5 seconds
            print("⚠️  High latency detected. Consider:")
            print("   - Checking API key and network connectivity")
            print("   - Optimizing cache configuration")
            print("   - Reducing query complexity")
    
    # Get system metrics
    metrics = driver.get_metrics()
    cache_hit_rate = metrics.get("cache_hit_rate", 0)
    
    print(f"📈 Cache hit rate: {cache_hit_rate:.1f}%")
    
    if cache_hit_rate < 20:
        print("⚠️  Low cache hit rate. Consider:")
        print("   - Increasing cache TTL")
        print("   - Adjusting minimum token requirements")
        print("   - Using more similar queries")
    
    return performance_results

Debugging Utilities

"""
Comprehensive debugging utilities for FACT system.
"""

import asyncio
import logging
from fact_system import FACTDriver
from fact_system.core.config import get_config, validate_configuration

async def full_system_diagnostic():
    """Run complete system diagnostic."""
    
    print("🔍 FACT System Diagnostic")
    print("=" * 50)
    
    # 1. Configuration check
    print("\n1. Configuration Check:")
    try:
        config = get_config()
        validate_configuration(config)
        print("✅ Configuration is valid")
        
        # Show key config values (safely)
        print(f"   Model: {config.claude_model}")
        print(f"   Database: {config.database_path}")
        print(f"   Cache prefix: {config.cache_config['prefix']}")
        
    except Exception as e:
        print(f"❌ Configuration error: {e}")
        return False
    
    # 2. System initialization
    print("\n2. System Initialization:")
    driver = FACTDriver(config)
    
    try:
        await driver.initialize()
        print("✅ System initialized successfully")
    except Exception as e:
        print(f"❌ Initialization failed: {e}")
        return False
    
    try:
        # 3. Component checks
        print("\n3. Component Health Checks:")
        
        # Database check
        if driver.database_manager:
            try:
                db_info = await driver.database_manager.get_database_info()
                print(f"✅ Database: {db_info}")
            except Exception as e:
                print(f"❌ Database error: {e}")
        
        # Cache check
        if driver.cache_system:
            try:
                health = await driver.cache_system.get_health_report()
                print(f"✅ Cache health: {health['overall_health']}")
            except Exception as e:
                print(f"❌ Cache error: {e}")
        
        # Tools check
        tool_count = len(driver.tool_registry.tools)
        print(f"✅ Tools registered: {tool_count}")
        
        # 4. Connectivity test
        print("\n4. Connectivity Test:")
        try:
            test_response = await driver.process_query("Hello, can you respond?")
            if test_response:
                print("✅ API connectivity working")
            else:
                print("⚠️  API returned empty response")
        except Exception as e:
            print(f"❌ API connectivity failed: {e}")
        
        # 5. Performance test
        print("\n5. Performance Test:")
        start_time = time.time()
        await driver.process_query("What is 2+2?")
        latency = (time.time() - start_time) * 1000
        print(f"✅ Query latency: {latency:.1f}ms")
        
        if latency > 10000:  # 10 seconds
            print("⚠️  High latency - check network and API status")
        
        # 6. System metrics
        print("\n6. System Metrics:")
        metrics = driver.get_metrics()
        for key, value in metrics.items():
            print(f"   {key}: {value}")
        
        print("\n✅ Diagnostic completed successfully")
        return True
        
    finally:
        await driver.shutdown()

# Run diagnostic
if __name__ == "__main__":
    import time
    asyncio.run(full_system_diagnostic())

This comprehensive Python guide covers all aspects of working with the FACT system in Python. From basic installation and usage to advanced integrations and troubleshooting, this guide provides practical examples and best practices for getting the most out of FACT in your Python applications.

For additional help, refer to the API Reference and Architecture Overview documentation.

Clone this wiki locally