Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 83 additions & 29 deletions server/python_backend/ai_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, data: Dict[str, Any]):
self.suggested_labels = data.get('suggested_labels', [])
self.risk_flags = data.get('risk_flags', [])
self.category_id = data.get('category_id')
self.action_items = data.get('action_items', []) # Added action_items

def to_dict(self) -> Dict[str, Any]:
return {
Expand All @@ -43,7 +44,8 @@ def to_dict(self) -> Dict[str, Any]:
'reasoning': self.reasoning,
'suggested_labels': self.suggested_labels,
'risk_flags': self.risk_flags,
'category_id': self.category_id
'category_id': self.category_id,
'action_items': self.action_items # Added action_items
}

class AdvancedAIEngine:
Expand All @@ -66,7 +68,8 @@ async def initialize(self):
logger.error(f"AI Engine initialization failed: {e}")

async def analyze_email(self, subject: str, content: str) -> AIAnalysisResult:
"""Analyze email content with AI"""
"""Analyze email content with AI by calling the NLPEngine script."""
logger.info(f"Initiating AI analysis for email with subject: '{subject[:50]}...'")
try:
cmd = [
sys.executable,
Expand All @@ -77,17 +80,37 @@ async def analyze_email(self, subject: str, content: str) -> AIAnalysisResult:
'--output-format', 'json'
]

