I've made some enhancements to smart_retrieval.py to improve its st…#34
I've made some enhancements to smart_retrieval.py to improve its st…#34
smart_retrieval.py to improve its st…#34Conversation
…andalone execution and testing capabilities.
Here's a summary of the changes:
1. **Command-Line Interface**: I've added a command-line interface with the following commands:
* `list-strategies`: This will show you the available retrieval strategies.
* `execute-strategies`: This allows you to run specific (or all) strategies with budget controls.
* `get-retrieval-analytics`: You can use this to fetch and display retrieval analytics.
All output from the command-line interface is in JSON format for both successful operations and any errors.
2. **Gmail Authentication**: I've integrated OAuth 2.0 authentication, which enables the script to make authenticated calls to the Gmail API. It uses `token.json` and `credentials.json` (via the `GMAIL_CREDENTIALS_JSON` environment variable).
3. **Real Gmail API Calls**: I've updated the email fetching logic in `_fetch_email_batch` to use actual Gmail API calls (`messages.list` and `messages.get`) instead of simulated ones. The simulation feature is still available as a fallback if authentication doesn't succeed.
4. **Refinements for Standalone Execution**:
* I've improved and standardized the logging throughout the script, and you can now configure the log levels.
* I've ensured consistent use of `checkpoint_db_path`.
* I've standardized the JSON error responses from the command-line interface, including exiting with a status of 1 when an error occurs.
5. **Unit Tests**:
* I've created `tests/test_smart_retrieval.py` with a structure for command-line interface and logic tests.
* I've made significant progress in implementing these tests, including mocking Gmail API interactions and addressing issues related to asynchronous testing and SQLite usage in tests.
* I've identified the specific next steps needed to fully stabilize the checkpointing and asynchronous command-line interface tests.
These updates make `smart_retrieval.py` a more functional and robust tool, which can be called by other services like `GmailAIService`.
Reviewer's GuideThis PR transforms smart_retrieval.py into a standalone, testable CLI tool by integrating an argparse-based JSON CLI, OAuth2 Gmail authentication, real Gmail API calls (with simulation fallback), enhanced logging and error handling, refined retrieval/budget logic, and a new unit test scaffold. Sequence Diagram: Email Fetching Logic in _fetch_email_batchsequenceDiagram
participant SGR as SmartGmailRetriever
participant GmailSvc as Gmail API Service
participant Simulator as Simulation Logic
SGR->>SGR: _fetch_email_batch(query, batch_size, page_token)
alt self.gmail_service is available
SGR->>GmailSvc: users().messages().list(q=query, maxResults=batch_size, pageToken=page_token)
GmailSvc-->>SGR: ListResponse (message_refs, nextPageToken, historyId)
loop for each message_ref in ListResponse
SGR->>GmailSvc: users().messages().get(id=message_ref.id, format='metadata')
GmailSvc-->>SGR: MessageDetail
SGR->>SGR: Transform MessageDetail
end
SGR-->>SGR: Returns {messages, nextPageToken, historyId}
else self.gmail_service is NOT available
SGR->>SGR: _simulate_gmail_response(query, batch_size, page_token)
SGR-->>SGR: Returns simulated {messages, nextPageToken, historyId}
end
Sequence Diagram: CLI execute-strategies Command FlowsequenceDiagram
actor User
participant CLI as main_cli
participant SGR as SmartGmailRetriever
User->>CLI: python smart_retrieval.py execute-strategies ...
CLI->>CLI: Parse arguments
CLI->>SGR: __init__(checkpoint_db_path)
alt SGR.gmail_service is None
CLI-->>User: JSON error (Gmail auth failed)
else SGR.gmail_service is available
CLI->>SGR: execute_smart_retrieval(strategies, max_api_calls, time_budget)
loop For each strategy (respecting budget)
SGR->>SGR: _load_checkpoint(strategy.name)
SGR->>SGR: get_incremental_query(strategy, checkpoint)
SGR->>SGR: _execute_strategy_retrieval(strategy, query, checkpoint, remaining_calls)
SGR->>SGR: _save_checkpoint(new_checkpoint)
end
SGR->>SGR: _store_daily_stats(results)
SGR-->>CLI: Returns execution results
CLI-->>User: JSON output (results)
end
Class Diagram: Changes to SmartGmailRetriever and Related TypesclassDiagram
class SmartGmailRetriever {
+checkpoint_db_path: str
+logger: logging.Logger
+gmail_service: Optional~Resource~
+api_limits: Dict
+sync_stats: Dict
+error_stats: Dict
+__init__(checkpoint_db_path: str)
#_init_checkpoint_db()
#_load_credentials() : Optional~Credentials~
#_store_credentials(creds: Credentials)
#_authenticate() : Optional~Credentials~
+get_optimized_retrieval_strategies() : List~RetrievalStrategy~
+get_incremental_query(strategy: RetrievalStrategy, checkpoint: Optional~SyncCheckpoint~) : str
+execute_smart_retrieval(strategies: Optional~List~, max_api_calls: int, time_budget_minutes: int) : Dict
#_execute_strategy_retrieval(strategy: RetrievalStrategy, query: str, checkpoint: Optional~SyncCheckpoint~, remaining_api_calls: int) : Dict
#_fetch_email_batch(query: str, batch_size: int, page_token: Optional~str~) : Dict
#_simulate_gmail_response(query: str, batch_size: int, page_token: Optional~str~) : Dict
#_load_checkpoint(strategy_name: str) : Optional~SyncCheckpoint~
#_save_checkpoint(checkpoint: SyncCheckpoint)
#_store_daily_stats(results: Dict)
+get_retrieval_analytics(days: int) : Dict
+optimize_strategies_based_on_performance() : List~RetrievalStrategy~
}
class Credentials {
<<google.oauth2.credentials>>
+valid: bool
+expired: bool
+refresh_token: str
+refresh(request: Request)
+to_json() : str
+from_authorized_user_file(filename: str, scopes: List[str]) : Credentials
}
class RetrievalStrategy {
<<Dataclass>>
+name: str
+query_filter: str
+priority: int
+frequency: str
+max_emails_per_run: int
+batch_size: int
+include_folders: List[str]
+exclude_folders: List[str]
}
class SyncCheckpoint {
<<Dataclass>>
+strategy_name: str
+last_sync_date: datetime
+last_history_id: str
+processed_count: int
+next_page_token: Optional[str]
+errors_count: int
}
SmartGmailRetriever --> Credentials : uses
SmartGmailRetriever --> RetrievalStrategy : uses
SmartGmailRetriever --> SyncCheckpoint : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Caution Review failedThe pull request is closed. WalkthroughThe changes introduce OAuth2-based Gmail API authentication and real email data fetching in the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI
participant SmartGmailRetriever
participant GmailAPI
User->>CLI: Run command (e.g., execute retrieval)
CLI->>SmartGmailRetriever: Initialize and authenticate
SmartGmailRetriever->>GmailAPI: Authenticate via OAuth2
GmailAPI-->>SmartGmailRetriever: Return credentials/service
SmartGmailRetriever->>GmailAPI: Fetch email batch (with query)
GmailAPI-->>SmartGmailRetriever: Return email data
SmartGmailRetriever->>CLI: Return results/logs
CLI->>User: Display output (JSON, logs, errors)
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Hey @MasumRab - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `server/python_nlp/smart_retrieval.py:26` </location>
<code_context>
+from googleapiclient.discovery import build
+from googleapiclient.errors import HttpError
+
+load_dotenv()
+
+# Define constants for authentication
</code_context>
<issue_to_address>
Avoid loading .env at import time
Move load_dotenv() to the CLI entrypoint or main function to prevent side effects and improve testability.
Suggested implementation:
```python
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Define constants for authentication
```
You must move the `load_dotenv()` call to the CLI entrypoint or the main function of your application (e.g., under `if __name__ == "__main__":` or in your main CLI handler). This will ensure environment variables are loaded only when the script is run directly, not on import.
</issue_to_address>
### Comment 2
<location> `server/python_nlp/smart_retrieval.py:421` </location>
<code_context>
exclude_filters = " AND ".join([f"-in:{folder.lower()}" for folder in strategy.exclude_folders])
base_query = f"{base_query} {exclude_filters}"
+ self.logger.info(f"Generated incremental query for strategy '{strategy.name}': {base_query}")
return base_query
</code_context>
<issue_to_address>
Use DEBUG level for verbose query logs
Consider changing this log statement to DEBUG to prevent cluttering INFO logs and reduce the risk of exposing sensitive information.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
self.logger.info(f"Generated incremental query for strategy '{strategy.name}': {base_query}")
=======
self.logger.debug(f"Generated incremental query for strategy '{strategy.name}': {base_query}")
>>>>>>> REPLACE
</suggested_fix>
### Comment 3
<location> `server/python_nlp/smart_retrieval.py:701` </location>
<code_context>
+ sys.exit(1)
+
if __name__ == "__main__":
- asyncio.run(main())
\ No newline at end of file
+ # Configure logging. Default to INFO, but allow override via environment variable for debugging.
</code_context>
<issue_to_address>
Add CLI invocation in __main__
Please add a call to execute the CLI, such as `asyncio.run(main_cli())`, so the script performs the intended retrieval logic when run.
</issue_to_address>
### Comment 4
<location> `server/python_nlp/smart_retrieval.py:595` </location>
<code_context>
+ # For now, assume 1 "conceptual" API call per _fetch_email_batch call for loop control.
+ # The actual API calls will be summed up later.
+ # This means api_calls_for_strategy here is more like "batches_processed_for_strategy".
+ # TODO: Refine API call counting here based on _fetch_email_batch's true cost.
+ # For now, let's count it as 1 + number of messages fetched (1 for list, 1 for each get).
+ # This is a placeholder, actual counting is complex and depends on _fetch_email_batch internals.
</code_context>
<issue_to_address>
Implement accurate API call counting
Use the actual number of list and get calls from `_fetch_email_batch` to ensure accurate API quota tracking.
Suggested implementation:
```python
# _fetch_email_batch makes 1 call for list, and N calls for get.
# We now use the actual number of API calls made in the batch, as reported by _fetch_email_batch.
# _fetch_email_batch must return an 'api_calls_in_batch' field: 1 (list) + N (get) calls.
api_calls_for_strategy += batch_result.get('api_calls_in_batch', 1) # Default to 1 if not present
if batch_result.get('error'):
```
You must ensure that `_fetch_email_batch` returns an `api_calls_in_batch` field in its result dictionary, accurately reflecting the number of API calls made (1 for list + N for get). If it does not, update `_fetch_email_batch` accordingly.
</issue_to_address>
### Comment 5
<location> `server/python_nlp/smart_retrieval.py:942` </location>
<code_context>
+ strategy.batch_size = max(10, strategy.batch_size // 2)
+ self.logger.info(f"Strategy '{strategy.name}': High error rate ({error_rate:.2f}%). Reducing batch size from {original_batch_size} to {strategy.batch_size}.")
+ elif error_rate < 2 and perf.get('sync_count', 0) > 5 : # Low error rate and sufficient data
+ strategy.batch_size = min(self.api_limits.get('batch_size_limit', 100), int(strategy.batch_size * 1.2)) # Ensure not over API limit
+ if strategy.batch_size != original_batch_size:
+ self.logger.info(f"Strategy '{strategy.name}': Low error rate ({error_rate:.2f}%). Increasing batch size from {original_batch_size} to {strategy.batch_size}.")
</code_context>
<issue_to_address>
Define `batch_size_limit` in `api_limits`
`api_limits` should explicitly define `batch_size_limit` or derive it from the Gmail API to avoid relying on a hardcoded default value.
Suggested implementation:
```python
elif error_rate < 2 and perf.get('sync_count', 0) > 5 : # Low error rate and sufficient data
strategy.batch_size = min(self.api_limits['batch_size_limit'], int(strategy.batch_size * 1.2)) # Ensure not over API limit
if strategy.batch_size != original_batch_size:
self.logger.info(f"Strategy '{strategy.name}': Low error rate ({error_rate:.2f}%). Increasing batch size from {original_batch_size} to {strategy.batch_size}.")
```
You must ensure that `self.api_limits['batch_size_limit']` is always set before this code runs.
- If `self.api_limits` is set in the class `__init__` or a setup method, add something like:
```python
self.api_limits['batch_size_limit'] = <value_from_gmail_api_or_config>
```
- If you can derive it from the Gmail API, do so and assign it to `self.api_limits['batch_size_limit']` during initialization.
- Remove any fallback to a hardcoded default in the `.get()` call as shown above.
</issue_to_address>
### Comment 6
<location> `server/python_nlp/smart_retrieval.py:158` </location>
<code_context>
+ def _store_credentials(self, creds: Credentials):
+ """Stores credentials to TOKEN_JSON_PATH."""
+ try:
+ with open(TOKEN_JSON_PATH, 'w') as token_file:
+ token_file.write(creds.to_json())
+ self.logger.info(f"Stored credentials to {TOKEN_JSON_PATH}")
</code_context>
<issue_to_address>
Restrict credentials file permissions
Set file permissions to 0o600 after writing the token file to prevent unauthorized access.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| from googleapiclient.discovery import build | ||
| from googleapiclient.errors import HttpError | ||
|
|
||
| load_dotenv() |
There was a problem hiding this comment.
suggestion: Avoid loading .env at import time
Move load_dotenv() to the CLI entrypoint or main function to prevent side effects and improve testability.
Suggested implementation:
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Define constants for authenticationYou must move the load_dotenv() call to the CLI entrypoint or the main function of your application (e.g., under if __name__ == "__main__": or in your main CLI handler). This will ensure environment variables are loaded only when the script is run directly, not on import.
| exclude_filters = " AND ".join([f"-in:{folder.lower()}" for folder in strategy.exclude_folders]) | ||
| base_query = f"{base_query} {exclude_filters}" | ||
|
|
||
| self.logger.info(f"Generated incremental query for strategy '{strategy.name}': {base_query}") |
There was a problem hiding this comment.
suggestion: Use DEBUG level for verbose query logs
Consider changing this log statement to DEBUG to prevent cluttering INFO logs and reduce the risk of exposing sensitive information.
| self.logger.info(f"Generated incremental query for strategy '{strategy.name}': {base_query}") | |
| self.logger.debug(f"Generated incremental query for strategy '{strategy.name}': {base_query}") |
| sys.exit(1) | ||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) No newline at end of file |
There was a problem hiding this comment.
suggestion (bug_risk): Add CLI invocation in main
Please add a call to execute the CLI, such as asyncio.run(main_cli()), so the script performs the intended retrieval logic when run.
| # For now, assume 1 "conceptual" API call per _fetch_email_batch call for loop control. | ||
| # The actual API calls will be summed up later. | ||
| # This means api_calls_for_strategy here is more like "batches_processed_for_strategy". | ||
| # TODO: Refine API call counting here based on _fetch_email_batch's true cost. |
There was a problem hiding this comment.
suggestion: Implement accurate API call counting
Use the actual number of list and get calls from _fetch_email_batch to ensure accurate API quota tracking.
Suggested implementation:
# _fetch_email_batch makes 1 call for list, and N calls for get.
# We now use the actual number of API calls made in the batch, as reported by _fetch_email_batch.
# _fetch_email_batch must return an 'api_calls_in_batch' field: 1 (list) + N (get) calls.
api_calls_for_strategy += batch_result.get('api_calls_in_batch', 1) # Default to 1 if not present
if batch_result.get('error'):You must ensure that _fetch_email_batch returns an api_calls_in_batch field in its result dictionary, accurately reflecting the number of API calls made (1 for list + N for get). If it does not, update _fetch_email_batch accordingly.
| strategy.batch_size = max(10, strategy.batch_size // 2) | ||
| self.logger.info(f"Strategy '{strategy.name}': High error rate ({error_rate:.2f}%). Reducing batch size from {original_batch_size} to {strategy.batch_size}.") | ||
| elif error_rate < 2 and perf.get('sync_count', 0) > 5 : # Low error rate and sufficient data | ||
| strategy.batch_size = min(self.api_limits.get('batch_size_limit', 100), int(strategy.batch_size * 1.2)) # Ensure not over API limit |
There was a problem hiding this comment.
suggestion: Define batch_size_limit in api_limits
api_limits should explicitly define batch_size_limit or derive it from the Gmail API to avoid relying on a hardcoded default value.
Suggested implementation:
elif error_rate < 2 and perf.get('sync_count', 0) > 5 : # Low error rate and sufficient data
strategy.batch_size = min(self.api_limits['batch_size_limit'], int(strategy.batch_size * 1.2)) # Ensure not over API limit
if strategy.batch_size != original_batch_size:
self.logger.info(f"Strategy '{strategy.name}': Low error rate ({error_rate:.2f}%). Increasing batch size from {original_batch_size} to {strategy.batch_size}.")You must ensure that self.api_limits['batch_size_limit'] is always set before this code runs.
- If
self.api_limitsis set in the class__init__or a setup method, add something like:self.api_limits['batch_size_limit'] = <value_from_gmail_api_or_config>
- If you can derive it from the Gmail API, do so and assign it to
self.api_limits['batch_size_limit']during initialization. - Remove any fallback to a hardcoded default in the
.get()call as shown above.
| def _store_credentials(self, creds: Credentials): | ||
| """Stores credentials to TOKEN_JSON_PATH.""" | ||
| try: | ||
| with open(TOKEN_JSON_PATH, 'w') as token_file: |
There was a problem hiding this comment.
🚨 suggestion (security): Restrict credentials file permissions
Set file permissions to 0o600 after writing the token file to prevent unauthorized access.
| self.logger.info(f"Generated incremental query for strategy '{strategy.name}': {base_query}") | ||
| return base_query | ||
|
|
||
| async def execute_smart_retrieval( |
There was a problem hiding this comment.
issue (code-quality): Low code quality found in SmartGmailRetriever.execute_smart_retrieval - 20% (low-code-quality)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
| @@ -581,6 +852,7 @@ def _store_daily_stats(self, results: Dict[str, Any]): | |||
|
|
|||
| def get_retrieval_analytics(self, days: int = 30) -> Dict[str, Any]: | |||
| """Get retrieval analytics for the past N days""" | |||
There was a problem hiding this comment.
issue (code-quality): Convert for loop into list comprehension [×2] (list-comprehension)
| @@ -643,37 +915,49 @@ def get_retrieval_analytics(self, days: int = 30) -> Dict[str, Any]: | |||
|
|
|||
| def optimize_strategies_based_on_performance(self) -> List[RetrievalStrategy]: | |||
There was a problem hiding this comment.
issue (code-quality): We've found these issues:
- Use named expression to simplify assignment and conditional (
use-named-expression) - Low code quality found in SmartGmailRetriever.optimize_strategies_based_on_performance - 21% (
low-code-quality)
Explanation
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
I've made some enhancements to `smart_retrieval.py` to improve its st…
I've made some enhancements to `smart_retrieval.py` to improve its st…
…andalone execution and testing capabilities.
Here's a summary of the changes:
Command-Line Interface: I've added a command-line interface with the following commands:
list-strategies: This will show you the available retrieval strategies.execute-strategies: This allows you to run specific (or all) strategies with budget controls.get-retrieval-analytics: You can use this to fetch and display retrieval analytics.All output from the command-line interface is in JSON format for both successful operations and any errors.
Gmail Authentication: I've integrated OAuth 2.0 authentication, which enables the script to make authenticated calls to the Gmail API. It uses
token.jsonandcredentials.json(via theGMAIL_CREDENTIALS_JSONenvironment variable).Real Gmail API Calls: I've updated the email fetching logic in
_fetch_email_batchto use actual Gmail API calls (messages.listandmessages.get) instead of simulated ones. The simulation feature is still available as a fallback if authentication doesn't succeed.Refinements for Standalone Execution:
checkpoint_db_path.Unit Tests:
tests/test_smart_retrieval.pywith a structure for command-line interface and logic tests.These updates make
smart_retrieval.pya more functional and robust tool, which can be called by other services likeGmailAIService.Summary by Sourcery
Enable standalone execution of smart_retrieval by adding a command-line interface, integrating Gmail OAuth2 authentication with real API calls (and a simulation fallback), enhancing logging and checkpoint handling, and introducing unit tests for CLI and retrieval logic.
New Features:
Enhancements:
Tests:
Summary by CodeRabbit