Added unsubscribe email recap
parent
14942a88cc
commit
3333a4e06d
@ -0,0 +1,282 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from email.message import EmailMessage
|
||||
import html
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import re
|
||||
from typing import Any
|
||||
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
|
||||
|
||||
logger = logging.getLogger("personal-agent.unsubscribe")
|
||||
|
||||
TRACKING_QUERY_KEYS = {
|
||||
"fbclid",
|
||||
"gclid",
|
||||
"mc_cid",
|
||||
"mc_eid",
|
||||
"_hsenc",
|
||||
"_hsmi",
|
||||
"utm_campaign",
|
||||
"utm_content",
|
||||
"utm_id",
|
||||
"utm_medium",
|
||||
"utm_name",
|
||||
"utm_source",
|
||||
"utm_term",
|
||||
}
|
||||
|
||||
UNSUBSCRIBE_HINTS = {
|
||||
"unsubscribe",
|
||||
"optout",
|
||||
"opt-out",
|
||||
"email-preferences",
|
||||
"manage-subscriptions",
|
||||
}
|
||||
|
||||
URL_PATTERN = re.compile(r"https?://[^\s<>'\"()]+", re.IGNORECASE)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class UnsubscribeDigestResult:
|
||||
scanned_messages: int
|
||||
extracted_unique_links: int
|
||||
new_links: int
|
||||
sent_to: str | None
|
||||
email_sent: bool
|
||||
|
||||
|
||||
class UnsubscribeDigestAgent:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
gmail_service: Any,
|
||||
query: str,
|
||||
state_file: str,
|
||||
recipient_email: str | None = None,
|
||||
send_empty_digest: bool = False,
|
||||
) -> None:
|
||||
self.gmail_service = gmail_service
|
||||
self.query = query
|
||||
self.state_file = Path(state_file)
|
||||
self.recipient_email = recipient_email
|
||||
self.send_empty_digest = send_empty_digest
|
||||
|
||||
def scan_and_send_digest(self, max_results: int = 500) -> UnsubscribeDigestResult:
|
||||
messages = (
|
||||
self.gmail_service.users()
|
||||
.messages()
|
||||
.list(userId="me", q=self.query, maxResults=max_results)
|
||||
.execute()
|
||||
.get("messages", [])
|
||||
)
|
||||
|
||||
extracted_links: set[str] = set()
|
||||
for message in messages:
|
||||
extracted_links.update(self._extract_links_from_message(message["id"]))
|
||||
|
||||
sent_links = self._load_sent_links()
|
||||
new_links = sorted(link for link in extracted_links if link not in sent_links)
|
||||
|
||||
should_send = bool(new_links) or self.send_empty_digest
|
||||
sent_to: str | None = None
|
||||
email_sent = False
|
||||
|
||||
if should_send:
|
||||
sent_to = self._resolve_recipient_email()
|
||||
self._send_digest_email(
|
||||
recipient_email=sent_to,
|
||||
new_links=new_links,
|
||||
scanned_messages=len(messages),
|
||||
)
|
||||
email_sent = True
|
||||
|
||||
if new_links:
|
||||
sent_links.update(new_links)
|
||||
self._save_sent_links(sent_links)
|
||||
|
||||
return UnsubscribeDigestResult(
|
||||
scanned_messages=len(messages),
|
||||
extracted_unique_links=len(extracted_links),
|
||||
new_links=len(new_links),
|
||||
sent_to=sent_to,
|
||||
email_sent=email_sent,
|
||||
)
|
||||
|
||||
def _extract_links_from_message(self, message_id: str) -> set[str]:
|
||||
message = (
|
||||
self.gmail_service.users()
|
||||
.messages()
|
||||
.get(userId="me", id=message_id, format="full")
|
||||
.execute()
|
||||
)
|
||||
payload = message.get("payload", {})
|
||||
links: set[str] = set()
|
||||
|
||||
for url in self._extract_list_unsubscribe_links(payload):
|
||||
normalized = _normalize_url(url)
|
||||
if normalized:
|
||||
links.add(normalized)
|
||||
|
||||
for text_block in self._extract_text_blocks(payload):
|
||||
for url in URL_PATTERN.findall(html.unescape(text_block)):
|
||||
if not _looks_like_unsubscribe(url):
|
||||
continue
|
||||
normalized = _normalize_url(url)
|
||||
if normalized:
|
||||
links.add(normalized)
|
||||
|
||||
return links
|
||||
|
||||
def _extract_list_unsubscribe_links(self, payload: dict[str, Any]) -> set[str]:
|
||||
headers = {
|
||||
header.get("name", "").lower(): header.get("value", "")
|
||||
for header in payload.get("headers", [])
|
||||
}
|
||||
header_value = headers.get("list-unsubscribe", "")
|
||||
if not header_value:
|
||||
return set()
|
||||
|
||||
results: set[str] = set()
|
||||
for candidate in re.findall(r"<([^>]+)>", header_value):
|
||||
candidate = candidate.strip()
|
||||
if candidate.lower().startswith(("http://", "https://")):
|
||||
results.add(candidate)
|
||||
|
||||
if not results:
|
||||
for token in header_value.split(","):
|
||||
candidate = token.strip().strip("<>").strip()
|
||||
if candidate.lower().startswith(("http://", "https://")):
|
||||
results.add(candidate)
|
||||
|
||||
return results
|
||||
|
||||
def _extract_text_blocks(self, payload: dict[str, Any]) -> list[str]:
|
||||
blocks: list[str] = []
|
||||
|
||||
def walk(part: dict[str, Any]) -> None:
|
||||
mime_type = part.get("mimeType", "")
|
||||
body_data = part.get("body", {}).get("data")
|
||||
if body_data and mime_type in {"text/plain", "text/html"}:
|
||||
decoded = _decode_base64(body_data)
|
||||
if decoded:
|
||||
blocks.append(decoded)
|
||||
|
||||
for child in part.get("parts", []):
|
||||
walk(child)
|
||||
|
||||
walk(payload)
|
||||
return blocks
|
||||
|
||||
def _resolve_recipient_email(self) -> str:
|
||||
if self.recipient_email:
|
||||
return self.recipient_email
|
||||
|
||||
profile = self.gmail_service.users().getProfile(userId="me").execute()
|
||||
email = profile.get("emailAddress", "").strip()
|
||||
if not email:
|
||||
raise RuntimeError(
|
||||
"Could not resolve recipient email. Set UNSUBSCRIBE_DIGEST_RECIPIENT in .env."
|
||||
)
|
||||
return email
|
||||
|
||||
def _send_digest_email(
|
||||
self,
|
||||
*,
|
||||
recipient_email: str,
|
||||
new_links: list[str],
|
||||
scanned_messages: int,
|
||||
) -> None:
|
||||
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
subject = f"Unsubscribe recap: {len(new_links)} new link(s)"
|
||||
|
||||
body_lines = [
|
||||
"Here is your unsubscribe digest.",
|
||||
"",
|
||||
f"Generated at: {now_utc}",
|
||||
f"Advertising messages scanned: {scanned_messages}",
|
||||
f"New unsubscribe links: {len(new_links)}",
|
||||
"",
|
||||
]
|
||||
if new_links:
|
||||
body_lines.append("Links:")
|
||||
body_lines.extend([f"- {link}" for link in new_links])
|
||||
else:
|
||||
body_lines.append("No new unsubscribe links found.")
|
||||
|
||||
message = EmailMessage()
|
||||
message["To"] = recipient_email
|
||||
message["Subject"] = subject
|
||||
message.set_content("\n".join(body_lines))
|
||||
|
||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
|
||||
(
|
||||
self.gmail_service.users()
|
||||
.messages()
|
||||
.send(userId="me", body={"raw": raw})
|
||||
.execute()
|
||||
)
|
||||
|
||||
def _load_sent_links(self) -> set[str]:
|
||||
if not self.state_file.exists():
|
||||
return set()
|
||||
|
||||
try:
|
||||
payload = json.loads(self.state_file.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("State file is invalid JSON: %s", self.state_file)
|
||||
return set()
|
||||
|
||||
links = payload.get("sent_links", [])
|
||||
if not isinstance(links, list):
|
||||
return set()
|
||||
return {str(link) for link in links if str(link).strip()}
|
||||
|
||||
def _save_sent_links(self, links: set[str]) -> None:
|
||||
self.state_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.state_file.write_text(
|
||||
json.dumps({"sent_links": sorted(links)}, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
||||
def _decode_base64(data: str) -> str:
|
||||
padded = data + "=" * (-len(data) % 4)
|
||||
try:
|
||||
return base64.urlsafe_b64decode(padded.encode("utf-8")).decode(
|
||||
"utf-8", errors="replace"
|
||||
)
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _looks_like_unsubscribe(url: str) -> bool:
|
||||
lowered = url.lower()
|
||||
return any(hint in lowered for hint in UNSUBSCRIBE_HINTS)
|
||||
|
||||
|
||||
def _normalize_url(url: str) -> str | None:
|
||||
cleaned = url.strip().strip(".,;)")
|
||||
split = urlsplit(cleaned)
|
||||
if split.scheme.lower() not in {"http", "https"} or not split.netloc:
|
||||
return None
|
||||
|
||||
scheme = split.scheme.lower()
|
||||
netloc = split.netloc.lower()
|
||||
path = split.path or "/"
|
||||
if path != "/":
|
||||
path = path.rstrip("/")
|
||||
|
||||
query_pairs = parse_qsl(split.query, keep_blank_values=True)
|
||||
filtered_pairs = [
|
||||
(key, value)
|
||||
for key, value in query_pairs
|
||||
if key.lower() not in TRACKING_QUERY_KEYS
|
||||
]
|
||||
query = urlencode(filtered_pairs, doseq=True)
|
||||
|
||||
return urlunsplit((scheme, netloc, path, query, ""))
|
||||
Loading…
Reference in New Issue