Skip to content

Jules was unable to complete the task in time. Please review the work…#1

Merged
MasumRab merged 1 commit intomainfrom
jules_wip_11755313394803907600
Jun 12, 2025
Merged

Jules was unable to complete the task in time. Please review the work…#1
MasumRab merged 1 commit intomainfrom
jules_wip_11755313394803907600

Conversation

@MasumRab
Copy link
Copy Markdown
Owner

@MasumRab MasumRab commented Jun 12, 2025

… done so far and provide feedback for Jules to continue.

Summary by Sourcery

Migrate backend from SQLite to PostgreSQL, refactor async database operations and JSON handling, enhance the NLP engine with ML model support and fallback mechanisms, convert performance monitoring to in-memory mode, and implement robust Gmail OAuth2 integration, updating project dependencies accordingly.

New Features:

  • Add environment-driven OAuth2 authentication and credential storage for Gmail integration with token refresh logic
  • Integrate scikit-learn model loading in NLPEngine for sentiment, topic, intent, and urgency classification
  • Switch PerformanceMonitor to in-memory buffering for metrics, alerts, and system health instead of SQLite persistence

Enhancements:

  • Migrate DatabaseManager from SQLite to PostgreSQL using async psycopg2 helpers, Drizzle ORM-based migrations, and JSON field parsing
  • Refactor NLP analysis pipeline to include fallback chains, method_used flags, improved logging, and improved preprocessing
  • Revamp GmailDataCollector to parse API payloads, handle HttpError retries, and support simulated fallbacks in absence of service

Chores:

  • Remove SQLite initialization and storage logic across backend modules
  • Update requirements.txt with new dependencies for PostgreSQL, machine learning, Gmail API, and dotenv

… done so far and provide feedback for Jules to continue.
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Jun 12, 2025

Reviewer's Guide

This PR migrates the Python backend from SQLite to PostgreSQL with an async psycopg2 helper, enhances the NLP engine to load optional ML models with fallbacks and richer logging, refactors the performance monitor to use in-memory buffers instead of SQLite, and modernizes Gmail integration by loading OAuth credentials from the environment and making real API calls with error handling.

Sequence Diagram for Gmail API Integration and Authentication

sequenceDiagram
    participant App as GmailDataCollector
    participant GoogleAuth as Google OAuth
    participant GmailAPI as Gmail API

    App->>App: __init__()
    App->>App: _load_credentials() from token.json
    alt Stored token is invalid or missing
        App->>App: _authenticate()
        Note over App: Reads credentials from GMAIL_CREDENTIALS_JSON env var
        App->>+GoogleAuth: Initiate OAuth Flow
        GoogleAuth-->>-App: Return new credentials/token
        App->>App: _store_credentials() to token.json
    end
    App->>App: Gmail service is initialized

    App->>+GmailAPI: list(messages, q="...")
    GmailAPI-->>-App: Message list response
    loop For each message ID in response
        App->>+GmailAPI: get(message, id=...)
        GmailAPI-->>-App: Full message data
        App->>App: _parse_message_payload()
    end
Loading

Entity Relationship Diagram for the Updated PostgreSQL Schema

