You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

283 lines
8.4 KiB
Python

from __future__ import annotations
import base64
from dataclasses import dataclass
from datetime import datetime, timezone
from email.message import EmailMessage
import html
import json
import logging
from pathlib import Path
import re
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
logger = logging.getLogger("personal-agent.unsubscribe")
TRACKING_QUERY_KEYS = {
"fbclid",
"gclid",
"mc_cid",
"mc_eid",
"_hsenc",
"_hsmi",
"utm_campaign",
"utm_content",
"utm_id",
"utm_medium",
"utm_name",
"utm_source",
"utm_term",
}
UNSUBSCRIBE_HINTS = {
"unsubscribe",
"optout",
"opt-out",
"email-preferences",
"manage-subscriptions",
}
URL_PATTERN = re.compile(r"https?://[^\s<>'\"()]+", re.IGNORECASE)
@dataclass(frozen=True)
class UnsubscribeDigestResult:
scanned_messages: int
extracted_unique_links: int
new_links: int
sent_to: str | None
email_sent: bool
class UnsubscribeDigestAgent:
def __init__(
self,
*,
gmail_service: Any,
query: str,
state_file: str,
recipient_email: str | None = None,
send_empty_digest: bool = False,
) -> None:
self.gmail_service = gmail_service
self.query = query
self.state_file = Path(state_file)
self.recipient_email = recipient_email
self.send_empty_digest = send_empty_digest
def scan_and_send_digest(self, max_results: int = 500) -> UnsubscribeDigestResult:
messages = (
self.gmail_service.users()
.messages()
.list(userId="me", q=self.query, maxResults=max_results)
.execute()
.get("messages", [])
)
extracted_links: set[str] = set()
for message in messages:
extracted_links.update(self._extract_links_from_message(message["id"]))
sent_links = self._load_sent_links()
new_links = sorted(link for link in extracted_links if link not in sent_links)
should_send = bool(new_links) or self.send_empty_digest
sent_to: str | None = None
email_sent = False
if should_send:
sent_to = self._resolve_recipient_email()
self._send_digest_email(
recipient_email=sent_to,
new_links=new_links,
scanned_messages=len(messages),
)
email_sent = True
if new_links:
sent_links.update(new_links)
self._save_sent_links(sent_links)
return UnsubscribeDigestResult(
scanned_messages=len(messages),
extracted_unique_links=len(extracted_links),
new_links=len(new_links),
sent_to=sent_to,
email_sent=email_sent,
)
def _extract_links_from_message(self, message_id: str) -> set[str]:
message = (
self.gmail_service.users()
.messages()
.get(userId="me", id=message_id, format="full")
.execute()
)
payload = message.get("payload", {})
links: set[str] = set()
for url in self._extract_list_unsubscribe_links(payload):
normalized = _normalize_url(url)
if normalized:
links.add(normalized)
for text_block in self._extract_text_blocks(payload):
for url in URL_PATTERN.findall(html.unescape(text_block)):
if not _looks_like_unsubscribe(url):
continue
normalized = _normalize_url(url)
if normalized:
links.add(normalized)
return links
def _extract_list_unsubscribe_links(self, payload: dict[str, Any]) -> set[str]:
headers = {
header.get("name", "").lower(): header.get("value", "")
for header in payload.get("headers", [])
}
header_value = headers.get("list-unsubscribe", "")
if not header_value:
return set()
results: set[str] = set()
for candidate in re.findall(r"<([^>]+)>", header_value):
candidate = candidate.strip()
if candidate.lower().startswith(("http://", "https://")):
results.add(candidate)
if not results:
for token in header_value.split(","):
candidate = token.strip().strip("<>").strip()
if candidate.lower().startswith(("http://", "https://")):
results.add(candidate)
return results
def _extract_text_blocks(self, payload: dict[str, Any]) -> list[str]:
blocks: list[str] = []
def walk(part: dict[str, Any]) -> None:
mime_type = part.get("mimeType", "")
body_data = part.get("body", {}).get("data")
if body_data and mime_type in {"text/plain", "text/html"}:
decoded = _decode_base64(body_data)
if decoded:
blocks.append(decoded)
for child in part.get("parts", []):
walk(child)
walk(payload)
return blocks
def _resolve_recipient_email(self) -> str:
if self.recipient_email:
return self.recipient_email
profile = self.gmail_service.users().getProfile(userId="me").execute()
email = profile.get("emailAddress", "").strip()
if not email:
raise RuntimeError(
"Could not resolve recipient email. Set UNSUBSCRIBE_DIGEST_RECIPIENT in .env."
)
return email
def _send_digest_email(
self,
*,
recipient_email: str,
new_links: list[str],
scanned_messages: int,
) -> None:
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
subject = f"Unsubscribe recap: {len(new_links)} new link(s)"
body_lines = [
"Here is your unsubscribe digest.",
"",
f"Generated at: {now_utc}",
f"Advertising messages scanned: {scanned_messages}",
f"New unsubscribe links: {len(new_links)}",
"",
]
if new_links:
body_lines.append("Links:")
body_lines.extend([f"- {link}" for link in new_links])
else:
body_lines.append("No new unsubscribe links found.")
message = EmailMessage()
message["To"] = recipient_email
message["Subject"] = subject
message.set_content("\n".join(body_lines))
raw = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
(
self.gmail_service.users()
.messages()
.send(userId="me", body={"raw": raw})
.execute()
)
def _load_sent_links(self) -> set[str]:
if not self.state_file.exists():
return set()
try:
payload = json.loads(self.state_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
logger.warning("State file is invalid JSON: %s", self.state_file)
return set()
links = payload.get("sent_links", [])
if not isinstance(links, list):
return set()
return {str(link) for link in links if str(link).strip()}
def _save_sent_links(self, links: set[str]) -> None:
self.state_file.parent.mkdir(parents=True, exist_ok=True)
self.state_file.write_text(
json.dumps({"sent_links": sorted(links)}, indent=2),
encoding="utf-8",
)
def _decode_base64(data: str) -> str:
padded = data + "=" * (-len(data) % 4)
try:
return base64.urlsafe_b64decode(padded.encode("utf-8")).decode(
"utf-8", errors="replace"
)
except Exception:
return ""
def _looks_like_unsubscribe(url: str) -> bool:
lowered = url.lower()
return any(hint in lowered for hint in UNSUBSCRIBE_HINTS)
def _normalize_url(url: str) -> str | None:
cleaned = url.strip().strip(".,;)")
split = urlsplit(cleaned)
if split.scheme.lower() not in {"http", "https"} or not split.netloc:
return None
scheme = split.scheme.lower()
netloc = split.netloc.lower()
path = split.path or "/"
if path != "/":
path = path.rstrip("/")
query_pairs = parse_qsl(split.query, keep_blank_values=True)
filtered_pairs = [
(key, value)
for key, value in query_pairs
if key.lower() not in TRACKING_QUERY_KEYS
]
query = urlencode(filtered_pairs, doseq=True)
return urlunsplit((scheme, netloc, path, query, ""))