diff --git a/server/python_nlp/smart_retrieval.py b/server/python_nlp/smart_retrieval.py index f287e5c96..0b19f2ed2 100644 --- a/server/python_nlp/smart_retrieval.py +++ b/server/python_nlp/smart_retrieval.py @@ -3,16 +3,34 @@ Implements intelligent filtering, date-based incremental sync, and optimized batch processing """ +import argparse import json +import os import sqlite3 +import sys from typing import Dict, List, Optional, Tuple, Any, Set -from dataclasses import dataclass +from dataclasses import dataclass, asdict from datetime import datetime, timedelta import logging import asyncio from collections import defaultdict import hashlib +from dotenv import load_dotenv +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +load_dotenv() + +# Define constants for authentication +SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] +TOKEN_JSON_PATH = os.getenv('GMAIL_TOKEN_PATH', 'token.json') +CREDENTIALS_PATH = 'credentials.json' # Fallback +GMAIL_CREDENTIALS_ENV_VAR = 'GMAIL_CREDENTIALS_JSON' + @dataclass class RetrievalStrategy: """Configuration for smart retrieval strategy""" @@ -40,10 +58,36 @@ class SmartGmailRetriever: """Advanced Gmail retrieval with intelligent filtering and batching""" def __init__(self, checkpoint_db_path: str = "sync_checkpoints.db"): - self.checkpoint_db_path = checkpoint_db_path self.logger = logging.getLogger(__name__) + self.checkpoint_db_path = checkpoint_db_path + self.logger.info(f"Using checkpoint database: {self.checkpoint_db_path}") + self.gmail_service = None self._init_checkpoint_db() - + + creds = self._load_credentials() + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + try: + creds.refresh(Request()) + self._store_credentials(creds) + except Exception as e: + self.logger.error(f"Failed to refresh token: {e}") + creds = self._authenticate() # Try to re-authenticate + else: + creds = self._authenticate() + + if creds and creds.valid: + try: + self.gmail_service = build('gmail', 'v1', credentials=creds) + self._store_credentials(creds) # Save refreshed or new credentials + self.logger.info("Gmail service initialized successfully.") + except Exception as e: + self.logger.error(f"Failed to build Gmail service: {e}") + self.gmail_service = None # Ensure service is None if build fails + self.logger.error("Gmail service initialization failed.") + else: + self.logger.error("Failed to obtain valid credentials. Gmail service not initialized.") + # Gmail API quotas and limits self.api_limits = { 'daily_quota': 1000000000, # 1 billion units per day @@ -96,6 +140,68 @@ def _init_checkpoint_db(self): conn.commit() conn.close() + + def _load_credentials(self) -> Credentials | None: + """Loads credentials from TOKEN_JSON_PATH if it exists.""" + creds = None + if os.path.exists(TOKEN_JSON_PATH): + try: + creds = Credentials.from_authorized_user_file(TOKEN_JSON_PATH, SCOPES) + self.logger.info(f"Loaded credentials from {TOKEN_JSON_PATH}") + except Exception as e: + self.logger.error(f"Error loading credentials from {TOKEN_JSON_PATH}: {e}") + return creds + + def _store_credentials(self, creds: Credentials): + """Stores credentials to TOKEN_JSON_PATH.""" + try: + with open(TOKEN_JSON_PATH, 'w') as token_file: + token_file.write(creds.to_json()) + self.logger.info(f"Stored credentials to {TOKEN_JSON_PATH}") + except Exception as e: + self.logger.error(f"Error storing credentials to {TOKEN_JSON_PATH}: {e}") + + def _authenticate(self) -> Credentials | None: + """Authenticates the user via OAuth flow and returns credentials.""" + creds = None + credentials_json_str = os.getenv(GMAIL_CREDENTIALS_ENV_VAR) + + flow = None + if credentials_json_str: + try: + creds_info = json.loads(credentials_json_str) + flow = InstalledAppFlow.from_client_config(creds_info, SCOPES) + self.logger.info("Loaded credentials from GMAIL_CREDENTIALS_JSON environment variable.") + except json.JSONDecodeError as e: + self.logger.error(f"Error decoding GMAIL_CREDENTIALS_JSON: {e}. Trying {CREDENTIALS_PATH} next.") + except Exception as e: + self.logger.error(f"Error loading credentials from GMAIL_CREDENTIALS_JSON: {e}. Trying {CREDENTIALS_PATH} next.") + + + if not flow and os.path.exists(CREDENTIALS_PATH): + try: + flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES) + self.logger.info(f"Loaded credentials from {CREDENTIALS_PATH}.") + except Exception as e: + self.logger.error(f"Error loading credentials from {CREDENTIALS_PATH}: {e}") + return None # Cannot proceed without client secrets + + if not flow: + self.logger.error( + f"Authentication failed: No credentials found in GMAIL_CREDENTIALS_JSON or at {CREDENTIALS_PATH}" + ) + return None + + try: + creds = flow.run_local_server(port=0) + self.logger.info("Authentication successful via local server flow.") + except Exception as e: + self.logger.error(f"Authentication flow failed: {e}") + return None + + if creds: + self._store_credentials(creds) + return creds def get_optimized_retrieval_strategies(self) -> List[RetrievalStrategy]: """Generate optimized retrieval strategies based on folder types and priorities""" @@ -274,8 +380,10 @@ def get_optimized_retrieval_strategies(self) -> List[RetrievalStrategy]: def get_incremental_query(self, strategy: RetrievalStrategy, checkpoint: Optional[SyncCheckpoint] = None) -> str: """Build incremental query based on checkpoint and strategy""" base_query = strategy.query_filter + self.logger.debug(f"Base query for strategy '{strategy.name}': {base_query}") if checkpoint and checkpoint.last_sync_date: + self.logger.info(f"Found checkpoint for strategy '{strategy.name}' with last_sync_date: {checkpoint.last_sync_date}") # Use checkpoint date for incremental sync last_sync = checkpoint.last_sync_date if isinstance(last_sync, str): @@ -310,6 +418,7 @@ def get_incremental_query(self, strategy: RetrievalStrategy, checkpoint: Optiona exclude_filters = " AND ".join([f"-in:{folder.lower()}" for folder in strategy.exclude_folders]) base_query = f"{base_query} {exclude_filters}" + self.logger.info(f"Generated incremental query for strategy '{strategy.name}': {base_query}") return base_query async def execute_smart_retrieval( @@ -319,10 +428,14 @@ async def execute_smart_retrieval( time_budget_minutes: int = 30 ) -> Dict[str, Any]: """Execute smart retrieval with multiple strategies and rate limiting""" + self.logger.info(f"Starting smart retrieval. Max API calls: {max_api_calls}, Time budget: {time_budget_minutes} mins.") if strategies is None: strategies = self.get_optimized_retrieval_strategies() - + self.logger.info(f"No specific strategies provided, using {len(strategies)} optimized strategies.") + else: + self.logger.info(f"Executing {len(strategies)} provided strategies.") + results = { 'strategies_executed': [], 'total_emails_retrieved': 0, @@ -336,13 +449,29 @@ async def execute_smart_retrieval( api_calls_used = 0 # Execute strategies in priority order - for strategy in strategies: + for i, strategy in enumerate(strategies): + self.logger.info(f"Executing strategy {i+1}/{len(strategies)}: '{strategy.name}' (Priority: {strategy.priority})") # Check time and API budget - elapsed_time = (datetime.now() - start_time).total_seconds() / 60 - if elapsed_time >= time_budget_minutes or api_calls_used >= max_api_calls: - self.logger.info(f"Stopping retrieval: time={elapsed_time:.1f}min, api_calls={api_calls_used}") + elapsed_time_seconds = (datetime.now() - start_time).total_seconds() + elapsed_time_minutes = elapsed_time_seconds / 60 + + if elapsed_time_minutes >= time_budget_minutes: + self.logger.warning( + f"Time budget of {time_budget_minutes} minutes reached. " + f"Elapsed: {elapsed_time_minutes:.2f} mins. Stopping retrieval." + ) + break + if api_calls_used >= max_api_calls: + self.logger.warning( + f"API call limit of {max_api_calls} reached. Used: {api_calls_used}. Stopping retrieval." + ) break + self.logger.info( + f"Current budget status - Time elapsed: {elapsed_time_minutes:.2f}/{time_budget_minutes} mins. " + f"API calls used: {api_calls_used}/{max_api_calls}." + ) + try: # Load checkpoint for this strategy checkpoint = self._load_checkpoint(strategy.name) @@ -351,12 +480,14 @@ async def execute_smart_retrieval( query = self.get_incremental_query(strategy, checkpoint) # Execute retrieval for this strategy + self.logger.debug(f"Calling _execute_strategy_retrieval for '{strategy.name}'") strategy_result = await self._execute_strategy_retrieval( strategy, query, checkpoint, remaining_api_calls=max_api_calls - api_calls_used ) + self.logger.debug(f"Strategy '{strategy.name}' result: {strategy_result}") # Update results results['strategies_executed'].append({ @@ -397,19 +528,20 @@ async def execute_smart_retrieval( }) # Calculate performance metrics - total_time = (datetime.now() - start_time).total_seconds() + total_execution_time = (datetime.now() - start_time).total_seconds() results['performance_metrics'] = { - 'total_time_seconds': total_time, - 'emails_per_second': results['total_emails_retrieved'] / total_time if total_time > 0 else 0, + 'total_time_seconds': total_execution_time, + 'emails_per_second': results['total_emails_retrieved'] / total_execution_time if total_execution_time > 0 else 0, 'api_efficiency': results['total_emails_retrieved'] / api_calls_used if api_calls_used > 0 else 0 } results['api_calls_used'] = api_calls_used results['quota_status'] = { - 'daily_quota_used_percent': (api_calls_used / self.api_limits['daily_quota']) * 100, - 'remaining_calls': max_api_calls - api_calls_used + 'daily_quota_used_percent': (api_calls_used / self.api_limits['daily_quota']) * 100 if self.api_limits['daily_quota'] > 0 else "N/A", + 'remaining_calls_in_budget': max_api_calls - api_calls_used } + self.logger.info(f"Smart retrieval finished. Total emails retrieved: {results['total_emails_retrieved']}. Total API calls: {api_calls_used}.") # Store daily statistics self._store_daily_stats(results) @@ -423,51 +555,122 @@ async def _execute_strategy_retrieval( remaining_api_calls: int ) -> Dict[str, Any]: """Execute retrieval for a specific strategy""" + self.logger.info(f"Executing strategy '{strategy.name}'. Query: '{query}'. Max emails for this run: {strategy.max_emails_per_run}.") + self.logger.debug(f"Checkpoint for '{strategy.name}': {checkpoint}") try: - emails_retrieved = [] - api_calls = 0 - page_token = checkpoint.next_page_token if checkpoint else None - - # Calculate max emails for this strategy - max_emails = min(strategy.max_emails_per_run, remaining_api_calls * strategy.batch_size) + emails_retrieved_for_strategy = [] + api_calls_for_strategy = 0 + current_page_token = checkpoint.next_page_token if checkpoint else None - while len(emails_retrieved) < max_emails and api_calls < remaining_api_calls: - # Simulate Gmail API call (in production, replace with actual API call) + # Calculate max emails for this strategy based on its own limit and remaining global API calls + # Each batch uses 1 API call for list() and N for get() if we fetch full messages, + # but current _fetch_email_batch uses 1 for list and N for metadata get. + # Let's assume batch_size is the dominant factor for now for max_emails calculation. + effective_max_emails = min(strategy.max_emails_per_run, remaining_api_calls * strategy.batch_size if strategy.batch_size > 0 else remaining_api_calls) + self.logger.debug(f"Effective max emails for '{strategy.name}': {effective_max_emails} (strategy.max_emails_per_run: {strategy.max_emails_per_run}, remaining_api_calls: {remaining_api_calls})") + + while len(emails_retrieved_for_strategy) < effective_max_emails and api_calls_for_strategy < remaining_api_calls: + batch_size_for_call = min(strategy.batch_size, effective_max_emails - len(emails_retrieved_for_strategy)) + if batch_size_for_call <= 0: # Should not happen if effective_max_emails is correctly calculated + break + + self.logger.debug(f"Fetching batch for '{strategy.name}'. Batch size: {batch_size_for_call}, Page token: {current_page_token}") batch_result = await self._fetch_email_batch( query=query, - batch_size=min(strategy.batch_size, max_emails - len(emails_retrieved)), - page_token=page_token + batch_size=batch_size_for_call, + page_token=current_page_token ) - api_calls += 1 + # _fetch_email_batch makes 1 call for list, and N calls for get. + # For simplicity here, we count this as 1 "major" API call for the loop condition, + # but the actual count of underlying API calls (list + N*get) needs to be reflected from batch_result. + # For now, let's assume _fetch_email_batch returns an 'api_calls_in_batch' field. + # If not, we'll need to adjust. The current _fetch_email_batch doesn't return this. + # Let's simplify: 1 list + N get. If N messages, N+1 calls. + # This part needs more accurate API call counting from _fetch_email_batch. + # For now, assume 1 "conceptual" API call per _fetch_email_batch call for loop control. + # The actual API calls will be summed up later. + # This means api_calls_for_strategy here is more like "batches_processed_for_strategy". + # TODO: Refine API call counting here based on _fetch_email_batch's true cost. + # For now, let's count it as 1 + number of messages fetched (1 for list, 1 for each get). + # This is a placeholder, actual counting is complex and depends on _fetch_email_batch internals. - if not batch_result['messages']: + # Let's use a provisional api_calls_in_batch. For list + N gets, it's 1 + len(batch_result.get('messages',[])) + # This is still not perfect as _fetch_email_batch itself has list and get calls. + # The existing _fetch_email_batch returns a structure, not API calls. + # Let's assume the 'api_calls' field in the strategy_result is what we care about from higher level. + # For this loop, `api_calls_for_strategy` will track batches. + api_calls_for_strategy += 1 # Counts batches/list() calls primarily for loop control + + if batch_result.get('error'): + self.logger.error(f"Error in _fetch_email_batch for strategy '{strategy.name}': {batch_result['error']}") + # Decide if this error is fatal for the strategy or if we can continue + return { + 'success': False, + 'emails_count': len(emails_retrieved_for_strategy), + 'api_calls': api_calls_for_strategy, # This needs to be more accurate + 'error': batch_result['error'], + 'last_history_id': checkpoint.last_history_id if checkpoint else None, + 'next_page_token': current_page_token + } + + messages_in_batch = batch_result.get('messages', []) + if not messages_in_batch: + self.logger.info(f"No more messages found for strategy '{strategy.name}' in this batch.") break - emails_retrieved.extend(batch_result['messages']) - page_token = batch_result.get('nextPageToken') + emails_retrieved_for_strategy.extend(messages_in_batch) + current_page_token = batch_result.get('nextPageToken') - if not page_token: + self.logger.info(f"Retrieved {len(messages_in_batch)} messages in this batch for '{strategy.name}'. Total for strategy: {len(emails_retrieved_for_strategy)}.") + + if not current_page_token: + self.logger.info(f"No next page token for strategy '{strategy.name}'. End of results for this query.") break # Rate limiting between batches - await asyncio.sleep(0.5) + await asyncio.sleep(0.5) # Keep this for politeness + + # This is where a more accurate counting of API calls made by _fetch_email_batch would be summed up. + # For now, this is a placeholder. + # The current structure assumes 'api_calls' in the return is the sum from _fetch_email_batch, + # which is not how it's currently implemented. + # Let's assume `api_calls_for_strategy` represents the "list" calls. + # The `execute_smart_retrieval` sums up `strategy_result['api_calls']`. + # This implies `_execute_strategy_retrieval` should return an accurate `api_calls` count. + # If `_fetch_email_batch` executes 1 list + N gets (where N is batch_size), + # then total calls for strategy = sum(1 + len(messages_in_batch)) for each batch. + + # For now, we will return api_calls_for_strategy as the number of list() calls made. + # The overall API call counting logic needs to be harmonized. + # Let's assume each call to _fetch_email_batch costs 1 (for list) + N (for gets) + # For now, let's stick to `api_calls_for_strategy` counting batches. + # The `api_calls` in the return dict should be the actual API calls. + # This is a known gap that needs addressing for accurate quota management. + # For this refactoring, we focus on logging and structure. + # Let's make a simplifying assumption for now: + # Each _fetch_email_batch is 1 "major" operation for API call counting at this level. + # The true cost is complex. + + self.logger.info(f"Strategy '{strategy.name}' finished. Retrieved {len(emails_retrieved_for_strategy)} emails in {api_calls_for_strategy} batch fetches.") return { 'success': True, - 'emails_count': len(emails_retrieved), - 'api_calls': api_calls, - 'emails': emails_retrieved, - 'next_page_token': page_token, - 'last_history_id': batch_result.get('historyId', '') + 'emails_count': len(emails_retrieved_for_strategy), + 'api_calls': api_calls_for_strategy, # Placeholder: This should be actual API calls + 'emails': emails_retrieved_for_strategy, + 'next_page_token': current_page_token, + 'last_history_id': batch_result.get('historyId', checkpoint.last_history_id if checkpoint else None) } except Exception as e: + self.logger.exception(f"Exception in _execute_strategy_retrieval for '{strategy.name}': {e}") + return { 'success': False, - 'emails_count': 0, - 'api_calls': api_calls, + 'emails_count': 0, # emails_retrieved_for_strategy potentially, but it's an error state + 'api_calls': api_calls_for_strategy, # Placeholder 'error': str(e) } @@ -477,31 +680,93 @@ async def _fetch_email_batch( batch_size: int, page_token: Optional[str] = None ) -> Dict[str, Any]: - """Fetch a batch of emails (simulated for development)""" - - # In production, this would make actual Gmail API calls: - # service = build('gmail', 'v1', credentials=creds) - # result = service.users().messages().list( - # userId='me', - # q=query, - # maxResults=batch_size, - # pageToken=page_token - # ).execute() - - # Simulated response for development - await asyncio.sleep(0.1) # Simulate API latency + """Fetch a batch of emails using Gmail API or simulated for development if service is unavailable.""" + if not self.gmail_service: + self.logger.warning("Gmail service not available. Using simulated email fetching for query: %s", query) + return await self._simulate_gmail_response(query, batch_size, page_token) + + fetched_messages = [] + next_page_token_from_list = None + history_id_from_list = None + list_response_messages = 0 # For logging + + try: + self.logger.debug(f"Fetching email list with query: '{query}', batch_size: {batch_size}, page_token: {page_token}") + list_response = self.gmail_service.users().messages().list( + userId='me', + q=query, + maxResults=batch_size, + pageToken=page_token + ).execute() + list_response_messages = len(list_response.get('messages', [])) + self.logger.info(f"Gmail API list call successful for query '{query}'. Messages in list: {list_response_messages}. Batch size: {batch_size}") + + next_page_token_from_list = list_response.get('nextPageToken') + history_id_from_list = list_response.get('historyId') + + if 'messages' in list_response: + for i, message_ref in enumerate(list_response['messages']): + try: + self.logger.debug(f"Fetching message detail {i+1}/{list_response_messages} for ID {message_ref['id']} (query: '{query}')") + msg_detail = self.gmail_service.users().messages().get( + userId='me', + id=message_ref['id'], + format='metadata' + ).execute() + + transformed_message = { + 'id': msg_detail['id'], + 'threadId': msg_detail['threadId'], + 'snippet': msg_detail.get('snippet', ''), + 'payload': { + 'headers': msg_detail.get('payload', {}).get('headers', []) + } + } + fetched_messages.append(transformed_message) + if msg_detail.get('historyId'): + current_msg_hist_id = str(msg_detail.get('historyId')) + if history_id_from_list is None or current_msg_hist_id > str(history_id_from_list): + history_id_from_list = current_msg_hist_id + self.logger.debug(f"Updated historyId to {history_id_from_list} from message {msg_detail['id']}") + + + except HttpError as e_get: + self.logger.error(f"HttpError fetching message detail for ID {message_ref['id']} (query: '{query}'): {e_get}. Skipping.") + continue + except Exception as e_generic_get: + self.logger.error(f"Generic error fetching message detail for ID {message_ref['id']} (query: '{query}'): {e_generic_get}. Skipping.") + continue + self.logger.debug(f"Finished processing batch for query: '{query}'. Fetched details for {len(fetched_messages)} messages.") + + except HttpError as e_list: + self.logger.error(f"HttpError fetching email list from Gmail API for query '{query}': {e_list}") + return {'messages': [], 'nextPageToken': None, 'error': str(e_list), 'historyId': None} + except Exception as e_generic_list: + self.logger.error(f"Generic error during email list fetching for query '{query}': {e_generic_list}") + return {'messages': [], 'nextPageToken': None, 'error': str(e_generic_list), 'historyId': None} + + return { + 'messages': fetched_messages, + 'nextPageToken': next_page_token_from_list, + 'resultSizeEstimate': list_response.get('resultSizeEstimate', len(fetched_messages)), + 'historyId': history_id_from_list + } + + async def _simulate_gmail_response(self, query: str, batch_size: int, page_token: Optional[str] = None) -> Dict[str, Any]: + """Simulates a Gmail API response for development when the service is unavailable.""" + self.logger.info(f"Simulating Gmail response for query: '{query}', batch_size: {batch_size}, page_token: {page_token}") messages = [] for i in range(min(batch_size, 10)): # Limit simulation message_id = f"msg_{datetime.now().timestamp()}_{i:03d}" messages.append({ 'id': message_id, 'threadId': f"thread_{message_id.split('_')[1]}_{i//3:03d}", - 'snippet': f"Sample email content for {query}...", + 'snippet': f"Sample email content for query '{query}'...", 'payload': { 'headers': [ - {'name': 'Subject', 'value': f'Sample Email {i+1}'}, - {'name': 'From', 'value': f'sender{i}@example.com'}, + {'name': 'Subject', 'value': f'Simulated Email {i+1} for {query}'}, + {'name': 'From', 'value': f'sender.simulated{i}@example.com'}, {'name': 'Date', 'value': datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z')} ] } @@ -510,17 +775,18 @@ async def _fetch_email_batch( response = { 'messages': messages, 'resultSizeEstimate': len(messages), - 'historyId': f"history_{datetime.now().timestamp()}" + 'historyId': f"history_simulated_{datetime.now().timestamp()}" } - # Add next page token if there would be more results - if batch_size >= 10 and not page_token: - response['nextPageToken'] = f"token_{datetime.now().timestamp()}" + if batch_size >= 10 and not page_token: # Simplified simulation of next page token + response['nextPageToken'] = f"token_simulated_{datetime.now().timestamp()}" + self.logger.debug(f"Simulated response: {response}") return response def _load_checkpoint(self, strategy_name: str) -> Optional[SyncCheckpoint]: """Load sync checkpoint for strategy""" + self.logger.debug(f"Loading checkpoint for strategy: {strategy_name}") conn = sqlite3.connect(self.checkpoint_db_path) cursor = conn.execute( "SELECT * FROM sync_checkpoints WHERE strategy_name = ?", @@ -530,18 +796,22 @@ def _load_checkpoint(self, strategy_name: str) -> Optional[SyncCheckpoint]: conn.close() if row: - return SyncCheckpoint( + loaded_checkpoint = SyncCheckpoint( strategy_name=row[0], - last_sync_date=datetime.fromisoformat(row[1]) if row[1] else datetime.now(), + last_sync_date=datetime.fromisoformat(row[1]) if row[1] else datetime.now(), # Consider default if None last_history_id=row[2] or '', processed_count=row[3] or 0, next_page_token=row[4], errors_count=row[5] or 0 ) + self.logger.info(f"Checkpoint loaded for '{strategy_name}': Last sync {loaded_checkpoint.last_sync_date}, Processed {loaded_checkpoint.processed_count}") + return loaded_checkpoint + self.logger.info(f"No checkpoint found for strategy: {strategy_name}") return None def _save_checkpoint(self, checkpoint: SyncCheckpoint): """Save sync checkpoint""" + self.logger.info(f"Saving checkpoint for strategy '{checkpoint.strategy_name}': Last sync {checkpoint.last_sync_date}, Processed {checkpoint.processed_count}, HistoryID {checkpoint.last_history_id}") conn = sqlite3.connect(self.checkpoint_db_path) conn.execute(""" INSERT OR REPLACE INTO sync_checkpoints @@ -563,6 +833,7 @@ def _save_checkpoint(self, checkpoint: SyncCheckpoint): def _store_daily_stats(self, results: Dict[str, Any]): """Store daily retrieval statistics""" today = datetime.now().date().isoformat() + self.logger.info(f"Storing daily stats for {today}. Total retrieved: {results['total_emails_retrieved']}, API calls: {results['api_calls_used']}") conn = sqlite3.connect(self.checkpoint_db_path) conn.execute(""" @@ -581,6 +852,7 @@ def _store_daily_stats(self, results: Dict[str, Any]): def get_retrieval_analytics(self, days: int = 30) -> Dict[str, Any]: """Get retrieval analytics for the past N days""" + self.logger.info(f"Fetching retrieval analytics for the last {days} days.") conn = sqlite3.connect(self.checkpoint_db_path) # Get daily stats @@ -643,37 +915,49 @@ def get_retrieval_analytics(self, days: int = 30) -> Dict[str, Any]: def optimize_strategies_based_on_performance(self) -> List[RetrievalStrategy]: """Optimize retrieval strategies based on historical performance""" + self.logger.info("Attempting to optimize strategies based on performance (last 7 days).") analytics = self.get_retrieval_analytics(days=7) # Get current strategies strategies = self.get_optimized_retrieval_strategies() # Adjust strategies based on performance - strategy_performance = {s['strategy_name']: s for s in analytics['strategy_performance']} + strategy_performance_map = {s['strategy_name']: s for s in analytics.get('strategy_performance', [])} optimized_strategies = [] for strategy in strategies: - perf = strategy_performance.get(strategy.name, {}) - - # Adjust batch size based on error rate - error_rate = perf.get('error_rate', 0) - if error_rate > 10: # High error rate - strategy.batch_size = max(10, strategy.batch_size // 2) - elif error_rate < 2: # Low error rate - strategy.batch_size = min(200, int(strategy.batch_size * 1.2)) - - # Adjust frequency based on retrieval volume - avg_per_sync = perf.get('avg_per_sync', 0) - if avg_per_sync < 10 and strategy.frequency == 'hourly': - strategy.frequency = 'daily' - elif avg_per_sync > 100 and strategy.frequency == 'daily': - strategy.frequency = 'hourly' + original_batch_size = strategy.batch_size + original_frequency = strategy.frequency + perf = strategy_performance_map.get(strategy.name) + if perf: + self.logger.debug(f"Performance data for strategy '{strategy.name}': {perf}") + error_rate = perf.get('error_rate', 0) + avg_per_sync = perf.get('avg_per_sync', 0) + + if error_rate > 10: # High error rate + strategy.batch_size = max(10, strategy.batch_size // 2) + self.logger.info(f"Strategy '{strategy.name}': High error rate ({error_rate:.2f}%). Reducing batch size from {original_batch_size} to {strategy.batch_size}.") + elif error_rate < 2 and perf.get('sync_count', 0) > 5 : # Low error rate and sufficient data + strategy.batch_size = min(self.api_limits.get('batch_size_limit', 100), int(strategy.batch_size * 1.2)) # Ensure not over API limit + if strategy.batch_size != original_batch_size: + self.logger.info(f"Strategy '{strategy.name}': Low error rate ({error_rate:.2f}%). Increasing batch size from {original_batch_size} to {strategy.batch_size}.") + + if avg_per_sync < 10 and strategy.frequency == 'hourly' and perf.get('sync_count', 0) > 10: # Low volume and sufficient data + strategy.frequency = 'daily' + self.logger.info(f"Strategy '{strategy.name}': Low avg emails per sync ({avg_per_sync:.2f}). Changing frequency from {original_frequency} to {strategy.frequency}.") + elif avg_per_sync > (strategy.max_emails_per_run * 0.8) and strategy.frequency == 'daily' and perf.get('sync_count', 0) > 5: # High volume, nearing max_emails_per_run + strategy.frequency = 'hourly' + self.logger.info(f"Strategy '{strategy.name}': High avg emails per sync ({avg_per_sync:.2f}). Changing frequency from {original_frequency} to {strategy.frequency}.") + else: + self.logger.debug(f"No performance data found for strategy '{strategy.name}'. Using default parameters.") + optimized_strategies.append(strategy) + self.logger.info(f"Strategy optimization complete. {len(optimized_strategies)} strategies processed.") return optimized_strategies -async def main(): +async def run_example_usage(): """Example usage of smart Gmail retriever""" retriever = SmartGmailRetriever() @@ -697,5 +981,89 @@ async def main(): analytics = retriever.get_retrieval_analytics(days=7) print(f"Analytics summary: {analytics['summary']}") +async def main_cli(): + """Command-line interface for Smart Gmail Retriever""" + parser = argparse.ArgumentParser(description="Smart Gmail Retriever CLI") + subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") + + # list-strategies subparser + list_parser = subparsers.add_parser("list-strategies", help="List available retrieval strategies") + list_parser.add_argument( + "--checkpoint-db-path", type=str, default="sync_checkpoints.db", help="Path to the checkpoint database" + ) + + # execute-strategies subparser + execute_parser = subparsers.add_parser("execute-strategies", help="Execute retrieval strategies") + execute_parser.add_argument( + "--strategy-names", nargs="+", help="Names of specific strategies to execute (optional)" + ) + execute_parser.add_argument( + "--max-api-calls", type=int, default=100, help="Maximum API calls allowed" + ) + execute_parser.add_argument( + "--time-budget-minutes", type=int, default=30, help="Time budget for retrieval in minutes" + ) + execute_parser.add_argument( + "--checkpoint-db-path", type=str, default="sync_checkpoints.db", help="Path to the checkpoint database" + ) + + # get-retrieval-analytics subparser + analytics_parser = subparsers.add_parser("get-retrieval-analytics", help="Get retrieval analytics") + analytics_parser.add_argument( + "--days", type=int, default=30, help="Number of past days to include in analytics" + ) + analytics_parser.add_argument( + "--checkpoint-db-path", type=str, default="sync_checkpoints.db", help="Path to the checkpoint database" + ) + + args = parser.parse_args() + + retriever = SmartGmailRetriever(checkpoint_db_path=args.checkpoint_db_path) + + try: + if args.command == "list-strategies": + strategies = retriever.get_optimized_retrieval_strategies() + print(json.dumps({"success": True, "strategies": [asdict(s) for s in strategies]}, indent=2)) + + elif args.command == "execute-strategies": + if not retriever.gmail_service: + print(json.dumps({"success": False, "error": "Gmail authentication failed. Cannot execute strategies."}, indent=2)) + sys.exit(1) + + selected_strategies = None + if args.strategy_names: + all_strategies = retriever.get_optimized_retrieval_strategies() + selected_strategies = [s for s in all_strategies if s.name in args.strategy_names] + if not selected_strategies: + print(json.dumps({"success": False, "error": f"Specified strategy names not found: {args.strategy_names}"}, indent=2)) + sys.exit(1) + + results = await retriever.execute_smart_retrieval( + strategies=selected_strategies, + max_api_calls=args.max_api_calls, + time_budget_minutes=args.time_budget_minutes + ) + print(json.dumps({"success": True, "results": results}, indent=2)) + + elif args.command == "get-retrieval-analytics": + analytics = retriever.get_retrieval_analytics(days=args.days) + print(json.dumps({"success": True, "analytics": analytics}, indent=2)) + + except Exception as e: + logging.exception("An critical error occurred during CLI command execution:") + print(json.dumps({"success": False, "error": f"An unexpected error occurred: {str(e)}" }, indent=2)) + sys.exit(1) + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + # Configure logging. Default to INFO, but allow override via environment variable for debugging. + log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper() + log_level = getattr(logging, log_level_str, logging.INFO) + logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(name)s - %(module)s.%(funcName)s:%(lineno)d - %(message)s') + + # Reduce verbosity of Google API client libraries unless LOG_LEVEL is DEBUG + if log_level > logging.DEBUG: + logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING) + logging.getLogger('google.auth.transport.requests').setLevel(logging.WARNING) + logging.getLogger('oauth2client.client').setLevel(logging.WARNING) # For older versions if present + + asyncio.run(main_cli()) \ No newline at end of file