erDiagram
    categories {
        int id PK
        string name
        string description
        string color
        int count
    }

    emails {
        int id PK
        string messageId UK "From Gmail"
        string threadId "From Gmail"
        string subject
        text content
        boolean isUnread
        int categoryId FK
        json analysisMetadata
        timestamp createdAt
        timestamp updatedAt
    }

    activities {
        int id PK
        string type
        string description
        json details
        timestamp timestamp
    }

    categories ||--o{ emails : "categorizes"
Loading

Class Diagram for the Refactored DatabaseManager

classDiagram
    class DatabaseManager {
        -database_url: str
        +__init__(db_url: Optional[str])
        +async _execute_query(query, params, fetch_one, fetch_all, commit)
        +init_database()
        +async create_email(email_data: Dict) Optional[Dict]
        +async get_email_by_id(email_id: int) Optional[Dict]
        +async get_emails(limit, offset, category_id, is_unread) List[Dict]
        +async update_email_by_message_id(message_id: str, update_data: Dict) Optional[Dict]
        +async create_activity(activity_data: Dict) Optional[Dict]
        +async get_dashboard_stats() Dict
    }
Loading

Class Diagram for the Enhanced NLPEngine

classDiagram
    class NLPEngine {
        -sentiment_model: Any
        -topic_model: Any
        -intent_model: Any
        -urgency_model: Any
        +__init__()
        -_load_model(model_path: str) Any
        -_analyze_sentiment(text: str) Dict
        -_analyze_topic(text: str) Dict
        -_analyze_intent(text: str) Dict
        -_analyze_urgency(text: str) Dict
        +analyze_email(subject: str, content: str) Dict
    }
Loading

File-Level Changes

Change Details Files
Migrate database interactions from SQLite to PostgreSQL with async psycopg2 wrapper and Drizzle ORM seeding
  • Replaced SQLite init and table creation with Drizzle-based seeding of default categories
  • Introduced _execute_query helper using asyncio.to_thread and RealDictCursor
  • Converted init_database, create/get/update methods to use PostgreSQL queries and schema.ts camelCase fields
  • Added JSON field parsing and legacy key mapping in DatabaseManager
  • Updated async context manager get_connection for psycopg2 connection lifecycle
server/python_backend/database.py
Enhance NLP engine to support model-based analysis with fallback and improved logging
  • Loaded scikit-learn models via joblib if available, configurable by NLP_MODEL_DIR env var
  • Updated sentiment, topic, intent, urgency analysis to use models or fallback to TextBlob/regex/keywords
  • Added method_used to analysis outputs and reasoning generation indicating fallback or AI model
  • Improved logging format and exception handling in analyze_email and model loading
server/python_nlp/nlp_engine.py
Refactor performance monitor to in-memory buffers, removing SQLite persistence
  • Removed SQLite DB initialization and storage methods (_store_metric, _store_system_health, init_database)
  • Switched to deque-based in-memory metrics, alerts, and system health buffers
  • Updated record_metric and _check_alerts to log and buffer alerts in-memory
  • Adapted summary and service performance methods to aggregate from in-memory data
server/python_backend/performance_monitor.py
Improve Gmail integration with dotenv config, OAuth credentials loading, and real API calls
  • Loaded environment variables for credentials via dotenv and GMAIL_CREDENTIALS_JSON
  • Implemented token refresh and storage to TOKEN_JSON_PATH, with fallback file support
  • Defined real Gmail API calls in _get_message_list and _get_message_content with error handling
  • Parsed Gmail message payloads into email_data and improved CLI guidance in main()
server/python_nlp/gmail_integration.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@MasumRab MasumRab merged commit 3b556e0 into main Jun 12, 2025
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @MasumRab - I've reviewed your changes - here's some feedback:

  • Refactor seeding and schema initialization out of __init__ into an async setup method to avoid blocking the event loop during object instantiation.
  • Consolidate the repeated snake_case-to-camelCase column mapping logic into a shared utility or mapping configuration to reduce duplication in the create/update methods.
  • Avoid using run_local_server in the constructor for OAuth in production; consider a non-blocking or service-account based authentication flow for headless environments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Refactor seeding and schema initialization out of `__init__` into an async setup method to avoid blocking the event loop during object instantiation.
- Consolidate the repeated snake_case-to-camelCase column mapping logic into a shared utility or mapping configuration to reduce duplication in the `create`/`update` methods.
- Avoid using `run_local_server` in the constructor for OAuth in production; consider a non-blocking or service-account based authentication flow for headless environments.

## Individual Comments

### Comment 1
<location> `server/python_backend/database.py:34` </location>
<code_context>
+        conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
+        try:
+            # Use RealDictCursor for dictionary-like row access
+            async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
+                await asyncio.to_thread(cur.execute, query, params)
+
</code_context>

<issue_to_address>
Sync psycopg2 cursor does not support async context manager

Since psycopg2 cursors are synchronous, using 'async with' is invalid. Use 'asyncio.to_thread' for all cursor operations or consider an async database driver like 'asyncpg' or 'aiopg'.
</issue_to_address>

### Comment 2
<location> `server/python_backend/database.py:31` </location>
<code_context>
+
+    async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
+        """Helper to execute queries using asyncio.to_thread for sync psycopg2."""
+        conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
+        try:
+            # Use RealDictCursor for dictionary-like row access
</code_context>

<issue_to_address>
Opening a new connection per query is expensive

Using a connection pool or an async-native client will help reduce overhead and improve performance by reusing database connections.

Suggested implementation:

```python
    def __init__(self, db_url: Optional[str] = None):
        import asyncpg
        self.asyncpg = asyncpg
        self.database_url = db_url or os.getenv("DATABASE_URL")
        if not self.database_url:
            logger.error("DATABASE_URL environment variable not set.")
            raise ValueError("DATABASE_URL environment variable not set.")
        self.pool = None  # Will be initialized asynchronously

    async def init_pool(self):
        """Initialize the asyncpg connection pool."""
        if self.pool is None:
            self.pool = await self.asyncpg.create_pool(self.database_url)

    async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
        """Helper to execute queries using asyncpg connection pool."""
        if self.pool is None:
            await self.init_pool()
        async with self.pool.acquire() as conn:
            result = None
            stmt = await conn.prepare(query)
            if commit:
                await conn.execute(query, *(params or ()))
            elif fetch_one:
                result_row = await stmt.fetchrow(*(params or ()))
                result = dict(result_row) if result_row else None
            elif fetch_all:
                result_rows = await stmt.fetch(*(params or ()))
                result = [dict(row) for row in result_rows]

```

- You will need to ensure that `asyncpg` is installed in your environment (`pip install asyncpg`).
- Any code that instantiates this class should call `await instance.init_pool()` before making queries.
- If you have other methods that use the old psycopg2 connection, refactor them to use the asyncpg pool as well.
- Remove any remaining imports or usages of `psycopg2` and `psycopg2.extras` in this file.
</issue_to_address>

### Comment 3
<location> `server/python_backend/database.py:143` </location>
<code_context>
+        pass
+
+
+    def _parse_json_fields(self, row: Dict[str, Any], fields: List[str]) -> Dict[str, Any]:
+        """Helper to parse stringified JSON fields in a row."""
+        if not row:
</code_context>

<issue_to_address>
Parsing JSON only for `analysisMetadata` may miss other JSON/text-array columns

Consider updating this helper to also parse array fields like `labels`, `labelIds`, and `toAddresses`, or any other JSON columns, to ensure all relevant fields are converted to proper Python types.
</issue_to_address>

### Comment 4
<location> `server/python_backend/database.py:241` </location>
<code_context>
-                # Email already exists, update it
-                return await self.update_email_by_message_id(email_data['message_id'], email_data)

     async def get_email_by_id(self, email_id: int) -> Optional[Dict[str, Any]]:
         """Get email by ID"""
-        async with self.get_connection() as conn:
</code_context>

<issue_to_address>
Missing JSON parsing for array columns in result

Fields like `labels` (of type `text[]`) may remain as raw DB types when using RealDictCursor. Ensure these are properly cast or parsed if not handled by the driver.
</issue_to_address>

### Comment 5
<location> `server/python_backend/database.py:276` </location>
<code_context>
+        categories = await self._execute_query(query, fetch_all=True)
+        return categories if categories else []
+
+    async def create_category(self, category_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+        """Create a new category.
+           The schema.ts for categories has id, name, description, color, count.
</code_context>

<issue_to_address>
Create category allows duplicates on `name`

If category names must be unique, enforce this with a uniqueness constraint in the schema or handle conflicts in the INSERT statement using `ON CONFLICT (name) DO NOTHING` or `DO UPDATE`.

Suggested implementation:

```python
        query = """
            INSERT INTO categories (name, description, color, count)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (name) DO NOTHING
            RETURNING id, name, description, color, count;
        """
        params = (
            category_data['name'],
            category_data.get('description'),

```

```python
        result = await self._execute_query(query, params=params, fetch_one=True)
        return result if result else None

```
</issue_to_address>

### Comment 6
<location> `server/python_backend/database.py:348` </location>
<code_context>
+            return [self._parse_json_fields(email, ['analysisMetadata']) for email in emails]
+        return []

     async def update_email_by_message_id(self, message_id: str, 
-                                       update_data: Dict[str, Any]) -> Dict[str, Any]:
-        """Update email by message ID"""
</code_context>

<issue_to_address>
Mapping of update_data keys may skip new schema fields

Some new schema fields (e.g., `preview`, `historyId`, `internalDate`) are not included in your mapping. Using a centralized mapping or auto-quoting camelCase fields could help prevent missing fields.

Suggested implementation:

```python
    def _camel_to_snake(self, name: str) -> str:
        import re
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
        return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

    def _map_update_data_keys(self, update_data: dict) -> dict:
        # Map camelCase keys to quoted snake_case DB fields
        mapped = {}
        for k, v in update_data.items():
            db_key = f'"{self._camel_to_snake(k)}"'
            mapped[db_key] = v
        return mapped

    async def update_email_by_message_id(self, message_id: str, 

```

```python
                                       update_data: Dict[str, Any]) -> Dict[str, Any]:
        """Update email by message ID"""
        # Map update_data keys to DB fields
        mapped_update_data = self._map_update_data_keys(update_data)
        set_clause = ", ".join([f"{k} = %s" for k in mapped_update_data.keys()])
        values = list(mapped_update_data.values())
        query = f'UPDATE emails SET {set_clause} WHERE "message_id" = %s RETURNING *'
        values.append(message_id)
        result = await self._execute_query(query, tuple(values), fetch_one=True)
        if result:
            return self._parse_json_fields(result, ['analysisMetadata'])
        return {}

```
</issue_to_address>

### Comment 7
<location> `server/python_nlp/nlp_engine.py:14` </location>
<code_context>
+    async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
+        """Helper to execute queries using asyncio.to_thread for sync psycopg2."""
+        conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
+        try:
+            # Use RealDictCursor for dictionary-like row access
+            async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
</code_context>

<issue_to_address>
Single except covers multiple imports, loses granularity

Splitting the imports into separate try blocks will allow you to handle missing dependencies more precisely and enable partial NLP functionality when possible.
</issue_to_address>

### Comment 8
<location> `server/python_nlp/nlp_engine.py:514` </location>
<code_context>
+            urgency_analysis = self._analyze_urgency(cleaned_text) # Already updated
+
+            # This method is regex-based, no model to load for it currently per its implementation
+            risk_analysis_flags = self._detect_risk_factors(cleaned_text)
+

</code_context>

<issue_to_address>
Undefined method `_detect_risk_factors`; original was `_analyze_risk_factors`

Please update the method call to match the correct name to prevent runtime errors.
</issue_to_address>

### Comment 9
<location> `server/python_nlp/nlp_engine.py:99` </location>
<code_context>
-            category_id = cursor.lastrowid
-            conn.commit()
-
-            return {
-                'id': category_id,
-                'name': category_data['name'],
</code_context>

<issue_to_address>
Including full analysis details may expose internal model outputs

The `details` object includes raw model predictions and confidences, which could reveal sensitive internal logic. Consider sanitizing or restricting this data in production APIs.
</issue_to_address>

### Comment 10
<location> `server/python_backend/performance_monitor.py:309` </location>
<code_context>
+        for metric in relevant_metrics:
+            metrics_summary_temp[metric.metric_name].append(metric.value)
+
+        final_metrics_summary = {}
+        for name, values in metrics_summary_temp.items():
+            count = len(values)
</code_context>

<issue_to_address>
Returning raw `values` list may cause large payloads

Consider summarizing the values with aggregates or implementing pagination to avoid large responses.
</issue_to_address>

### Comment 11
<location> `server/python_backend/performance_monitor.py:349` </location>
<code_context>
-
-        conn.close()
+        service_data_points = []
+        for metric in self.metrics_buffer:
+            if metric.timestamp > since_time and metric.tags.get("service") == service_name:
+                 service_data_points.append(metric) # Store the whole PerformanceMetric object
</code_context>

<issue_to_address>
Service filter checks `tags.get("service")` without validating structure

Please verify that all `record_metric` calls provide `tags` as a dict containing a `service` key, or add checks/logging to handle missing or malformed tags.
</issue_to_address>

### Comment 12
<location> `server/python_nlp/gmail_integration.py:292` </location>
<code_context>
+        except OSError as e:
+            self.logger.error(f"Error storing credentials to {token_path}: {e}")
+
+    def _authenticate(self):
+        """Authenticates the user and obtains credentials using GMAIL_CREDENTIALS_JSON env var."""
+        creds = None
</code_context>

<issue_to_address>
OAuth flow blocks the event loop and is interactive

`flow.run_local_server()` blocks the event loop. For async server-side use, prefer a service-account flow or run the interactive step in a separate thread or process.
</issue_to_address>

### Comment 13
<location> `server/python_nlp/gmail_integration.py:297` </location>
<code_context>
+        creds = None
+        credentials_json_str = os.getenv(GMAIL_CREDENTIALS_ENV_VAR)
+
+        if not credentials_json_str:
+            self.logger.error(
+                f"Environment variable {GMAIL_CREDENTIALS_ENV_VAR} is not set. "
</code_context>

<issue_to_address>
Fallback to credentials file but mixing env/file methods is confusing

Consider using a single credential source or clearly documenting which source takes precedence to prevent confusion.
</issue_to_address>

### Comment 14
<location> `server/python_backend/database.py:29` </location>
<code_context>
+        # self.init_database() # Table creation handled by Drizzle ORM based on schema.ts
+        # Seeding default data can be done here if needed, or via a separate script.
+
+    async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
+        """Helper to execute queries using asyncio.to_thread for sync psycopg2."""
+        conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
</code_context>

<issue_to_address>
Consider refactoring the large, flag-driven _execute_query method into separate helpers for execution and fetching, using a cursor context manager to centralize commit/rollback and reduce code duplication.

Here’s one way to pull apart that big `_execute_query` and cut down on flags, nesting and JSON‐parsing spread all over:

1. Extract a simple cursor context manager.  
2. Have one “execute only” helper and two small fetch helpers instead of flags.  
3. Keep JSONin–text parsing in one place.

```python
from contextlib import asynccontextmanager

    @asynccontextmanager
    async def _get_cursor(self):
        conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
        cur  = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        try:
            yield cur
            await asyncio.to_thread(conn.commit)
        except Exception:
            await asyncio.to_thread(conn.rollback)
            raise
        finally:
            await asyncio.to_thread(cur.close)
            await asyncio.to_thread(conn.close)

    async def _execute(self, query: str, params: Tuple = None):
        async with self._get_cursor() as cur:
            await asyncio.to_thread(cur.execute, query, params)

    async def _fetch_one(self, query: str, params: Tuple = None) -> Optional[Dict]:
        async with self._get_cursor() as cur:
            await asyncio.to_thread(cur.execute, query, params)
            row = await asyncio.to_thread(cur.fetchone)
            return dict(row) if row else None

    async def _fetch_all(self, query: str, params: Tuple = None) -> List[Dict]:
        async with self._get_cursor() as cur:
            await asyncio.to_thread(cur.execute, query, params)
            rows = await asyncio.to_thread(cur.fetchall)
            return [dict(r) for r in rows]
```

Then you can rewrite e.g. `get_email_by_id` as

```python
    async def get_email_by_id(self, email_id: int) -> Optional[Dict]:
        q = """
          SELECT e.*, c.name AS "categoryName", c.color AS "categoryColor"
            FROM emails e
            LEFT JOIN categories c ON e."categoryId" = c.id
           WHERE e.id = %s
        """
        row = await self._fetch_one(q, (email_id,))
        return self._parse_json_fields(row, ["analysisMetadata"]) if row else None
```

and similar for inserts/updates:

```python
    async def create_category(self, data):
        q = """INSERT INTO categories(name,description,color,count)
               VALUES(%s,%s,%s,%s)
               RETURNING id,name,description,color,count"""
        return await self._fetch_one(q, (
            data["name"],
            data.get("description"),
            data.get("color", "#6366f1"),
            data.get("count", 0),
        ))
```

This:

- Removes the single giant `_execute_query` with flags  
- Gives each step one responsibility (execute, fetch_one, fetch_all)  
- Keeps commit/rollback in one place  
- Leaves you free to call `self._parse_json_fields` exactly where and only for the columns you need.
</issue_to_address>

### Comment 15
<location> `server/python_nlp/nlp_engine.py:75` </location>
<code_context>
         return text

     def _analyze_sentiment(self, text: str) -> Dict[str, Any]:
-        """Enhanced sentiment analysis with confidence scoring"""
-        if not HAS_NLTK:
</code_context>

<issue_to_address>
Consider refactoring the repeated model-then-fallback logic in each _analyze_* method into a single reusable helper to reduce duplication and nesting.

Here’s a way to collapse all of those “try model → catch → fallback → keyword” blocks into one reusable helper, and then slim each `_analyze_*` down to a single call.  This preserves all existing functionality, but removes 90% of the nesting and duplication.

1) Add a generic helper in your class:

```python
    def _analyze_with_model_fallback(
        self,
        model,                       # e.g. self.sentiment_model
        text: str,
        predict_fn: Callable,        # e.g. lambda m, t: (label, proba_array)
        fallback_fn: Callable,       # e.g. self._simple_sentiment
        method_tag: str,             # e.g. 'sentiment'
        default: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Try the sklearn pipeline in `model`, else fallback, 
        always returning a dict with 'confidence', 'method_used', etc.
        """
        if model:
            try:
                label, probas = predict_fn(model, text)
                confidence = float(max(probas))
                return {
                    method_tag: str(label),
                    'confidence': confidence,
                    'method_used': f'model_{method_tag}'
                }
            except Exception as e:
                logger.error(f"{method_tag} model error, fallback: {e}")
        # fallback
        result = fallback_fn(text)
        result['method_used'] = (
            result.get('method_used') or f'fallback_{method_tag}'
        )
        return result or default
```

2) Extract each simple fallback into its own tiny function:

```python
    def _simple_sentiment(self, text: str) -> Dict[str, Any]:
        text_lower = text.lower()
        # ... your existing positive/negative count logic ...
        return {
            'sentiment': sentiment_label,
            'polarity': pol,
            'subjectivity': 0.5,
            'confidence': conf
        }
```

3) Then rewrite your big `_analyze_sentiment` as:

```python
    def _analyze_sentiment(self, text: str) -> Dict[str, Any]:
        default = {'sentiment': 'neutral', 'polarity': 0.0, 
                   'subjectivity': 0.5, 'confidence': 0.5}
        return self._analyze_with_model_fallback(
            self.sentiment_model,
            text,
            predict_fn=lambda m, t: (m.predict([t])[0], m.predict_proba([t])[0]),
            fallback_fn=self._simple_sentiment,
            method_tag='sentiment',
            default=default
        )
```

4) Do the same for topic, intent, urgency.

— You’ll end up with four 510 line methods instead of dozens, and one central helper that does all of the try/catch, method tagging, and defaulting.  This keeps every feature but slashes cognitive complexity.
</issue_to_address>

### Comment 16
<location> `server/python_nlp/gmail_integration.py:239` </location>
<code_context>

-        # In production, this would be initialized with actual Gmail API credentials
-        self.gmail_service = None  # Placeholder for Gmail API service
+        self.gmail_service = None
+        self._load_credentials()
+        if not self.gmail_service:
</code_context>

<issue_to_address>
Consider extracting OAuth and credentials logic into a dedicated helper class to improve separation of concerns.

Here’s one way to peel off the OAuth-and-credentials code into its own single-responsibility helper.  You can apply the same pattern to pull out parsing or simulation logic.

```python
# oauth_manager.py
import os, json, logging
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

class OAuthManager:
    def __init__(self,
                 scopes: list,
                 token_path: str,
                 credentials_env_var: str,
                 fallback_credentials_file: str):
        self.scopes = scopes
        self.token_path = token_path
        self.env_var = credentials_env_var
        self.fallback = fallback_credentials_file
        self.logger = logging.getLogger(self.__class__.__name__)

    def get_service(self, api_name='gmail', api_version='v1'):
        creds = self._load_token()
        if not creds:
            creds = self._authenticate()
        service = build(api_name, api_version, credentials=creds)
        self._save_token(creds)
        return service

    def _load_token(self):
        if os.path.exists(self.token_path):
            try:
                creds = Credentials.from_authorized_user_file(self.token_path, self.scopes)
            except Exception as e:
                self.logger.error(f"token load failed: {e}")
                return None
            if creds.valid:
                return creds
            if creds.expired and creds.refresh_token:
                try:
                    creds.refresh(Request())
                    return creds
                except Exception:
                    os.remove(self.token_path)
        return None

    def _authenticate(self):
        data = os.getenv(self.env_var)
        if data:
            config = json.loads(data)
            flow = InstalledAppFlow.from_client_config(config, self.scopes)
        elif os.path.exists(self.fallback):
            flow = InstalledAppFlow.from_client_secrets_file(self.fallback, self.scopes)
        else:
            raise RuntimeError(f"Missing credentials—set {self.env_var} or place {self.fallback}")
        return flow.run_local_server(port=0)

    def _save_token(self, creds):
        try:
            with open(self.token_path, 'w') as f:
                f.write(creds.to_json())
        except Exception as e:
            self.logger.error(f"token save failed: {e}")
```

Then in your collector:

```python
from oauth_manager import OAuthManager

class GmailDataCollector:
    def __init__(self, auth_manager=None, config: RateLimitConfig=None):
        self.config = config or RateLimitConfig()
        self.rate_limiter = RateLimiter(self.config)
        self.cache = EmailCache()
        self.logger = logging.getLogger(__name__)

        self.auth = auth_manager or OAuthManager(
            scopes=SCOPES,
            token_path=TOKEN_JSON_PATH,
            credentials_env_var=GMAIL_CREDENTIALS_ENV_VAR,
            fallback_credentials_file=CREDENTIALS_PATH
        )
        self.gmail_service = self.auth.get_service()
```

By doing this you:

- Move all token-loading/refreshing/fallback logic into `OAuthManager`
- Keep `GmailDataCollector` focused on fetching, parsing and rate-limiting
- Make both classes smaller and easier to test in isolation

You can apply the same pattern to bundle your parsing (`_parse_message_payload`, `_extract_email_address`) into an `EmailParser` class, or to group simulated vs live calls into a `GmailApiClient` wrapper.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
try:
# Use RealDictCursor for dictionary-like row access
async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Sync psycopg2 cursor does not support async context manager

Since psycopg2 cursors are synchronous, using 'async with' is invalid. Use 'asyncio.to_thread' for all cursor operations or consider an async database driver like 'asyncpg' or 'aiopg'.


async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
"""Helper to execute queries using asyncio.to_thread for sync psycopg2."""
conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Opening a new connection per query is expensive

Using a connection pool or an async-native client will help reduce overhead and improve performance by reusing database connections.

Suggested implementation:

    def __init__(self, db_url: Optional[str] = None):
        import asyncpg
        self.asyncpg = asyncpg
        self.database_url = db_url or os.getenv("DATABASE_URL")
        if not self.database_url:
            logger.error("DATABASE_URL environment variable not set.")
            raise ValueError("DATABASE_URL environment variable not set.")
        self.pool = None  # Will be initialized asynchronously

    async def init_pool(self):
        """Initialize the asyncpg connection pool."""
        if self.pool is None:
            self.pool = await self.asyncpg.create_pool(self.database_url)

    async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
        """Helper to execute queries using asyncpg connection pool."""
        if self.pool is None:
            await self.init_pool()
        async with self.pool.acquire() as conn:
            result = None
            stmt = await conn.prepare(query)
            if commit:
                await conn.execute(query, *(params or ()))
            elif fetch_one:
                result_row = await stmt.fetchrow(*(params or ()))
                result = dict(result_row) if result_row else None
            elif fetch_all:
                result_rows = await stmt.fetch(*(params or ()))
                result = [dict(row) for row in result_rows]
  • You will need to ensure that asyncpg is installed in your environment (pip install asyncpg).
  • Any code that instantiates this class should call await instance.init_pool() before making queries.
  • If you have other methods that use the old psycopg2 connection, refactor them to use the asyncpg pool as well.
  • Remove any remaining imports or usages of psycopg2 and psycopg2.extras in this file.

pass


def _parse_json_fields(self, row: Dict[str, Any], fields: List[str]) -> Dict[str, Any]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Parsing JSON only for analysisMetadata may miss other JSON/text-array columns

Consider updating this helper to also parse array fields like labels, labelIds, and toAddresses, or any other JSON columns, to ensure all relevant fields are converted to proper Python types.

# Email already exists, update it
return await self.update_email_by_message_id(email_data['message_id'], email_data)

async def get_email_by_id(self, email_id: int) -> Optional[Dict[str, Any]]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Missing JSON parsing for array columns in result

Fields like labels (of type text[]) may remain as raw DB types when using RealDictCursor. Ensure these are properly cast or parsed if not handled by the driver.

categories = await self._execute_query(query, fetch_all=True)
return categories if categories else []

async def create_category(self, category_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Create category allows duplicates on name

If category names must be unique, enforce this with a uniqueness constraint in the schema or handle conflicts in the INSERT statement using ON CONFLICT (name) DO NOTHING or DO UPDATE.

Suggested implementation:

        query = """
            INSERT INTO categories (name, description, color, count)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (name) DO NOTHING
            RETURNING id, name, description, color, count;
        """
        params = (
            category_data['name'],
            category_data.get('description'),
        result = await self._execute_query(query, params=params, fetch_one=True)
        return result if result else None

Comment on lines +329 to 336
for severity, count in severities.items():
final_alerts_summary.append({
"type": alert_type,
"severity": severity,
"count": count
})

conn.close()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace a for append loop with list extend (for-append-to-extend)

Suggested change
for severity, count in severities.items():
final_alerts_summary.append({
"type": alert_type,
"severity": severity,
"count": count
})
conn.close()
final_alerts_summary.extend(
{"type": alert_type, "severity": severity, "count": count}
for severity, count in severities.items()
)

Comment on lines +348 to 352
service_data_points = []
for metric in self.metrics_buffer:
if metric.timestamp > since_time and metric.tags.get("service") == service_name:
service_data_points.append(metric) # Store the whole PerformanceMetric object

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Convert for loop into list comprehension (list-comprehension)

Suggested change
service_data_points = []
for metric in self.metrics_buffer:
if metric.timestamp > since_time and metric.tags.get("service") == service_name:
service_data_points.append(metric) # Store the whole PerformanceMetric object
service_data_points = [
metric
for metric in self.metrics_buffer
if metric.timestamp > since_time
and metric.tags.get("service") == service_name
]

def _authenticate(self):
"""Authenticates the user and obtains credentials using GMAIL_CREDENTIALS_JSON env var."""
creds = None
credentials_json_str = os.getenv(GMAIL_CREDENTIALS_ENV_VAR)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

Comment on lines +620 to +621
match = re.search(r'<([^>]+)>', sender_header)
if match:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)

Suggested change
match = re.search(r'<([^>]+)>', sender_header)
if match:
if match := re.search(r'<([^>]+)>', sender_header):

print(f"1. {GMAIL_CREDENTIALS_ENV_VAR}: Should contain the JSON string of your Google Cloud credentials.")
print(f" Example: export {GMAIL_CREDENTIALS_ENV_VAR}='{{ \"installed\": {{ ... }} }}'")
print(f" (Alternatively, as a fallback, place a 'credentials.json' file in the script's directory: {os.getcwd()})")
print(f"2. GMAIL_TOKEN_PATH (Optional): Specify a custom path for 'token.json'.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

@MasumRab MasumRab deleted the jules_wip_11755313394803907600 branch June 13, 2025 00:37
MasumRab added a commit that referenced this pull request Oct 29, 2025
Jules was unable to complete the task in time. Please review the work…
MasumRab added a commit that referenced this pull request Mar 24, 2026
**Improvement #1: Path-Based CI Triggers**
- Skip CI for docs-only changes (*.md, docs/**, *.mdx)
- Add commit message options: [skip ci], [ci:tests-only], [ci:lint-only]
- Add PR labels: ci:skip-tests, ci:skip-lint, automerge
- Make lint non-blocking (continue-on-error: true)

**Improvement #2: Dependabot for GitHub Actions**
- Auto-update GitHub Actions weekly
- Auto-update Python dependencies weekly
- Auto-update npm dependencies weekly
- Group minor/patch updates to reduce PR noise
- Label all Dependabot PRs with 'automerge'

**Improvement #3: Enhanced Mergify Auto-Merge**
- Auto-merge Dependabot PRs on test OR lint success
- Auto-merge PRs with 'automerge' label
- Support flexible CI success criteria

**Documentation:**
- Add CI/CD Guide section to README.md
- Document skip options and PR labels
- Create CI_CD_IMPROVEMENTS.md with full analysis

**Expected Impact:**
- 40% reduction: Path-based triggers (docs-only PRs)
- 20% reduction: Non-blocking lint
- 15% reduction: Label-based control
- Total: ~75% fewer unnecessary CI blockages

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant