from __future__ import annotations import base64 from dataclasses import dataclass from email.message import EmailMessage from email.utils import parseaddr import hashlib import html import json import logging from pathlib import Path import re from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import parse_qs, parse_qsl, urlencode, urlsplit, urlunsplit from urllib.request import Request, urlopen logger = logging.getLogger("personal-agent.unsubscribe.hil") TRACKING_QUERY_KEYS = { "fbclid", "gclid", "mc_cid", "mc_eid", "_hsenc", "_hsmi", "utm_campaign", "utm_content", "utm_id", "utm_medium", "utm_name", "utm_source", "utm_term", } UNSUBSCRIBE_HINTS = { "unsubscribe", "optout", "opt-out", "email-preferences", "manage-subscriptions", } URL_PATTERN = re.compile(r"https?://[^\s<>'\"()]+", re.IGNORECASE) METADATA_HEADERS = [ "From", "Subject", "List-Id", "List-Unsubscribe", ] @dataclass(frozen=True) class UnsubscribeMethod: method_id: str method_type: str value: str @dataclass(frozen=True) class MailingListCandidate: candidate_id: str list_key: str list_name: str sender_domain: str message_count: int sample_senders: list[str] sample_subjects: list[str] methods: list[UnsubscribeMethod] approved: bool @dataclass(frozen=True) class CandidateSnapshot: scanned_messages: int candidates: list[MailingListCandidate] @dataclass(frozen=True) class MethodExecutionResult: candidate_id: str list_name: str method_id: str method_type: str value: str success: bool detail: str @dataclass(frozen=True) class UnsubscribeExecutionResult: scanned_messages: int candidates_considered: int selected_candidates: int executed_methods: int skipped_already_executed: int failed_methods: int updated_approved_count: int results: list[MethodExecutionResult] @dataclass class _UnsubscribeState: approved_list_keys: set[str] executed_methods: set[str] class UnsubscribeHumanLoopAgent: def __init__( self, *, gmail_service: Any, query: str, state_file: str, http_timeout_seconds: float = 12.0, user_agent: str = "Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0)", ) -> None: self.gmail_service = gmail_service self.query = query self.state_file = Path(state_file) self.http_timeout_seconds = http_timeout_seconds self.user_agent = user_agent def discover_candidates(self, max_results: int = 500) -> CandidateSnapshot: state = self._load_state() scanned_messages = self._list_message_refs(max_results=max_results) candidates = self._build_candidates( scanned_messages, approved_list_keys=state.approved_list_keys ) return CandidateSnapshot( scanned_messages=len(scanned_messages), candidates=candidates, ) def execute_selected( self, *, selected_candidate_ids: list[str], max_results: int = 500, remember_selection: bool = True, ) -> UnsubscribeExecutionResult: state = self._load_state() message_refs = self._list_message_refs(max_results=max_results) candidates = self._build_candidates(message_refs, approved_list_keys=state.approved_list_keys) return self._execute_candidates( selected_candidate_ids=selected_candidate_ids, candidates=candidates, state=state, remember_selection=remember_selection, scanned_messages=len(message_refs), ) def execute_for_approved(self, max_results: int = 500) -> UnsubscribeExecutionResult: state = self._load_state() message_refs = self._list_message_refs(max_results=max_results) candidates = self._build_candidates(message_refs, approved_list_keys=state.approved_list_keys) approved_ids = [ candidate.candidate_id for candidate in candidates if candidate.list_key in state.approved_list_keys ] return self._execute_candidates( selected_candidate_ids=approved_ids, candidates=candidates, state=state, remember_selection=False, scanned_messages=len(message_refs), ) def _execute_candidates( self, *, selected_candidate_ids: list[str], candidates: list[MailingListCandidate], state: _UnsubscribeState, remember_selection: bool, scanned_messages: int, ) -> UnsubscribeExecutionResult: selected_ids = {candidate_id.strip() for candidate_id in selected_candidate_ids if candidate_id} selected = [candidate for candidate in candidates if candidate.candidate_id in selected_ids] if remember_selection: for candidate in selected: state.approved_list_keys.add(candidate.list_key) results: list[MethodExecutionResult] = [] executed = 0 skipped = 0 failed = 0 for candidate in selected: for method in candidate.methods: if method.method_id in state.executed_methods: skipped += 1 results.append( MethodExecutionResult( candidate_id=candidate.candidate_id, list_name=candidate.list_name, method_id=method.method_id, method_type=method.method_type, value=method.value, success=True, detail="Already executed previously, skipped.", ) ) continue success, detail = self._execute_method(method) if success: state.executed_methods.add(method.method_id) executed += 1 else: failed += 1 results.append( MethodExecutionResult( candidate_id=candidate.candidate_id, list_name=candidate.list_name, method_id=method.method_id, method_type=method.method_type, value=method.value, success=success, detail=detail, ) ) self._save_state(state) return UnsubscribeExecutionResult( scanned_messages=scanned_messages, candidates_considered=len(candidates), selected_candidates=len(selected), executed_methods=executed, skipped_already_executed=skipped, failed_methods=failed, updated_approved_count=len(state.approved_list_keys), results=results, ) def _list_message_refs(self, max_results: int) -> list[dict[str, str]]: bounded_max_results = max(1, min(max_results, 500)) return ( self.gmail_service.users() .messages() .list(userId="me", q=self.query, maxResults=bounded_max_results) .execute() .get("messages", []) ) def _build_candidates( self, message_refs: list[dict[str, str]], *, approved_list_keys: set[str], ) -> list[MailingListCandidate]: groups: dict[str, dict[str, Any]] = {} for message_ref in message_refs: message_id = message_ref["id"] metadata = ( self.gmail_service.users() .messages() .get( userId="me", id=message_id, format="metadata", metadataHeaders=METADATA_HEADERS, ) .execute() ) headers = { header.get("name", "").lower(): header.get("value", "") for header in metadata.get("payload", {}).get("headers", []) } sender = headers.get("from", "") subject = headers.get("subject", "") list_id = _clean_list_id(headers.get("list-id", "")) sender_email = parseaddr(sender)[1].lower() sender_domain = sender_email.split("@")[-1] if "@" in sender_email else "unknown" list_key = list_id or sender_domain or "unknown" list_name = _derive_list_name(list_id=list_id, sender=sender, sender_domain=sender_domain) methods = self._extract_methods_from_message( message_id=message_id, list_unsubscribe_header=headers.get("list-unsubscribe", ""), ) if not methods: continue group = groups.setdefault( list_key, { "list_name": list_name, "sender_domain": sender_domain, "message_count": 0, "sample_senders": [], "sample_subjects": [], "methods": {}, }, ) group["message_count"] += 1 if sender and sender not in group["sample_senders"] and len(group["sample_senders"]) < 3: group["sample_senders"].append(sender) if subject and subject not in group["sample_subjects"] and len(group["sample_subjects"]) < 5: group["sample_subjects"].append(subject) for method in methods: group["methods"][method.method_id] = method candidates: list[MailingListCandidate] = [] for list_key, group in groups.items(): candidate_id = hashlib.sha1(list_key.encode("utf-8")).hexdigest()[:12] methods = sorted(group["methods"].values(), key=lambda method: method.method_id) candidates.append( MailingListCandidate( candidate_id=candidate_id, list_key=list_key, list_name=group["list_name"], sender_domain=group["sender_domain"], message_count=group["message_count"], sample_senders=group["sample_senders"], sample_subjects=group["sample_subjects"], methods=methods, approved=list_key in approved_list_keys, ) ) candidates.sort(key=lambda candidate: candidate.message_count, reverse=True) return candidates def _extract_methods_from_message( self, *, message_id: str, list_unsubscribe_header: str, ) -> list[UnsubscribeMethod]: methods_by_id: dict[str, UnsubscribeMethod] = {} for raw_value in _extract_list_unsubscribe_values(list_unsubscribe_header): method = _make_unsubscribe_method(raw_value) if method: methods_by_id[method.method_id] = method if methods_by_id: return sorted(methods_by_id.values(), key=lambda method: method.method_id) full_message = ( self.gmail_service.users() .messages() .get(userId="me", id=message_id, format="full") .execute() ) payload = full_message.get("payload", {}) for text_block in _extract_text_blocks(payload): for url in URL_PATTERN.findall(html.unescape(text_block)): if not _looks_like_unsubscribe(url): continue method = _make_unsubscribe_method(url) if method: methods_by_id[method.method_id] = method return sorted(methods_by_id.values(), key=lambda method: method.method_id) def _execute_method(self, method: UnsubscribeMethod) -> tuple[bool, str]: if method.method_type == "http": return self._execute_http_method(method.value) if method.method_type == "mailto": return self._execute_mailto_method(method.value) return False, f"Unsupported unsubscribe method type: {method.method_type}" def _execute_http_method(self, url: str) -> tuple[bool, str]: request = Request( url=url, headers={ "User-Agent": self.user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", }, method="GET", ) try: with urlopen(request, timeout=self.http_timeout_seconds) as response: status = response.getcode() success = 200 <= status < 400 return success, f"HTTP {status}" except HTTPError as exc: return False, f"HTTP {exc.code}" except URLError as exc: return False, f"Network error: {exc.reason}" except Exception as exc: return False, f"Unexpected error: {exc}" def _execute_mailto_method(self, mailto_url: str) -> tuple[bool, str]: split = urlsplit(mailto_url) recipient = split.path.strip() if not recipient: return False, "Invalid mailto URL: missing recipient." query = parse_qs(split.query, keep_blank_values=True) subject = _first_query_value(query, "subject") or "Unsubscribe request" body = _first_query_value(query, "body") or ( "Please unsubscribe this email address from this mailing list." ) message = EmailMessage() message["To"] = recipient message["Subject"] = subject message.set_content(body) raw = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8") try: ( self.gmail_service.users() .messages() .send(userId="me", body={"raw": raw}) .execute() ) return True, "Sent mailto unsubscribe request via Gmail API." except Exception as exc: return False, f"Failed to send mailto unsubscribe request: {exc}" def _load_state(self) -> _UnsubscribeState: if not self.state_file.exists(): return _UnsubscribeState(approved_list_keys=set(), executed_methods=set()) try: payload = json.loads(self.state_file.read_text(encoding="utf-8")) except json.JSONDecodeError: logger.warning("State file is invalid JSON: %s", self.state_file) return _UnsubscribeState(approved_list_keys=set(), executed_methods=set()) approved = payload.get("approved_list_keys", []) executed = payload.get("executed_methods", []) return _UnsubscribeState( approved_list_keys={str(item) for item in approved if str(item).strip()}, executed_methods={str(item) for item in executed if str(item).strip()}, ) def _save_state(self, state: _UnsubscribeState) -> None: self.state_file.parent.mkdir(parents=True, exist_ok=True) self.state_file.write_text( json.dumps( { "approved_list_keys": sorted(state.approved_list_keys), "executed_methods": sorted(state.executed_methods), }, indent=2, ), encoding="utf-8", ) def _extract_list_unsubscribe_values(header_value: str) -> list[str]: if not header_value: return [] bracketed = [value.strip() for value in re.findall(r"<([^>]+)>", header_value)] if bracketed: return [value for value in bracketed if value] values: list[str] = [] for token in header_value.split(","): candidate = token.strip().strip("<>").strip() if candidate: values.append(candidate) return values def _make_unsubscribe_method(raw_value: str) -> UnsubscribeMethod | None: value = raw_value.strip().strip(",") lowered = value.lower() if lowered.startswith(("http://", "https://")): normalized = _normalize_http_url(value) if not normalized: return None method_id = f"http:{normalized}" return UnsubscribeMethod(method_id=method_id, method_type="http", value=normalized) if lowered.startswith("mailto:"): normalized = _normalize_mailto_url(value) if not normalized: return None method_id = f"mailto:{normalized}" return UnsubscribeMethod(method_id=method_id, method_type="mailto", value=normalized) return None def _extract_text_blocks(payload: dict[str, Any]) -> list[str]: blocks: list[str] = [] def walk(part: dict[str, Any]) -> None: mime_type = part.get("mimeType", "") body_data = part.get("body", {}).get("data") if body_data and mime_type in {"text/plain", "text/html"}: decoded = _decode_base64(body_data) if decoded: blocks.append(decoded) for child in part.get("parts", []): walk(child) walk(payload) return blocks def _decode_base64(data: str) -> str: padded = data + "=" * (-len(data) % 4) try: return base64.urlsafe_b64decode(padded.encode("utf-8")).decode( "utf-8", errors="replace" ) except Exception: return "" def _normalize_http_url(url: str) -> str | None: cleaned = url.strip().strip(".,;)") split = urlsplit(cleaned) if split.scheme.lower() not in {"http", "https"} or not split.netloc: return None scheme = split.scheme.lower() netloc = split.netloc.lower() path = split.path or "/" if path != "/": path = path.rstrip("/") query_pairs = parse_qsl(split.query, keep_blank_values=True) filtered_pairs = [ (key, value) for key, value in query_pairs if key.lower() not in TRACKING_QUERY_KEYS ] query = urlencode(filtered_pairs, doseq=True) return urlunsplit((scheme, netloc, path, query, "")) def _normalize_mailto_url(url: str) -> str | None: split = urlsplit(url.strip()) if split.scheme.lower() != "mailto": return None recipient = split.path.strip().lower() if not recipient: return None query_pairs = parse_qsl(split.query, keep_blank_values=True) normalized_query = urlencode(sorted(query_pairs), doseq=True) return urlunsplit(("mailto", "", recipient, normalized_query, "")) def _clean_list_id(list_id: str) -> str: cleaned = list_id.strip().lower() if not cleaned: return "" if "<" in cleaned and ">" in cleaned: match = re.search(r"<([^>]+)>", cleaned) if match: cleaned = match.group(1) return cleaned def _derive_list_name(list_id: str, sender: str, sender_domain: str) -> str: if list_id: list_name = list_id.split(".", 1)[0].replace("-", " ").replace("_", " ").strip() if list_name: return list_name.title() return list_id display_name = parseaddr(sender)[0].strip() if display_name and len(display_name) > 2: return display_name return sender_domain def _looks_like_unsubscribe(url: str) -> bool: lowered = url.lower() return any(hint in lowered for hint in UNSUBSCRIBE_HINTS) def _first_query_value(values: dict[str, list[str]], key: str) -> str: for candidate_key, candidate_values in values.items(): if candidate_key.lower() != key.lower(): continue if candidate_values: return candidate_values[0] return ""