diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..e8800b672 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +google-auth +google-auth-oauthlib +google-api-python-client +python-dotenv +scikit-learn +joblib +psycopg2-binary diff --git a/server/python_backend/database.py b/server/python_backend/database.py index 389de7b43..c66cef7a3 100644 --- a/server/python_backend/database.py +++ b/server/python_backend/database.py @@ -1,9 +1,10 @@ """ Database management for Gmail AI email processing -SQLite with async support and optimized queries +PostgreSQL implementation. """ -import sqlite3 +import psycopg2 +import psycopg2.extras # For RealDictCursor import asyncio import logging from typing import Dict, List, Any, Optional, Tuple @@ -15,65 +16,60 @@ logger = logging.getLogger(__name__) class DatabaseManager: - """Async database manager for email data""" - - def __init__(self, db_path: str = "emails.db"): - self.db_path = db_path - self.init_database() + """Async database manager for email data using PostgreSQL""" + + def __init__(self, db_url: Optional[str] = None): + self.database_url = db_url or os.getenv("DATABASE_URL") + if not self.database_url: + logger.error("DATABASE_URL environment variable not set.") + raise ValueError("DATABASE_URL environment variable not set.") + # self.init_database() # Table creation handled by Drizzle ORM based on schema.ts + # Seeding default data can be done here if needed, or via a separate script. + + async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False): + """Helper to execute queries using asyncio.to_thread for sync psycopg2.""" + conn = await asyncio.to_thread(psycopg2.connect, self.database_url) + try: + # Use RealDictCursor for dictionary-like row access + async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + await asyncio.to_thread(cur.execute, query, params) + + result = None + if fetch_one: + result_row = await asyncio.to_thread(cur.fetchone) + result = dict(result_row) if result_row else None + elif fetch_all: + result_rows = await asyncio.to_thread(cur.fetchall) + result = [dict(row) for row in result_rows] + + if commit: + await asyncio.to_thread(conn.commit) + + # For INSERT returning id, psycopg2 handles it differently + if query.strip().upper().startswith("INSERT") and "RETURNING id" in query.lower() and not result: + # If RETURNING id was used and fetch_one wasn't (e.g. if we only want lastrowid equivalent) + # For psycopg2, RETURNING is the standard way. + # If we need the ID and didn't fetch_one, we assume the query was like "INSERT ... RETURNING id" + # and the result should have been fetched. + # If fetch_one was True with RETURNING id, it's already in 'result'. + # This part might need adjustment based on how INSERT ... RETURNING is used. + # If lastrowid is needed, it's better to use RETURNING id and fetch_one. + pass # result would be set if RETURNING was used with fetch_one + + return result + except psycopg2.Error as e: + logger.error(f"Database error: {e}") + await asyncio.to_thread(conn.rollback) # Rollback on error + raise + finally: + await asyncio.to_thread(conn.close) def init_database(self): - """Initialize database with required tables""" - conn = sqlite3.connect(self.db_path) - - # Create emails table - conn.execute(""" - CREATE TABLE IF NOT EXISTS emails ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - message_id TEXT UNIQUE NOT NULL, - thread_id TEXT, - subject TEXT, - sender TEXT, - sender_email TEXT, - content TEXT, - snippet TEXT, - labels TEXT, - timestamp TEXT, - is_read BOOLEAN DEFAULT FALSE, - category_id INTEGER, - confidence INTEGER DEFAULT 0, - analysis_metadata TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - updated_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Create categories table - conn.execute(""" - CREATE TABLE IF NOT EXISTS categories ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE NOT NULL, - description TEXT, - color TEXT DEFAULT '#6366f1', - count INTEGER DEFAULT 0, - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Create activities table - conn.execute(""" - CREATE TABLE IF NOT EXISTS activities ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - type TEXT NOT NULL, - description TEXT NOT NULL, - email_id INTEGER, - email_subject TEXT, - metadata TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (email_id) REFERENCES emails (id) - ) - """) - - # Insert default categories + """ + Initializes the database with default categories. + Table creation is assumed to be handled by Drizzle migrations. + This method now only seeds default categories if they don't exist. + """ default_categories = [ ('Work & Business', 'Work-related emails and business communications', '#2563eb'), ('Personal & Family', 'Personal emails from friends and family', '#16a34a'), @@ -84,358 +80,529 @@ def init_database(self): ('General', 'Uncategorized emails', '#6b7280') ] - for name, desc, color in default_categories: - conn.execute(""" - INSERT OR IGNORE INTO categories (name, description, color) - VALUES (?, ?, ?) - """, (name, desc, color)) + # This function must be called from an async context if it uses _execute_query + # For simplicity in __init__, this part might be called separately or made synchronous + # if __init__ itself is not async. + # For now, let's make it a synchronous helper for init_database if needed. + conn = psycopg2.connect(self.database_url) + try: + with conn.cursor() as cur: + for name, desc, color in default_categories: + # Assuming 'name' should be unique for these default categories. + # The schema.ts doesn't enforce unique name for categories globally, + # but for default seeding, ON CONFLICT is useful. + # If categories table has a unique constraint on 'name': + # cur.execute(""" + # INSERT INTO categories (name, description, color) + # VALUES (%s, %s, %s) + # ON CONFLICT (name) DO NOTHING; + # """, (name, desc, color)) + # If no unique constraint, or if userId is involved for uniqueness (not in current schema.ts for categories) + # We might need to check existence first + cur.execute("SELECT id FROM categories WHERE name = %s", (name,)) + if not cur.fetchone(): + cur.execute(""" + INSERT INTO categories (name, description, color, count) + VALUES (%s, %s, %s, %s) + """, (name, desc, color, 0)) + conn.commit() + except psycopg2.Error as e: + logger.error(f"Error seeding default categories: {e}") + conn.rollback() + finally: + conn.close() - conn.commit() - conn.close() @asynccontextmanager async def get_connection(self): - """Async context manager for database connections""" - conn = sqlite3.connect(self.db_path) - conn.row_factory = sqlite3.Row + """Async context manager for database connections using psycopg2.""" + conn = None try: + conn = await asyncio.to_thread(psycopg2.connect, self.database_url) + conn.autocommit = False # Ensure transactions are handled explicitly yield conn + except psycopg2.Error as e: + logger.error(f"Failed to connect to database: {e}") + if conn: # pragma: no cover + await asyncio.to_thread(conn.rollback) # Rollback before raising + raise finally: - conn.close() + if conn: + await asyncio.to_thread(conn.close) async def initialize(self): - """Initialize database asynchronously""" - # Re-run initialization to ensure tables exist - self.init_database() - - async def create_email(self, email_data: Dict[str, Any]) -> Dict[str, Any]: - """Create a new email record""" - async with self.get_connection() as conn: - try: - cursor = conn.execute(""" - INSERT INTO emails - (message_id, thread_id, subject, sender, sender_email, content, - snippet, labels, timestamp, is_read, category_id, confidence, analysis_metadata) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - email_data['message_id'], - email_data.get('thread_id'), - email_data.get('subject'), - email_data.get('sender'), - email_data.get('sender_email'), - email_data.get('content'), - email_data.get('snippet'), - json.dumps(email_data.get('labels', [])), - email_data.get('timestamp'), - email_data.get('is_read', False), - email_data.get('category_id'), - email_data.get('confidence', 0), - json.dumps(email_data.get('analysis_metadata', {})) - )) - - email_id = cursor.lastrowid - conn.commit() - - # Update category count - if email_data.get('category_id'): - await self._update_category_count(email_data['category_id']) - - return await self.get_email_by_id(email_id) + """Initialize database asynchronously (e.g., seed data)""" + # This method is kept for API consistency if needed later for more async init steps. + # For now, default data seeding is in the synchronous init_database, + # which itself could be called from here if made async or wrapped. + # Let's assume init_database() is called from constructor for now for seeding. + logger.info("DatabaseManager initialized. Default categories seeding attempted if constructor called it.") + pass + + + def _parse_json_fields(self, row: Dict[str, Any], fields: List[str]) -> Dict[str, Any]: + """Helper to parse stringified JSON fields in a row.""" + if not row: + return row + for field in fields: + if field in row and isinstance(row[field], str): + try: + row[field] = json.loads(row[field]) + except json.JSONDecodeError: + logger.warning(f"Failed to parse JSON for field {field} in row {row.get('id')}") + row[field] = {} if field == 'analysisMetadata' or field == 'metadata' else [] # Default based on expected type + return row + + async def create_email(self, email_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Create a new email record. + Note: `schema.ts` has `isUnread` (bool) and a legacy `isRead` (bool). + The input `email_data` might have `is_read`. We need to map to `isUnread`. + `is_read = True` means `isUnread = False`. + `is_read = False` means `isUnread = True`. + """ + # Schema default for isUnread is TRUE. So if 'is_read' is not provided, assume it's NOT read (isUnread=True). + # email_data.get('is_read', False) means if 'is_read' is absent, consider it False. + # not False -> True. So isUnread defaults to True. This matches schema. + is_unread = not email_data.get('is_read', False) + + query = """ + INSERT INTO emails + ("messageId", "threadId", subject, sender, "senderEmail", content, snippet, labels, "time", "isUnread", "categoryId", confidence, "analysisMetadata", "createdAt", "updatedAt", + "historyId", "contentHtml", preview, "toAddresses", "ccAddresses", "bccAddresses", "replyTo", "internalDate", "labelIds", category, "isStarred", "isImportant", "isDraft", "isSent", "isSpam", "isTrash", "isChat", "hasAttachments", "attachmentCount", "sizeEstimate", "spfStatus", "dkimStatus", "dmarcStatus", "isEncrypted", "isSigned", priority, "isAutoReply", "mailingList", "inReplyTo", "references", "isFirstInThread") + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id; + """ + # Ensure all fields from schema.ts are accounted for, using defaults if not provided + params = ( + email_data.get('messageId', email_data.get('message_id')), # Map from old `message_id` if present + email_data.get('threadId', email_data.get('thread_id')), + email_data.get('subject'), + email_data.get('sender'), + email_data.get('senderEmail', email_data.get('sender_email')), + email_data.get('content'), + email_data.get('snippet'), + email_data.get('labels', []), # Should be a list of strings + email_data.get('time', email_data.get('timestamp')), # Map from old `timestamp` + is_unread, + email_data.get('categoryId', email_data.get('category_id')), + email_data.get('confidence', 0), + json.dumps(email_data.get('analysisMetadata', email_data.get('analysis_metadata', {}))), + email_data.get('historyId'), + email_data.get('contentHtml'), + email_data.get('preview', email_data.get('snippet')), # Use snippet if preview not present + email_data.get('toAddresses', []), + email_data.get('ccAddresses', []), + email_data.get('bccAddresses', []), + email_data.get('replyTo'), + email_data.get('internalDate'), + email_data.get('labelIds', []), + email_data.get('category'), + email_data.get('isStarred', False), + email_data.get('isImportant', False), + email_data.get('isDraft', False), + email_data.get('isSent', False), + email_data.get('isSpam', False), + email_data.get('isTrash', False), + email_data.get('isChat', False), + email_data.get('hasAttachments', False), + email_data.get('attachmentCount', 0), + email_data.get('sizeEstimate'), + email_data.get('spfStatus'), + email_data.get('dkimStatus'), + email_data.get('dmarcStatus'), + email_data.get('isEncrypted', False), + email_data.get('isSigned', False), + email_data.get('priority', 'normal'), + email_data.get('isAutoReply', False), + email_data.get('mailingList'), + email_data.get('inReplyTo'), + email_data.get('references', []), + email_data.get('isFirstInThread', True) + ) + try: + result = await self._execute_query(query, params, fetch_one=True, commit=True) + if result and result.get('id'): + email_id = result['id'] + # Update category count if categoryId is present + category_id = email_data.get('categoryId', email_data.get('category_id')) + if category_id: + await self._update_category_count(category_id) # Make sure this is adapted too + return await self.get_email_by_id(email_id) # Adapted to fetch full dict + return None + except psycopg2.IntegrityError as e: + # This IntegrityError check is for UNIQUE constraint on messageId + logger.warning(f"Email with messageId {email_data.get('messageId')} likely already exists: {e}") + # Attempt to update if it already exists + # Note: Drizzle schema for emails.messageId does not have .notNull() which is strange. Assuming it is effectively notNull. + return await self.update_email_by_message_id(email_data['messageId'], email_data) - except sqlite3.IntegrityError: - # Email already exists, update it - return await self.update_email_by_message_id(email_data['message_id'], email_data) async def get_email_by_id(self, email_id: int) -> Optional[Dict[str, Any]]: """Get email by ID""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - SELECT e.*, c.name as category_name, c.color as category_color - FROM emails e - LEFT JOIN categories c ON e.category_id = c.id - WHERE e.id = ? - """, (email_id,)) - - row = cursor.fetchone() - if row: - return self._row_to_dict(row) + query = """ + SELECT e.*, c.name as "categoryName", c.color as "categoryColor" + FROM emails e + LEFT JOIN categories c ON e."categoryId" = c.id + WHERE e.id = %s + """ + # Important: Accessing e.* will get all columns. If some are JSON strings, they need parsing. + # RealDictCursor returns dicts, but JSON stored as text is still text. + row = await self._execute_query(query, (email_id,), fetch_one=True) + return self._parse_json_fields(row, ['analysisMetadata']) if row else None + + + async def get_categories(self) -> List[Dict[str, Any]]: + """Get all categories with their counts.""" + # The schema.ts has `count` in `categories` table. + # This implies the count is maintained there. + # The old query dynamically calculated count. If `categories.count` is reliable: + query = """ + SELECT id, name, description, color, count + FROM categories + ORDER BY name + """ + # If categories.count is NOT reliable and needs dynamic calculation: + # query = """ + # SELECT c.id, c.name, c.description, c.color, + # (SELECT COUNT(*) FROM emails e WHERE e."categoryId" = c.id) as count + # FROM categories c + # ORDER BY c.name + # """ + # Assuming schema.ts `count` field is maintained and accurate. + categories = await self._execute_query(query, fetch_all=True) + return categories if categories else [] + + async def create_category(self, category_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Create a new category. + The schema.ts for categories has id, name, description, color, count. + It does not have userId. + """ + query = """ + INSERT INTO categories (name, description, color, count) + VALUES (%s, %s, %s, %s) + RETURNING id, name, description, color, count; + """ + params = ( + category_data['name'], + category_data.get('description'), + category_data.get('color', '#6366f1'), + category_data.get('count', 0) # Default count to 0 + ) + try: + # ON CONFLICT (name) DO NOTHING; could be added if name should be unique + # but schema.ts doesn't specify this. For now, allow duplicates or handle at app level. + new_category = await self._execute_query(query, params, fetch_one=True, commit=True) + return new_category + except psycopg2.Error as e: + logger.error(f"Failed to create category {category_data.get('name')}: {e}") return None + async def _update_category_count(self, category_id: int): + """Update category email count.""" + # This method assumes there's a "count" column in the "categories" table. + # schema.ts confirms `count: integer("count").default(0)` + query = """ + UPDATE categories + SET count = (SELECT COUNT(*) FROM emails WHERE "categoryId" = %s) + WHERE id = %s + """ + await self._execute_query(query, (category_id, category_id), commit=True) + + + # ... other methods to be adapted ... + # For now, let's placeholder them or comment out async def get_emails(self, limit: int = 50, offset: int = 0, - category_id: Optional[int] = None) -> List[Dict[str, Any]]: - """Get emails with pagination and filtering""" - async with self.get_connection() as conn: - where_clause = "" - params = [] - - if category_id: - where_clause = "WHERE e.category_id = ?" - params.append(category_id) - - cursor = conn.execute(f""" - SELECT e.*, c.name as category_name, c.color as category_color - FROM emails e - LEFT JOIN categories c ON e.category_id = c.id - {where_clause} - ORDER BY e.timestamp DESC - LIMIT ? OFFSET ? - """, params + [limit, offset]) - - return [self._row_to_dict(row) for row in cursor.fetchall()] + category_id: Optional[int] = None, is_unread: Optional[bool] = None) -> List[Dict[str, Any]]: + """Get emails with pagination and filtering. + 'is_unread' corresponds to 'isUnread' in the schema. + 'timestamp' from old code maps to 'time' in new schema for ordering. + """ + params = [] + base_query = """ + SELECT e.*, c.name as "categoryName", c.color as "categoryColor" + FROM emails e + LEFT JOIN categories c ON e."categoryId" = c.id + """ + + where_clauses = [] + if category_id is not None: + where_clauses.append('e."categoryId" = %s') + params.append(category_id) + + if is_unread is not None: + where_clauses.append('e."isUnread" = %s') # Directly use isUnread + params.append(is_unread) + + if where_clauses: + base_query += " WHERE " + " AND ".join(where_clauses) + + base_query += ' ORDER BY e."time" DESC LIMIT %s OFFSET %s' + params.extend([limit, offset]) + + emails = await self._execute_query(base_query, tuple(params), fetch_all=True) + if emails: + return [self._parse_json_fields(email, ['analysisMetadata']) for email in emails] + return [] async def update_email_by_message_id(self, message_id: str, - update_data: Dict[str, Any]) -> Dict[str, Any]: - """Update email by message ID""" - async with self.get_connection() as conn: - set_clauses = [] - params = [] - - for key, value in update_data.items(): - if key in ['subject', 'sender', 'sender_email', 'content', 'snippet', - 'timestamp', 'is_read', 'category_id', 'confidence']: - set_clauses.append(f"{key} = ?") - params.append(value) - elif key == 'labels': - set_clauses.append("labels = ?") - params.append(json.dumps(value)) - elif key == 'analysis_metadata': - set_clauses.append("analysis_metadata = ?") - params.append(json.dumps(value)) - - if set_clauses: - set_clauses.append("updated_at = CURRENT_TIMESTAMP") - params.append(message_id) - - conn.execute(f""" - UPDATE emails SET {', '.join(set_clauses)} - WHERE message_id = ? - """, params) - conn.commit() - - # Get updated email - cursor = conn.execute("SELECT id FROM emails WHERE message_id = ?", (message_id,)) - row = cursor.fetchone() - if row: - return await self.get_email_by_id(row['id']) - return {} + update_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Update email by messageId. + Handles 'is_read' to 'isUnread' conversion. + Other schema.ts fields should be used directly if present in update_data. + """ + set_clauses = [] + params = [] + + # Mapping from old Python keys to new schema keys if necessary, or direct use + # Fields from schema.ts: subject, sender, senderEmail, content, snippet, time, isUnread, categoryId, confidence, labels, analysisMetadata, etc. + # Quoted column names are essential. + + for key, value in update_data.items(): + column_name = key + # Handle specific mappings or transformations + if key == "message_id": continue # Cannot update message_id itself, it's for the WHERE clause + if key == "is_read": # Legacy key from old Python code + column_name = '"isUnread"' + value = not value # Invert logic + elif key == "category_id": + column_name = '"categoryId"' + elif key == "sender_email": + column_name = '"senderEmail"' + elif key == "thread_id": + column_name = '"threadId"' + elif key == "analysis_metadata": # Stored as JSON string + column_name = '"analysisMetadata"' + value = json.dumps(value) + elif key == "labels": # Stored as text array + column_name = '"labels"' + # psycopg2 handles Python list to PG array, ensure value is a list + if not isinstance(value, list): + logger.warning(f"Labels value for update is not a list: {value}, attempting to wrap.") + value = [str(value)] # Or handle error appropriately + elif key == "timestamp": # old key, maps to "time" + column_name = '"time"' + else: # For other keys, assume they match schema.ts (camelCased) and quote them + column_name = f'"{key}"' + + + set_clauses.append(f"{column_name} = %s") + params.append(value) + + if not set_clauses: + logger.warning("No valid fields to update for message_id: {message_id}") + return await self.get_email_by_message_id(message_id) # Return current state if no updates + + set_clauses.append('"updatedAt" = NOW()') + + query = f""" + UPDATE emails SET {', '.join(set_clauses)} + WHERE "messageId" = %s + """ + params.append(message_id) + + await self._execute_query(query, tuple(params), commit=True) + + # Fetch and return the updated email using the internal ID + # This requires a way to get 'id' from 'messageId' if not already available. + # For simplicity, if get_email_by_message_id is implemented, use that. + return await self.get_email_by_message_id(message_id) + + async def get_email_by_message_id(self, message_id: str) -> Optional[Dict[str, Any]]: + """Get email by messageId (similar to get_email_by_id but uses messageId)""" + query = """ + SELECT e.*, c.name as "categoryName", c.color as "categoryColor" + FROM emails e + LEFT JOIN categories c ON e."categoryId" = c.id + WHERE e."messageId" = %s + """ + row = await self._execute_query(query, (message_id,), fetch_one=True) + return self._parse_json_fields(row, ['analysisMetadata']) if row else None + + + async def create_activity(self, activity_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Create a new activity record. + Schema: id, type, description, details, timestamp, icon, iconBg + """ + query = """ + INSERT INTO activities (type, description, details, "timestamp", icon, "iconBg") + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING id, type, description, details, "timestamp", icon, "iconBg"; + """ + # The old 'metadata' field can map to 'details' (as text/JSON string) + # 'email_id' and 'email_subject' from old schema should be part of 'details' if needed. + details_data = activity_data.get('metadata', {}) + if 'email_id' in activity_data: + details_data['emailId'] = activity_data['email_id'] # Match camelCase + if 'email_subject' in activity_data: + details_data['emailSubject'] = activity_data['email_subject'] + + params = ( + activity_data['type'], + activity_data['description'], + json.dumps(details_data) if details_data else None, + activity_data.get('timestamp', datetime.now().isoformat()), # Ensure ISO format for text timestamp + activity_data.get('icon', 'default_icon'), # Provide defaults if not in activity_data + activity_data.get('iconBg', '#ffffff') # Provide defaults + ) + new_activity = await self._execute_query(query, params, fetch_one=True, commit=True) + return self._parse_json_fields(new_activity, ['details']) if new_activity else None - async def get_categories(self) -> List[Dict[str, Any]]: - """Get all categories""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - SELECT id, name, description, color, - (SELECT COUNT(*) FROM emails WHERE category_id = categories.id) as count - FROM categories - ORDER BY name - """) - - return [self._row_to_dict(row) for row in cursor.fetchall()] - - async def create_category(self, category_data: Dict[str, Any]) -> Dict[str, Any]: - """Create a new category""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - INSERT INTO categories (name, description, color) - VALUES (?, ?, ?) - """, ( - category_data['name'], - category_data.get('description'), - category_data.get('color', '#6366f1') - )) - - category_id = cursor.lastrowid - conn.commit() - - return { - 'id': category_id, - 'name': category_data['name'], - 'description': category_data.get('description'), - 'color': category_data.get('color', '#6366f1'), - 'count': 0 - } - - async def create_activity(self, activity_data: Dict[str, Any]) -> Dict[str, Any]: - """Create a new activity record""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - INSERT INTO activities (type, description, email_id, email_subject, metadata) - VALUES (?, ?, ?, ?, ?) - """, ( - activity_data['type'], - activity_data['description'], - activity_data.get('email_id'), - activity_data.get('email_subject'), - json.dumps(activity_data.get('metadata', {})) - )) - - activity_id = cursor.lastrowid - conn.commit() - - return { - 'id': activity_id, - 'type': activity_data['type'], - 'description': activity_data['description'], - 'email_id': activity_data.get('email_id'), - 'email_subject': activity_data.get('email_subject'), - 'metadata': activity_data.get('metadata', {}), - 'created_at': datetime.now().isoformat() - } async def get_recent_activities(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent activities""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - SELECT * FROM activities - ORDER BY created_at DESC - LIMIT ? - """, (limit,)) - - return [self._row_to_dict(row) for row in cursor.fetchall()] + query = """ + SELECT id, type, description, details, "timestamp", icon, "iconBg" + FROM activities + ORDER BY "timestamp" DESC + LIMIT %s + """ + activities = await self._execute_query(query, (limit,), fetch_all=True) + if activities: + return [self._parse_json_fields(activity, ['details']) for activity in activities] + return [] async def get_dashboard_stats(self) -> Dict[str, Any]: - """Get dashboard statistics""" - async with self.get_connection() as conn: - # Get total emails - cursor = conn.execute("SELECT COUNT(*) as total FROM emails") - total_emails = cursor.fetchone()['total'] - - # Get unread emails - cursor = conn.execute("SELECT COUNT(*) as unread FROM emails WHERE is_read = FALSE") - unread_emails = cursor.fetchone()['unread'] - - # Get categories count - cursor = conn.execute("SELECT COUNT(*) as total FROM categories") - total_categories = cursor.fetchone()['total'] - - # Get recent activities count - cursor = conn.execute("SELECT COUNT(*) as total FROM activities") - total_activities = cursor.fetchone()['total'] - - # Get top categories - cursor = conn.execute(""" - SELECT c.name, c.color, COUNT(e.id) as count - FROM categories c - LEFT JOIN emails e ON c.id = e.category_id - GROUP BY c.id, c.name, c.color - ORDER BY count DESC - LIMIT 5 - """) - top_categories = [self._row_to_dict(row) for row in cursor.fetchall()] - - return { - 'totalEmails': total_emails, - 'autoLabeled': total_emails, # Assuming all are auto-labeled - 'categories': total_categories, - 'timeSaved': "2.5 hours", - 'weeklyGrowth': { - 'emails': total_emails, - 'percentage': 15.0 - }, - 'topCategories': top_categories + logger.warning("get_dashboard_stats not fully migrated to PostgreSQL yet.") + # This method needs significant rework based on the new schema and how stats are derived. + # For example, 'autoLabeled' and 'timeSaved' are not direct DB fields. + # Category counts will need to use the 'categories.count' field or a new query. + # For now, return a basic structure. + + results = await asyncio.gather( + self._execute_query('SELECT COUNT(*) AS count FROM emails', fetch_one=True), + self._execute_query('SELECT COUNT(*) AS count FROM emails WHERE "isUnread" = TRUE', fetch_one=True), + self._execute_query('SELECT COUNT(*) AS count FROM emails WHERE priority = %s', ('high',), fetch_one=True), # For "important" + self._execute_query('SELECT COUNT(*) AS count FROM emails WHERE "isSpam" = TRUE', fetch_one=True), # Using isSpam field + self._execute_query('SELECT COUNT(*) AS count FROM categories', fetch_one=True), + self._execute_query('SELECT name, color, count FROM categories ORDER BY count DESC LIMIT 5', fetch_all=True) + ) + + total_emails = results[0]['count'] if results[0] else 0 + unread_emails = results[1]['count'] if results[1] else 0 + important_emails = results[2]['count'] if results[2] else 0 + spam_emails = results[3]['count'] if results[3] else 0 + total_category_types = results[4]['count'] if results[4] else 0 + top_categories_list = results[5] if results[5] else [] + + # The 'DashboardStats' type in schema.ts has more complex fields like 'weeklyGrowth'. + # These are likely calculated in application logic or are placeholders. + # The python backend should provide the raw counts it can get. + return { + 'totalEmails': total_emails, + 'unreadEmails': unread_emails, + 'importantEmails': important_emails, + 'spamEmails': spam_emails, + 'totalCategoryTypes': total_category_types, + 'topCategories': top_categories_list, + + # Placeholders for fields not directly derivable from current DB structure by this function alone + 'autoLabeled': total_emails, # Example: Assuming all emails are auto-labeled for now + 'timeSaved': "2.5 hours", # Example: static or calculated elsewhere + # weeklyGrowth would require historical data not currently queried here. + 'weeklyGrowth': { + 'totalEmails': total_emails, # This is current total, not growth + 'autoLabeled': total_emails, # Placeholder + 'categories': total_category_types, # Placeholder + 'timeSaved': 0 # Placeholder } + } + + # --- Methods to be fully adapted or verified --- async def get_all_emails(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: """Get all emails with pagination""" - return await self.get_emails(limit, offset) + return await self.get_emails(limit=limit, offset=offset) # Relies on get_emails async def get_emails_by_category(self, category_id: int, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: """Get emails by category""" - return await self.get_emails(limit, offset, category_id) + return await self.get_emails(limit=limit, offset=offset, category_id=category_id) # Relies on get_emails async def search_emails(self, search_term: str, limit: int = 50) -> List[Dict[str, Any]]: - """Search emails by content or subject""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - SELECT e.*, c.name as category_name, c.color as category_color - FROM emails e - LEFT JOIN categories c ON e.category_id = c.id - WHERE e.subject LIKE ? OR e.content LIKE ? - ORDER BY e.timestamp DESC - LIMIT ? - """, (f"%{search_term}%", f"%{search_term}%", limit)) - - return [self._row_to_dict(row) for row in cursor.fetchall()] - - async def get_all_categories(self) -> List[Dict[str, Any]]: - """Get all categories""" - return await self.get_categories() + """Search emails by content or subject. + Uses ILIKE for case-insensitive search in PostgreSQL. + Orders by 'time' (reception time). + """ + query = """ + SELECT e.*, c.name as "categoryName", c.color as "categoryColor" + FROM emails e + LEFT JOIN categories c ON e."categoryId" = c.id + WHERE e.subject ILIKE %s OR e.content ILIKE %s + ORDER BY e."time" DESC + LIMIT %s + """ + # The schema has e.preview and e.snippet as well, could be included in search + params = (f"%{search_term}%", f"%{search_term}%", limit) + emails = await self._execute_query(query, params, fetch_all=True) + if emails: + return [self._parse_json_fields(email, ['analysisMetadata']) for email in emails] + return [] + async def get_recent_emails(self, limit: int = 100) -> List[Dict[str, Any]]: - """Get recent emails for analysis""" - async with self.get_connection() as conn: - cursor = conn.execute(""" - SELECT e.*, c.name as category_name, c.color as category_color - FROM emails e - LEFT JOIN categories c ON e.category_id = c.id - ORDER BY e.timestamp DESC - LIMIT ? - """, (limit,)) - - return [self._row_to_dict(row) for row in cursor.fetchall()] + """Get recent emails for analysis, ordered by reception time.""" + # This effectively calls get_emails without specific filters, ordered by time. + return await self.get_emails(limit=limit, offset=0) - async def update_email(self, email_id: int, update_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Update email by ID""" - async with self.get_connection() as conn: - set_clauses = [] - params = [] - - for key, value in update_data.items(): - if key in ['subject', 'content', 'sender', 'sender_email', - 'is_read', 'is_important', 'is_starred', 'category_id', 'confidence']: - set_clauses.append(f"{key} = ?") - params.append(value) - elif key == 'labels': - set_clauses.append("labels = ?") - params.append(json.dumps(value)) - - if set_clauses: - set_clauses.append("updated_at = CURRENT_TIMESTAMP") - params.append(email_id) - - conn.execute(f""" - UPDATE emails SET {', '.join(set_clauses)} - WHERE id = ? - """, params) - conn.commit() - return await self.get_email_by_id(email_id) - return None + async def update_email(self, email_id: int, update_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Update email by its internal ID. + This is similar to update_email_by_message_id but uses the primary key 'id'. + """ + set_clauses = [] + params = [] + + for key, value in update_data.items(): + column_name = key + if key == "id": continue # Cannot update id itself + + if key == "is_read": # Legacy key + column_name = '"isUnread"' + value = not value + elif key == "is_important": # schema.ts: isImportant + column_name = '"isImportant"' + elif key == "is_starred": # schema.ts: isStarred + column_name = '"isStarred"' + elif key == "category_id": + column_name = '"categoryId"' + elif key == "analysis_metadata": + column_name = '"analysisMetadata"' + value = json.dumps(value) + elif key == "labels": # text[] + column_name = '"labels"' + if not isinstance(value, list): value = [str(value)] + # Add other direct schema mappings here if they are simple string/bool/int + # Example: subject, content, sender, senderEmail, confidence + elif key in ['subject', 'content', 'sender', 'senderEmail', 'confidence', 'snippet', 'time', 'category', 'priority']: + column_name = f'"{key}"' # Quote camelCase keys + else: + logger.warning(f"update_email: Unhandled key '{key}' or key not directly updatable by internal ID method. Skipping.") + continue + + set_clauses.append(f"{column_name} = %s") + params.append(value) + + if not set_clauses: + logger.info(f"No valid fields to update for email id: {email_id}") + return await self.get_email_by_id(email_id) + + set_clauses.append('"updatedAt" = NOW()') + + query = f""" + UPDATE emails SET {', '.join(set_clauses)} + WHERE id = %s + """ + params.append(email_id) + + await self._execute_query(query, tuple(params), commit=True) + return await self.get_email_by_id(email_id) - async def _update_category_count(self, category_id: int): - """Update category email count""" - async with self.get_connection() as conn: - conn.execute(""" - UPDATE categories - SET count = (SELECT COUNT(*) FROM emails WHERE category_id = ?) - WHERE id = ? - """, (category_id, category_id)) - conn.commit() - - def _row_to_dict(self, row) -> Dict[str, Any]: - """Convert SQLite row to dictionary""" - result = dict(row) - - # Parse JSON fields - if 'labels' in result and result['labels']: - try: - result['labels'] = json.loads(result['labels']) - except: - result['labels'] = [] - - if 'analysis_metadata' in result and result['analysis_metadata']: - try: - result['analysis_metadata'] = json.loads(result['analysis_metadata']) - except: - result['analysis_metadata'] = {} - - if 'metadata' in result and result['metadata']: - try: - result['metadata'] = json.loads(result['metadata']) - except: - result['metadata'] = {} - - return result async def get_db() -> DatabaseManager: """Dependency injection for database""" + # Consider how DatabaseManager is instantiated, especially if init_database needs to be async + # For now, assuming constructor handles necessary synchronous setup like assigning db_url + # and actual async connections happen in methods. + # If init_database (seeding) needs to be async, it should be called from an async context. + # One option: db_manager = DatabaseManager(); await db_manager.initialize_async_components() return DatabaseManager() \ No newline at end of file diff --git a/server/python_backend/performance_monitor.py b/server/python_backend/performance_monitor.py index 8f0d9c4ca..95bc8d517 100644 --- a/server/python_backend/performance_monitor.py +++ b/server/python_backend/performance_monitor.py @@ -10,10 +10,12 @@ from collections import defaultdict, deque import json import psutil -import sqlite3 +# import sqlite3 # Removed SQLite logger = logging.getLogger(__name__) +# dataclasses remain the same + @dataclass class PerformanceMetric: """Performance metric data point""" @@ -34,72 +36,30 @@ class SystemHealth: uptime: float class PerformanceMonitor: - """Performance monitoring for email processing""" + """Performance monitoring for email processing (In-memory version)""" - def __init__(self, db_path: str = "performance.db"): + def __init__(self): # Removed db_path self.metrics_history = defaultdict(deque) - self.performance_data = {} + # self.performance_data = {} # This was not used, can be removed self.alert_thresholds = { - 'processing_time': 5.0, # seconds - 'error_rate': 0.1, # 10% - 'memory_usage': 0.8, # 80% - "cpu_usage": 80.0, - "memory_usage": 85.0, - "disk_usage": 90.0, - "response_time": 5000.0, # milliseconds - "error_rate": 10.0 # percentage + 'processing_time': 5.0, # seconds # This specific key is not used in _check_alerts + 'error_rate': 0.1, # 10% # This specific key is not used in _check_alerts + # 'memory_usage': 0.8, # 80% # This is covered by system health alert thresholds + "cpu_usage": 80.0, # Used by system health and _check_alerts if metric 'cpu_usage' recorded + "memory_usage": 85.0, # Used by system health and _check_alerts if metric 'memory_usage' recorded + "disk_usage": 90.0, # Used by system health + "response_time": 5000.0, # milliseconds # Used by _check_alerts + # "error_rate": 10.0 # percentage # This is covered by email processing error rate } - self.db_path = db_path - self.metrics_buffer = deque(maxlen=1000) - self.service_metrics = defaultdict(list) - self.init_database() - - def init_database(self): - """Initialize performance monitoring database""" - conn = sqlite3.connect(self.db_path) - - conn.execute(""" - CREATE TABLE IF NOT EXISTS performance_metrics ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - metric_name TEXT NOT NULL, - value REAL NOT NULL, - tags TEXT, - metadata TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - conn.execute(""" - CREATE TABLE IF NOT EXISTS system_health ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - status TEXT NOT NULL, - cpu_usage REAL, - memory_usage REAL, - disk_usage REAL, - process_count INTEGER, - uptime REAL, - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) - - conn.execute(""" - CREATE TABLE IF NOT EXISTS performance_alerts ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - alert_type TEXT NOT NULL, - severity TEXT NOT NULL, - message TEXT NOT NULL, - metric_value REAL, - threshold REAL, - resolved BOOLEAN DEFAULT FALSE, - created_at TEXT DEFAULT CURRENT_TIMESTAMP - ) - """) + # self.db_path = db_path # Removed + self.metrics_buffer = deque(maxlen=1000) # In-memory buffer for metrics + self.alerts_buffer = deque(maxlen=100) # In-memory buffer for alerts + self.system_health_history = deque(maxlen=100) # In-memory for system health + # self.service_metrics = defaultdict(list) # This was not used, can be removed + # self.init_database() # Removed SQLite database initialization + logger.info("PerformanceMonitor initialized (in-memory mode). SQLite persistence removed.") - conn.commit() - conn.close() + # init_database method removed as SQLite is no longer used. async def record_email_processing( self, @@ -256,65 +216,35 @@ async def record_metric(self, metric_name: str, value: float, self.metrics_buffer.append(metric) - # Store in database - await self._store_metric(metric) + # Store in database (Removed) + # await self._store_metric(metric) + logger.debug(f"Metric recorded (in-memory): {metric.metric_name} = {metric.value}") - # Check for alerts + # Check for alerts (in-memory) await self._check_alerts(metric) - async def _store_metric(self, metric: PerformanceMetric): - """Store metric in database""" - conn = sqlite3.connect(self.db_path) - - conn.execute(""" - INSERT INTO performance_metrics - (timestamp, metric_name, value, tags, metadata) - VALUES (?, ?, ?, ?, ?) - """, ( - metric.timestamp.isoformat(), - metric.metric_name, - metric.value, - json.dumps(metric.tags), - json.dumps(metric.metadata) - )) - - conn.commit() - conn.close() + # _store_metric method removed async def _check_alerts(self, metric: PerformanceMetric): - """Check if metric triggers any alerts""" + """Check if metric triggers any alerts and store them in-memory.""" threshold = self.alert_thresholds.get(metric.metric_name) if threshold and metric.value > threshold: - await self._create_alert( - alert_type=metric.metric_name, - severity="warning" if metric.value < threshold * 1.2 else "critical", - message=f"{metric.metric_name} exceeded threshold: {metric.value:.2f} > {threshold}", - metric_value=metric.value, - threshold=threshold - ) + alert_message = f"{metric.metric_name} exceeded threshold: {metric.value:.2f} > {threshold}" + severity = "warning" if metric.value < threshold * 1.2 else "critical" + + alert_data = { + "timestamp": datetime.now().isoformat(), + "alert_type": metric.metric_name, + "severity": severity, + "message": alert_message, + "metric_value": metric.value, + "threshold": threshold, + "resolved": False # In-memory alerts are not resolved in this simple model + } + self.alerts_buffer.append(alert_data) + logger.warning(f"Performance Alert [{severity}]: {alert_message}") - async def _create_alert(self, alert_type: str, severity: str, message: str, - metric_value: float, threshold: float): - """Create a performance alert""" - conn = sqlite3.connect(self.db_path) - - conn.execute(""" - INSERT INTO performance_alerts - (timestamp, alert_type, severity, message, metric_value, threshold) - VALUES (?, ?, ?, ?, ?, ?) - """, ( - datetime.now().isoformat(), - alert_type, - severity, - message, - metric_value, - threshold - )) - - conn.commit() - conn.close() - - logger.warning(f"Performance Alert [{severity}]: {message}") + # _create_alert method (DB part) removed. Alert logging is now in _check_alerts. async def get_system_health(self) -> SystemHealth: """Get current system health status""" @@ -345,13 +275,15 @@ async def get_system_health(self) -> SystemHealth: uptime=uptime ) - # Record system health - await self._store_system_health(health) + # Record system health (in-memory) + self.system_health_history.append(health) + # await self._store_system_health(health) # Removed DB storage return health except Exception as e: logger.error(f"Error getting system health: {e}") + # Return a SystemHealth object even in case of error return SystemHealth( status="unknown", cpu_usage=0.0, @@ -361,120 +293,77 @@ async def get_system_health(self) -> SystemHealth: uptime=0.0 ) - async def _store_system_health(self, health: SystemHealth): - """Store system health in database""" - conn = sqlite3.connect(self.db_path) - - conn.execute(""" - INSERT INTO system_health - (timestamp, status, cpu_usage, memory_usage, disk_usage, process_count, uptime) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, ( - datetime.now().isoformat(), - health.status, - health.cpu_usage, - health.memory_usage, - health.disk_usage, - health.process_count, - health.uptime - )) - - conn.commit() - conn.close() + # _store_system_health method removed async def get_metrics_summary(self, hours: int = 24) -> Dict[str, Any]: - """Get performance metrics summary for the past N hours""" - conn = sqlite3.connect(self.db_path) - + """Get performance metrics summary for the past N hours from in-memory buffer.""" since_time = datetime.now() - timedelta(hours=hours) - cursor = conn.execute(""" - SELECT metric_name, - COUNT(*) as count, - AVG(value) as avg_value, - MIN(value) as min_value, - MAX(value) as max_value, - STDEV(value) as std_dev - FROM performance_metrics - WHERE timestamp > ? - GROUP BY metric_name - """, (since_time.isoformat(),)) - - metrics_summary = {} - for row in cursor.fetchall(): - metrics_summary[row[0]] = { - "count": row[1], - "average": row[2], - "minimum": row[3], - "maximum": row[4], - "std_deviation": row[5] or 0.0 + # Filter metrics from buffer + relevant_metrics = [m for m in self.metrics_buffer if m.timestamp > since_time] + + metrics_summary_temp = defaultdict(list) + for metric in relevant_metrics: + metrics_summary_temp[metric.metric_name].append(metric.value) + + final_metrics_summary = {} + for name, values in metrics_summary_temp.items(): + count = len(values) + avg = sum(values) / count if count > 0 else 0 + # Basic stats, can add min, max, std_dev if numpy/statistics is allowed or implemented manually + final_metrics_summary[name] = { + "count": count, + "average": avg, + "sum": sum(values), + "values": values # Could be large, consider removing for actual summary } - # Get recent alerts - cursor = conn.execute(""" - SELECT alert_type, severity, COUNT(*) as count - FROM performance_alerts - WHERE timestamp > ? AND resolved = FALSE - GROUP BY alert_type, severity - """, (since_time.isoformat(),)) - - alerts_summary = [] - for row in cursor.fetchall(): - alerts_summary.append({ - "type": row[0], - "severity": row[1], - "count": row[2] - }) + # Filter alerts from buffer + relevant_alerts = [a for a in self.alerts_buffer if datetime.fromisoformat(a['timestamp']) > since_time and not a['resolved']] + alerts_summary_temp = defaultdict(lambda: defaultdict(int)) + for alert in relevant_alerts: + alerts_summary_temp[alert['alert_type']][alert['severity']] += 1 + + final_alerts_summary = [] + for alert_type, severities in alerts_summary_temp.items(): + for severity, count in severities.items(): + final_alerts_summary.append({ + "type": alert_type, + "severity": severity, + "count": count + }) - conn.close() return { "time_range_hours": hours, - "metrics": metrics_summary, - "alerts": alerts_summary, - "total_metrics": sum(m["count"] for m in metrics_summary.values()) + "metrics": final_metrics_summary, + "alerts": final_alerts_summary, + "total_metrics_recorded_in_buffer_for_period": len(relevant_metrics) } async def get_service_performance(self, service_name: str, hours: int = 24) -> Dict[str, Any]: - """Get performance data for a specific service""" - conn = sqlite3.connect(self.db_path) - + """Get performance data for a specific service from in-memory buffer.""" since_time = datetime.now() - timedelta(hours=hours) - cursor = conn.execute(""" - SELECT timestamp, metric_name, value, tags, metadata - FROM performance_metrics - WHERE timestamp > ? AND tags LIKE ? - ORDER BY timestamp DESC - """, (since_time.isoformat(), f'%{service_name}%')) - - service_data = [] - for row in cursor.fetchall(): - service_data.append({ - "timestamp": row[0], - "metric_name": row[1], - "value": row[2], - "tags": json.loads(row[3]) if row[3] else {}, - "metadata": json.loads(row[4]) if row[4] else {} - }) - - conn.close() + service_data_points = [] + for metric in self.metrics_buffer: + if metric.timestamp > since_time and metric.tags.get("service") == service_name: + service_data_points.append(metric) # Store the whole PerformanceMetric object - # Calculate service-specific metrics - response_times = [d["value"] for d in service_data if d["metric_name"] == "response_time"] - error_counts = [d["value"] for d in service_data if d["metric_name"] == "error_count"] + response_times = [m.value for m in service_data_points if m.metric_name == "response_time"] + error_counts = [m.value for m in service_data_points if m.metric_name == "error_count"] # Assuming error_count is 1 per error avg_response_time = sum(response_times) / len(response_times) if response_times else 0 - total_errors = sum(error_counts) if error_counts else 0 + total_errors = sum(error_counts) # Sum of values (e.g., if value is 1 per error) return { "service_name": service_name, "time_range_hours": hours, - "total_requests": len(service_data), - "average_response_time": avg_response_time, + "total_requests_or_metrics": len(service_data_points), # More generic name + "average_response_time_ms": avg_response_time, "total_errors": total_errors, - "error_rate": (total_errors / len(service_data) * 100) if service_data else 0, - "data_points": service_data + "error_rate_percentage": (total_errors / len(response_times) * 100) if response_times else 0, # Error rate based on response_time metrics + "data_points": [vars(m) for m in service_data_points] # Convert dataclasses to dicts for output } async def track_function_performance(self, func_name: str): @@ -578,44 +467,14 @@ async def get_optimization_recommendations(self) -> List[Dict[str, Any]]: return recommendations async def cleanup_old_data(self, days: int = 30): - """Clean up old performance data""" - conn = sqlite3.connect(self.db_path) - - cutoff_date = datetime.now() - timedelta(days=days) - - # Clean up old metrics - cursor = conn.execute(""" - DELETE FROM performance_metrics - WHERE timestamp < ? - """, (cutoff_date.isoformat(),)) - - metrics_deleted = cursor.rowcount - - # Clean up old system health data - cursor = conn.execute(""" - DELETE FROM system_health - WHERE timestamp < ? - """, (cutoff_date.isoformat(),)) - - health_deleted = cursor.rowcount - - # Clean up resolved alerts - cursor = conn.execute(""" - DELETE FROM performance_alerts - WHERE timestamp < ? AND resolved = TRUE - """, (cutoff_date.isoformat(),)) - - alerts_deleted = cursor.rowcount - - conn.commit() - conn.close() - - logger.info(f"Cleaned up old data: {metrics_deleted} metrics, {health_deleted} health records, {alerts_deleted} alerts") - + """Clean up old performance data (No longer needed for in-memory)""" + # This method is no longer needed as deques handle fixed-size history. + # If specific cleanup of in-memory buffers were needed, it would go here. + logger.info("cleanup_old_data called, but not applicable for in-memory PerformanceMonitor.") return { - "metrics_deleted": metrics_deleted, - "health_records_deleted": health_deleted, - "alerts_deleted": alerts_deleted + "metrics_deleted": 0, # No direct deletion like from DB + "health_records_deleted": 0, + "alerts_deleted": 0 } async def main(): diff --git a/server/python_nlp/gmail_integration.py b/server/python_nlp/gmail_integration.py index 416162282..08db5be7e 100644 --- a/server/python_nlp/gmail_integration.py +++ b/server/python_nlp/gmail_integration.py @@ -13,6 +13,30 @@ import asyncio from collections import deque import hashlib +import os + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + +# If modifying these SCOPES, delete the file token.json. +SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] + +# Path for token.json, configurable via environment variable +TOKEN_JSON_PATH = os.getenv('GMAIL_TOKEN_PATH', 'token.json') + +# Credentials content will be loaded from GMAIL_CREDENTIALS_JSON environment variable +# CREDENTIALS_PATH is now a placeholder for where it *would* be if it were a file. +# Users should set GMAIL_CREDENTIALS_JSON instead of creating credentials.json +CREDENTIALS_PATH = 'credentials.json' # Placeholder, not directly used if GMAIL_CREDENTIALS_JSON is set. +GMAIL_CREDENTIALS_ENV_VAR = 'GMAIL_CREDENTIALS_JSON' + @dataclass class RateLimitConfig: @@ -212,11 +236,125 @@ def __init__(self, config: RateLimitConfig = None): self.cache = EmailCache() self.logger = logging.getLogger(__name__) - # In production, this would be initialized with actual Gmail API credentials - self.gmail_service = None # Placeholder for Gmail API service + self.gmail_service = None + self._load_credentials() + if not self.gmail_service: + self._authenticate() + def _load_credentials(self): + """Loads API credentials from storage. + Uses TOKEN_JSON_PATH which can be set by GMAIL_TOKEN_PATH env var. + """ + creds = None + token_path = TOKEN_JSON_PATH # Uses the global TOKEN_JSON_PATH + if os.path.exists(token_path): + try: + creds = Credentials.from_authorized_user_file(token_path, SCOPES) + except Exception as e: + self.logger.error(f"Error loading credentials from {token_path}: {e}") + creds = None # Ensure creds is None if loading fails + + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + self.logger.info(f"Refreshing expired credentials from {token_path}...") + try: + creds.refresh(Request()) + except Exception as e: + self.logger.error(f"Error refreshing credentials: {e}") + # Potentially delete token.json and force re-authentication + if os.path.exists(token_path): + try: + os.remove(token_path) + self.logger.info(f"Removed invalid token file: {token_path}") + except OSError as oe: + self.logger.error(f"Error removing token file {token_path}: {oe}") + creds = None # Force re-authentication + # If creds are still None (not loaded or refresh failed), _authenticate will be called + + if creds: + self.gmail_service = build('gmail', 'v1', credentials=creds) + self._store_credentials(creds) # Store potentially refreshed credentials + else: + self.gmail_service = None # Ensure service is None if no valid creds + + def _store_credentials(self, creds): + """Stores API credentials. + Uses TOKEN_JSON_PATH which can be set by GMAIL_TOKEN_PATH env var. + """ + token_path = TOKEN_JSON_PATH # Uses the global TOKEN_JSON_PATH + try: + with open(token_path, 'w') as token_file: + token_file.write(creds.to_json()) + self.logger.info(f"Credentials stored in {token_path}") + except OSError as e: + self.logger.error(f"Error storing credentials to {token_path}: {e}") + + def _authenticate(self): + """Authenticates the user and obtains credentials using GMAIL_CREDENTIALS_JSON env var.""" + creds = None + credentials_json_str = os.getenv(GMAIL_CREDENTIALS_ENV_VAR) + + if not credentials_json_str: + self.logger.error( + f"Environment variable {GMAIL_CREDENTIALS_ENV_VAR} is not set. " + "This variable should contain the JSON content of your Google Cloud credentials file." + ) + # Attempt to fall back to local credentials.json if GMAIL_CREDENTIALS_JSON is not set + # This maintains previous behavior if the env var is not set, but logs a warning. + self.logger.warning( + f"Attempting to use local '{CREDENTIALS_PATH}' as fallback for OAuth. " + f"It is recommended to set the {GMAIL_CREDENTIALS_ENV_VAR} environment variable." + ) + if os.path.exists(CREDENTIALS_PATH): + try: + flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES) + except Exception as e: + self.logger.error(f"Error loading fallback credentials from {CREDENTIALS_PATH}: {e}") + return # Exit if fallback also fails + else: + self.logger.error( + f"Fallback credentials file '{CREDENTIALS_PATH}' not found. " + "Please set the GMAIL_CREDENTIALS_JSON environment variable or provide the file." + ) + return # Exit if no credentials source is found + else: + try: + credentials_info = json.loads(credentials_json_str) + flow = InstalledAppFlow.from_client_config(credentials_info, SCOPES) + except json.JSONDecodeError: + self.logger.error( + f"Invalid JSON content in {GMAIL_CREDENTIALS_ENV_VAR}. " + "Please ensure it's a valid JSON string." + ) + return + except Exception as e: # Catch other potential errors from from_client_config + self.logger.error(f"Error loading credentials from {GMAIL_CREDENTIALS_ENV_VAR}: {e}") + return + + + # TODO: This will block if run in a non-interactive environment. + # Consider alternative flows for server-side applications (e.g., service accounts + # or a web-based OAuth flow where the user is redirected). + # For this subtask, we assume an environment where user interaction is possible. + try: + creds = flow.run_local_server(port=0) + except Exception as e: # Catch generic exception from run_local_server + self.logger.error(f"OAuth flow failed: {e}") + return + + + if creds: + self._store_credentials(creds) + self.gmail_service = build('gmail', 'v1', credentials=creds) + self.logger.info("Authentication successful, Gmail service initialized.") + else: + self.logger.error("Failed to obtain credentials.") + # Potentially raise an exception or handle this state appropriately + def set_gmail_service(self, service): - """Set the Gmail API service instance""" + """Set the Gmail API service instance (Deprecated if using OAuth within class)""" + # This method might be deprecated or changed if OAuth is handled internally + self.logger.warning("set_gmail_service is called, but OAuth is now handled internally.") self.gmail_service = service async def collect_emails_incremental( @@ -319,16 +457,25 @@ async def _get_message_list( """ # Simulate Gmail API response with realistic structure if self.gmail_service: - # Actual Gmail API call would be: - # return self.gmail_service.users().messages().list( - # userId='me', - # q=query, - # pageToken=page_token, - # maxResults=max_results - # ).execute() - pass - - # Simulated response for development + try: + # Actual Gmail API call + return self.gmail_service.users().messages().list( + userId='me', + q=query, + pageToken=page_token, + maxResults=max_results + ).execute() + except HttpError as error: + self.logger.error(f'An API error occurred: {error}') + # Implement more sophisticated error handling and retry logic if needed + return {"messages": [], "resultSizeEstimate": 0} # Return empty on error + except Exception as e: + self.logger.error(f"Unexpected error in _get_message_list: {e}") + return {"messages": [], "resultSizeEstimate": 0} + + + # Fallback to simulated response if gmail_service is not available + self.logger.warning("Gmail service not available, using simulated response for _get_message_list.") return await self._simulate_gmail_response(query, page_token, max_results) async def _simulate_gmail_response( @@ -398,25 +545,82 @@ async def _get_message_content(self, message_id: str) -> Optional[Dict[str, Any] try: if self.gmail_service: - # Actual Gmail API call would be: - # message = self.gmail_service.users().messages().get( - # userId='me', - # id=message_id, - # format='full' - # ).execute() - pass - - # Simulate message content for development - email_data = await self._simulate_email_content(message_id) - - # Cache the email - self.cache.cache_email(email_data) + try: + # Actual Gmail API call + message = self.gmail_service.users().messages().get( + userId='me', + id=message_id, + format='full' # 'metadata' or 'raw' could also be used depending on needs + ).execute() + + # Process the message payload to extract relevant fields + # This part will depend on the structure of the Gmail API response + email_data = self._parse_message_payload(message) + + if email_data: + # Cache the email + self.cache.cache_email(email_data) + return email_data + else: + self.logger.warning(f"Could not parse email data for message {message_id}") + return None + + except HttpError as error: + self.logger.error(f'An API error occurred while fetching message {message_id}: {error}') + # Handle different types of errors (e.g., 404 Not Found, 403 Forbidden) + return None + except Exception as e: + self.logger.error(f"Unexpected error retrieving message {message_id}: {e}") + return None + + # Fallback to simulated response if gmail_service is not available + self.logger.warning(f"Gmail service not available, using simulated content for message {message_id}.") + email_data = await self._simulate_email_content(message_id) + self.cache.cache_email(email_data) + return email_data - return email_data + def _parse_message_payload(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Parses the raw message payload from Gmail API.""" + try: + headers = {h['name']: h['value'] for h in message.get('payload', {}).get('headers', [])} + # Simplified content extraction (plaintext preferred) + # Real implementation would need to handle multipart messages, base64 decoding, etc. + content = "" + if 'parts' in message['payload']: + for part in message['payload']['parts']: + if part['mimeType'] == 'text/plain' and 'data' in part['body']: + import base64 + content = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') + break + # Could add more part types (e.g., text/html) + elif 'body' in message['payload'] and 'data' in message['payload']['body']: + import base64 + content = base64.urlsafe_b64decode(message['payload']['body']['data']).decode('utf-8') + + + return { + "message_id": message['id'], + "thread_id": message.get('threadId', ""), + "subject": headers.get('Subject', ""), + "sender": headers.get('From', ""), # This often includes name and email + "sender_email": self._extract_email_address(headers.get('From', "")), + "content": content, + "labels": message.get('labelIds', []), + "timestamp": datetime.fromtimestamp(int(message['internalDate']) / 1000).isoformat() + } except Exception as e: - self.logger.error(f"Error retrieving message {message_id}: {e}") + self.logger.error(f"Error parsing message payload for {message.get('id')}: {e}") return None + + def _extract_email_address(self, sender_header: str) -> str: + """Extracts the email address from a 'From' header string.""" + # Simple extraction, might need a more robust regex for complex cases + import re + match = re.search(r'<([^>]+)>', sender_header) + if match: + return match.group(1) + return sender_header # Return the whole string if no angle brackets found async def _simulate_email_content(self, message_id: str) -> Dict[str, Any]: """Simulate email content for development""" @@ -514,22 +718,55 @@ async def execute_collection_strategy(self, strategy_name: str) -> EmailBatch: async def main(): """Example usage of Gmail data collector""" - collector = GmailDataCollector() + # Note: The main function would need to be async if _authenticate might run an async flow, + # or _authenticate needs to be run in a way that doesn't block asyncio loop if used elsewhere. + # For now, assuming _authenticate might block or run its own loop for console apps. + # Note: The dummy credentials.json creation is removed. + # The user must now configure environment variables. + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') + logger = logging.getLogger(__name__) + + + collector = GmailDataCollector() # This will trigger authentication if needed + + if not collector.gmail_service: + logger.error("Gmail service initialization failed. Please check logs for details.") + print("--------------------------------------------------------------------------------------") + print("Gmail Service Initialization Failed!") + print("--------------------------------------------------------------------------------------") + print("Please ensure the following environment variables are correctly set:") + print(f"1. {GMAIL_CREDENTIALS_ENV_VAR}: Should contain the JSON string of your Google Cloud credentials.") + print(f" Example: export {GMAIL_CREDENTIALS_ENV_VAR}='{{ \"installed\": {{ ... }} }}'") + print(f" (Alternatively, as a fallback, place a 'credentials.json' file in the script's directory: {os.getcwd()})") + print(f"2. GMAIL_TOKEN_PATH (Optional): Specify a custom path for 'token.json'.") + print(f" Defaults to '{TOKEN_JSON_PATH}' in the script's directory: {os.getcwd()}") + print("If running for the first time, you might need to go through the OAuth2 authentication flow in your browser.") + print("Check the console logs for more detailed error messages from the application.") + print("--------------------------------------------------------------------------------------") + return + # Execute daily sync strategy try: - batch = await collector.execute_collection_strategy("daily_sync") - print(f"Collected {batch.total_count} emails in batch {batch.batch_id}") - - # Print first few emails - for email in batch.messages[:3]: - print(f"Subject: {email['subject']}") - print(f"From: {email['sender']} <{email['sender_email']}>") - print(f"Preview: {email['content'][:100]}...") - print("---") + # Need an event loop to run async functions + loop = asyncio.get_event_loop() + batch = loop.run_until_complete(collector.execute_collection_strategy("daily_sync")) + + if batch: + print(f"Collected {batch.total_count} emails in batch {batch.batch_id}") + + # Print first few emails + for email in batch.messages[:3]: + print(f"Subject: {email['subject']}") + print(f"From: {email['sender']} <{email['sender_email']}>") + print(f"Preview: {email['content'][:100]}...") + print("---") + else: + print("No emails collected or collection failed.") except Exception as e: print(f"Collection failed: {e}") if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + # asyncio.run(main()) # main is not an async function + main() \ No newline at end of file diff --git a/server/python_nlp/nlp_engine.py b/server/python_nlp/nlp_engine.py index 1c63bb531..39f72ff92 100644 --- a/server/python_nlp/nlp_engine.py +++ b/server/python_nlp/nlp_engine.py @@ -10,22 +10,62 @@ from typing import Dict, List, Any, Optional, Tuple from datetime import datetime import re +import os try: import nltk from textblob import TextBlob + import joblib # For loading scikit-learn models HAS_NLTK = True + HAS_SKLEARN_AND_JOBLIB = True except ImportError: HAS_NLTK = False - print("Warning: NLTK not available, using fallback analysis", file=sys.stderr) + HAS_SKLEARN_AND_JOBLIB = False # Assume if one is missing, model loading won't work + print("Warning: NLTK, scikit-learn or joblib not available. Model loading and/or advanced NLP features will be disabled.", file=sys.stderr) # Configure logging -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') logger = logging.getLogger(__name__) +# Define paths for pre-trained models (assuming they are in the same directory or a specified model directory) +MODEL_DIR = os.getenv("NLP_MODEL_DIR", os.path.dirname(__file__)) # Default to script's directory +SENTIMENT_MODEL_PATH = os.path.join(MODEL_DIR, "sentiment_model.pkl") +TOPIC_MODEL_PATH = os.path.join(MODEL_DIR, "topic_model.pkl") +INTENT_MODEL_PATH = os.path.join(MODEL_DIR, "intent_model.pkl") +URGENCY_MODEL_PATH = os.path.join(MODEL_DIR, "urgency_model.pkl") + + class NLPEngine: def __init__(self): self.stop_words = set(nltk.corpus.stopwords.words('english')) if HAS_NLTK else set() + self.sentiment_model = None + self.topic_model = None + self.intent_model = None + self.urgency_model = None + + if HAS_SKLEARN_AND_JOBLIB: + logger.info("Attempting to load NLP models...") + self.sentiment_model = self._load_model(SENTIMENT_MODEL_PATH) + self.topic_model = self._load_model(TOPIC_MODEL_PATH) + self.intent_model = self._load_model(INTENT_MODEL_PATH) + self.urgency_model = self._load_model(URGENCY_MODEL_PATH) + else: + logger.warning("Scikit-learn or joblib not available. NLP models will not be loaded. Using fallback logic.") + + def _load_model(self, model_path: str) -> Any: + """Helper function to load a pickled model file.""" + try: + if os.path.exists(model_path): + model = joblib.load(model_path) + logger.info(f"Successfully loaded model from {model_path}") + return model + else: + logger.warning(f"Model file not found at {model_path}. This model will be unavailable.") + return None + except Exception as e: + logger.error(f"Error loading model from {model_path}: {e}") + return None + def _preprocess_text(self, text: str) -> str: """Basic text cleaning and normalization""" text = text.lower() @@ -33,60 +73,97 @@ def _preprocess_text(self, text: str) -> str: return text def _analyze_sentiment(self, text: str) -> Dict[str, Any]: - """Enhanced sentiment analysis with confidence scoring""" - if not HAS_NLTK: - # Simple sentiment analysis without TextBlob - text_lower = text.lower() - positive_words = ['good', 'great', 'excellent', 'thank', 'please', 'welcome', 'happy', 'love'] - negative_words = ['bad', 'terrible', 'problem', 'issue', 'error', 'failed', 'hate', 'angry'] - - positive_count = sum(1 for word in positive_words if word in text_lower) - negative_count = sum(1 for word in negative_words if word in text_lower) - - if positive_count > negative_count: - sentiment = 'positive' - polarity = 0.5 - confidence = 0.6 - elif negative_count > positive_count: - sentiment = 'negative' - polarity = -0.5 - confidence = 0.6 - else: - sentiment = 'neutral' + """Enhanced sentiment analysis with confidence scoring using a loaded model or TextBlob.""" + if self.sentiment_model: + try: + # Assuming the model is a scikit-learn pipeline that handles preprocessing (e.g., vectorization) + # And it has methods predict() and predict_proba() + # The classes_ attribute should tell us the order of probabilities + prediction = self.sentiment_model.predict([text])[0] + probabilities = self.sentiment_model.predict_proba([text])[0] + + # Assuming classes are ordered e.g. ['negative', 'neutral', 'positive'] + # Or the model has a classes_ attribute + class_labels = self.sentiment_model.classes_ + confidence = max(probabilities) + + # Polarity and subjectivity are harder to get directly from a generic classifier. + # We'll set polarity based on prediction and default subjectivity. + # This part might need refinement based on how the sentiment model is trained. polarity = 0.0 - confidence = 0.5 + if prediction == 'positive': + polarity = confidence # Use confidence as a proxy for polarity strength + elif prediction == 'negative': + polarity = -confidence + + return { + 'sentiment': str(prediction), # Ensure it's a string + 'polarity': polarity, + 'subjectivity': 0.5, # Default subjectivity, TextBlob is better for this + 'confidence': float(confidence), + 'method_used': 'model_sentiment' + } + except Exception as e: + logger.error(f"Error using sentiment model: {e}. Falling back to TextBlob/simple analysis.") + # Fall through to TextBlob/simple analysis + + # Fallback to TextBlob if NLTK is available + if HAS_NLTK: + blob = TextBlob(text) + polarity = blob.sentiment.polarity + subjectivity = blob.sentiment.subjectivity + + if polarity > 0.1: + sentiment_label = 'positive' + confidence = min(polarity + 0.5, 1.0) + elif polarity < -0.1: + sentiment_label = 'negative' + confidence = min(abs(polarity) + 0.5, 1.0) + else: + sentiment_label = 'neutral' + confidence = 0.7 # Default confidence for TextBlob neutral return { - 'sentiment': sentiment, + 'sentiment': sentiment_label, 'polarity': polarity, - 'subjectivity': 0.5, - 'confidence': confidence + 'subjectivity': subjectivity, + 'confidence': confidence, + 'method_used': 'fallback_textblob_sentiment' } - blob = TextBlob(text) - polarity = blob.sentiment.polarity - subjectivity = blob.sentiment.subjectivity + # Absolute fallback (simple keyword-based if NLTK/TextBlob also failed or unavailable) + text_lower = text.lower() + positive_words = ['good', 'great', 'excellent', 'thank', 'please', 'welcome', 'happy', 'love'] + negative_words = ['bad', 'terrible', 'problem', 'issue', 'error', 'failed', 'hate', 'angry'] + positive_count = sum(1 for word in positive_words if word in text_lower) + negative_count = sum(1 for word in negative_words if word in text_lower) - # Convert polarity to categorical sentiment - if polarity > 0.1: - sentiment = 'positive' - confidence = min(polarity + 0.5, 1.0) - elif polarity < -0.1: - sentiment = 'negative' - confidence = min(abs(polarity) + 0.5, 1.0) - else: - sentiment = 'neutral' - confidence = 0.7 + if positive_count > negative_count: sentiment_label, pol, conf = 'positive', 0.5, 0.6 + elif negative_count > positive_count: sentiment_label, pol, conf = 'negative', -0.5, 0.6 + else: sentiment_label, pol, conf = 'neutral', 0.0, 0.5 return { - 'sentiment': sentiment, - 'polarity': polarity, - 'subjectivity': subjectivity, - 'confidence': confidence + 'sentiment': sentiment_label, + 'polarity': pol, + 'subjectivity': 0.5, + 'confidence': conf, + 'method_used': 'fallback_keyword_sentiment' } + def _analyze_topic(self, text: str) -> Dict[str, Any]: - """Identify main topic of the email""" + """Identify main topic of the email using a loaded model or keyword-based fallback.""" + if self.topic_model: + try: + prediction = self.topic_model.predict([text])[0] + probabilities = self.topic_model.predict_proba([text])[0] + confidence = float(max(probabilities)) + return {'topic': str(prediction), 'confidence': confidence, 'method_used': 'model_topic'} + except Exception as e: + logger.error(f"Error using topic model: {e}. Falling back to keyword-based analysis.") + # Fall through to keyword-based + + # Fallback keyword-based logic topics = { 'Work & Business': ['meeting', 'conference', 'project', 'deadline', 'client', 'presentation', 'report', 'proposal'], 'Finance & Banking': ['payment', 'invoice', 'bill', 'statement', 'account', 'credit', 'debit', 'transfer', 'money', 'financial'], @@ -94,66 +171,79 @@ def _analyze_topic(self, text: str) -> Dict[str, Any]: 'Health & Wellness': ['doctor', 'medical', 'health', 'hospital', 'clinic', 'appointment', 'prescription', 'medicine', 'treatment', 'therapy'], 'Travel & Leisure': ['travel', 'flight', 'hotel', 'booking', 'reservation', 'trip', 'vacation', 'destination', 'airport', 'airline'] } - topic_scores = {} text_lower = text.lower() - for topic, keywords in topics.items(): score = sum(1 for keyword in keywords if keyword in text_lower) topic_scores[topic] = score - if topic_scores: + if any(score > 0 for score in topic_scores.values()): # Check if any keyword matched best_topic = max(topic_scores, key=topic_scores.get) - confidence = min(topic_scores[best_topic] / 10.0, 0.9) - return {'topic': best_topic, 'confidence': confidence} + confidence = min(topic_scores[best_topic] / 5.0, 0.9) + return {'topic': best_topic, 'confidence': max(0.1, confidence), 'method_used': 'fallback_keyword_topic'} else: - return {'topic': 'General', 'confidence': 0.5} + return {'topic': 'General', 'confidence': 0.5, 'method_used': 'fallback_keyword_topic'} + def _analyze_intent(self, text: str) -> Dict[str, Any]: - """Determine the intent of the email""" + """Determine the intent of the email using a loaded model or regex-based fallback.""" + if self.intent_model: + try: + prediction = self.intent_model.predict([text])[0] + probabilities = self.intent_model.predict_proba([text])[0] + confidence = float(max(probabilities)) + return {'intent': str(prediction), 'confidence': confidence, 'method_used': 'model_intent'} + except Exception as e: + logger.error(f"Error using intent model: {e}. Falling back to regex-based analysis.") + # Fall through to regex-based + + # Fallback regex-based logic intent_patterns = { 'request': r'\b(please|could you|would you|can you|need|require|request)\b', 'inquiry': r'\b(question|ask|wonder|curious|information|details|clarification)\b', 'scheduling': r'\b(schedule|calendar|meeting|appointment|time|date|available)\b', - 'urgent_action': r'\b(urgent|asap|immediately|emergency|critical|priority)\b', + 'urgent_action': r'\b(urgent|asap|immediately|emergency|critical|priority)\b', # This might overlap with urgency 'gratitude': r'\b(thank|thanks|grateful|appreciate)\b', 'complaint': r'\b(complaint|complain|issue|problem|dissatisfied|unhappy)\b', 'follow_up': r'\b(follow up|follow-up|checking in|status|update|progress)\b', 'confirmation': r'\b(confirm|confirmation|verify|check|acknowledge)\b' } - intent_scores = {} text_lower = text.lower() - for intent, pattern in intent_patterns.items(): matches = re.findall(pattern, text_lower) intent_scores[intent] = len(matches) - if intent_scores: + if any(score > 0 for score in intent_scores.values()): best_intent = max(intent_scores, key=intent_scores.get) - confidence = min(intent_scores[best_intent] / 5.0, 0.9) - return {'intent': best_intent, 'confidence': confidence} + confidence = min(intent_scores[best_intent] / 3.0, 0.9) + return {'intent': best_intent, 'confidence': max(0.1, confidence), 'method_used': 'fallback_regex_intent'} else: - return {'intent': 'informational', 'confidence': 0.6} + return {'intent': 'informational', 'confidence': 0.6, 'method_used': 'fallback_regex_intent'} + def _analyze_urgency(self, text: str) -> Dict[str, Any]: """Assess the urgency level of the email""" - text_lower = text.lower() + if self.urgency_model: + try: + prediction = self.urgency_model.predict([text])[0] + probabilities = self.urgency_model.predict_proba([text])[0] + confidence = float(max(probabilities)) + return {'urgency': str(prediction), 'confidence': confidence, 'method_used': 'model_urgency'} + except Exception as e: + logger.error(f"Error using urgency model: {e}. Falling back to regex-based analysis.") + # Fall through to regex based + text_lower = text.lower() if re.search(r'\b(emergency|urgent|asap|immediately|critical|crisis|disaster)\b', text_lower): - urgency = 'critical' - confidence = 0.9 + urgency_label, conf = 'critical', 0.9 elif re.search(r'\b(soon|quickly|priority|important|deadline|time-sensitive)\b', text_lower): - urgency = 'high' - confidence = 0.8 + urgency_label, conf = 'high', 0.8 elif re.search(r'\b(when you can|next week|upcoming|planned|scheduled)\b', text_lower): - urgency = 'medium' - confidence = 0.6 + urgency_label, conf = 'medium', 0.6 else: - urgency = 'low' - confidence = 0.5 - - return {'urgency': urgency, 'confidence': confidence} + urgency_label, conf = 'low', 0.5 + return {'urgency': urgency_label, 'confidence': conf, 'method_used': 'fallback_regex_urgency'} def _extract_keywords(self, text: str) -> List[str]: """Extract important keywords from text""" @@ -235,20 +325,31 @@ def _calculate_confidence(self, analysis_results: List[Dict[str, Any]]) -> float return min(total_confidence / len(analysis_results), 0.95) def _generate_reasoning(self, sentiment: Dict[str, Any], topic: Dict[str, Any], intent: Dict[str, Any], urgency: Dict[str, Any]) -> str: - """Generate human-readable reasoning for the analysis""" + """Generate human-readable reasoning for the analysis, indicating if fallbacks were used.""" parts = [] - if sentiment['sentiment'] != 'neutral': - parts.append(f"Sentiment analysis detected {sentiment['sentiment']} sentiment") + def get_method_suffix(analysis_result: Optional[Dict[str, Any]]) -> str: + if analysis_result and 'method_used' in analysis_result and 'fallback' in analysis_result['method_used']: + return f" (using fallback: {analysis_result['method_used'].replace('fallback_', '')})" + elif analysis_result and 'method_used' in analysis_result and 'model' in analysis_result['method_used']: + return " (using AI model)" + return " (method unknown)" + + + if sentiment and sentiment.get('sentiment') != 'neutral': + parts.append(f"Sentiment analysis detected {sentiment['sentiment']} sentiment{get_method_suffix(sentiment)}") - if topic['topic'] != 'General': - parts.append(f"Identified topic: {topic['topic']}") + if topic and topic.get('topic') != 'General': + parts.append(f"Identified topic: {topic['topic']}{get_method_suffix(topic)}") - if intent['intent'] != 'informational': - parts.append(f"Detected intent: {intent['intent']}") + if intent and intent.get('intent') != 'informational': + parts.append(f"Detected intent: {intent['intent']}{get_method_suffix(intent)}") - if urgency['urgency'] != 'low': - parts.append(f"Assessed urgency level: {urgency['urgency']}") + if urgency and urgency.get('urgency') != 'low': + parts.append(f"Assessed urgency level: {urgency['urgency']}{get_method_suffix(urgency)}") + + if not parts: + return "No significant insights detected from the email content through automated analysis." return ". ".join(parts) + "." @@ -315,9 +416,9 @@ def _get_fallback_analysis(self, error_msg: str) -> Dict[str, Any]: 'keywords': [], 'reasoning': f"Fallback analysis due to error: {error_msg}", 'suggested_labels': ['General'], - 'risk_flags': ['analysis_failed'], + 'risk_flags': ['analysis_failed'], # Ensure this key exists 'validation': { - 'method': 'fallback', + 'method': 'fallback', # Ensure this key exists 'score': 0.5, 'reliable': False, 'feedback': 'Analysis failed, using fallback method' @@ -378,10 +479,10 @@ def _get_simple_fallback_analysis(self, subject: str, content: str) -> Dict[str, 'suggested_labels': categories, 'risk_flags': [], 'validation': { - 'validation_method': 'basic_fallback', + 'method': 'basic_fallback', # Changed key for consistency 'score': 0.6, 'reliable': False, - 'feedback': 'Basic analysis - install NLTK for enhanced features' + 'feedback': 'Basic analysis - NLTK/models not available or failed' } } @@ -390,78 +491,100 @@ def analyze_email(self, subject: str, content: str) -> Dict[str, Any]: Comprehensive email analysis using multiple NLP techniques """ try: - # If NLTK is not available, use fallback analysis - if not HAS_NLTK: - return self._get_simple_fallback_analysis(subject, content) + # If NLTK and models are not available, use simple fallback + if not HAS_NLTK and not HAS_SKLEARN_AND_JOBLIB: # or specific model checks + logger.warning("NLTK and scikit-learn/joblib are unavailable. Using simple fallback analysis.") + return self._get_simple_fallback_analysis(subject, content) # Combine subject and content for analysis full_text = f"{subject} {content}" - # Basic preprocessing + # Basic preprocessing (primarily for non-model based methods or as initial step) + # Model pipelines should ideally handle their own specific preprocessing. cleaned_text = self._preprocess_text(full_text) # Multi-model analysis + # These methods will internally use models if available, or fall back. sentiment_analysis = self._analyze_sentiment(cleaned_text) - topic_analysis = self._analyze_topic(cleaned_text) - intent_analysis = self._analyze_intent(cleaned_text) - urgency_analysis = self._analyze_urgency(cleaned_text) - risk_analysis = self._analyze_risk_factors(cleaned_text) + topic_analysis = self._analyze_topic(cleaned_text) # Will be updated in next step + intent_analysis = self._analyze_intent(cleaned_text) # Will be updated in next step + urgency_analysis = self._analyze_urgency(cleaned_text) # Already updated + + # This method is regex-based, no model to load for it currently per its implementation + risk_analysis_flags = self._detect_risk_factors(cleaned_text) + # Extract keywords and entities - keywords = self._extract_keywords(cleaned_text) - categories = self._categorize_content(cleaned_text) + keywords = self._extract_keywords(cleaned_text) # Uses TextBlob if available + categories = self._categorize_content(cleaned_text) # Regex-based + + # Consolidate results + # Confidence calculation might need to be revisited if models are very different + # For now, let's assume each analysis function returns a 'confidence' field. + analysis_results_for_confidence = [ + r for r in [sentiment_analysis, topic_analysis, intent_analysis, urgency_analysis] if r and 'confidence' in r + ] + overall_confidence = self._calculate_confidence(analysis_results_for_confidence) if analysis_results_for_confidence else 0.5 - # Calculate overall confidence - confidence = self._calculate_confidence([ - sentiment_analysis, topic_analysis, - intent_analysis, urgency_analysis - ]) - # Generate reasoning reasoning = self._generate_reasoning( sentiment_analysis, topic_analysis, intent_analysis, urgency_analysis ) - # Validation - validation = self._validate_analysis({ - 'sentiment': sentiment_analysis, - 'topic': topic_analysis, - 'intent': intent_analysis, - 'urgency': urgency_analysis - }) + validation_input = { + 'sentiment': sentiment_analysis, 'topic': topic_analysis, + 'intent': intent_analysis, 'urgency': urgency_analysis + } + # Filter out None results before passing to _validate_analysis + validation_input_filtered = {k: v for k, v in validation_input.items() if v is not None} + validation = self._validate_analysis(validation_input_filtered) + return { - 'topic': topic_analysis['topic'], - 'sentiment': sentiment_analysis['sentiment'], - 'intent': intent_analysis['intent'], - 'urgency': urgency_analysis['urgency'], - 'confidence': confidence, + 'topic': topic_analysis.get('topic', 'General') if topic_analysis else 'General', + 'sentiment': sentiment_analysis.get('sentiment', 'neutral') if sentiment_analysis else 'neutral', + 'intent': intent_analysis.get('intent', 'informational') if intent_analysis else 'informational', + 'urgency': urgency_analysis.get('urgency', 'low') if urgency_analysis else 'low', + 'confidence': overall_confidence, 'categories': categories, 'keywords': keywords, 'reasoning': reasoning, - 'suggested_labels': self._suggest_labels(categories, urgency_analysis['urgency']), - 'risk_flags': risk_analysis, - 'validation': validation + 'suggested_labels': self._suggest_labels(categories, urgency_analysis.get('urgency', 'low') if urgency_analysis else 'low'), + 'risk_flags': risk_analysis_flags, # Use the direct output from _detect_risk_factors + 'validation': validation, + # Include individual model confidences for transparency if needed + 'details': { + 'sentiment_analysis': sentiment_analysis, + 'topic_analysis': topic_analysis, + 'intent_analysis': intent_analysis, + 'urgency_analysis': urgency_analysis, + } } except Exception as e: error_msg = f"NLP analysis failed: {str(e)}" - logging.error(error_msg) + logger.exception("Exception in analyze_email:") # Log full traceback return self._get_fallback_analysis(error_msg) def main(): - if len(sys.argv) != 3: - print(json.dumps({'error': 'Invalid arguments. Usage: python nlp_engine.py '})) + # Basic logging for CLI usage, can be overridden by Gunicorn's logger in production + logging.basicConfig(stream=sys.stdout, level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') + + if len(sys.argv) < 3: # Allow content to be empty + print(json.dumps({'error': 'Invalid arguments. Usage: python nlp_engine.py "" ""'})) sys.exit(1) subject = sys.argv[1] - content = sys.argv[2] + # Content can be empty, handle that case + content = sys.argv[2] if len(sys.argv) > 2 else "" + engine = NLPEngine() result = engine.analyze_email(subject, content) - print(json.dumps(result)) + print(json.dumps(result, indent=2)) # Pretty print for readability if __name__ == "__main__": main() \ No newline at end of file