result = await _execute_async_command(cmd, cwd=self.python_nlp_path)
logger.debug(f"Executing NLPEngine script with command: {' '.join(cmd)}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): Sensitive content in debug logs

Redact or truncate user-provided parameters in logs to prevent exposure of sensitive data.

Suggested change
logger.debug(f"Executing NLPEngine script with command: {' '.join(cmd)}")
# Redact or truncate user-provided parameters in logs to prevent exposure of sensitive data
redacted_cmd = [
arg if i == 0 else ("<REDACTED>" if arg not in ('--output-format', 'json') else arg)
for i, arg in enumerate(cmd)
]
logger.debug(f"Executing NLPEngine script with command: {' '.join(redacted_cmd)}")

result_json_str = await _execute_async_command(cmd, cwd=self.python_nlp_path)

if 'error' in result:
# Fallback to basic analysis
return self._get_fallback_analysis(subject, content)
if not result_json_str:
logger.error("NLPEngine script returned empty output.")
return self._get_fallback_analysis(subject, content, "empty script output")

try:
result = json.loads(result_json_str)
except json.JSONDecodeError as je:
logger.error(f"Failed to parse JSON output from NLPEngine: {je}. Output: {result_json_str[:200]}")
return self._get_fallback_analysis(subject, content, "invalid JSON output")

if 'error' in result or result.get('status') == 'error': # Assuming nlp_engine might return a status field
error_message = result.get('error', 'Unknown error from NLPEngine script')
logger.error(f"NLPEngine script returned an error: {error_message}")
# Fallback to basic analysis, passing the error message for context
return self._get_fallback_analysis(subject, content, error_message)

logger.info(f"Successfully received analysis from NLPEngine. Method used: {result.get('validation', {}).get('method', 'unknown')}")
return AIAnalysisResult(result)

except FileNotFoundError:
logger.critical(f"NLPEngine script not found at {self.nlp_service_script}. Ensure the path is correct.")
return self._get_fallback_analysis(subject, content, "NLP script not found")
except asyncio.TimeoutError:
logger.error("NLPEngine script execution timed out.")
return self._get_fallback_analysis(subject, content, "script execution timeout")
except Exception as e:
logger.error(f"AI analysis failed: {e}")
return self._get_fallback_analysis(subject, content)
logger.error(f"An unexpected error occurred during AI analysis: {e}", exc_info=True)
return self._get_fallback_analysis(subject, content, str(e))

async def train_models(self, training_emails: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Train AI models with email data"""
Expand Down Expand Up @@ -176,26 +199,57 @@ async def cleanup(self):
except Exception as e:
logger.error(f"AI Engine cleanup failed: {e}")

def _get_fallback_analysis(self, subject: str, content: str) -> AIAnalysisResult:
"""Fallback analysis when AI service is unavailable, using NLPEngine's simple fallback."""
def _get_fallback_analysis(self, subject: str, content: str, error_context: Optional[str] = None) -> AIAnalysisResult:
"""
Provides a basic fallback analysis if the primary NLPEngine script fails or returns an error.
This uses the in-memory FallbackNLPEngine instance.
"""
reason = "Fallback analysis due to AI service error"
if error_context:
reason += f": {error_context}"

# Use the _get_simple_fallback_analysis from the NLPEngine instance
# This method already provides: topic, sentiment, intent (default), urgency,
# confidence (default), categories, keywords (empty), reasoning.
fallback_data = self.fallback_nlp_engine._get_simple_fallback_analysis(subject, content)
logger.warning(f"{reason}. Subject: {subject[:50]}...")

# Adapt the result to AIAnalysisResult structure.
# Most fields should align or have sensible defaults from _get_simple_fallback_analysis.
return AIAnalysisResult({
'topic': fallback_data.get('topic', 'general_communication'),
'sentiment': fallback_data.get('sentiment', 'neutral'),
'intent': fallback_data.get('intent', 'information_sharing'), # NLPEngine fallback gives 'informational'
'urgency': fallback_data.get('urgency', 'low'),
'confidence': fallback_data.get('confidence', 0.3), # NLPEngine fallback gives 0.6, we can override if needed
'categories': fallback_data.get('categories', ['general']),
'keywords': fallback_data.get('keywords', []), # NLPEngine fallback gives empty list
'reasoning': fallback_data.get('reasoning', 'Fallback analysis - AI service unavailable'),
'suggested_labels': fallback_data.get('suggested_labels', ['general']),
'risk_flags': fallback_data.get('risk_flags', []),
'category_id': None # Not provided by this simple fallback
})
try:
# Use the _get_simple_fallback_analysis from the FallbackNLPEngine instance
# This method provides: topic, sentiment, intent (default), urgency,
# confidence (default), categories, keywords (empty), reasoning.
fallback_data = self.fallback_nlp_engine._get_simple_fallback_analysis(subject, content)

# Override reasoning if a specific error context was provided
if error_context:
fallback_data['reasoning'] = reason

# Adapt the result to AIAnalysisResult structure.
# Most fields should align or have sensible defaults from _get_simple_fallback_analysis.
return AIAnalysisResult({
'topic': fallback_data.get('topic', 'general_communication'),
'sentiment': fallback_data.get('sentiment', 'neutral'),
'intent': fallback_data.get('intent', 'informational'),
'urgency': fallback_data.get('urgency', 'low'),
'confidence': fallback_data.get('confidence', 0.3),
'categories': fallback_data.get('categories', ['general']),
'keywords': fallback_data.get('keywords', []),
'reasoning': fallback_data.get('reasoning', 'Fallback analysis - AI service unavailable'),
'suggested_labels': fallback_data.get('suggested_labels', ['general']),
'risk_flags': fallback_data.get('risk_flags', ['ai_analysis_failed']),
'category_id': None,
'action_items': [] # Ensure action_items is in the fallback
})
except Exception as e:
logger.error(f"Error generating fallback analysis itself: {e}", exc_info=True)
# If even the fallback engine fails, return a very minimal structure
return AIAnalysisResult({
'topic': 'unknown',
'sentiment': 'neutral',
'intent': 'unknown',
'urgency': 'low',
'confidence': 0.1,
'categories': ['general'],
'keywords': [],
'reasoning': f'Critical failure in AI analysis and fallback: {e}',
'suggested_labels': ['general'],
'risk_flags': ['ai_analysis_critically_failed'],
'category_id': None,
'action_items': [] # Ensure action_items is in the critical fallback
})
54 changes: 54 additions & 0 deletions server/python_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@
allow_headers=["*"],
)

# Pydantic models for Action Item Extraction
class ActionExtractionRequest(BaseModel):
subject: Optional[str] = None
content: str

class ActionItem(BaseModel):
action_phrase: str
verb: Optional[str] = None
object: Optional[str] = None
raw_due_date_text: Optional[str] = None
context: str

# Set up metrics if in production or staging environment
if os.getenv("NODE_ENV") in ["production", "staging"]:
from .metrics import setup_metrics
Expand Down Expand Up @@ -674,6 +686,48 @@ async def prune_filters(request: Request):
)
raise HTTPException(status_code=500, detail="Failed to prune filters")

