You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
581 lines
19 KiB
Python
581 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
from dataclasses import dataclass
|
|
from email.message import EmailMessage
|
|
from email.utils import parseaddr
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
import re
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import parse_qs, parse_qsl, urlencode, urlsplit, urlunsplit
|
|
from urllib.request import Request, urlopen
|
|
|
|
logger = logging.getLogger("personal-agent.unsubscribe.hil")
|
|
|
|
TRACKING_QUERY_KEYS = {
|
|
"fbclid",
|
|
"gclid",
|
|
"mc_cid",
|
|
"mc_eid",
|
|
"_hsenc",
|
|
"_hsmi",
|
|
"utm_campaign",
|
|
"utm_content",
|
|
"utm_id",
|
|
"utm_medium",
|
|
"utm_name",
|
|
"utm_source",
|
|
"utm_term",
|
|
}
|
|
|
|
UNSUBSCRIBE_HINTS = {
|
|
"unsubscribe",
|
|
"optout",
|
|
"opt-out",
|
|
"email-preferences",
|
|
"manage-subscriptions",
|
|
}
|
|
|
|
URL_PATTERN = re.compile(r"https?://[^\s<>'\"()]+", re.IGNORECASE)
|
|
|
|
METADATA_HEADERS = [
|
|
"From",
|
|
"Subject",
|
|
"List-Id",
|
|
"List-Unsubscribe",
|
|
]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class UnsubscribeMethod:
|
|
method_id: str
|
|
method_type: str
|
|
value: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MailingListCandidate:
|
|
candidate_id: str
|
|
list_key: str
|
|
list_name: str
|
|
sender_domain: str
|
|
message_count: int
|
|
sample_senders: list[str]
|
|
sample_subjects: list[str]
|
|
methods: list[UnsubscribeMethod]
|
|
approved: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CandidateSnapshot:
|
|
scanned_messages: int
|
|
candidates: list[MailingListCandidate]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MethodExecutionResult:
|
|
candidate_id: str
|
|
list_name: str
|
|
method_id: str
|
|
method_type: str
|
|
value: str
|
|
success: bool
|
|
detail: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class UnsubscribeExecutionResult:
|
|
scanned_messages: int
|
|
candidates_considered: int
|
|
selected_candidates: int
|
|
executed_methods: int
|
|
skipped_already_executed: int
|
|
failed_methods: int
|
|
updated_approved_count: int
|
|
results: list[MethodExecutionResult]
|
|
|
|
|
|
@dataclass
|
|
class _UnsubscribeState:
|
|
approved_list_keys: set[str]
|
|
executed_methods: set[str]
|
|
|
|
|
|
class UnsubscribeHumanLoopAgent:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
gmail_service: Any,
|
|
query: str,
|
|
state_file: str,
|
|
http_timeout_seconds: float = 12.0,
|
|
user_agent: str = "Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0)",
|
|
) -> None:
|
|
self.gmail_service = gmail_service
|
|
self.query = query
|
|
self.state_file = Path(state_file)
|
|
self.http_timeout_seconds = http_timeout_seconds
|
|
self.user_agent = user_agent
|
|
|
|
def discover_candidates(self, max_results: int = 500) -> CandidateSnapshot:
|
|
state = self._load_state()
|
|
scanned_messages = self._list_message_refs(max_results=max_results)
|
|
candidates = self._build_candidates(
|
|
scanned_messages, approved_list_keys=state.approved_list_keys
|
|
)
|
|
return CandidateSnapshot(
|
|
scanned_messages=len(scanned_messages),
|
|
candidates=candidates,
|
|
)
|
|
|
|
def execute_selected(
|
|
self,
|
|
*,
|
|
selected_candidate_ids: list[str],
|
|
max_results: int = 500,
|
|
remember_selection: bool = True,
|
|
) -> UnsubscribeExecutionResult:
|
|
state = self._load_state()
|
|
message_refs = self._list_message_refs(max_results=max_results)
|
|
candidates = self._build_candidates(message_refs, approved_list_keys=state.approved_list_keys)
|
|
return self._execute_candidates(
|
|
selected_candidate_ids=selected_candidate_ids,
|
|
candidates=candidates,
|
|
state=state,
|
|
remember_selection=remember_selection,
|
|
scanned_messages=len(message_refs),
|
|
)
|
|
|
|
def execute_for_approved(self, max_results: int = 500) -> UnsubscribeExecutionResult:
|
|
state = self._load_state()
|
|
message_refs = self._list_message_refs(max_results=max_results)
|
|
candidates = self._build_candidates(message_refs, approved_list_keys=state.approved_list_keys)
|
|
approved_ids = [
|
|
candidate.candidate_id for candidate in candidates if candidate.list_key in state.approved_list_keys
|
|
]
|
|
return self._execute_candidates(
|
|
selected_candidate_ids=approved_ids,
|
|
candidates=candidates,
|
|
state=state,
|
|
remember_selection=False,
|
|
scanned_messages=len(message_refs),
|
|
)
|
|
|
|
def _execute_candidates(
|
|
self,
|
|
*,
|
|
selected_candidate_ids: list[str],
|
|
candidates: list[MailingListCandidate],
|
|
state: _UnsubscribeState,
|
|
remember_selection: bool,
|
|
scanned_messages: int,
|
|
) -> UnsubscribeExecutionResult:
|
|
selected_ids = {candidate_id.strip() for candidate_id in selected_candidate_ids if candidate_id}
|
|
|
|
selected = [candidate for candidate in candidates if candidate.candidate_id in selected_ids]
|
|
if remember_selection:
|
|
for candidate in selected:
|
|
state.approved_list_keys.add(candidate.list_key)
|
|
|
|
results: list[MethodExecutionResult] = []
|
|
executed = 0
|
|
skipped = 0
|
|
failed = 0
|
|
|
|
for candidate in selected:
|
|
for method in candidate.methods:
|
|
if method.method_id in state.executed_methods:
|
|
skipped += 1
|
|
results.append(
|
|
MethodExecutionResult(
|
|
candidate_id=candidate.candidate_id,
|
|
list_name=candidate.list_name,
|
|
method_id=method.method_id,
|
|
method_type=method.method_type,
|
|
value=method.value,
|
|
success=True,
|
|
detail="Already executed previously, skipped.",
|
|
)
|
|
)
|
|
continue
|
|
|
|
success, detail = self._execute_method(method)
|
|
if success:
|
|
state.executed_methods.add(method.method_id)
|
|
executed += 1
|
|
else:
|
|
failed += 1
|
|
results.append(
|
|
MethodExecutionResult(
|
|
candidate_id=candidate.candidate_id,
|
|
list_name=candidate.list_name,
|
|
method_id=method.method_id,
|
|
method_type=method.method_type,
|
|
value=method.value,
|
|
success=success,
|
|
detail=detail,
|
|
)
|
|
)
|
|
|
|
self._save_state(state)
|
|
return UnsubscribeExecutionResult(
|
|
scanned_messages=scanned_messages,
|
|
candidates_considered=len(candidates),
|
|
selected_candidates=len(selected),
|
|
executed_methods=executed,
|
|
skipped_already_executed=skipped,
|
|
failed_methods=failed,
|
|
updated_approved_count=len(state.approved_list_keys),
|
|
results=results,
|
|
)
|
|
|
|
def _list_message_refs(self, max_results: int) -> list[dict[str, str]]:
|
|
bounded_max_results = max(1, min(max_results, 500))
|
|
return (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.list(userId="me", q=self.query, maxResults=bounded_max_results)
|
|
.execute()
|
|
.get("messages", [])
|
|
)
|
|
|
|
def _build_candidates(
|
|
self,
|
|
message_refs: list[dict[str, str]],
|
|
*,
|
|
approved_list_keys: set[str],
|
|
) -> list[MailingListCandidate]:
|
|
groups: dict[str, dict[str, Any]] = {}
|
|
|
|
for message_ref in message_refs:
|
|
message_id = message_ref["id"]
|
|
metadata = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.get(
|
|
userId="me",
|
|
id=message_id,
|
|
format="metadata",
|
|
metadataHeaders=METADATA_HEADERS,
|
|
)
|
|
.execute()
|
|
)
|
|
headers = {
|
|
header.get("name", "").lower(): header.get("value", "")
|
|
for header in metadata.get("payload", {}).get("headers", [])
|
|
}
|
|
sender = headers.get("from", "")
|
|
subject = headers.get("subject", "")
|
|
list_id = _clean_list_id(headers.get("list-id", ""))
|
|
|
|
sender_email = parseaddr(sender)[1].lower()
|
|
sender_domain = sender_email.split("@")[-1] if "@" in sender_email else "unknown"
|
|
list_key = list_id or sender_domain or "unknown"
|
|
list_name = _derive_list_name(list_id=list_id, sender=sender, sender_domain=sender_domain)
|
|
|
|
methods = self._extract_methods_from_message(
|
|
message_id=message_id,
|
|
list_unsubscribe_header=headers.get("list-unsubscribe", ""),
|
|
)
|
|
if not methods:
|
|
continue
|
|
|
|
group = groups.setdefault(
|
|
list_key,
|
|
{
|
|
"list_name": list_name,
|
|
"sender_domain": sender_domain,
|
|
"message_count": 0,
|
|
"sample_senders": [],
|
|
"sample_subjects": [],
|
|
"methods": {},
|
|
},
|
|
)
|
|
|
|
group["message_count"] += 1
|
|
if sender and sender not in group["sample_senders"] and len(group["sample_senders"]) < 3:
|
|
group["sample_senders"].append(sender)
|
|
if subject and subject not in group["sample_subjects"] and len(group["sample_subjects"]) < 5:
|
|
group["sample_subjects"].append(subject)
|
|
|
|
for method in methods:
|
|
group["methods"][method.method_id] = method
|
|
|
|
candidates: list[MailingListCandidate] = []
|
|
for list_key, group in groups.items():
|
|
candidate_id = hashlib.sha1(list_key.encode("utf-8")).hexdigest()[:12]
|
|
methods = sorted(group["methods"].values(), key=lambda method: method.method_id)
|
|
candidates.append(
|
|
MailingListCandidate(
|
|
candidate_id=candidate_id,
|
|
list_key=list_key,
|
|
list_name=group["list_name"],
|
|
sender_domain=group["sender_domain"],
|
|
message_count=group["message_count"],
|
|
sample_senders=group["sample_senders"],
|
|
sample_subjects=group["sample_subjects"],
|
|
methods=methods,
|
|
approved=list_key in approved_list_keys,
|
|
)
|
|
)
|
|
|
|
candidates.sort(key=lambda candidate: candidate.message_count, reverse=True)
|
|
return candidates
|
|
|
|
def _extract_methods_from_message(
|
|
self,
|
|
*,
|
|
message_id: str,
|
|
list_unsubscribe_header: str,
|
|
) -> list[UnsubscribeMethod]:
|
|
methods_by_id: dict[str, UnsubscribeMethod] = {}
|
|
|
|
for raw_value in _extract_list_unsubscribe_values(list_unsubscribe_header):
|
|
method = _make_unsubscribe_method(raw_value)
|
|
if method:
|
|
methods_by_id[method.method_id] = method
|
|
|
|
if methods_by_id:
|
|
return sorted(methods_by_id.values(), key=lambda method: method.method_id)
|
|
|
|
full_message = (
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.get(userId="me", id=message_id, format="full")
|
|
.execute()
|
|
)
|
|
payload = full_message.get("payload", {})
|
|
for text_block in _extract_text_blocks(payload):
|
|
for url in URL_PATTERN.findall(html.unescape(text_block)):
|
|
if not _looks_like_unsubscribe(url):
|
|
continue
|
|
method = _make_unsubscribe_method(url)
|
|
if method:
|
|
methods_by_id[method.method_id] = method
|
|
|
|
return sorted(methods_by_id.values(), key=lambda method: method.method_id)
|
|
|
|
def _execute_method(self, method: UnsubscribeMethod) -> tuple[bool, str]:
|
|
if method.method_type == "http":
|
|
return self._execute_http_method(method.value)
|
|
if method.method_type == "mailto":
|
|
return self._execute_mailto_method(method.value)
|
|
return False, f"Unsupported unsubscribe method type: {method.method_type}"
|
|
|
|
def _execute_http_method(self, url: str) -> tuple[bool, str]:
|
|
request = Request(
|
|
url=url,
|
|
headers={
|
|
"User-Agent": self.user_agent,
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
},
|
|
method="GET",
|
|
)
|
|
try:
|
|
with urlopen(request, timeout=self.http_timeout_seconds) as response:
|
|
status = response.getcode()
|
|
success = 200 <= status < 400
|
|
return success, f"HTTP {status}"
|
|
except HTTPError as exc:
|
|
return False, f"HTTP {exc.code}"
|
|
except URLError as exc:
|
|
return False, f"Network error: {exc.reason}"
|
|
except Exception as exc:
|
|
return False, f"Unexpected error: {exc}"
|
|
|
|
def _execute_mailto_method(self, mailto_url: str) -> tuple[bool, str]:
|
|
split = urlsplit(mailto_url)
|
|
recipient = split.path.strip()
|
|
if not recipient:
|
|
return False, "Invalid mailto URL: missing recipient."
|
|
|
|
query = parse_qs(split.query, keep_blank_values=True)
|
|
subject = _first_query_value(query, "subject") or "Unsubscribe request"
|
|
body = _first_query_value(query, "body") or (
|
|
"Please unsubscribe this email address from this mailing list."
|
|
)
|
|
|
|
message = EmailMessage()
|
|
message["To"] = recipient
|
|
message["Subject"] = subject
|
|
message.set_content(body)
|
|
raw = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
|
|
|
|
try:
|
|
(
|
|
self.gmail_service.users()
|
|
.messages()
|
|
.send(userId="me", body={"raw": raw})
|
|
.execute()
|
|
)
|
|
return True, "Sent mailto unsubscribe request via Gmail API."
|
|
except Exception as exc:
|
|
return False, f"Failed to send mailto unsubscribe request: {exc}"
|
|
|
|
def _load_state(self) -> _UnsubscribeState:
|
|
if not self.state_file.exists():
|
|
return _UnsubscribeState(approved_list_keys=set(), executed_methods=set())
|
|
|
|
try:
|
|
payload = json.loads(self.state_file.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
logger.warning("State file is invalid JSON: %s", self.state_file)
|
|
return _UnsubscribeState(approved_list_keys=set(), executed_methods=set())
|
|
|
|
approved = payload.get("approved_list_keys", [])
|
|
executed = payload.get("executed_methods", [])
|
|
return _UnsubscribeState(
|
|
approved_list_keys={str(item) for item in approved if str(item).strip()},
|
|
executed_methods={str(item) for item in executed if str(item).strip()},
|
|
)
|
|
|
|
def _save_state(self, state: _UnsubscribeState) -> None:
|
|
self.state_file.parent.mkdir(parents=True, exist_ok=True)
|
|
self.state_file.write_text(
|
|
json.dumps(
|
|
{
|
|
"approved_list_keys": sorted(state.approved_list_keys),
|
|
"executed_methods": sorted(state.executed_methods),
|
|
},
|
|
indent=2,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def _extract_list_unsubscribe_values(header_value: str) -> list[str]:
|
|
if not header_value:
|
|
return []
|
|
bracketed = [value.strip() for value in re.findall(r"<([^>]+)>", header_value)]
|
|
if bracketed:
|
|
return [value for value in bracketed if value]
|
|
|
|
values: list[str] = []
|
|
for token in header_value.split(","):
|
|
candidate = token.strip().strip("<>").strip()
|
|
if candidate:
|
|
values.append(candidate)
|
|
return values
|
|
|
|
|
|
def _make_unsubscribe_method(raw_value: str) -> UnsubscribeMethod | None:
|
|
value = raw_value.strip().strip(",")
|
|
lowered = value.lower()
|
|
if lowered.startswith(("http://", "https://")):
|
|
normalized = _normalize_http_url(value)
|
|
if not normalized:
|
|
return None
|
|
method_id = f"http:{normalized}"
|
|
return UnsubscribeMethod(method_id=method_id, method_type="http", value=normalized)
|
|
if lowered.startswith("mailto:"):
|
|
normalized = _normalize_mailto_url(value)
|
|
if not normalized:
|
|
return None
|
|
method_id = f"mailto:{normalized}"
|
|
return UnsubscribeMethod(method_id=method_id, method_type="mailto", value=normalized)
|
|
return None
|
|
|
|
|
|
def _extract_text_blocks(payload: dict[str, Any]) -> list[str]:
|
|
blocks: list[str] = []
|
|
|
|
def walk(part: dict[str, Any]) -> None:
|
|
mime_type = part.get("mimeType", "")
|
|
body_data = part.get("body", {}).get("data")
|
|
if body_data and mime_type in {"text/plain", "text/html"}:
|
|
decoded = _decode_base64(body_data)
|
|
if decoded:
|
|
blocks.append(decoded)
|
|
for child in part.get("parts", []):
|
|
walk(child)
|
|
|
|
walk(payload)
|
|
return blocks
|
|
|
|
|
|
def _decode_base64(data: str) -> str:
|
|
padded = data + "=" * (-len(data) % 4)
|
|
try:
|
|
return base64.urlsafe_b64decode(padded.encode("utf-8")).decode(
|
|
"utf-8", errors="replace"
|
|
)
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _normalize_http_url(url: str) -> str | None:
|
|
cleaned = url.strip().strip(".,;)")
|
|
split = urlsplit(cleaned)
|
|
if split.scheme.lower() not in {"http", "https"} or not split.netloc:
|
|
return None
|
|
|
|
scheme = split.scheme.lower()
|
|
netloc = split.netloc.lower()
|
|
path = split.path or "/"
|
|
if path != "/":
|
|
path = path.rstrip("/")
|
|
|
|
query_pairs = parse_qsl(split.query, keep_blank_values=True)
|
|
filtered_pairs = [
|
|
(key, value)
|
|
for key, value in query_pairs
|
|
if key.lower() not in TRACKING_QUERY_KEYS
|
|
]
|
|
query = urlencode(filtered_pairs, doseq=True)
|
|
return urlunsplit((scheme, netloc, path, query, ""))
|
|
|
|
|
|
def _normalize_mailto_url(url: str) -> str | None:
|
|
split = urlsplit(url.strip())
|
|
if split.scheme.lower() != "mailto":
|
|
return None
|
|
recipient = split.path.strip().lower()
|
|
if not recipient:
|
|
return None
|
|
query_pairs = parse_qsl(split.query, keep_blank_values=True)
|
|
normalized_query = urlencode(sorted(query_pairs), doseq=True)
|
|
return urlunsplit(("mailto", "", recipient, normalized_query, ""))
|
|
|
|
|
|
def _clean_list_id(list_id: str) -> str:
|
|
cleaned = list_id.strip().lower()
|
|
if not cleaned:
|
|
return ""
|
|
if "<" in cleaned and ">" in cleaned:
|
|
match = re.search(r"<([^>]+)>", cleaned)
|
|
if match:
|
|
cleaned = match.group(1)
|
|
return cleaned
|
|
|
|
|
|
def _derive_list_name(list_id: str, sender: str, sender_domain: str) -> str:
|
|
if list_id:
|
|
list_name = list_id.split(".", 1)[0].replace("-", " ").replace("_", " ").strip()
|
|
if list_name:
|
|
return list_name.title()
|
|
return list_id
|
|
|
|
display_name = parseaddr(sender)[0].strip()
|
|
if display_name and len(display_name) > 2:
|
|
return display_name
|
|
return sender_domain
|
|
|
|
|
|
def _looks_like_unsubscribe(url: str) -> bool:
|
|
lowered = url.lower()
|
|
return any(hint in lowered for hint in UNSUBSCRIBE_HINTS)
|
|
|
|
|
|
def _first_query_value(values: dict[str, list[str]], key: str) -> str:
|
|
for candidate_key, candidate_values in values.items():
|
|
if candidate_key.lower() != key.lower():
|
|
continue
|
|
if candidate_values:
|
|
return candidate_values[0]
|
|
return ""
|