From 3333a4e06dffdde7d8de6434b3d7e31cd050c18d Mon Sep 17 00:00:00 2001 From: oabrivard Date: Sun, 8 Mar 2026 12:15:32 +0100 Subject: [PATCH] Added unsubscribe email recap --- .env.example | 6 + .gitignore | 1 + README.md | 18 +++ app/config.py | 20 +++ app/google_clients.py | 2 + app/main.py | 88 +++++++++++- app/unsubscribe_agent.py | 282 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 416 insertions(+), 1 deletion(-) create mode 100644 app/unsubscribe_agent.py diff --git a/.env.example b/.env.example index d4716ac..046c986 100644 --- a/.env.example +++ b/.env.example @@ -8,4 +8,10 @@ LLM_TIMEOUT_SECONDS=20 LLM_FALLBACK_TO_RULES=false GMAIL_SCAN_INTERVAL_MINUTES=5 GMAIL_QUERY=in:inbox -label:AgentProcessed newer_than:7d +UNSUBSCRIBE_DIGEST_INTERVAL_MINUTES=1440 +UNSUBSCRIBE_QUERY=label:Advertising +UNSUBSCRIBE_MAX_RESULTS=500 +UNSUBSCRIBE_STATE_FILE=data/sent_unsubscribe_links.json +UNSUBSCRIBE_DIGEST_RECIPIENT= +UNSUBSCRIBE_SEND_EMPTY_DIGEST=false LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore index acd9139..c83fee7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ __pycache__/ *.pyc credentials.json token.json +data/ diff --git a/README.md b/README.md index 2ce9252..f3d83a0 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ This project runs a small local API service that: - classifies emails with an LLM as `LINKEDIN`, `ADVERTISING`, or `OTHER` - moves LinkedIn emails to a `LinkedIn` label/folder - moves advertising emails to an `Advertising` label/folder +- scans the `Advertising` label and emails you new unsubscribe links (deduplicated) - exposes a secure availability endpoint powered by Google Calendar free/busy ## 1) Prerequisites @@ -27,6 +28,7 @@ This project runs a small local API service that: 3. Save it in this project as `credentials.json`. The first run opens a browser window for consent and creates `token.json`. +If your existing token was created before `gmail.send` was added, you may be prompted again. ## 3) Install and configure @@ -39,6 +41,7 @@ Edit `.env` and set: - `AGENT_API_KEY` to a strong secret for agent-to-agent calls - `LLM_API_KEY` and optional `LLM_MODEL` / `LLM_BASE_URL` +- optional unsubscribe digest settings (`UNSUBSCRIBE_*`) - optional scan frequency and Gmail query ## 4) Run @@ -79,6 +82,13 @@ curl -X POST "http://127.0.0.1:8000/availability" \ If `available` is `true`, there are no busy slots in that range. +### Manual unsubscribe digest + +```bash +curl -X POST "http://127.0.0.1:8000/unsubscribe-digest?max_results=500" \ + -H "X-API-Key: your-secret" +``` + ## Classification behavior - LLM classification is used for each email (`LINKEDIN`, `ADVERTISING`, `OTHER`). @@ -86,6 +96,14 @@ If `available` is `true`, there are no busy slots in that range. - Set `LLM_FALLBACK_TO_RULES=true` only if you want rules-based backup when LLM calls fail. - Every scanned message gets an `AgentProcessed` label to avoid reprocessing loops. +## Unsubscribe digest behavior + +- Reads emails from `UNSUBSCRIBE_QUERY` (default `label:Advertising`). +- Extracts unsubscribe URLs from `List-Unsubscribe` headers and message content. +- Removes duplicates within the run and across runs. +- Persists already sent links in `UNSUBSCRIBE_STATE_FILE`. +- Sends only new links by email, unless `UNSUBSCRIBE_SEND_EMPTY_DIGEST=true`. + ## Notes - Gmail "folders" are labels. This agent creates: diff --git a/app/config.py b/app/config.py index fbe0aa7..2ce3e06 100644 --- a/app/config.py +++ b/app/config.py @@ -7,6 +7,7 @@ load_dotenv() GOOGLE_SCOPES = ( "https://www.googleapis.com/auth/gmail.modify", + "https://www.googleapis.com/auth/gmail.send", "https://www.googleapis.com/auth/calendar.readonly", ) @@ -23,11 +24,18 @@ class Settings: llm_base_url: str | None llm_timeout_seconds: float llm_fallback_to_rules: bool + unsubscribe_digest_interval_minutes: int + unsubscribe_query: str + unsubscribe_max_results: int + unsubscribe_state_file: str + unsubscribe_digest_recipient: str | None + unsubscribe_send_empty_digest: bool log_level: str def get_settings() -> Settings: llm_base_url = os.getenv("LLM_BASE_URL", "").strip() + unsubscribe_digest_recipient = os.getenv("UNSUBSCRIBE_DIGEST_RECIPIENT", "").strip() return Settings( google_client_secrets_file=os.getenv("GOOGLE_CLIENT_SECRETS_FILE", "credentials.json"), google_token_file=os.getenv("GOOGLE_TOKEN_FILE", "token.json"), @@ -41,6 +49,18 @@ def get_settings() -> Settings: llm_base_url=llm_base_url or None, llm_timeout_seconds=float(os.getenv("LLM_TIMEOUT_SECONDS", "20")), llm_fallback_to_rules=_as_bool(os.getenv("LLM_FALLBACK_TO_RULES", "false")), + unsubscribe_digest_interval_minutes=int( + os.getenv("UNSUBSCRIBE_DIGEST_INTERVAL_MINUTES", "1440") + ), + unsubscribe_query=os.getenv("UNSUBSCRIBE_QUERY", "label:Advertising"), + unsubscribe_max_results=int(os.getenv("UNSUBSCRIBE_MAX_RESULTS", "500")), + unsubscribe_state_file=os.getenv( + "UNSUBSCRIBE_STATE_FILE", "data/sent_unsubscribe_links.json" + ), + unsubscribe_digest_recipient=unsubscribe_digest_recipient or None, + unsubscribe_send_empty_digest=_as_bool( + os.getenv("UNSUBSCRIBE_SEND_EMPTY_DIGEST", "false") + ), log_level=os.getenv("LOG_LEVEL", "INFO"), ) diff --git a/app/google_clients.py b/app/google_clients.py index 8e7477c..559976f 100644 --- a/app/google_clients.py +++ b/app/google_clients.py @@ -15,6 +15,8 @@ def get_google_credentials(settings: Settings) -> Credentials: creds = Credentials.from_authorized_user_file( settings.google_token_file, GOOGLE_SCOPES ) + if not creds.has_scopes(GOOGLE_SCOPES): + creds = None if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: diff --git a/app/main.py b/app/main.py index 1a83df6..5563106 100644 --- a/app/main.py +++ b/app/main.py @@ -14,6 +14,7 @@ from app.config import get_settings from app.gmail_agent import GmailTriageAgent from app.google_clients import build_calendar_service, build_gmail_service from app.llm_classifier import LLMEmailClassifier +from app.unsubscribe_agent import UnsubscribeDigestAgent settings = get_settings() logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO)) @@ -22,6 +23,7 @@ logger = logging.getLogger("personal-agent") app = FastAPI(title="Personal Agent", version="0.1.0") scheduler: AsyncIOScheduler | None = None scan_lock: asyncio.Lock | None = None +unsubscribe_lock: asyncio.Lock | None = None llm_key_warning_logged = False @@ -53,6 +55,14 @@ class AvailabilityResponse(BaseModel): checked_calendars: list[str] +class UnsubscribeDigestResponse(BaseModel): + scanned_messages: int + extracted_unique_links: int + new_links: int + sent_to: str | None + email_sent: bool + + def verify_api_key( x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, authorization: Annotated[str | None, Header()] = None, @@ -91,6 +101,26 @@ def _run_scan_once(max_results: int) -> ScanResponse: ) +def _run_unsubscribe_digest_once(max_results: int) -> UnsubscribeDigestResponse: + bounded_max_results = max(1, min(max_results, 500)) + gmail_service = build_gmail_service(settings) + unsubscribe_agent = UnsubscribeDigestAgent( + gmail_service=gmail_service, + query=settings.unsubscribe_query, + state_file=settings.unsubscribe_state_file, + recipient_email=settings.unsubscribe_digest_recipient, + send_empty_digest=settings.unsubscribe_send_empty_digest, + ) + result = unsubscribe_agent.scan_and_send_digest(max_results=bounded_max_results) + return UnsubscribeDigestResponse( + scanned_messages=result.scanned_messages, + extracted_unique_links=result.extracted_unique_links, + new_links=result.new_links, + sent_to=result.sent_to, + email_sent=result.email_sent, + ) + + def _build_llm_classifier() -> LLMEmailClassifier | None: global llm_key_warning_logged @@ -127,6 +157,13 @@ def _get_scan_lock() -> asyncio.Lock: return scan_lock +def _get_unsubscribe_lock() -> asyncio.Lock: + global unsubscribe_lock + if unsubscribe_lock is None: + unsubscribe_lock = asyncio.Lock() + return unsubscribe_lock + + async def _scheduled_scan() -> None: lock = _get_scan_lock() if lock.locked(): @@ -141,10 +178,27 @@ async def _scheduled_scan() -> None: logger.exception("Scheduled scan failed") +async def _scheduled_unsubscribe_digest() -> None: + lock = _get_unsubscribe_lock() + if lock.locked(): + logger.info("Previous unsubscribe digest still running, skipping this tick.") + return + + async with lock: + try: + result = await asyncio.to_thread( + _run_unsubscribe_digest_once, settings.unsubscribe_max_results + ) + logger.info("Scheduled unsubscribe digest complete: %s", result.model_dump()) + except Exception: + logger.exception("Scheduled unsubscribe digest failed") + + @app.on_event("startup") async def startup_event() -> None: global scheduler _get_scan_lock() + _get_unsubscribe_lock() scheduler = AsyncIOScheduler() scheduler.add_job( _scheduled_scan, @@ -152,9 +206,17 @@ async def startup_event() -> None: minutes=settings.gmail_scan_interval_minutes, next_run_time=datetime.now(), ) + scheduler.add_job( + _scheduled_unsubscribe_digest, + "interval", + minutes=settings.unsubscribe_digest_interval_minutes, + next_run_time=datetime.now(), + ) scheduler.start() logger.info( - "Scheduler started (interval=%s min)", settings.gmail_scan_interval_minutes + "Scheduler started (scan interval=%s min, unsubscribe interval=%s min)", + settings.gmail_scan_interval_minutes, + settings.unsubscribe_digest_interval_minutes, ) @@ -169,6 +231,7 @@ def health() -> dict[str, object]: return { "status": "ok", "scan_interval_minutes": settings.gmail_scan_interval_minutes, + "unsubscribe_digest_interval_minutes": settings.unsubscribe_digest_interval_minutes, } @@ -227,3 +290,26 @@ async def availability(request: AvailabilityRequest) -> AvailabilityResponse: status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Availability lookup failed: {exc}", ) from exc + + +@app.post( + "/unsubscribe-digest", + response_model=UnsubscribeDigestResponse, + dependencies=[Depends(verify_api_key)], +) +async def unsubscribe_digest_now( + max_results: int = Query(default=settings.unsubscribe_max_results, ge=1, le=500), +) -> UnsubscribeDigestResponse: + async with _get_unsubscribe_lock(): + try: + return await asyncio.to_thread(_run_unsubscribe_digest_once, max_results) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(exc), + ) from exc + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Unsubscribe digest failed: {exc}", + ) from exc diff --git a/app/unsubscribe_agent.py b/app/unsubscribe_agent.py new file mode 100644 index 0000000..7e62e28 --- /dev/null +++ b/app/unsubscribe_agent.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import base64 +from dataclasses import dataclass +from datetime import datetime, timezone +from email.message import EmailMessage +import html +import json +import logging +from pathlib import Path +import re +from typing import Any +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit + +logger = logging.getLogger("personal-agent.unsubscribe") + +TRACKING_QUERY_KEYS = { + "fbclid", + "gclid", + "mc_cid", + "mc_eid", + "_hsenc", + "_hsmi", + "utm_campaign", + "utm_content", + "utm_id", + "utm_medium", + "utm_name", + "utm_source", + "utm_term", +} + +UNSUBSCRIBE_HINTS = { + "unsubscribe", + "optout", + "opt-out", + "email-preferences", + "manage-subscriptions", +} + +URL_PATTERN = re.compile(r"https?://[^\s<>'\"()]+", re.IGNORECASE) + + +@dataclass(frozen=True) +class UnsubscribeDigestResult: + scanned_messages: int + extracted_unique_links: int + new_links: int + sent_to: str | None + email_sent: bool + + +class UnsubscribeDigestAgent: + def __init__( + self, + *, + gmail_service: Any, + query: str, + state_file: str, + recipient_email: str | None = None, + send_empty_digest: bool = False, + ) -> None: + self.gmail_service = gmail_service + self.query = query + self.state_file = Path(state_file) + self.recipient_email = recipient_email + self.send_empty_digest = send_empty_digest + + def scan_and_send_digest(self, max_results: int = 500) -> UnsubscribeDigestResult: + messages = ( + self.gmail_service.users() + .messages() + .list(userId="me", q=self.query, maxResults=max_results) + .execute() + .get("messages", []) + ) + + extracted_links: set[str] = set() + for message in messages: + extracted_links.update(self._extract_links_from_message(message["id"])) + + sent_links = self._load_sent_links() + new_links = sorted(link for link in extracted_links if link not in sent_links) + + should_send = bool(new_links) or self.send_empty_digest + sent_to: str | None = None + email_sent = False + + if should_send: + sent_to = self._resolve_recipient_email() + self._send_digest_email( + recipient_email=sent_to, + new_links=new_links, + scanned_messages=len(messages), + ) + email_sent = True + + if new_links: + sent_links.update(new_links) + self._save_sent_links(sent_links) + + return UnsubscribeDigestResult( + scanned_messages=len(messages), + extracted_unique_links=len(extracted_links), + new_links=len(new_links), + sent_to=sent_to, + email_sent=email_sent, + ) + + def _extract_links_from_message(self, message_id: str) -> set[str]: + message = ( + self.gmail_service.users() + .messages() + .get(userId="me", id=message_id, format="full") + .execute() + ) + payload = message.get("payload", {}) + links: set[str] = set() + + for url in self._extract_list_unsubscribe_links(payload): + normalized = _normalize_url(url) + if normalized: + links.add(normalized) + + for text_block in self._extract_text_blocks(payload): + for url in URL_PATTERN.findall(html.unescape(text_block)): + if not _looks_like_unsubscribe(url): + continue + normalized = _normalize_url(url) + if normalized: + links.add(normalized) + + return links + + def _extract_list_unsubscribe_links(self, payload: dict[str, Any]) -> set[str]: + headers = { + header.get("name", "").lower(): header.get("value", "") + for header in payload.get("headers", []) + } + header_value = headers.get("list-unsubscribe", "") + if not header_value: + return set() + + results: set[str] = set() + for candidate in re.findall(r"<([^>]+)>", header_value): + candidate = candidate.strip() + if candidate.lower().startswith(("http://", "https://")): + results.add(candidate) + + if not results: + for token in header_value.split(","): + candidate = token.strip().strip("<>").strip() + if candidate.lower().startswith(("http://", "https://")): + results.add(candidate) + + return results + + def _extract_text_blocks(self, payload: dict[str, Any]) -> list[str]: + blocks: list[str] = [] + + def walk(part: dict[str, Any]) -> None: + mime_type = part.get("mimeType", "") + body_data = part.get("body", {}).get("data") + if body_data and mime_type in {"text/plain", "text/html"}: + decoded = _decode_base64(body_data) + if decoded: + blocks.append(decoded) + + for child in part.get("parts", []): + walk(child) + + walk(payload) + return blocks + + def _resolve_recipient_email(self) -> str: + if self.recipient_email: + return self.recipient_email + + profile = self.gmail_service.users().getProfile(userId="me").execute() + email = profile.get("emailAddress", "").strip() + if not email: + raise RuntimeError( + "Could not resolve recipient email. Set UNSUBSCRIBE_DIGEST_RECIPIENT in .env." + ) + return email + + def _send_digest_email( + self, + *, + recipient_email: str, + new_links: list[str], + scanned_messages: int, + ) -> None: + now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + subject = f"Unsubscribe recap: {len(new_links)} new link(s)" + + body_lines = [ + "Here is your unsubscribe digest.", + "", + f"Generated at: {now_utc}", + f"Advertising messages scanned: {scanned_messages}", + f"New unsubscribe links: {len(new_links)}", + "", + ] + if new_links: + body_lines.append("Links:") + body_lines.extend([f"- {link}" for link in new_links]) + else: + body_lines.append("No new unsubscribe links found.") + + message = EmailMessage() + message["To"] = recipient_email + message["Subject"] = subject + message.set_content("\n".join(body_lines)) + + raw = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8") + ( + self.gmail_service.users() + .messages() + .send(userId="me", body={"raw": raw}) + .execute() + ) + + def _load_sent_links(self) -> set[str]: + if not self.state_file.exists(): + return set() + + try: + payload = json.loads(self.state_file.read_text(encoding="utf-8")) + except json.JSONDecodeError: + logger.warning("State file is invalid JSON: %s", self.state_file) + return set() + + links = payload.get("sent_links", []) + if not isinstance(links, list): + return set() + return {str(link) for link in links if str(link).strip()} + + def _save_sent_links(self, links: set[str]) -> None: + self.state_file.parent.mkdir(parents=True, exist_ok=True) + self.state_file.write_text( + json.dumps({"sent_links": sorted(links)}, indent=2), + encoding="utf-8", + ) + + +def _decode_base64(data: str) -> str: + padded = data + "=" * (-len(data) % 4) + try: + return base64.urlsafe_b64decode(padded.encode("utf-8")).decode( + "utf-8", errors="replace" + ) + except Exception: + return "" + + +def _looks_like_unsubscribe(url: str) -> bool: + lowered = url.lower() + return any(hint in lowered for hint in UNSUBSCRIBE_HINTS) + + +def _normalize_url(url: str) -> str | None: + cleaned = url.strip().strip(".,;)") + split = urlsplit(cleaned) + if split.scheme.lower() not in {"http", "https"} or not split.netloc: + return None + + scheme = split.scheme.lower() + netloc = split.netloc.lower() + path = split.path or "/" + if path != "/": + path = path.rstrip("/") + + query_pairs = parse_qsl(split.query, keep_blank_values=True) + filtered_pairs = [ + (key, value) + for key, value in query_pairs + if key.lower() not in TRACKING_QUERY_KEYS + ] + query = urlencode(filtered_pairs, doseq=True) + + return urlunsplit((scheme, netloc, path, query, ""))