from __future__ import annotations from dataclasses import dataclass from email.utils import parseaddr import logging from typing import Any from app.llm_classifier import LLMEmailClassifier METADATA_HEADERS = [ "From", "Subject", "List-Unsubscribe", "Precedence", ] AD_SUBJECT_KEYWORDS = { "discount", "offer", "sale", "promo", "newsletter", "deal", "save", "coupon", "special offer", "limited time", } AD_SENDER_HINTS = { "newsletter", "marketing", "offers", "promotions", "deals", "no-reply", "noreply", } logger = logging.getLogger("personal-agent.gmail") @dataclass(frozen=True) class ScanResult: scanned: int linkedin: int advertising: int skipped: int failed: int class GmailTriageAgent: def __init__( self, gmail_service: Any, query: str, *, classifier: LLMEmailClassifier | None = None, fallback_to_rules: bool = True, ) -> None: self.gmail_service = gmail_service self.query = query self.classifier = classifier self.fallback_to_rules = fallback_to_rules def ensure_labels(self) -> dict[str, str]: labels_response = ( self.gmail_service.users().labels().list(userId="me").execute() ) labels = labels_response.get("labels", []) label_by_name = {label["name"]: label["id"] for label in labels} for required_name in ("LinkedIn", "Advertising", "AgentProcessed"): if required_name not in label_by_name: created = ( self.gmail_service.users() .labels() .create( userId="me", body={ "name": required_name, "labelListVisibility": "labelShow", "messageListVisibility": "show", }, ) .execute() ) label_by_name[required_name] = created["id"] return label_by_name def scan_and_route_messages(self, max_results: int = 100) -> ScanResult: label_by_name = self.ensure_labels() inbox_messages = ( self.gmail_service.users() .messages() .list(userId="me", q=self.query, maxResults=max_results) .execute() .get("messages", []) ) linkedin = 0 advertising = 0 skipped = 0 failed = 0 for message in inbox_messages: outcome = self._route_message(message["id"], label_by_name) if outcome == "linkedin": linkedin += 1 elif outcome == "advertising": advertising += 1 elif outcome == "skipped": skipped += 1 else: failed += 1 return ScanResult( scanned=len(inbox_messages), linkedin=linkedin, advertising=advertising, skipped=skipped, failed=failed, ) def _route_message(self, message_id: str, label_by_name: dict[str, str]) -> str: try: message = ( self.gmail_service.users() .messages() .get( userId="me", id=message_id, format="metadata", metadataHeaders=METADATA_HEADERS, ) .execute() ) headers = { h["name"].lower(): h["value"] for h in message.get("payload", {}).get("headers", []) } label_ids = set(message.get("labelIds", [])) sender = headers.get("from", "") subject = headers.get("subject", "") snippet = message.get("snippet", "") list_unsubscribe = headers.get("list-unsubscribe", "") precedence = headers.get("precedence", "") label = self._classify_email( message_id=message_id, sender=sender, subject=subject, snippet=snippet, list_unsubscribe=list_unsubscribe, precedence=precedence, message_label_ids=label_ids, ) add_labels = [label_by_name["AgentProcessed"]] remove_labels = [] if label == "LINKEDIN": add_labels.insert(0, label_by_name["LinkedIn"]) remove_labels.append("INBOX") outcome = "linkedin" elif label == "ADVERTISING": add_labels.insert(0, label_by_name["Advertising"]) remove_labels.append("INBOX") outcome = "advertising" else: outcome = "skipped" ( self.gmail_service.users() .messages() .modify( userId="me", id=message_id, body={ "addLabelIds": add_labels, "removeLabelIds": remove_labels, }, ) .execute() ) return outcome except Exception: logger.exception("Failed to route message %s", message_id) return "failed" def _classify_email( self, *, message_id: str, sender: str, subject: str, snippet: str, list_unsubscribe: str, precedence: str, message_label_ids: set[str], ) -> str: if self.classifier: try: llm_result = self.classifier.classify( sender=sender, subject=subject, snippet=snippet, list_unsubscribe=list_unsubscribe, precedence=precedence, message_label_ids=message_label_ids, ) logger.info( "Message %s classified by LLM as %s (confidence=%.2f)", message_id, llm_result.label, llm_result.confidence, ) return llm_result.label except Exception: logger.exception("LLM classification failed for %s", message_id) if not self.fallback_to_rules: return "OTHER" if self.fallback_to_rules: if self._is_linkedin_email(sender=sender, subject=subject): return "LINKEDIN" if self._is_advertising_email( sender=sender, subject=subject, list_unsubscribe=list_unsubscribe, precedence=precedence, message_label_ids=message_label_ids, ): return "ADVERTISING" return "OTHER" def _is_linkedin_email(self, sender: str, subject: str) -> bool: sender_lower = sender.lower() subject_lower = subject.lower() if "linkedin" in sender_lower or "linkedin" in subject_lower: return True parsed_address = parseaddr(sender)[1].lower() return parsed_address.endswith("@linkedin.com") def _is_advertising_email( self, sender: str, subject: str, list_unsubscribe: str, precedence: str, message_label_ids: set[str], ) -> bool: sender_lower = sender.lower() subject_lower = subject.lower() precedence_lower = precedence.lower() if "CATEGORY_PROMOTIONS" in message_label_ids: return True if list_unsubscribe.strip(): return True if precedence_lower in {"bulk", "list", "junk"}: return True if any(keyword in subject_lower for keyword in AD_SUBJECT_KEYWORDS): return True return any(hint in sender_lower for hint in AD_SENDER_HINTS)