Jules was unable to complete the task in time. Please review the work…#1
Jules was unable to complete the task in time. Please review the work…#1
Conversation
… done so far and provide feedback for Jules to continue.
Reviewer's GuideThis PR migrates the Python backend from SQLite to PostgreSQL with an async psycopg2 helper, enhances the NLP engine to load optional ML models with fallbacks and richer logging, refactors the performance monitor to use in-memory buffers instead of SQLite, and modernizes Gmail integration by loading OAuth credentials from the environment and making real API calls with error handling. Sequence Diagram for Gmail API Integration and AuthenticationsequenceDiagram
participant App as GmailDataCollector
participant GoogleAuth as Google OAuth
participant GmailAPI as Gmail API
App->>App: __init__()
App->>App: _load_credentials() from token.json
alt Stored token is invalid or missing
App->>App: _authenticate()
Note over App: Reads credentials from GMAIL_CREDENTIALS_JSON env var
App->>+GoogleAuth: Initiate OAuth Flow
GoogleAuth-->>-App: Return new credentials/token
App->>App: _store_credentials() to token.json
end
App->>App: Gmail service is initialized
App->>+GmailAPI: list(messages, q="...")
GmailAPI-->>-App: Message list response
loop For each message ID in response
App->>+GmailAPI: get(message, id=...)
GmailAPI-->>-App: Full message data
App->>App: _parse_message_payload()
end
Entity Relationship Diagram for the Updated PostgreSQL SchemaerDiagram
categories {
int id PK
string name
string description
string color
int count
}
emails {
int id PK
string messageId UK "From Gmail"
string threadId "From Gmail"
string subject
text content
boolean isUnread
int categoryId FK
json analysisMetadata
timestamp createdAt
timestamp updatedAt
}
activities {
int id PK
string type
string description
json details
timestamp timestamp
}
categories ||--o{ emails : "categorizes"
Class Diagram for the Refactored DatabaseManagerclassDiagram
class DatabaseManager {
-database_url: str
+__init__(db_url: Optional[str])
+async _execute_query(query, params, fetch_one, fetch_all, commit)
+init_database()
+async create_email(email_data: Dict) Optional[Dict]
+async get_email_by_id(email_id: int) Optional[Dict]
+async get_emails(limit, offset, category_id, is_unread) List[Dict]
+async update_email_by_message_id(message_id: str, update_data: Dict) Optional[Dict]
+async create_activity(activity_data: Dict) Optional[Dict]
+async get_dashboard_stats() Dict
}
Class Diagram for the Enhanced NLPEngineclassDiagram
class NLPEngine {
-sentiment_model: Any
-topic_model: Any
-intent_model: Any
-urgency_model: Any
+__init__()
-_load_model(model_path: str) Any
-_analyze_sentiment(text: str) Dict
-_analyze_topic(text: str) Dict
-_analyze_intent(text: str) Dict
-_analyze_urgency(text: str) Dict
+analyze_email(subject: str, content: str) Dict
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey @MasumRab - I've reviewed your changes - here's some feedback:
- Refactor seeding and schema initialization out of
__init__into an async setup method to avoid blocking the event loop during object instantiation. - Consolidate the repeated snake_case-to-camelCase column mapping logic into a shared utility or mapping configuration to reduce duplication in the
create/updatemethods. - Avoid using
run_local_serverin the constructor for OAuth in production; consider a non-blocking or service-account based authentication flow for headless environments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Refactor seeding and schema initialization out of `__init__` into an async setup method to avoid blocking the event loop during object instantiation.
- Consolidate the repeated snake_case-to-camelCase column mapping logic into a shared utility or mapping configuration to reduce duplication in the `create`/`update` methods.
- Avoid using `run_local_server` in the constructor for OAuth in production; consider a non-blocking or service-account based authentication flow for headless environments.
## Individual Comments
### Comment 1
<location> `server/python_backend/database.py:34` </location>
<code_context>
+ conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
+ try:
+ # Use RealDictCursor for dictionary-like row access
+ async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
+ await asyncio.to_thread(cur.execute, query, params)
+
</code_context>
<issue_to_address>
Sync psycopg2 cursor does not support async context manager
Since psycopg2 cursors are synchronous, using 'async with' is invalid. Use 'asyncio.to_thread' for all cursor operations or consider an async database driver like 'asyncpg' or 'aiopg'.
</issue_to_address>
### Comment 2
<location> `server/python_backend/database.py:31` </location>
<code_context>
+
+ async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
+ """Helper to execute queries using asyncio.to_thread for sync psycopg2."""
+ conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
+ try:
+ # Use RealDictCursor for dictionary-like row access
</code_context>
<issue_to_address>
Opening a new connection per query is expensive
Using a connection pool or an async-native client will help reduce overhead and improve performance by reusing database connections.
Suggested implementation:
```python
def __init__(self, db_url: Optional[str] = None):
import asyncpg
self.asyncpg = asyncpg
self.database_url = db_url or os.getenv("DATABASE_URL")
if not self.database_url:
logger.error("DATABASE_URL environment variable not set.")
raise ValueError("DATABASE_URL environment variable not set.")
self.pool = None # Will be initialized asynchronously
async def init_pool(self):
"""Initialize the asyncpg connection pool."""
if self.pool is None:
self.pool = await self.asyncpg.create_pool(self.database_url)
async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
"""Helper to execute queries using asyncpg connection pool."""
if self.pool is None:
await self.init_pool()
async with self.pool.acquire() as conn:
result = None
stmt = await conn.prepare(query)
if commit:
await conn.execute(query, *(params or ()))
elif fetch_one:
result_row = await stmt.fetchrow(*(params or ()))
result = dict(result_row) if result_row else None
elif fetch_all:
result_rows = await stmt.fetch(*(params or ()))
result = [dict(row) for row in result_rows]
```
- You will need to ensure that `asyncpg` is installed in your environment (`pip install asyncpg`).
- Any code that instantiates this class should call `await instance.init_pool()` before making queries.
- If you have other methods that use the old psycopg2 connection, refactor them to use the asyncpg pool as well.
- Remove any remaining imports or usages of `psycopg2` and `psycopg2.extras` in this file.
</issue_to_address>
### Comment 3
<location> `server/python_backend/database.py:143` </location>
<code_context>
+ pass
+
+
+ def _parse_json_fields(self, row: Dict[str, Any], fields: List[str]) -> Dict[str, Any]:
+ """Helper to parse stringified JSON fields in a row."""
+ if not row:
</code_context>
<issue_to_address>
Parsing JSON only for `analysisMetadata` may miss other JSON/text-array columns
Consider updating this helper to also parse array fields like `labels`, `labelIds`, and `toAddresses`, or any other JSON columns, to ensure all relevant fields are converted to proper Python types.
</issue_to_address>
### Comment 4
<location> `server/python_backend/database.py:241` </location>
<code_context>
- # Email already exists, update it
- return await self.update_email_by_message_id(email_data['message_id'], email_data)
async def get_email_by_id(self, email_id: int) -> Optional[Dict[str, Any]]:
"""Get email by ID"""
- async with self.get_connection() as conn:
</code_context>
<issue_to_address>
Missing JSON parsing for array columns in result
Fields like `labels` (of type `text[]`) may remain as raw DB types when using RealDictCursor. Ensure these are properly cast or parsed if not handled by the driver.
</issue_to_address>
### Comment 5
<location> `server/python_backend/database.py:276` </location>
<code_context>
+ categories = await self._execute_query(query, fetch_all=True)
+ return categories if categories else []
+
+ async def create_category(self, category_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+ """Create a new category.
+ The schema.ts for categories has id, name, description, color, count.
</code_context>
<issue_to_address>
Create category allows duplicates on `name`
If category names must be unique, enforce this with a uniqueness constraint in the schema or handle conflicts in the INSERT statement using `ON CONFLICT (name) DO NOTHING` or `DO UPDATE`.
Suggested implementation:
```python
query = """
INSERT INTO categories (name, description, color, count)
VALUES (%s, %s, %s, %s)
ON CONFLICT (name) DO NOTHING
RETURNING id, name, description, color, count;
"""
params = (
category_data['name'],
category_data.get('description'),
```
```python
result = await self._execute_query(query, params=params, fetch_one=True)
return result if result else None
```
</issue_to_address>
### Comment 6
<location> `server/python_backend/database.py:348` </location>
<code_context>
+ return [self._parse_json_fields(email, ['analysisMetadata']) for email in emails]
+ return []
async def update_email_by_message_id(self, message_id: str,
- update_data: Dict[str, Any]) -> Dict[str, Any]:
- """Update email by message ID"""
</code_context>
<issue_to_address>
Mapping of update_data keys may skip new schema fields
Some new schema fields (e.g., `preview`, `historyId`, `internalDate`) are not included in your mapping. Using a centralized mapping or auto-quoting camelCase fields could help prevent missing fields.
Suggested implementation:
```python
def _camel_to_snake(self, name: str) -> str:
import re
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def _map_update_data_keys(self, update_data: dict) -> dict:
# Map camelCase keys to quoted snake_case DB fields
mapped = {}
for k, v in update_data.items():
db_key = f'"{self._camel_to_snake(k)}"'
mapped[db_key] = v
return mapped
async def update_email_by_message_id(self, message_id: str,
```
```python
update_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update email by message ID"""
# Map update_data keys to DB fields
mapped_update_data = self._map_update_data_keys(update_data)
set_clause = ", ".join([f"{k} = %s" for k in mapped_update_data.keys()])
values = list(mapped_update_data.values())
query = f'UPDATE emails SET {set_clause} WHERE "message_id" = %s RETURNING *'
values.append(message_id)
result = await self._execute_query(query, tuple(values), fetch_one=True)
if result:
return self._parse_json_fields(result, ['analysisMetadata'])
return {}
```
</issue_to_address>
### Comment 7
<location> `server/python_nlp/nlp_engine.py:14` </location>
<code_context>
+ async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
+ """Helper to execute queries using asyncio.to_thread for sync psycopg2."""
+ conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
+ try:
+ # Use RealDictCursor for dictionary-like row access
+ async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
</code_context>
<issue_to_address>
Single except covers multiple imports, loses granularity
Splitting the imports into separate try blocks will allow you to handle missing dependencies more precisely and enable partial NLP functionality when possible.
</issue_to_address>
### Comment 8
<location> `server/python_nlp/nlp_engine.py:514` </location>
<code_context>
+ urgency_analysis = self._analyze_urgency(cleaned_text) # Already updated
+
+ # This method is regex-based, no model to load for it currently per its implementation
+ risk_analysis_flags = self._detect_risk_factors(cleaned_text)
+
</code_context>
<issue_to_address>
Undefined method `_detect_risk_factors`; original was `_analyze_risk_factors`
Please update the method call to match the correct name to prevent runtime errors.
</issue_to_address>
### Comment 9
<location> `server/python_nlp/nlp_engine.py:99` </location>
<code_context>
- category_id = cursor.lastrowid
- conn.commit()
-
- return {
- 'id': category_id,
- 'name': category_data['name'],
</code_context>
<issue_to_address>
Including full analysis details may expose internal model outputs
The `details` object includes raw model predictions and confidences, which could reveal sensitive internal logic. Consider sanitizing or restricting this data in production APIs.
</issue_to_address>
### Comment 10
<location> `server/python_backend/performance_monitor.py:309` </location>
<code_context>
+ for metric in relevant_metrics:
+ metrics_summary_temp[metric.metric_name].append(metric.value)
+
+ final_metrics_summary = {}
+ for name, values in metrics_summary_temp.items():
+ count = len(values)
</code_context>
<issue_to_address>
Returning raw `values` list may cause large payloads
Consider summarizing the values with aggregates or implementing pagination to avoid large responses.
</issue_to_address>
### Comment 11
<location> `server/python_backend/performance_monitor.py:349` </location>
<code_context>
-
- conn.close()
+ service_data_points = []
+ for metric in self.metrics_buffer:
+ if metric.timestamp > since_time and metric.tags.get("service") == service_name:
+ service_data_points.append(metric) # Store the whole PerformanceMetric object
</code_context>
<issue_to_address>
Service filter checks `tags.get("service")` without validating structure
Please verify that all `record_metric` calls provide `tags` as a dict containing a `service` key, or add checks/logging to handle missing or malformed tags.
</issue_to_address>
### Comment 12
<location> `server/python_nlp/gmail_integration.py:292` </location>
<code_context>
+ except OSError as e:
+ self.logger.error(f"Error storing credentials to {token_path}: {e}")
+
+ def _authenticate(self):
+ """Authenticates the user and obtains credentials using GMAIL_CREDENTIALS_JSON env var."""
+ creds = None
</code_context>
<issue_to_address>
OAuth flow blocks the event loop and is interactive
`flow.run_local_server()` blocks the event loop. For async server-side use, prefer a service-account flow or run the interactive step in a separate thread or process.
</issue_to_address>
### Comment 13
<location> `server/python_nlp/gmail_integration.py:297` </location>
<code_context>
+ creds = None
+ credentials_json_str = os.getenv(GMAIL_CREDENTIALS_ENV_VAR)
+
+ if not credentials_json_str:
+ self.logger.error(
+ f"Environment variable {GMAIL_CREDENTIALS_ENV_VAR} is not set. "
</code_context>
<issue_to_address>
Fallback to credentials file but mixing env/file methods is confusing
Consider using a single credential source or clearly documenting which source takes precedence to prevent confusion.
</issue_to_address>
### Comment 14
<location> `server/python_backend/database.py:29` </location>
<code_context>
+ # self.init_database() # Table creation handled by Drizzle ORM based on schema.ts
+ # Seeding default data can be done here if needed, or via a separate script.
+
+ async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
+ """Helper to execute queries using asyncio.to_thread for sync psycopg2."""
+ conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
</code_context>
<issue_to_address>
Consider refactoring the large, flag-driven _execute_query method into separate helpers for execution and fetching, using a cursor context manager to centralize commit/rollback and reduce code duplication.
Here’s one way to pull apart that big `_execute_query` and cut down on flags, nesting and JSON‐parsing spread all over:
1. Extract a simple cursor context manager.
2. Have one “execute only” helper and two small fetch helpers instead of flags.
3. Keep JSON–in–text parsing in one place.
```python
from contextlib import asynccontextmanager
@asynccontextmanager
async def _get_cursor(self):
conn = await asyncio.to_thread(psycopg2.connect, self.database_url)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
try:
yield cur
await asyncio.to_thread(conn.commit)
except Exception:
await asyncio.to_thread(conn.rollback)
raise
finally:
await asyncio.to_thread(cur.close)
await asyncio.to_thread(conn.close)
async def _execute(self, query: str, params: Tuple = None):
async with self._get_cursor() as cur:
await asyncio.to_thread(cur.execute, query, params)
async def _fetch_one(self, query: str, params: Tuple = None) -> Optional[Dict]:
async with self._get_cursor() as cur:
await asyncio.to_thread(cur.execute, query, params)
row = await asyncio.to_thread(cur.fetchone)
return dict(row) if row else None
async def _fetch_all(self, query: str, params: Tuple = None) -> List[Dict]:
async with self._get_cursor() as cur:
await asyncio.to_thread(cur.execute, query, params)
rows = await asyncio.to_thread(cur.fetchall)
return [dict(r) for r in rows]
```
Then you can rewrite e.g. `get_email_by_id` as
```python
async def get_email_by_id(self, email_id: int) -> Optional[Dict]:
q = """
SELECT e.*, c.name AS "categoryName", c.color AS "categoryColor"
FROM emails e
LEFT JOIN categories c ON e."categoryId" = c.id
WHERE e.id = %s
"""
row = await self._fetch_one(q, (email_id,))
return self._parse_json_fields(row, ["analysisMetadata"]) if row else None
```
and similar for inserts/updates:
```python
async def create_category(self, data):
q = """INSERT INTO categories(name,description,color,count)
VALUES(%s,%s,%s,%s)
RETURNING id,name,description,color,count"""
return await self._fetch_one(q, (
data["name"],
data.get("description"),
data.get("color", "#6366f1"),
data.get("count", 0),
))
```
This:
- Removes the single giant `_execute_query` with flags
- Gives each step one responsibility (execute, fetch_one, fetch_all)
- Keeps commit/rollback in one place
- Leaves you free to call `self._parse_json_fields` exactly where and only for the columns you need.
</issue_to_address>
### Comment 15
<location> `server/python_nlp/nlp_engine.py:75` </location>
<code_context>
return text
def _analyze_sentiment(self, text: str) -> Dict[str, Any]:
- """Enhanced sentiment analysis with confidence scoring"""
- if not HAS_NLTK:
</code_context>
<issue_to_address>
Consider refactoring the repeated model-then-fallback logic in each _analyze_* method into a single reusable helper to reduce duplication and nesting.
Here’s a way to collapse all of those “try model → catch → fallback → keyword” blocks into one reusable helper, and then slim each `_analyze_*` down to a single call. This preserves all existing functionality, but removes 90% of the nesting and duplication.
1) Add a generic helper in your class:
```python
def _analyze_with_model_fallback(
self,
model, # e.g. self.sentiment_model
text: str,
predict_fn: Callable, # e.g. lambda m, t: (label, proba_array)
fallback_fn: Callable, # e.g. self._simple_sentiment
method_tag: str, # e.g. 'sentiment'
default: Dict[str, Any]
) -> Dict[str, Any]:
"""
Try the sklearn pipeline in `model`, else fallback,
always returning a dict with 'confidence', 'method_used', etc.
"""
if model:
try:
label, probas = predict_fn(model, text)
confidence = float(max(probas))
return {
method_tag: str(label),
'confidence': confidence,
'method_used': f'model_{method_tag}'
}
except Exception as e:
logger.error(f"{method_tag} model error, fallback: {e}")
# fallback
result = fallback_fn(text)
result['method_used'] = (
result.get('method_used') or f'fallback_{method_tag}'
)
return result or default
```
2) Extract each simple fallback into its own tiny function:
```python
def _simple_sentiment(self, text: str) -> Dict[str, Any]:
text_lower = text.lower()
# ... your existing positive/negative count logic ...
return {
'sentiment': sentiment_label,
'polarity': pol,
'subjectivity': 0.5,
'confidence': conf
}
```
3) Then rewrite your big `_analyze_sentiment` as:
```python
def _analyze_sentiment(self, text: str) -> Dict[str, Any]:
default = {'sentiment': 'neutral', 'polarity': 0.0,
'subjectivity': 0.5, 'confidence': 0.5}
return self._analyze_with_model_fallback(
self.sentiment_model,
text,
predict_fn=lambda m, t: (m.predict([t])[0], m.predict_proba([t])[0]),
fallback_fn=self._simple_sentiment,
method_tag='sentiment',
default=default
)
```
4) Do the same for topic, intent, urgency.
— You’ll end up with four 5–10 line methods instead of dozens, and one central helper that does all of the try/catch, method tagging, and defaulting. This keeps every feature but slashes cognitive complexity.
</issue_to_address>
### Comment 16
<location> `server/python_nlp/gmail_integration.py:239` </location>
<code_context>
- # In production, this would be initialized with actual Gmail API credentials
- self.gmail_service = None # Placeholder for Gmail API service
+ self.gmail_service = None
+ self._load_credentials()
+ if not self.gmail_service:
</code_context>
<issue_to_address>
Consider extracting OAuth and credentials logic into a dedicated helper class to improve separation of concerns.
Here’s one way to peel off the OAuth-and-credentials code into its own single-responsibility helper. You can apply the same pattern to pull out parsing or simulation logic.
```python
# oauth_manager.py
import os, json, logging
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
class OAuthManager:
def __init__(self,
scopes: list,
token_path: str,
credentials_env_var: str,
fallback_credentials_file: str):
self.scopes = scopes
self.token_path = token_path
self.env_var = credentials_env_var
self.fallback = fallback_credentials_file
self.logger = logging.getLogger(self.__class__.__name__)
def get_service(self, api_name='gmail', api_version='v1'):
creds = self._load_token()
if not creds:
creds = self._authenticate()
service = build(api_name, api_version, credentials=creds)
self._save_token(creds)
return service
def _load_token(self):
if os.path.exists(self.token_path):
try:
creds = Credentials.from_authorized_user_file(self.token_path, self.scopes)
except Exception as e:
self.logger.error(f"token load failed: {e}")
return None
if creds.valid:
return creds
if creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
return creds
except Exception:
os.remove(self.token_path)
return None
def _authenticate(self):
data = os.getenv(self.env_var)
if data:
config = json.loads(data)
flow = InstalledAppFlow.from_client_config(config, self.scopes)
elif os.path.exists(self.fallback):
flow = InstalledAppFlow.from_client_secrets_file(self.fallback, self.scopes)
else:
raise RuntimeError(f"Missing credentials—set {self.env_var} or place {self.fallback}")
return flow.run_local_server(port=0)
def _save_token(self, creds):
try:
with open(self.token_path, 'w') as f:
f.write(creds.to_json())
except Exception as e:
self.logger.error(f"token save failed: {e}")
```
Then in your collector:
```python
from oauth_manager import OAuthManager
class GmailDataCollector:
def __init__(self, auth_manager=None, config: RateLimitConfig=None):
self.config = config or RateLimitConfig()
self.rate_limiter = RateLimiter(self.config)
self.cache = EmailCache()
self.logger = logging.getLogger(__name__)
self.auth = auth_manager or OAuthManager(
scopes=SCOPES,
token_path=TOKEN_JSON_PATH,
credentials_env_var=GMAIL_CREDENTIALS_ENV_VAR,
fallback_credentials_file=CREDENTIALS_PATH
)
self.gmail_service = self.auth.get_service()
```
By doing this you:
- Move all token-loading/refreshing/fallback logic into `OAuthManager`
- Keep `GmailDataCollector` focused on fetching, parsing and rate-limiting
- Make both classes smaller and easier to test in isolation
You can apply the same pattern to bundle your parsing (`_parse_message_payload`, `_extract_email_address`) into an `EmailParser` class, or to group simulated vs live calls into a `GmailApiClient` wrapper.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| conn = await asyncio.to_thread(psycopg2.connect, self.database_url) | ||
| try: | ||
| # Use RealDictCursor for dictionary-like row access | ||
| async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: |
There was a problem hiding this comment.
issue (bug_risk): Sync psycopg2 cursor does not support async context manager
Since psycopg2 cursors are synchronous, using 'async with' is invalid. Use 'asyncio.to_thread' for all cursor operations or consider an async database driver like 'asyncpg' or 'aiopg'.
|
|
||
| async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False): | ||
| """Helper to execute queries using asyncio.to_thread for sync psycopg2.""" | ||
| conn = await asyncio.to_thread(psycopg2.connect, self.database_url) |
There was a problem hiding this comment.
suggestion (performance): Opening a new connection per query is expensive
Using a connection pool or an async-native client will help reduce overhead and improve performance by reusing database connections.
Suggested implementation:
def __init__(self, db_url: Optional[str] = None):
import asyncpg
self.asyncpg = asyncpg
self.database_url = db_url or os.getenv("DATABASE_URL")
if not self.database_url:
logger.error("DATABASE_URL environment variable not set.")
raise ValueError("DATABASE_URL environment variable not set.")
self.pool = None # Will be initialized asynchronously
async def init_pool(self):
"""Initialize the asyncpg connection pool."""
if self.pool is None:
self.pool = await self.asyncpg.create_pool(self.database_url)
async def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False, fetch_all: bool = False, commit: bool = False):
"""Helper to execute queries using asyncpg connection pool."""
if self.pool is None:
await self.init_pool()
async with self.pool.acquire() as conn:
result = None
stmt = await conn.prepare(query)
if commit:
await conn.execute(query, *(params or ()))
elif fetch_one:
result_row = await stmt.fetchrow(*(params or ()))
result = dict(result_row) if result_row else None
elif fetch_all:
result_rows = await stmt.fetch(*(params or ()))
result = [dict(row) for row in result_rows]- You will need to ensure that
asyncpgis installed in your environment (pip install asyncpg). - Any code that instantiates this class should call
await instance.init_pool()before making queries. - If you have other methods that use the old psycopg2 connection, refactor them to use the asyncpg pool as well.
- Remove any remaining imports or usages of
psycopg2andpsycopg2.extrasin this file.
| pass | ||
|
|
||
|
|
||
| def _parse_json_fields(self, row: Dict[str, Any], fields: List[str]) -> Dict[str, Any]: |
There was a problem hiding this comment.
suggestion: Parsing JSON only for analysisMetadata may miss other JSON/text-array columns
Consider updating this helper to also parse array fields like labels, labelIds, and toAddresses, or any other JSON columns, to ensure all relevant fields are converted to proper Python types.
| # Email already exists, update it | ||
| return await self.update_email_by_message_id(email_data['message_id'], email_data) | ||
|
|
||
| async def get_email_by_id(self, email_id: int) -> Optional[Dict[str, Any]]: |
There was a problem hiding this comment.
issue (bug_risk): Missing JSON parsing for array columns in result
Fields like labels (of type text[]) may remain as raw DB types when using RealDictCursor. Ensure these are properly cast or parsed if not handled by the driver.
| categories = await self._execute_query(query, fetch_all=True) | ||
| return categories if categories else [] | ||
|
|
||
| async def create_category(self, category_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
There was a problem hiding this comment.
suggestion (bug_risk): Create category allows duplicates on name
If category names must be unique, enforce this with a uniqueness constraint in the schema or handle conflicts in the INSERT statement using ON CONFLICT (name) DO NOTHING or DO UPDATE.
Suggested implementation:
query = """
INSERT INTO categories (name, description, color, count)
VALUES (%s, %s, %s, %s)
ON CONFLICT (name) DO NOTHING
RETURNING id, name, description, color, count;
"""
params = (
category_data['name'],
category_data.get('description'), result = await self._execute_query(query, params=params, fetch_one=True)
return result if result else None| for severity, count in severities.items(): | ||
| final_alerts_summary.append({ | ||
| "type": alert_type, | ||
| "severity": severity, | ||
| "count": count | ||
| }) | ||
|
|
||
| conn.close() | ||
|
|
There was a problem hiding this comment.
suggestion (code-quality): Replace a for append loop with list extend (for-append-to-extend)
| for severity, count in severities.items(): | |
| final_alerts_summary.append({ | |
| "type": alert_type, | |
| "severity": severity, | |
| "count": count | |
| }) | |
| conn.close() | |
| final_alerts_summary.extend( | |
| {"type": alert_type, "severity": severity, "count": count} | |
| for severity, count in severities.items() | |
| ) |
| service_data_points = [] | ||
| for metric in self.metrics_buffer: | ||
| if metric.timestamp > since_time and metric.tags.get("service") == service_name: | ||
| service_data_points.append(metric) # Store the whole PerformanceMetric object | ||
|
|
There was a problem hiding this comment.
suggestion (code-quality): Convert for loop into list comprehension (list-comprehension)
| service_data_points = [] | |
| for metric in self.metrics_buffer: | |
| if metric.timestamp > since_time and metric.tags.get("service") == service_name: | |
| service_data_points.append(metric) # Store the whole PerformanceMetric object | |
| service_data_points = [ | |
| metric | |
| for metric in self.metrics_buffer | |
| if metric.timestamp > since_time | |
| and metric.tags.get("service") == service_name | |
| ] |
| def _authenticate(self): | ||
| """Authenticates the user and obtains credentials using GMAIL_CREDENTIALS_JSON env var.""" | ||
| creds = None | ||
| credentials_json_str = os.getenv(GMAIL_CREDENTIALS_ENV_VAR) |
There was a problem hiding this comment.
issue (code-quality): We've found these issues:
- Use named expression to simplify assignment and conditional (
use-named-expression) - Swap if/else branches (
swap-if-else-branches)
| match = re.search(r'<([^>]+)>', sender_header) | ||
| if match: |
There was a problem hiding this comment.
suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)
| match = re.search(r'<([^>]+)>', sender_header) | |
| if match: | |
| if match := re.search(r'<([^>]+)>', sender_header): |
| print(f"1. {GMAIL_CREDENTIALS_ENV_VAR}: Should contain the JSON string of your Google Cloud credentials.") | ||
| print(f" Example: export {GMAIL_CREDENTIALS_ENV_VAR}='{{ \"installed\": {{ ... }} }}'") | ||
| print(f" (Alternatively, as a fallback, place a 'credentials.json' file in the script's directory: {os.getcwd()})") | ||
| print(f"2. GMAIL_TOKEN_PATH (Optional): Specify a custom path for 'token.json'.") |
There was a problem hiding this comment.
issue (code-quality): We've found these issues:
- Replace f-string with no interpolated values with string (
remove-redundant-fstring) - Use named expression to simplify assignment and conditional (
use-named-expression)
Jules was unable to complete the task in time. Please review the work…
**Improvement #1: Path-Based CI Triggers** - Skip CI for docs-only changes (*.md, docs/**, *.mdx) - Add commit message options: [skip ci], [ci:tests-only], [ci:lint-only] - Add PR labels: ci:skip-tests, ci:skip-lint, automerge - Make lint non-blocking (continue-on-error: true) **Improvement #2: Dependabot for GitHub Actions** - Auto-update GitHub Actions weekly - Auto-update Python dependencies weekly - Auto-update npm dependencies weekly - Group minor/patch updates to reduce PR noise - Label all Dependabot PRs with 'automerge' **Improvement #3: Enhanced Mergify Auto-Merge** - Auto-merge Dependabot PRs on test OR lint success - Auto-merge PRs with 'automerge' label - Support flexible CI success criteria **Documentation:** - Add CI/CD Guide section to README.md - Document skip options and PR labels - Create CI_CD_IMPROVEMENTS.md with full analysis **Expected Impact:** - 40% reduction: Path-based triggers (docs-only PRs) - 20% reduction: Non-blocking lint - 15% reduction: Label-based control - Total: ~75% fewer unnecessary CI blockages Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
… done so far and provide feedback for Jules to continue.
Summary by Sourcery
Migrate backend from SQLite to PostgreSQL, refactor async database operations and JSON handling, enhance the NLP engine with ML model support and fallback mechanisms, convert performance monitoring to in-memory mode, and implement robust Gmail OAuth2 integration, updating project dependencies accordingly.
New Features:
Enhancements:
Chores: