Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 97 additions & 123 deletions server/python_backend/ai_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import asyncio
import json
import logging
import sys
# import sys # No longer needed for subprocess
import os
from typing import Dict, List, Any, Optional
from datetime import datetime
# from .utils.async_utils import _execute_async_command # Commented out
from server.python_nlp.nlp_engine import NLPEngine as FallbackNLPEngine # Renamed for clarity
from server.python_nlp.nlp_engine import NLPEngine # Changed import alias

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,164 +52,138 @@ class AdvancedAIEngine:
"""Advanced AI engine with async support"""

def __init__(self):
self.python_nlp_path = os.path.join(os.path.dirname(__file__), '..', 'python_nlp')
self.ai_training_script = os.path.join(self.python_nlp_path, 'ai_training.py')
self.nlp_service_script = os.path.join(self.python_nlp_path, 'nlp_engine.py')
# Removed: self.python_nlp_path, self.ai_training_script, self.nlp_service_script
# Instantiate the NLP engine for fallback analysis
self.fallback_nlp_engine = FallbackNLPEngine()
self.nlp_engine = NLPEngine() # Renamed from self.fallback_nlp_engine

async def initialize(self):
def initialize(self): # Changed to synchronous
"""Initialize AI engine"""
try:
# Test connection to NLP services
await self.health_check()
self.health_check() # Changed to synchronous call
logger.info("AI Engine initialized successfully")
except Exception as e:
logger.error(f"AI Engine initialization failed: {e}")

async def analyze_email(self, subject: str, content: str) -> AIAnalysisResult:
"""Analyze email content with AI by calling the NLPEngine script."""
logger.info(f"Initiating AI analysis for email with subject: '{subject[:50]}...'")
def analyze_email(self, subject: str, content: str) -> AIAnalysisResult: # Changed to synchronous
"""Analyze email content with AI by calling the NLPEngine directly."""
logger.info(f"Initiating AI analysis for email with subject: '{subject[:50]}...' using direct NLPEngine call.")
try:
cmd = [
sys.executable,
self.nlp_service_script,
'--analyze-email',
'--subject', subject,
'--content', content,
'--output-format', 'json'
]
# Directly call the nlp_engine's analyze_email method
analysis_data = self.nlp_engine.analyze_email(subject, content) # This is a synchronous call

logger.debug(f"Executing NLPEngine script with command: {' '.join(cmd)}")
# result_json_str = await _execute_async_command(cmd, cwd=self.python_nlp_path) # Commented out
logger.warning("_execute_async_command is commented out. Using fallback for analyze_email.")
return self._get_fallback_analysis(subject, content, "_execute_async_command not available")

# This part below will be skipped due to the direct return above
if not result_json_str: # type: ignore
logger.error("NLPEngine script returned empty output.")
return self._get_fallback_analysis(subject, content, "empty script output")

try:
result = json.loads(result_json_str)
except json.JSONDecodeError as je:
logger.error(f"Failed to parse JSON output from NLPEngine: {je}. Output: {result_json_str[:200]}")
return self._get_fallback_analysis(subject, content, "invalid JSON output")
# Ensure 'action_items' is part of the analysis_data or add it if missing,
# as AIAnalysisResult expects it. NLPEngine.analyze_email already returns it.
if 'action_items' not in analysis_data:
analysis_data['action_items'] = []

if 'error' in result or result.get('status') == 'error': # Assuming nlp_engine might return a status field
error_message = result.get('error', 'Unknown error from NLPEngine script')
logger.error(f"NLPEngine script returned an error: {error_message}")
# Fallback to basic analysis, passing the error message for context
return self._get_fallback_analysis(subject, content, error_message)

logger.info(f"Successfully received analysis from NLPEngine. Method used: {result.get('validation', {}).get('method', 'unknown')}")
return AIAnalysisResult(result)

except FileNotFoundError:
logger.critical(f"NLPEngine script not found at {self.nlp_service_script}. Ensure the path is correct.")
return self._get_fallback_analysis(subject, content, "NLP script not found")
except asyncio.TimeoutError:
logger.error("NLPEngine script execution timed out.")
return self._get_fallback_analysis(subject, content, "script execution timeout")
logger.info(f"Successfully received analysis from NLPEngine. Method used: {analysis_data.get('validation', {}).get('method', 'unknown')}")
return AIAnalysisResult(analysis_data)
except Exception as e:
logger.error(f"An unexpected error occurred during AI analysis: {e}", exc_info=True)
return self._get_fallback_analysis(subject, content, str(e))
logger.error(f"An unexpected error occurred during direct AI analysis: {e}", exc_info=True)
# Use the existing _get_fallback_analysis, but pass the error from the direct call
return self._get_fallback_analysis(subject, content, f"direct NLPEngine error: {str(e)}")

async def train_models(self, training_emails: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Train AI models with email data"""
try:
# Save training data to temporary file
training_file = os.path.join(self.python_nlp_path, 'temp_training_data.json')
with open(training_file, 'w') as f:
json.dump(training_emails, f)

cmd = [
sys.executable,
self.ai_training_script,
'--train-models',
'--training-data', training_file,
'--output-format', 'json'
]

# result = await _execute_async_command(cmd, cwd=self.python_nlp_path) # Commented out
logger.warning("_execute_async_command is commented out. Returning error for train_models.")
result = {"error": "_execute_async_command not available"} # Mock result

# Cleanup temporary file
def train_models(self, training_emails: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: # Changed to synchronous
"""Train AI models with email data - Currently not functional with direct NLPEngine integration."""
logger.warning("train_models is not currently functional with direct NLPEngine integration. The ai_training.py script logic needs to be integrated into NLPEngine.")

# Path might need adjustment if os.path.dirname(__file__) is not appropriate in all contexts
# Assuming this file remains in server/python_backend/
training_file_path = os.path.join(os.path.dirname(__file__), '..', 'python_nlp', 'temp_training_data.json')

# Create dummy training_data.json if training_emails provided, to simulate old behavior for cleanup test
if training_emails:
try:
with open(training_file_path, 'w') as f:
json.dump(training_emails, f)
except IOError as e:
logger.error(f"Error creating temporary training file {training_file_path}: {e}")


# Cleanup temporary file if created (or if it exists from a previous run)
if os.path.exists(training_file_path):
try:
os.remove(training_file)
except:
pass

return {
"success": True,
"modelsTrained": result.get('models_trained', []),
"trainingAccuracy": result.get('training_accuracy', {}),
"validationAccuracy": result.get('validation_accuracy', {}),
"trainingTime": result.get('training_time', 0),
"emailsProcessed": len(training_emails),
"error": result.get('error')
}

except Exception as e:
logger.error(f"Model training failed: {e}")
return {
"success": False,
"modelsTrained": [],
"trainingAccuracy": {},
"validationAccuracy": {},
"trainingTime": 0,
"emailsProcessed": 0,
"error": str(e)
}
os.remove(training_file_path)
logger.info(f"Removed temporary training file: {training_file_path}")
except OSError as e:
logger.error(f"Error removing temporary training file {training_file_path}: {e}")

return {
"success": False,
"error": "Model training via direct NLPEngine call is not implemented. Requires ai_training.py logic integration.",
"modelsTrained": [],
"trainingAccuracy": {},
"validationAccuracy": {},
"trainingTime": 0,
"emailsProcessed": len(training_emails) if training_emails else 0,
}

async def health_check(self) -> Dict[str, Any]:
"""Check AI engine health"""
def health_check(self) -> Dict[str, Any]: # Changed to synchronous
"""Check AI engine health by inspecting the NLPEngine instance."""
try:
cmd = [
sys.executable,
self.nlp_service_script,
'--health-check',
'--output-format', 'json'
]

# result = await _execute_async_command(cmd, cwd=self.python_nlp_path) # Commented out
logger.warning("_execute_async_command is commented out. Returning unhealthy for health_check.")
result = {"status": "error", "error": "_execute_async_command not available"} # Mock result
models_available = []
if self.nlp_engine.sentiment_model: models_available.append("sentiment")
if self.nlp_engine.topic_model: models_available.append("topic")
if self.nlp_engine.intent_model: models_available.append("intent")
if self.nlp_engine.urgency_model: models_available.append("urgency")

all_models_loaded = all(model is not None for model in [
self.nlp_engine.sentiment_model, self.nlp_engine.topic_model,
self.nlp_engine.intent_model, self.nlp_engine.urgency_model
])

# Accessing HAS_NLTK and HAS_SKLEARN_AND_JOBLIB from nlp_engine instance
# These are class attributes in NLPEngine, so they are accessible via instance.
nltk_available = self.nlp_engine.HAS_NLTK
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): HAS_NLTK not defined on NLPEngine instance

Reference HAS_NLTK directly from the module or assign it to self in NLPEngine.init to prevent AttributeError.

sklearn_available = self.nlp_engine.HAS_SKLEARN_AND_JOBLIB

status = "ok"
if not all_models_loaded:
status = "degraded"
if not nltk_available or not sklearn_available:
status = "degraded" # Or "unhealthy" depending on severity

return {
"status": "unhealthy", # Changed to unhealthy due to missing command
"models_available": result.get('models_available', []),
"performance": result.get('performance', {}),
"status": status,
"models_available": models_available,
"nltk_available": nltk_available,
"sklearn_available": sklearn_available,
"timestamp": datetime.now().isoformat()
}

except Exception as e:
logger.error(f"AI health check failed: {e}")
logger.error(f"AI health check failed during direct inspection: {e}", exc_info=True)
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat()
}

async def cleanup(self):
def cleanup(self): # Changed to synchronous (was async but did sync operations)
"""Cleanup AI engine resources"""
try:
# Cleanup any temporary files or resources
temp_files = [
os.path.join(self.python_nlp_path, 'temp_training_data.json')
]
for temp_file in temp_files:
# Path might need adjustment if os.path.dirname(__file__) is not appropriate in all contexts
training_file_path = os.path.join(os.path.dirname(__file__), '..', 'python_nlp', 'temp_training_data.json')

temp_files_to_check = [training_file_path] # Add other temp files if any

for temp_file in temp_files_to_check:
if os.path.exists(temp_file):
os.remove(temp_file)
try:
os.remove(temp_file)
logger.info(f"Removed temporary file during cleanup: {temp_file}")
except OSError as e:
logger.error(f"Error removing temporary file {temp_file} during cleanup: {e}")

logger.info("AI Engine cleanup completed")
except Exception as e:
logger.error(f"AI Engine cleanup failed: {e}")

def _get_fallback_analysis(self, subject: str, content: str, error_context: Optional[str] = None) -> AIAnalysisResult:
"""
Provides a basic fallback analysis if the primary NLPEngine script fails or returns an error.
This uses the in-memory FallbackNLPEngine instance.
This uses the in-memory NLPEngine instance.
"""
reason = "Fallback analysis due to AI service error"
if error_context:
Expand All @@ -218,10 +192,10 @@ def _get_fallback_analysis(self, subject: str, content: str, error_context: Opti
logger.warning(f"{reason}. Subject: {subject[:50]}...")

try:
# Use the _get_simple_fallback_analysis from the FallbackNLPEngine instance
# Use the _get_simple_fallback_analysis from the NLPEngine instance
# This method provides: topic, sentiment, intent (default), urgency,
# confidence (default), categories, keywords (empty), reasoning.
fallback_data = self.fallback_nlp_engine._get_simple_fallback_analysis(subject, content)
fallback_data = self.nlp_engine._get_simple_fallback_analysis(subject, content) # Use self.nlp_engine

# Override reasoning if a specific error context was provided
if error_context:
Expand Down
28 changes: 12 additions & 16 deletions server/python_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .models import EmailCreate, EmailUpdate, CategoryCreate, ActivityCreate
# Updated import to use NLP GmailAIService directly
from server.python_nlp.gmail_service import GmailAIService
# Removed: from .smart_filters import EmailFilter (as per instruction)
from server.python_nlp.smart_filters import SmartFilterManager
from .ai_engine import AdvancedAIEngine
from .performance_monitor import PerformanceMonitor
Expand Down Expand Up @@ -250,7 +251,7 @@ async def create_email(
"""Create new email with AI analysis"""
try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Explicitly raise from a previous error [×2] (raise-from-previous-error)

# Perform AI analysis
ai_analysis = await ai_engine.analyze_email(email.subject, email.content)
ai_analysis = ai_engine.analyze_email(email.subject, email.content) # Changed to synchronous

# Apply smart filters
filter_results = await filter_manager.apply_filters_to_email(email.dict())
Expand Down Expand Up @@ -610,22 +611,17 @@ async def get_filters(request: Request):
async def create_filter(request: Request, filter_request_model: FilterRequest): # Renamed model
"""Create new email filter"""
try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

# Assuming filter_manager.create_filter (or add_custom_filter) takes a dict or an EmailFilter object
# The smart_filters.py in context had add_custom_filter(EmailFilter(...))
# and no create_filter. This might need adjustment in SmartFilterManager or here.
# For now, assuming a compatible create_filter or add_custom_filter exists.
# Let's assume add_custom_filter is the intended method from previous context for adding new filters.
from .smart_filters import EmailFilter # Ensure EmailFilter is available for instantiation
new_filter_config = EmailFilter(
description = filter_request_model.criteria.get("description", "")

new_filter_object = filter_manager.add_custom_filter(
name=filter_request_model.name,
description=filter_request_model.criteria.get("description", ""), # Assuming description might be in criteria
description=description,
criteria=filter_request_model.criteria,
action=filter_request_model.actions.get("type", ""), # Assuming actions dict has a 'type' for the action name
priority=filter_request_model.priority,
enabled=True
actions=filter_request_model.actions,
priority=filter_request_model.priority
)
filter_manager.add_custom_filter(new_filter_config) # Using add_custom_filter
return new_filter_config.__dict__ # Return the created filter as dict
# FastAPI will handle dataclass serialization to JSON
return new_filter_object
except Exception as e:
logger.error(
json.dumps({
Expand Down Expand Up @@ -717,10 +713,10 @@ async def extract_actions_from_text(

logger.info(f"Received action extraction request for subject: '{request_model.subject[:50] if request_model.subject else 'N/A'}'")

ai_analysis_result = await ai_engine.analyze_email(
ai_analysis_result = ai_engine.analyze_email(
subject=request_model.subject or "", # Pass empty string if subject is None
content=request_model.content
)
) # Changed to synchronous

# The AIAnalysisResult object should have an 'action_items' attribute
action_items_data = ai_analysis_result.action_items
Expand Down
Loading