# Action Item Extraction Endpoint
@app.post("/api/actions/extract-from-text", response_model=List[ActionItem])
async def extract_actions_from_text(
fastapi_req: Request, # Renamed to avoid conflict with Pydantic request model
request_model: ActionExtractionRequest
):
"""Extract action items from provided text (subject and content)"""
try:
# Ensure AI engine is initialized (if it has an async init method)
# This might be better handled as a FastAPI dependency or at startup.
# For now, let's assume ai_engine is ready or handles its state.
# await ai_engine.initialize() # If needed and not already handled

logger.info(f"Received action extraction request for subject: '{request_model.subject[:50] if request_model.subject else 'N/A'}'")

ai_analysis_result = await ai_engine.analyze_email(
subject=request_model.subject or "", # Pass empty string if subject is None
content=request_model.content
)

# The AIAnalysisResult object should have an 'action_items' attribute
action_items_data = ai_analysis_result.action_items

# Convert the list of dicts to a list of ActionItem Pydantic models
# This ensures the response conforms to the defined schema.
response_action_items = [ActionItem(**item) for item in action_items_data]

logger.info(f"Extracted {len(response_action_items)} action items.")
return response_action_items

except Exception as e:
logger.error(
json.dumps({
"message": "Unhandled error in extract_actions_from_text",
"endpoint": str(fastapi_req.url), # Use fastapi_req here
"error_type": type(e).__name__,
"error_detail": str(e),
})
)
# Consider specific error codes for different failure types if necessary
raise HTTPException(status_code=500, detail=f"Failed to extract action items: {str(e)}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Explicitly raise from a previous error (raise-from-previous-error)

Suggested change
raise HTTPException(status_code=500, detail=f"Failed to extract action items: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to extract action items: {str(e)}"
) from e


@app.get("/health")
async def health_check(request: Request):
"""System health check"""
Expand Down
171 changes: 171 additions & 0 deletions server/python_nlp/action_item_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import re
import logging
from typing import List, Dict, Any, Optional

Comment on lines +1 to +4
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing Tuple import => runtime NameError

_extract_verb_object_with_nltk declares Tuple[...] in the return annotation but Tuple is not imported. The file will raise at import time.

-from typing import List, Dict, Any, Optional
+from typing import List, Dict, Any, Optional, Tuple
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import re
import logging
from typing import List, Dict, Any, Optional
import re
import logging
-from typing import List, Dict, Any, Optional
+from typing import List, Dict, Any, Optional, Tuple
🤖 Prompt for AI Agents
In server/python_nlp/action_item_extractor.py at the top import section (lines 1
to 4), the Tuple type is used in the return annotation of the function
_extract_verb_object_with_nltk but Tuple is not imported from typing. Add Tuple
to the import statement from typing alongside List, Dict, Any, and Optional to
prevent a runtime NameError.

# Attempt to import NLTK for POS tagging
try:
import nltk
# Ensure necessary NLTK data is available, if not, download it.
# This is more for a local setup; in a container, it should be pre-installed.
try:
nltk.data.find('taggers/averaged_perceptron_tagger')
except nltk.downloader.DownloadError:
nltk.download('averaged_perceptron_tagger', quiet=True)
try:
nltk.data.find('tokenizers/punkt')
except nltk.downloader.DownloadError:
nltk.download('punkt', quiet=True)
HAS_NLTK = True
Comment on lines +8 to +18
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Auto-downloading NLTK resources at import time is risky

Triggering network downloads inside a library import can hang or fail in production containers without internet access and slows cold-starts.

Move the download logic behind an explicit helper or setup script instead of executing automatically.

🤖 Prompt for AI Agents
In server/python_nlp/action_item_extractor.py around lines 8 to 18, the current
code automatically downloads NLTK resources during import, which can cause hangs
or failures in production without internet access. Refactor by removing the
automatic download logic from the import time and instead create a separate
explicit helper function or setup script that checks for and downloads the
required NLTK resources. This helper should be called manually during setup or
initialization, not during module import.

except ImportError:
HAS_NLTK = False

logger = logging.getLogger(__name__)

class ActionItemExtractor:
"""
Extracts potential action items from text using rule-based logic
and optional NLTK POS tagging.
"""

def __init__(self):
# Regex for keywords indicating action items
self.action_keywords_regex = re.compile(
r'\b(please|task:|action:|need to|required to|must|should|can you|could you|will you)\b',
re.IGNORECASE
)
# Regex for simple due date patterns
# This is a basic version and can be expanded significantly
self.due_date_regex = re.compile(
r'\b(by (next )?(monday|tuesday|wednesday|thursday|friday|saturday|sunday|tomorrow|end of day|eod)|'
r'on \d{1,2}(st|nd|rd|th)? (jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)(\w*)?(\s\d{4})?|'
r'in \d+ (days?|weeks?|months?)|'
r'next (week|month|year))\b',
re.IGNORECASE
)
Comment on lines +32 to +44
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Keyword pattern too narrow – test suite fails on “Action required:”

action_keywords_regex lacks action required: and the plain task form; unit-tests (tests/test_action_item_extractor.py) expect those to be recognised.

-            r'\b(please|task:|action:|need to|required to|must|should|can you|could you|will you)\b',
+            r'\b(please|task:?|action:?|action required:|need to|required to|must|should|can you|could you|will you)\b',

(Added optional colon for task / action, plus explicit action required:).
Update tests accordingly if you choose a different wording.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.action_keywords_regex = re.compile(
r'\b(please|task:|action:|need to|required to|must|should|can you|could you|will you)\b',
re.IGNORECASE
)
# Regex for simple due date patterns
# This is a basic version and can be expanded significantly
self.due_date_regex = re.compile(
r'\b(by (next )?(monday|tuesday|wednesday|thursday|friday|saturday|sunday|tomorrow|end of day|eod)|'
r'on \d{1,2}(st|nd|rd|th)? (jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)(\w*)?(\s\d{4})?|'
r'in \d+ (days?|weeks?|months?)|'
r'next (week|month|year))\b',
re.IGNORECASE
)
self.action_keywords_regex = re.compile(
r'\b(please|task:?|action:?|action required:|need to|required to|must|should|can you|could you|will you)\b',
re.IGNORECASE
)
# Regex for simple due date patterns
# This is a basic version and can be expanded significantly
self.due_date_regex = re.compile(
r'\b(by (next )?(monday|tuesday|wednesday|thursday|friday|saturday|sunday|tomorrow|end of day|eod)|'
r'on \d{1,2}(st|nd|rd|th)? (jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)(\w*)?(\s\d{4})?|'
r'in \d+ (days?|weeks?|months?)|'
r'next (week|month|year))\b',
re.IGNORECASE
)
🤖 Prompt for AI Agents
In server/python_nlp/action_item_extractor.py around lines 32 to 44, the
action_keywords_regex pattern is too narrow and misses matching "action
required:" and the plain "task" keyword without a colon, causing test failures.
Update the regex to include optional colons after "task" and "action" and
explicitly add "action required:" as a keyword. After updating the regex, ensure
the unit tests in tests/test_action_item_extractor.py are aligned with these
changes.

self.sentence_splitter_regex = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s')

if HAS_NLTK:
logger.info("NLTK found. POS tagging will be available for action item extraction.")
else:
logger.warning("NLTK not found. Action item extraction will rely solely on regex and keyword spotting.")

def _extract_verb_object_with_nltk(self, text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Extracts verb and object from a phrase using NLTK POS tagging.
This is a simplified approach.
"""
if not HAS_NLTK:
return None, None
try:
tokens = nltk.word_tokenize(text)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

tagged_tokens = nltk.pos_tag(tokens)

verb = None
obj = None

# Find first verb
for token, tag in tagged_tokens:
if tag.startswith('VB'): # VB, VBP, VBZ, VBG, VBD, VBN
verb = token
break

# Find first noun or pronoun after the verb as a simple object
if verb:
verb_index = tokens.index(verb)
for i in range(verb_index + 1, len(tagged_tokens)):
token, tag = tagged_tokens[i]
if tag.startswith('NN') or tag.startswith('PRP'): # Noun or Pronoun
obj = token
break
return verb, obj
except Exception as e:
logger.error(f"Error during NLTK POS tagging or verb/object extraction: {e}")
return None, None

def extract_actions(self, text: str) -> List[Dict[str, Any]]:
"""
Extracts action items from the given text.
"""
action_items: List[Dict[str, Any]] = []
if not text or not isinstance(text, str):
return action_items

# Split text into sentences to provide context
# Using a simple regex for sentence splitting, can be improved with NLTK's sent_tokenize
sentences = self.sentence_splitter_regex.split(text)

for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue

match = self.action_keywords_regex.search(sentence)
if match:
Comment on lines +102 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)

Suggested change
match = self.action_keywords_regex.search(sentence)
if match:
if match := self.action_keywords_regex.search(sentence):

action_phrase = sentence[match.start():] # Capture from keyword onwards as a starting point

# Refine action_phrase to be more specific if possible
# For example, stop at the end of the clause or sentence.
# This is a simplification; more advanced parsing would be better.

verb, obj = None, None
if HAS_NLTK:
# Try to get a more specific part of the sentence for verb/object extraction
# This could be the text following the keyword.
potential_action_segment = sentence[match.end():].strip()
verb, obj = self._extract_verb_object_with_nltk(potential_action_segment)

due_date_match = self.due_date_regex.search(action_phrase)
raw_due_date_text = None
if due_date_match:
raw_due_date_text = due_date_match.group(0).strip()
# Optionally, remove due date from action phrase to avoid redundancy
# action_phrase = action_phrase.replace(raw_due_date_text, "").strip()


action_item: Dict[str, Any] = {
'action_phrase': action_phrase.strip(),
'verb': verb,
'object': obj,
'raw_due_date_text': raw_due_date_text,
'context': sentence.strip() # The full sentence as context
}
action_items.append(action_item)
logger.debug(f"Extracted action item: {action_item}")

logger.info(f"Extracted {len(action_items)} potential action items.")
return action_items

if __name__ == '__main__':
# Example Usage
logging.basicConfig(level=logging.DEBUG)
extractor = ActionItemExtractor()

test_text_1 = "Please submit the report by Friday. We also need to review the budget. Can you schedule a meeting?"
test_text_2 = "Action: John to complete the slides. Task: Maria to send out invites by tomorrow. Required to update the JIRA ticket."
test_text_3 = "No actions here, just a general update."
test_text_4 = "Could you please finalize the presentation by next Monday? Also, will you call the vendor?"

print("\n--- Test Text 1 ---")
actions1 = extractor.extract_actions(test_text_1)
for action in actions1:
print(action)

print("\n--- Test Text 2 ---")
actions2 = extractor.extract_actions(test_text_2)
for action in actions2:
print(action)

print("\n--- Test Text 3 ---")
actions3 = extractor.extract_actions(test_text_3)
for action in actions3:
print(action)

print("\n--- Test Text 4 ---")
actions4 = extractor.extract_actions(test_text_4)
for action in actions4:
print(action)

if HAS_NLTK:
print("\nNLTK was used.")
else:
print("\nNLTK was NOT used.")
Loading