diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..7b9a669 Binary files /dev/null and b/.DS_Store differ diff --git a/.env.example b/.env.example index 40bf5f5..2210900 100644 --- a/.env.example +++ b/.env.example @@ -14,7 +14,7 @@ LLM_BASE_URL= LLM_TIMEOUT_SECONDS= LLM_FALLBACK_TO_RULES=false GMAIL_SCAN_INTERVAL_MINUTES=5 -GMAIL_QUERY=in:inbox -label:AgentProcessed newer_than:7d +GMAIL_QUERY=in:inbox is:unread -label:AgentProcessed UNSUBSCRIBE_DIGEST_INTERVAL_MINUTES=1440 UNSUBSCRIBE_QUERY=label:Advertising UNSUBSCRIBE_MAX_RESULTS=500 diff --git a/README.md b/README.md index 07bd2db..57ceb33 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This project runs a small local API service that: -- scans new Gmail inbox messages +- scans unread emails in the root Gmail inbox - classifies emails with **Strands** (`LINKEDIN`, `ADVERTISING`, `OTHER`) - moves LinkedIn emails to a `LinkedIn` label/folder - moves advertising emails to an `Advertising` label/folder @@ -43,7 +43,7 @@ Edit `.env` and set: - `AGENT_API_KEY` to a strong secret for agent-to-agent calls - `STRANDS_OPENAI_API_KEY` and optional `STRANDS_MODEL_ID` / `STRANDS_OPENAI_BASE_URL` - optional unsubscribe digest settings (`UNSUBSCRIBE_*`) -- optional scan frequency and Gmail query +- optional scan frequency and additional Gmail filters (`GMAIL_QUERY`) ## 4) Run @@ -122,6 +122,8 @@ curl -X POST "http://127.0.0.1:8000/unsubscribe/auto-run?max_results=500" \ ## Classification behavior +- Scan scope is always forced to `in:inbox is:unread` (root inbox + unread). +- `GMAIL_QUERY` is treated as additional filters (for example `-label:AgentProcessed`). - Strands classification is used for each email (`LINKEDIN`, `ADVERTISING`, `OTHER`). - LinkedIn has priority over advertising inside the classifier prompt. - Set `LLM_FALLBACK_TO_RULES=true` only if you want rules-based backup when LLM calls fail. diff --git a/app/config.py b/app/config.py index 30e6a79..8edd274 100644 --- a/app/config.py +++ b/app/config.py @@ -49,7 +49,7 @@ def get_settings() -> Settings: google_token_file=os.getenv("GOOGLE_TOKEN_FILE", "token.json"), gmail_scan_interval_minutes=int(os.getenv("GMAIL_SCAN_INTERVAL_MINUTES", "5")), gmail_query=os.getenv( - "GMAIL_QUERY", "in:inbox -label:AgentProcessed newer_than:7d" + "GMAIL_QUERY", "in:inbox is:unread -label:AgentProcessed" ), agent_api_key=os.getenv("AGENT_API_KEY", ""), strands_api_key=_first_set_env("STRANDS_OPENAI_API_KEY", "LLM_API_KEY"), diff --git a/app/gmail_agent.py b/app/gmail_agent.py index a8a1abf..35887ed 100644 --- a/app/gmail_agent.py +++ b/app/gmail_agent.py @@ -91,21 +91,35 @@ class GmailTriageAgent: def scan_and_route_messages(self, max_results: int = 100) -> ScanResult: label_by_name = self.ensure_labels() - inbox_messages = ( + effective_query = self._build_effective_query() + inbox_response = ( self.gmail_service.users() .messages() - .list(userId="me", q=self.query, maxResults=max_results) + .list(userId="me", q=effective_query, maxResults=max_results) .execute() - .get("messages", []) ) + inbox_messages = inbox_response.get("messages", []) linkedin = 0 advertising = 0 skipped = 0 failed = 0 + logger.info( + "Gmail scan base_query='%s' effective_query='%s' matched=%s " + "(resultSizeEstimate=%s, max_results=%s)", + self.query, + effective_query, + len(inbox_messages), + inbox_response.get("resultSizeEstimate", 0), + max_results, + ) + if not inbox_messages: + self._log_query_diagnostics(effective_query) + for message in inbox_messages: outcome = self._route_message(message["id"], label_by_name) + logger.info("Message %s routed with outcome: %s", message["id"], outcome) if outcome == "linkedin": linkedin += 1 elif outcome == "advertising": @@ -123,6 +137,50 @@ class GmailTriageAgent: failed=failed, ) + def _build_effective_query(self) -> str: + # Hard requirement: scan only unread messages in root inbox. + base_query = (self.query or "").strip() + query_lower = base_query.lower() + required_terms = [] + if "in:inbox" not in query_lower: + required_terms.append("in:inbox") + if "is:unread" not in query_lower: + required_terms.append("is:unread") + + if not base_query: + return " ".join(required_terms) + if not required_terms: + return base_query + return f"{base_query} {' '.join(required_terms)}" + + def _log_query_diagnostics(self, effective_query: str) -> None: + diagnostic_queries = [ + "in:inbox", + "in:inbox is:unread", + "in:inbox newer_than:30d", + "in:inbox is:unread -label:AgentProcessed", + ] + logger.info("No messages matched current query. Running diagnostics...") + for diagnostic_query in diagnostic_queries: + if diagnostic_query == effective_query: + continue + try: + response = ( + self.gmail_service.users() + .messages() + .list(userId="me", q=diagnostic_query, maxResults=1) + .execute() + ) + logger.info( + "Diagnostic query='%s' resultSizeEstimate=%s", + diagnostic_query, + response.get("resultSizeEstimate", 0), + ) + except Exception: + logger.exception( + "Failed to run diagnostic query='%s'", diagnostic_query + ) + def _route_message(self, message_id: str, label_by_name: dict[str, str]) -> str: try: message = ( diff --git a/app/main.py b/app/main.py index 38f2f48..a60de5f 100644 --- a/app/main.py +++ b/app/main.py @@ -117,6 +117,10 @@ class UnsubscribeExecutionResponse(BaseModel): results: list[MethodExecutionResponse] +def _is_api_auth_enabled() -> bool: + return bool(settings.agent_api_key.strip()) + + def verify_api_key( x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, authorization: Annotated[str | None, Header()] = None, @@ -357,6 +361,10 @@ async def startup_event() -> None: global scheduler _get_scan_lock() _get_unsubscribe_lock() + logger.info( + "API authentication enabled=%s (header: X-API-Key or Bearer token)", + _is_api_auth_enabled(), + ) scheduler = AsyncIOScheduler() scheduler.add_job( _scheduled_scan, diff --git a/app/strands_classifier.py b/app/strands_classifier.py index aaaf299..0c085c8 100644 --- a/app/strands_classifier.py +++ b/app/strands_classifier.py @@ -44,16 +44,13 @@ class StrandsEmailClassifier: if not api_key: raise ValueError("Strands/OpenAI API key is required for classification.") - client_args = {"api_key": api_key, "timeout": timeout_seconds} - if base_url: - client_args["base_url"] = base_url - - model = OpenAIModel( - client_args=client_args, - model_id=model_id, - params={"temperature": temperature}, - ) - self.agent = Agent(model=model, system_prompt=SYSTEM_PROMPT) + self._api_key = api_key + self._model_id = model_id + self._base_url = base_url + self._timeout_seconds = timeout_seconds + self._temperature = temperature + self._temperature_enabled = True + self.agent = self._build_agent(include_temperature=True) def classify( self, @@ -75,7 +72,7 @@ class StrandsEmailClassifier: "output_json_only": True, } - response = self.agent(json.dumps(prompt_payload, ensure_ascii=True)) + response = self._invoke_agent_with_temperature_fallback(prompt_payload) parsed = _parse_json(str(response)) label = str(parsed.get("label", "OTHER")).upper().strip() if label not in ALLOWED_LABELS: @@ -86,6 +83,41 @@ class StrandsEmailClassifier: reason = str(parsed.get("reason", "")).strip() return EmailClassification(label=label, confidence=confidence, reason=reason) + def _invoke_agent_with_temperature_fallback(self, prompt_payload: dict) -> object: + prompt = json.dumps(prompt_payload, ensure_ascii=True) + try: + return self.agent(prompt) + except Exception as exc: + if self._temperature_enabled and _is_temperature_unsupported(exc): + logger.warning( + "Model '%s' rejected temperature=%s; retrying without temperature.", + self._model_id, + self._temperature, + ) + self._temperature_enabled = False + self.agent = self._build_agent(include_temperature=False) + return self.agent(prompt) + raise + + def _build_agent(self, *, include_temperature: bool) -> Agent: + client_args = {"api_key": self._api_key, "timeout": self._timeout_seconds} + if self._base_url: + client_args["base_url"] = self._base_url + + params: dict[str, float] | None = None + if include_temperature: + params = {"temperature": self._temperature} + + model_kwargs = { + "client_args": client_args, + "model_id": self._model_id, + } + if params is not None: + model_kwargs["params"] = params + + model = OpenAIModel(**model_kwargs) + return Agent(model=model, system_prompt=SYSTEM_PROMPT) + def _parse_json(content: str) -> dict: if not content: @@ -112,3 +144,8 @@ def _to_confidence(raw_value: object) -> float: if confidence > 1: return 1.0 return confidence + + +def _is_temperature_unsupported(exc: Exception) -> bool: + message = str(exc).lower() + return "temperature" in message and "unsupported" in message