From e42f81ce7266ec045754e511b093f58887b5861b Mon Sep 17 00:00:00 2001 From: oabrivard Date: Sun, 8 Mar 2026 12:54:39 +0100 Subject: [PATCH] Added unsubscribe agent --- .env.example | 18 +- README.md | 50 +- app/config.py | 53 +- app/gmail_agent.py | 16 +- app/main.py | 265 +++++++- ...lm_classifier.py => strands_classifier.py} | 56 +- app/unsubscribe_hil_agent.py | 580 ++++++++++++++++++ pyproject.toml | 2 +- 8 files changed, 965 insertions(+), 75 deletions(-) rename app/{llm_classifier.py => strands_classifier.py} (66%) create mode 100644 app/unsubscribe_hil_agent.py diff --git a/.env.example b/.env.example index 046c986..40bf5f5 100644 --- a/.env.example +++ b/.env.example @@ -1,10 +1,17 @@ GOOGLE_CLIENT_SECRETS_FILE=credentials.json GOOGLE_TOKEN_FILE=token.json AGENT_API_KEY=change-me +# Preferred Strands settings +STRANDS_OPENAI_API_KEY= +STRANDS_MODEL_ID=gpt-4.1-mini +STRANDS_OPENAI_BASE_URL= +STRANDS_TIMEOUT_SECONDS=20 +STRANDS_TEMPERATURE=0 +# Backward-compatible fallback names LLM_API_KEY= -LLM_MODEL=gpt-4.1-mini +LLM_MODEL= LLM_BASE_URL= -LLM_TIMEOUT_SECONDS=20 +LLM_TIMEOUT_SECONDS= LLM_FALLBACK_TO_RULES=false GMAIL_SCAN_INTERVAL_MINUTES=5 GMAIL_QUERY=in:inbox -label:AgentProcessed newer_than:7d @@ -14,4 +21,11 @@ UNSUBSCRIBE_MAX_RESULTS=500 UNSUBSCRIBE_STATE_FILE=data/sent_unsubscribe_links.json UNSUBSCRIBE_DIGEST_RECIPIENT= UNSUBSCRIBE_SEND_EMPTY_DIGEST=false +UNSUBSCRIBE_HIL_QUERY=label:Advertising +UNSUBSCRIBE_HIL_MAX_RESULTS=500 +UNSUBSCRIBE_HIL_STATE_FILE=data/unsubscribed_methods.json +UNSUBSCRIBE_AUTO_ENABLED=true +UNSUBSCRIBE_AUTO_INTERVAL_MINUTES=720 +UNSUBSCRIBE_HTTP_TIMEOUT_SECONDS=12 +UNSUBSCRIBE_USER_AGENT=Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0) LOG_LEVEL=INFO diff --git a/README.md b/README.md index f3d83a0..07bd2db 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,11 @@ This project runs a small local API service that: - scans new Gmail inbox messages -- classifies emails with an LLM as `LINKEDIN`, `ADVERTISING`, or `OTHER` +- classifies emails with **Strands** (`LINKEDIN`, `ADVERTISING`, `OTHER`) - moves LinkedIn emails to a `LinkedIn` label/folder - moves advertising emails to an `Advertising` label/folder - scans the `Advertising` label and emails you new unsubscribe links (deduplicated) +- discovers unsubscribe-ready mailing lists for human review, then auto-unsubscribes selected lists - exposes a secure availability endpoint powered by Google Calendar free/busy ## 1) Prerequisites @@ -14,7 +15,7 @@ This project runs a small local API service that: - Python 3.11+ - `uv` ([installation guide](https://docs.astral.sh/uv/getting-started/installation/)) - A Google account -- An OpenAI-compatible API key for the LLM classifier +- An OpenAI-compatible API key for Strands (`STRANDS_OPENAI_API_KEY`) - A Google Cloud project with: - Gmail API enabled - Google Calendar API enabled @@ -40,7 +41,7 @@ cp .env.example .env Edit `.env` and set: - `AGENT_API_KEY` to a strong secret for agent-to-agent calls -- `LLM_API_KEY` and optional `LLM_MODEL` / `LLM_BASE_URL` +- `STRANDS_OPENAI_API_KEY` and optional `STRANDS_MODEL_ID` / `STRANDS_OPENAI_BASE_URL` - optional unsubscribe digest settings (`UNSUBSCRIBE_*`) - optional scan frequency and Gmail query @@ -50,7 +51,11 @@ Edit `.env` and set: uv run uvicorn app.main:app --reload ``` -At startup, the scheduler runs every `GMAIL_SCAN_INTERVAL_MINUTES`. +At startup, the scheduler runs: + +- Gmail triage (`GMAIL_SCAN_INTERVAL_MINUTES`) +- unsubscribe digest (`UNSUBSCRIBE_DIGEST_INTERVAL_MINUTES`) +- auto-unsubscribe for approved lists (`UNSUBSCRIBE_AUTO_INTERVAL_MINUTES`) when `UNSUBSCRIBE_AUTO_ENABLED=true` ## 5) API usage @@ -89,9 +94,35 @@ curl -X POST "http://127.0.0.1:8000/unsubscribe-digest?max_results=500" \ -H "X-API-Key: your-secret" ``` +### Human-in-the-loop unsubscribe candidates + +```bash +curl -X POST "http://127.0.0.1:8000/unsubscribe/candidates?max_results=500" \ + -H "X-API-Key: your-secret" +``` + +### Execute unsubscribe for selected mailing lists + +```bash +curl -X POST "http://127.0.0.1:8000/unsubscribe/execute" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-secret" \ + -d '{ + "selected_candidate_ids": ["abc123def456", "987zyx654wvu"], + "remember_selection": true + }' +``` + +### Trigger auto-unsubscribe run (approved lists only) + +```bash +curl -X POST "http://127.0.0.1:8000/unsubscribe/auto-run?max_results=500" \ + -H "X-API-Key: your-secret" +``` + ## Classification behavior -- LLM classification is used for each email (`LINKEDIN`, `ADVERTISING`, `OTHER`). +- Strands classification is used for each email (`LINKEDIN`, `ADVERTISING`, `OTHER`). - LinkedIn has priority over advertising inside the classifier prompt. - Set `LLM_FALLBACK_TO_RULES=true` only if you want rules-based backup when LLM calls fail. - Every scanned message gets an `AgentProcessed` label to avoid reprocessing loops. @@ -104,6 +135,15 @@ curl -X POST "http://127.0.0.1:8000/unsubscribe-digest?max_results=500" \ - Persists already sent links in `UNSUBSCRIBE_STATE_FILE`. - Sends only new links by email, unless `UNSUBSCRIBE_SEND_EMPTY_DIGEST=true`. +## Human-In-The-Loop Unsubscribe + +- Candidate discovery groups advertising messages into mailing lists using headers (`List-Id`, `From`) and unsubscribe methods. +- You pick the `candidate_id` values you want to unsubscribe from. +- Only selected lists are executed. +- Executed unsubscribe methods are persisted and never executed twice. +- Selected lists can be remembered (`remember_selection=true`) for scheduled auto-unsubscribe. +- State is saved in `UNSUBSCRIBE_HIL_STATE_FILE`. + ## Notes - Gmail "folders" are labels. This agent creates: diff --git a/app/config.py b/app/config.py index 2ce3e06..30e6a79 100644 --- a/app/config.py +++ b/app/config.py @@ -19,10 +19,11 @@ class Settings: gmail_scan_interval_minutes: int gmail_query: str agent_api_key: str - llm_api_key: str - llm_model: str - llm_base_url: str | None - llm_timeout_seconds: float + strands_api_key: str + strands_model_id: str + strands_base_url: str | None + strands_timeout_seconds: float + strands_temperature: float llm_fallback_to_rules: bool unsubscribe_digest_interval_minutes: int unsubscribe_query: str @@ -30,11 +31,18 @@ class Settings: unsubscribe_state_file: str unsubscribe_digest_recipient: str | None unsubscribe_send_empty_digest: bool + unsubscribe_hil_query: str + unsubscribe_hil_max_results: int + unsubscribe_hil_state_file: str + unsubscribe_auto_enabled: bool + unsubscribe_auto_interval_minutes: int + unsubscribe_http_timeout_seconds: float + unsubscribe_user_agent: str log_level: str def get_settings() -> Settings: - llm_base_url = os.getenv("LLM_BASE_URL", "").strip() + strands_base_url = _first_set_env("STRANDS_OPENAI_BASE_URL", "LLM_BASE_URL").strip() unsubscribe_digest_recipient = os.getenv("UNSUBSCRIBE_DIGEST_RECIPIENT", "").strip() return Settings( google_client_secrets_file=os.getenv("GOOGLE_CLIENT_SECRETS_FILE", "credentials.json"), @@ -44,10 +52,13 @@ def get_settings() -> Settings: "GMAIL_QUERY", "in:inbox -label:AgentProcessed newer_than:7d" ), agent_api_key=os.getenv("AGENT_API_KEY", ""), - llm_api_key=os.getenv("LLM_API_KEY", ""), - llm_model=os.getenv("LLM_MODEL", "gpt-4.1-mini"), - llm_base_url=llm_base_url or None, - llm_timeout_seconds=float(os.getenv("LLM_TIMEOUT_SECONDS", "20")), + strands_api_key=_first_set_env("STRANDS_OPENAI_API_KEY", "LLM_API_KEY"), + strands_model_id=_first_set_env("STRANDS_MODEL_ID", "LLM_MODEL") or "gpt-4.1-mini", + strands_base_url=strands_base_url or None, + strands_timeout_seconds=float( + _first_set_env("STRANDS_TIMEOUT_SECONDS", "LLM_TIMEOUT_SECONDS") or "20" + ), + strands_temperature=float(os.getenv("STRANDS_TEMPERATURE", "0")), llm_fallback_to_rules=_as_bool(os.getenv("LLM_FALLBACK_TO_RULES", "false")), unsubscribe_digest_interval_minutes=int( os.getenv("UNSUBSCRIBE_DIGEST_INTERVAL_MINUTES", "1440") @@ -61,9 +72,33 @@ def get_settings() -> Settings: unsubscribe_send_empty_digest=_as_bool( os.getenv("UNSUBSCRIBE_SEND_EMPTY_DIGEST", "false") ), + unsubscribe_hil_query=os.getenv("UNSUBSCRIBE_HIL_QUERY", "label:Advertising"), + unsubscribe_hil_max_results=int(os.getenv("UNSUBSCRIBE_HIL_MAX_RESULTS", "500")), + unsubscribe_hil_state_file=os.getenv( + "UNSUBSCRIBE_HIL_STATE_FILE", "data/unsubscribed_methods.json" + ), + unsubscribe_auto_enabled=_as_bool(os.getenv("UNSUBSCRIBE_AUTO_ENABLED", "true")), + unsubscribe_auto_interval_minutes=int( + os.getenv("UNSUBSCRIBE_AUTO_INTERVAL_MINUTES", "720") + ), + unsubscribe_http_timeout_seconds=float( + os.getenv("UNSUBSCRIBE_HTTP_TIMEOUT_SECONDS", "12") + ), + unsubscribe_user_agent=os.getenv( + "UNSUBSCRIBE_USER_AGENT", + "Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0; +https://example.local)", + ), log_level=os.getenv("LOG_LEVEL", "INFO"), ) def _as_bool(value: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} + + +def _first_set_env(*names: str) -> str: + for name in names: + value = os.getenv(name) + if value: + return value.strip() + return "" diff --git a/app/gmail_agent.py b/app/gmail_agent.py index 1989816..a8a1abf 100644 --- a/app/gmail_agent.py +++ b/app/gmail_agent.py @@ -5,7 +5,7 @@ from email.utils import parseaddr import logging from typing import Any -from app.llm_classifier import LLMEmailClassifier +from app.strands_classifier import StrandsEmailClassifier METADATA_HEADERS = [ "From", @@ -55,7 +55,7 @@ class GmailTriageAgent: gmail_service: Any, query: str, *, - classifier: LLMEmailClassifier | None = None, + classifier: StrandsEmailClassifier | None = None, fallback_to_rules: bool = True, ) -> None: self.gmail_service = gmail_service @@ -204,7 +204,7 @@ class GmailTriageAgent: ) -> str: if self.classifier: try: - llm_result = self.classifier.classify( + classifier_result = self.classifier.classify( sender=sender, subject=subject, snippet=snippet, @@ -213,14 +213,14 @@ class GmailTriageAgent: message_label_ids=message_label_ids, ) logger.info( - "Message %s classified by LLM as %s (confidence=%.2f)", + "Message %s classified by model as %s (confidence=%.2f)", message_id, - llm_result.label, - llm_result.confidence, + classifier_result.label, + classifier_result.confidence, ) - return llm_result.label + return classifier_result.label except Exception: - logger.exception("LLM classification failed for %s", message_id) + logger.exception("Model classification failed for %s", message_id) if not self.fallback_to_rules: return "OTHER" diff --git a/app/main.py b/app/main.py index 5563106..38f2f48 100644 --- a/app/main.py +++ b/app/main.py @@ -7,24 +7,29 @@ from typing import Annotated from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import Depends, FastAPI, Header, HTTPException, Query, status -from pydantic import BaseModel +from pydantic import BaseModel, Field from app.calendar_agent import CalendarAvailabilityAgent from app.config import get_settings from app.gmail_agent import GmailTriageAgent from app.google_clients import build_calendar_service, build_gmail_service -from app.llm_classifier import LLMEmailClassifier +from app.strands_classifier import StrandsEmailClassifier from app.unsubscribe_agent import UnsubscribeDigestAgent +from app.unsubscribe_hil_agent import ( + UnsubscribeHumanLoopAgent, + CandidateSnapshot, + UnsubscribeExecutionResult, +) settings = get_settings() logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO)) logger = logging.getLogger("personal-agent") -app = FastAPI(title="Personal Agent", version="0.1.0") +app = FastAPI(title="Personal Agent", version="0.3.0") scheduler: AsyncIOScheduler | None = None scan_lock: asyncio.Lock | None = None unsubscribe_lock: asyncio.Lock | None = None -llm_key_warning_logged = False +strands_key_warning_logged = False class ScanResponse(BaseModel): @@ -63,6 +68,55 @@ class UnsubscribeDigestResponse(BaseModel): email_sent: bool +class UnsubscribeMethodResponse(BaseModel): + method_id: str + method_type: str + value: str + + +class MailingListCandidateResponse(BaseModel): + candidate_id: str + list_name: str + sender_domain: str + message_count: int + sample_senders: list[str] + sample_subjects: list[str] + methods: list[UnsubscribeMethodResponse] + approved: bool + + +class UnsubscribeCandidatesResponse(BaseModel): + scanned_messages: int + candidates: list[MailingListCandidateResponse] + + +class ExecuteUnsubscribeRequest(BaseModel): + selected_candidate_ids: list[str] = Field(default_factory=list) + max_results: int | None = Field(default=None, ge=1, le=500) + remember_selection: bool = True + + +class MethodExecutionResponse(BaseModel): + candidate_id: str + list_name: str + method_id: str + method_type: str + value: str + success: bool + detail: str + + +class UnsubscribeExecutionResponse(BaseModel): + scanned_messages: int + candidates_considered: int + selected_candidates: int + executed_methods: int + skipped_already_executed: int + failed_methods: int + updated_approved_count: int + results: list[MethodExecutionResponse] + + def verify_api_key( x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, authorization: Annotated[str | None, Header()] = None, @@ -88,7 +142,7 @@ def _run_scan_once(max_results: int) -> ScanResponse: gmail_agent = GmailTriageAgent( gmail_service=gmail_service, query=settings.gmail_query, - classifier=_build_llm_classifier(), + classifier=_build_strands_classifier(), fallback_to_rules=settings.llm_fallback_to_rules, ) result = gmail_agent.scan_and_route_messages(max_results=max_results) @@ -121,35 +175,123 @@ def _run_unsubscribe_digest_once(max_results: int) -> UnsubscribeDigestResponse: ) -def _build_llm_classifier() -> LLMEmailClassifier | None: - global llm_key_warning_logged +def _run_unsubscribe_candidates_once(max_results: int) -> UnsubscribeCandidatesResponse: + agent = _build_unsubscribe_hil_agent() + snapshot = agent.discover_candidates(max_results=max_results) + return _snapshot_to_response(snapshot) + + +def _run_unsubscribe_execute_selected( + selected_candidate_ids: list[str], + max_results: int, + remember_selection: bool, +) -> UnsubscribeExecutionResponse: + agent = _build_unsubscribe_hil_agent() + result = agent.execute_selected( + selected_candidate_ids=selected_candidate_ids, + max_results=max_results, + remember_selection=remember_selection, + ) + return _execution_to_response(result) + + +def _run_unsubscribe_auto_once(max_results: int) -> UnsubscribeExecutionResponse: + agent = _build_unsubscribe_hil_agent() + result = agent.execute_for_approved(max_results=max_results) + return _execution_to_response(result) + + +def _snapshot_to_response(snapshot: CandidateSnapshot) -> UnsubscribeCandidatesResponse: + return UnsubscribeCandidatesResponse( + scanned_messages=snapshot.scanned_messages, + candidates=[ + MailingListCandidateResponse( + candidate_id=candidate.candidate_id, + list_name=candidate.list_name, + sender_domain=candidate.sender_domain, + message_count=candidate.message_count, + sample_senders=candidate.sample_senders, + sample_subjects=candidate.sample_subjects, + methods=[ + UnsubscribeMethodResponse( + method_id=method.method_id, + method_type=method.method_type, + value=method.value, + ) + for method in candidate.methods + ], + approved=candidate.approved, + ) + for candidate in snapshot.candidates + ], + ) - if not settings.llm_api_key: + +def _execution_to_response(result: UnsubscribeExecutionResult) -> UnsubscribeExecutionResponse: + return UnsubscribeExecutionResponse( + scanned_messages=result.scanned_messages, + candidates_considered=result.candidates_considered, + selected_candidates=result.selected_candidates, + executed_methods=result.executed_methods, + skipped_already_executed=result.skipped_already_executed, + failed_methods=result.failed_methods, + updated_approved_count=result.updated_approved_count, + results=[ + MethodExecutionResponse( + candidate_id=item.candidate_id, + list_name=item.list_name, + method_id=item.method_id, + method_type=item.method_type, + value=item.value, + success=item.success, + detail=item.detail, + ) + for item in result.results + ], + ) + + +def _build_strands_classifier() -> StrandsEmailClassifier | None: + global strands_key_warning_logged + + if not settings.strands_api_key: if settings.llm_fallback_to_rules: - if not llm_key_warning_logged: + if not strands_key_warning_logged: logger.warning( - "LLM_API_KEY not set. Falling back to rules-based classification." + "Strands API key not set. Falling back to rules-based classification." ) - llm_key_warning_logged = True + strands_key_warning_logged = True return None raise RuntimeError( - "LLM_API_KEY is required when LLM_FALLBACK_TO_RULES is disabled." + "STRANDS_OPENAI_API_KEY (or LLM_API_KEY) is required when LLM_FALLBACK_TO_RULES is disabled." ) try: - return LLMEmailClassifier( - api_key=settings.llm_api_key, - model=settings.llm_model, - base_url=settings.llm_base_url, - timeout_seconds=settings.llm_timeout_seconds, + return StrandsEmailClassifier( + api_key=settings.strands_api_key, + model_id=settings.strands_model_id, + base_url=settings.strands_base_url, + timeout_seconds=settings.strands_timeout_seconds, + temperature=settings.strands_temperature, ) except Exception: if settings.llm_fallback_to_rules: - logger.exception("Could not initialize LLM classifier; using rules fallback.") + logger.exception("Could not initialize Strands classifier; using rules fallback.") return None raise +def _build_unsubscribe_hil_agent() -> UnsubscribeHumanLoopAgent: + gmail_service = build_gmail_service(settings) + return UnsubscribeHumanLoopAgent( + gmail_service=gmail_service, + query=settings.unsubscribe_hil_query, + state_file=settings.unsubscribe_hil_state_file, + http_timeout_seconds=settings.unsubscribe_http_timeout_seconds, + user_agent=settings.unsubscribe_user_agent, + ) + + def _get_scan_lock() -> asyncio.Lock: global scan_lock if scan_lock is None: @@ -194,6 +336,22 @@ async def _scheduled_unsubscribe_digest() -> None: logger.exception("Scheduled unsubscribe digest failed") +async def _scheduled_unsubscribe_auto() -> None: + lock = _get_unsubscribe_lock() + if lock.locked(): + logger.info("Previous unsubscribe auto run still running, skipping this tick.") + return + + async with lock: + try: + result = await asyncio.to_thread( + _run_unsubscribe_auto_once, settings.unsubscribe_hil_max_results + ) + logger.info("Scheduled unsubscribe auto run complete: %s", result.model_dump()) + except Exception: + logger.exception("Scheduled unsubscribe auto run failed") + + @app.on_event("startup") async def startup_event() -> None: global scheduler @@ -212,11 +370,20 @@ async def startup_event() -> None: minutes=settings.unsubscribe_digest_interval_minutes, next_run_time=datetime.now(), ) + if settings.unsubscribe_auto_enabled: + scheduler.add_job( + _scheduled_unsubscribe_auto, + "interval", + minutes=settings.unsubscribe_auto_interval_minutes, + next_run_time=datetime.now(), + ) scheduler.start() logger.info( - "Scheduler started (scan interval=%s min, unsubscribe interval=%s min)", + "Scheduler started (scan=%s min, digest=%s min, auto_unsub=%s/%s min)", settings.gmail_scan_interval_minutes, settings.unsubscribe_digest_interval_minutes, + settings.unsubscribe_auto_enabled, + settings.unsubscribe_auto_interval_minutes, ) @@ -232,6 +399,8 @@ def health() -> dict[str, object]: "status": "ok", "scan_interval_minutes": settings.gmail_scan_interval_minutes, "unsubscribe_digest_interval_minutes": settings.unsubscribe_digest_interval_minutes, + "unsubscribe_auto_enabled": settings.unsubscribe_auto_enabled, + "unsubscribe_auto_interval_minutes": settings.unsubscribe_auto_interval_minutes, } @@ -313,3 +482,61 @@ async def unsubscribe_digest_now( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Unsubscribe digest failed: {exc}", ) from exc + + +@app.post( + "/unsubscribe/candidates", + response_model=UnsubscribeCandidatesResponse, + dependencies=[Depends(verify_api_key)], +) +async def unsubscribe_candidates( + max_results: int = Query(default=settings.unsubscribe_hil_max_results, ge=1, le=500), +) -> UnsubscribeCandidatesResponse: + async with _get_unsubscribe_lock(): + try: + return await asyncio.to_thread(_run_unsubscribe_candidates_once, max_results) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Candidate discovery failed: {exc}", + ) from exc + + +@app.post( + "/unsubscribe/execute", + response_model=UnsubscribeExecutionResponse, + dependencies=[Depends(verify_api_key)], +) +async def unsubscribe_execute(request: ExecuteUnsubscribeRequest) -> UnsubscribeExecutionResponse: + max_results = request.max_results or settings.unsubscribe_hil_max_results + async with _get_unsubscribe_lock(): + try: + return await asyncio.to_thread( + _run_unsubscribe_execute_selected, + request.selected_candidate_ids, + max_results, + request.remember_selection, + ) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Unsubscribe execution failed: {exc}", + ) from exc + + +@app.post( + "/unsubscribe/auto-run", + response_model=UnsubscribeExecutionResponse, + dependencies=[Depends(verify_api_key)], +) +async def unsubscribe_auto_run( + max_results: int = Query(default=settings.unsubscribe_hil_max_results, ge=1, le=500), +) -> UnsubscribeExecutionResponse: + async with _get_unsubscribe_lock(): + try: + return await asyncio.to_thread(_run_unsubscribe_auto_once, max_results) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Auto unsubscribe run failed: {exc}", + ) from exc diff --git a/app/llm_classifier.py b/app/strands_classifier.py similarity index 66% rename from app/llm_classifier.py rename to app/strands_classifier.py index 186e602..aaaf299 100644 --- a/app/llm_classifier.py +++ b/app/strands_classifier.py @@ -5,9 +5,10 @@ import json import logging import re -from openai import OpenAI +from strands import Agent +from strands.models.openai import OpenAIModel -logger = logging.getLogger("personal-agent.llm") +logger = logging.getLogger("personal-agent.strands") ALLOWED_LABELS = {"LINKEDIN", "ADVERTISING", "OTHER"} @@ -24,30 +25,35 @@ Rules: @dataclass(frozen=True) -class LLMClassification: +class EmailClassification: label: str confidence: float reason: str -class LLMEmailClassifier: +class StrandsEmailClassifier: def __init__( self, *, api_key: str, - model: str, + model_id: str, base_url: str | None = None, timeout_seconds: float = 20.0, + temperature: float = 0.0, ) -> None: if not api_key: - raise ValueError("LLM API key is required for LLM classification.") + raise ValueError("Strands/OpenAI API key is required for classification.") - self.model = model - self.client = OpenAI( - api_key=api_key, - base_url=base_url, - timeout=timeout_seconds, + client_args = {"api_key": api_key, "timeout": timeout_seconds} + if base_url: + client_args["base_url"] = base_url + + model = OpenAIModel( + client_args=client_args, + model_id=model_id, + params={"temperature": temperature}, ) + self.agent = Agent(model=model, system_prompt=SYSTEM_PROMPT) def classify( self, @@ -58,43 +64,32 @@ class LLMEmailClassifier: list_unsubscribe: str, precedence: str, message_label_ids: set[str], - ) -> LLMClassification: - email_payload = { + ) -> EmailClassification: + prompt_payload = { "sender": sender, "subject": subject, "snippet": snippet, "list_unsubscribe_present": bool(list_unsubscribe.strip()), "precedence": precedence, "gmail_label_ids": sorted(message_label_ids), + "output_json_only": True, } - completion = self.client.chat.completions.create( - model=self.model, - temperature=0, - response_format={"type": "json_object"}, - max_tokens=120, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": json.dumps(email_payload, ensure_ascii=True)}, - ], - ) - content = completion.choices[0].message.content or "{}" - - parsed = _parse_json(content) + response = self.agent(json.dumps(prompt_payload, ensure_ascii=True)) + parsed = _parse_json(str(response)) label = str(parsed.get("label", "OTHER")).upper().strip() if label not in ALLOWED_LABELS: - logger.warning("Unexpected LLM label '%s', falling back to OTHER.", label) + logger.warning("Unexpected Strands label '%s', falling back to OTHER.", label) label = "OTHER" - confidence = _to_confidence(parsed.get("confidence", 0.0)) + confidence = _to_confidence(parsed.get("confidence", 0)) reason = str(parsed.get("reason", "")).strip() - return LLMClassification(label=label, confidence=confidence, reason=reason) + return EmailClassification(label=label, confidence=confidence, reason=reason) def _parse_json(content: str) -> dict: if not content: return {} - try: return json.loads(content) except json.JSONDecodeError: @@ -112,7 +107,6 @@ def _to_confidence(raw_value: object) -> float: confidence = float(raw_value) except (TypeError, ValueError): return 0.0 - if confidence < 0: return 0.0 if confidence > 1: diff --git a/app/unsubscribe_hil_agent.py b/app/unsubscribe_hil_agent.py new file mode 100644 index 0000000..46f10ab --- /dev/null +++ b/app/unsubscribe_hil_agent.py @@ -0,0 +1,580 @@ +from __future__ import annotations + +import base64 +from dataclasses import dataclass +from email.message import EmailMessage +from email.utils import parseaddr +import hashlib +import html +import json +import logging +from pathlib import Path +import re +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.parse import parse_qs, parse_qsl, urlencode, urlsplit, urlunsplit +from urllib.request import Request, urlopen + +logger = logging.getLogger("personal-agent.unsubscribe.hil") + +TRACKING_QUERY_KEYS = { + "fbclid", + "gclid", + "mc_cid", + "mc_eid", + "_hsenc", + "_hsmi", + "utm_campaign", + "utm_content", + "utm_id", + "utm_medium", + "utm_name", + "utm_source", + "utm_term", +} + +UNSUBSCRIBE_HINTS = { + "unsubscribe", + "optout", + "opt-out", + "email-preferences", + "manage-subscriptions", +} + +URL_PATTERN = re.compile(r"https?://[^\s<>'\"()]+", re.IGNORECASE) + +METADATA_HEADERS = [ + "From", + "Subject", + "List-Id", + "List-Unsubscribe", +] + + +@dataclass(frozen=True) +class UnsubscribeMethod: + method_id: str + method_type: str + value: str + + +@dataclass(frozen=True) +class MailingListCandidate: + candidate_id: str + list_key: str + list_name: str + sender_domain: str + message_count: int + sample_senders: list[str] + sample_subjects: list[str] + methods: list[UnsubscribeMethod] + approved: bool + + +@dataclass(frozen=True) +class CandidateSnapshot: + scanned_messages: int + candidates: list[MailingListCandidate] + + +@dataclass(frozen=True) +class MethodExecutionResult: + candidate_id: str + list_name: str + method_id: str + method_type: str + value: str + success: bool + detail: str + + +@dataclass(frozen=True) +class UnsubscribeExecutionResult: + scanned_messages: int + candidates_considered: int + selected_candidates: int + executed_methods: int + skipped_already_executed: int + failed_methods: int + updated_approved_count: int + results: list[MethodExecutionResult] + + +@dataclass +class _UnsubscribeState: + approved_list_keys: set[str] + executed_methods: set[str] + + +class UnsubscribeHumanLoopAgent: + def __init__( + self, + *, + gmail_service: Any, + query: str, + state_file: str, + http_timeout_seconds: float = 12.0, + user_agent: str = "Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0)", + ) -> None: + self.gmail_service = gmail_service + self.query = query + self.state_file = Path(state_file) + self.http_timeout_seconds = http_timeout_seconds + self.user_agent = user_agent + + def discover_candidates(self, max_results: int = 500) -> CandidateSnapshot: + state = self._load_state() + scanned_messages = self._list_message_refs(max_results=max_results) + candidates = self._build_candidates( + scanned_messages, approved_list_keys=state.approved_list_keys + ) + return CandidateSnapshot( + scanned_messages=len(scanned_messages), + candidates=candidates, + ) + + def execute_selected( + self, + *, + selected_candidate_ids: list[str], + max_results: int = 500, + remember_selection: bool = True, + ) -> UnsubscribeExecutionResult: + state = self._load_state() + message_refs = self._list_message_refs(max_results=max_results) + candidates = self._build_candidates(message_refs, approved_list_keys=state.approved_list_keys) + return self._execute_candidates( + selected_candidate_ids=selected_candidate_ids, + candidates=candidates, + state=state, + remember_selection=remember_selection, + scanned_messages=len(message_refs), + ) + + def execute_for_approved(self, max_results: int = 500) -> UnsubscribeExecutionResult: + state = self._load_state() + message_refs = self._list_message_refs(max_results=max_results) + candidates = self._build_candidates(message_refs, approved_list_keys=state.approved_list_keys) + approved_ids = [ + candidate.candidate_id for candidate in candidates if candidate.list_key in state.approved_list_keys + ] + return self._execute_candidates( + selected_candidate_ids=approved_ids, + candidates=candidates, + state=state, + remember_selection=False, + scanned_messages=len(message_refs), + ) + + def _execute_candidates( + self, + *, + selected_candidate_ids: list[str], + candidates: list[MailingListCandidate], + state: _UnsubscribeState, + remember_selection: bool, + scanned_messages: int, + ) -> UnsubscribeExecutionResult: + selected_ids = {candidate_id.strip() for candidate_id in selected_candidate_ids if candidate_id} + + selected = [candidate for candidate in candidates if candidate.candidate_id in selected_ids] + if remember_selection: + for candidate in selected: + state.approved_list_keys.add(candidate.list_key) + + results: list[MethodExecutionResult] = [] + executed = 0 + skipped = 0 + failed = 0 + + for candidate in selected: + for method in candidate.methods: + if method.method_id in state.executed_methods: + skipped += 1 + results.append( + MethodExecutionResult( + candidate_id=candidate.candidate_id, + list_name=candidate.list_name, + method_id=method.method_id, + method_type=method.method_type, + value=method.value, + success=True, + detail="Already executed previously, skipped.", + ) + ) + continue + + success, detail = self._execute_method(method) + if success: + state.executed_methods.add(method.method_id) + executed += 1 + else: + failed += 1 + results.append( + MethodExecutionResult( + candidate_id=candidate.candidate_id, + list_name=candidate.list_name, + method_id=method.method_id, + method_type=method.method_type, + value=method.value, + success=success, + detail=detail, + ) + ) + + self._save_state(state) + return UnsubscribeExecutionResult( + scanned_messages=scanned_messages, + candidates_considered=len(candidates), + selected_candidates=len(selected), + executed_methods=executed, + skipped_already_executed=skipped, + failed_methods=failed, + updated_approved_count=len(state.approved_list_keys), + results=results, + ) + + def _list_message_refs(self, max_results: int) -> list[dict[str, str]]: + bounded_max_results = max(1, min(max_results, 500)) + return ( + self.gmail_service.users() + .messages() + .list(userId="me", q=self.query, maxResults=bounded_max_results) + .execute() + .get("messages", []) + ) + + def _build_candidates( + self, + message_refs: list[dict[str, str]], + *, + approved_list_keys: set[str], + ) -> list[MailingListCandidate]: + groups: dict[str, dict[str, Any]] = {} + + for message_ref in message_refs: + message_id = message_ref["id"] + metadata = ( + self.gmail_service.users() + .messages() + .get( + userId="me", + id=message_id, + format="metadata", + metadataHeaders=METADATA_HEADERS, + ) + .execute() + ) + headers = { + header.get("name", "").lower(): header.get("value", "") + for header in metadata.get("payload", {}).get("headers", []) + } + sender = headers.get("from", "") + subject = headers.get("subject", "") + list_id = _clean_list_id(headers.get("list-id", "")) + + sender_email = parseaddr(sender)[1].lower() + sender_domain = sender_email.split("@")[-1] if "@" in sender_email else "unknown" + list_key = list_id or sender_domain or "unknown" + list_name = _derive_list_name(list_id=list_id, sender=sender, sender_domain=sender_domain) + + methods = self._extract_methods_from_message( + message_id=message_id, + list_unsubscribe_header=headers.get("list-unsubscribe", ""), + ) + if not methods: + continue + + group = groups.setdefault( + list_key, + { + "list_name": list_name, + "sender_domain": sender_domain, + "message_count": 0, + "sample_senders": [], + "sample_subjects": [], + "methods": {}, + }, + ) + + group["message_count"] += 1 + if sender and sender not in group["sample_senders"] and len(group["sample_senders"]) < 3: + group["sample_senders"].append(sender) + if subject and subject not in group["sample_subjects"] and len(group["sample_subjects"]) < 5: + group["sample_subjects"].append(subject) + + for method in methods: + group["methods"][method.method_id] = method + + candidates: list[MailingListCandidate] = [] + for list_key, group in groups.items(): + candidate_id = hashlib.sha1(list_key.encode("utf-8")).hexdigest()[:12] + methods = sorted(group["methods"].values(), key=lambda method: method.method_id) + candidates.append( + MailingListCandidate( + candidate_id=candidate_id, + list_key=list_key, + list_name=group["list_name"], + sender_domain=group["sender_domain"], + message_count=group["message_count"], + sample_senders=group["sample_senders"], + sample_subjects=group["sample_subjects"], + methods=methods, + approved=list_key in approved_list_keys, + ) + ) + + candidates.sort(key=lambda candidate: candidate.message_count, reverse=True) + return candidates + + def _extract_methods_from_message( + self, + *, + message_id: str, + list_unsubscribe_header: str, + ) -> list[UnsubscribeMethod]: + methods_by_id: dict[str, UnsubscribeMethod] = {} + + for raw_value in _extract_list_unsubscribe_values(list_unsubscribe_header): + method = _make_unsubscribe_method(raw_value) + if method: + methods_by_id[method.method_id] = method + + if methods_by_id: + return sorted(methods_by_id.values(), key=lambda method: method.method_id) + + full_message = ( + self.gmail_service.users() + .messages() + .get(userId="me", id=message_id, format="full") + .execute() + ) + payload = full_message.get("payload", {}) + for text_block in _extract_text_blocks(payload): + for url in URL_PATTERN.findall(html.unescape(text_block)): + if not _looks_like_unsubscribe(url): + continue + method = _make_unsubscribe_method(url) + if method: + methods_by_id[method.method_id] = method + + return sorted(methods_by_id.values(), key=lambda method: method.method_id) + + def _execute_method(self, method: UnsubscribeMethod) -> tuple[bool, str]: + if method.method_type == "http": + return self._execute_http_method(method.value) + if method.method_type == "mailto": + return self._execute_mailto_method(method.value) + return False, f"Unsupported unsubscribe method type: {method.method_type}" + + def _execute_http_method(self, url: str) -> tuple[bool, str]: + request = Request( + url=url, + headers={ + "User-Agent": self.user_agent, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + }, + method="GET", + ) + try: + with urlopen(request, timeout=self.http_timeout_seconds) as response: + status = response.getcode() + success = 200 <= status < 400 + return success, f"HTTP {status}" + except HTTPError as exc: + return False, f"HTTP {exc.code}" + except URLError as exc: + return False, f"Network error: {exc.reason}" + except Exception as exc: + return False, f"Unexpected error: {exc}" + + def _execute_mailto_method(self, mailto_url: str) -> tuple[bool, str]: + split = urlsplit(mailto_url) + recipient = split.path.strip() + if not recipient: + return False, "Invalid mailto URL: missing recipient." + + query = parse_qs(split.query, keep_blank_values=True) + subject = _first_query_value(query, "subject") or "Unsubscribe request" + body = _first_query_value(query, "body") or ( + "Please unsubscribe this email address from this mailing list." + ) + + message = EmailMessage() + message["To"] = recipient + message["Subject"] = subject + message.set_content(body) + raw = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8") + + try: + ( + self.gmail_service.users() + .messages() + .send(userId="me", body={"raw": raw}) + .execute() + ) + return True, "Sent mailto unsubscribe request via Gmail API." + except Exception as exc: + return False, f"Failed to send mailto unsubscribe request: {exc}" + + def _load_state(self) -> _UnsubscribeState: + if not self.state_file.exists(): + return _UnsubscribeState(approved_list_keys=set(), executed_methods=set()) + + try: + payload = json.loads(self.state_file.read_text(encoding="utf-8")) + except json.JSONDecodeError: + logger.warning("State file is invalid JSON: %s", self.state_file) + return _UnsubscribeState(approved_list_keys=set(), executed_methods=set()) + + approved = payload.get("approved_list_keys", []) + executed = payload.get("executed_methods", []) + return _UnsubscribeState( + approved_list_keys={str(item) for item in approved if str(item).strip()}, + executed_methods={str(item) for item in executed if str(item).strip()}, + ) + + def _save_state(self, state: _UnsubscribeState) -> None: + self.state_file.parent.mkdir(parents=True, exist_ok=True) + self.state_file.write_text( + json.dumps( + { + "approved_list_keys": sorted(state.approved_list_keys), + "executed_methods": sorted(state.executed_methods), + }, + indent=2, + ), + encoding="utf-8", + ) + + +def _extract_list_unsubscribe_values(header_value: str) -> list[str]: + if not header_value: + return [] + bracketed = [value.strip() for value in re.findall(r"<([^>]+)>", header_value)] + if bracketed: + return [value for value in bracketed if value] + + values: list[str] = [] + for token in header_value.split(","): + candidate = token.strip().strip("<>").strip() + if candidate: + values.append(candidate) + return values + + +def _make_unsubscribe_method(raw_value: str) -> UnsubscribeMethod | None: + value = raw_value.strip().strip(",") + lowered = value.lower() + if lowered.startswith(("http://", "https://")): + normalized = _normalize_http_url(value) + if not normalized: + return None + method_id = f"http:{normalized}" + return UnsubscribeMethod(method_id=method_id, method_type="http", value=normalized) + if lowered.startswith("mailto:"): + normalized = _normalize_mailto_url(value) + if not normalized: + return None + method_id = f"mailto:{normalized}" + return UnsubscribeMethod(method_id=method_id, method_type="mailto", value=normalized) + return None + + +def _extract_text_blocks(payload: dict[str, Any]) -> list[str]: + blocks: list[str] = [] + + def walk(part: dict[str, Any]) -> None: + mime_type = part.get("mimeType", "") + body_data = part.get("body", {}).get("data") + if body_data and mime_type in {"text/plain", "text/html"}: + decoded = _decode_base64(body_data) + if decoded: + blocks.append(decoded) + for child in part.get("parts", []): + walk(child) + + walk(payload) + return blocks + + +def _decode_base64(data: str) -> str: + padded = data + "=" * (-len(data) % 4) + try: + return base64.urlsafe_b64decode(padded.encode("utf-8")).decode( + "utf-8", errors="replace" + ) + except Exception: + return "" + + +def _normalize_http_url(url: str) -> str | None: + cleaned = url.strip().strip(".,;)") + split = urlsplit(cleaned) + if split.scheme.lower() not in {"http", "https"} or not split.netloc: + return None + + scheme = split.scheme.lower() + netloc = split.netloc.lower() + path = split.path or "/" + if path != "/": + path = path.rstrip("/") + + query_pairs = parse_qsl(split.query, keep_blank_values=True) + filtered_pairs = [ + (key, value) + for key, value in query_pairs + if key.lower() not in TRACKING_QUERY_KEYS + ] + query = urlencode(filtered_pairs, doseq=True) + return urlunsplit((scheme, netloc, path, query, "")) + + +def _normalize_mailto_url(url: str) -> str | None: + split = urlsplit(url.strip()) + if split.scheme.lower() != "mailto": + return None + recipient = split.path.strip().lower() + if not recipient: + return None + query_pairs = parse_qsl(split.query, keep_blank_values=True) + normalized_query = urlencode(sorted(query_pairs), doseq=True) + return urlunsplit(("mailto", "", recipient, normalized_query, "")) + + +def _clean_list_id(list_id: str) -> str: + cleaned = list_id.strip().lower() + if not cleaned: + return "" + if "<" in cleaned and ">" in cleaned: + match = re.search(r"<([^>]+)>", cleaned) + if match: + cleaned = match.group(1) + return cleaned + + +def _derive_list_name(list_id: str, sender: str, sender_domain: str) -> str: + if list_id: + list_name = list_id.split(".", 1)[0].replace("-", " ").replace("_", " ").strip() + if list_name: + return list_name.title() + return list_id + + display_name = parseaddr(sender)[0].strip() + if display_name and len(display_name) > 2: + return display_name + return sender_domain + + +def _looks_like_unsubscribe(url: str) -> bool: + lowered = url.lower() + return any(hint in lowered for hint in UNSUBSCRIBE_HINTS) + + +def _first_query_value(values: dict[str, list[str]], key: str) -> str: + for candidate_key, candidate_values in values.items(): + if candidate_key.lower() != key.lower(): + continue + if candidate_values: + return candidate_values[0] + return "" diff --git a/pyproject.toml b/pyproject.toml index f306a7c..63151e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,8 +9,8 @@ dependencies = [ "google-api-python-client", "google-auth", "google-auth-oauthlib", - "openai", "python-dotenv", + "strands-agents[openai]", "uvicorn[standard]", ]