You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.unsubscribe_agent import UnsubscribeDigestAgent
|
|
|
|
|
|
def _b64url_text(value: str) -> str:
|
|
return base64.urlsafe_b64encode(value.encode("utf-8")).decode("utf-8").rstrip("=")
|
|
|
|
|
|
class _Executable:
|
|
def __init__(self, callback):
|
|
self._callback = callback
|
|
|
|
def execute(self): # type: ignore[no-untyped-def]
|
|
return self._callback()
|
|
|
|
|
|
class _FakeMessagesApi:
|
|
def __init__(self, message_payload_by_id: dict[str, dict[str, Any]]) -> None:
|
|
self._message_payload_by_id = message_payload_by_id
|
|
self.sent_messages: list[dict[str, Any]] = []
|
|
|
|
def list(self, userId: str, q: str, maxResults: int): # type: ignore[no-untyped-def]
|
|
message_ids = [{"id": key} for key in self._message_payload_by_id.keys()]
|
|
return _Executable(lambda: {"messages": message_ids[:maxResults]})
|
|
|
|
def get(self, userId: str, id: str, format: str): # type: ignore[no-untyped-def]
|
|
return _Executable(lambda: self._message_payload_by_id[id])
|
|
|
|
def send(self, userId: str, body: dict[str, Any]): # type: ignore[no-untyped-def]
|
|
self.sent_messages.append(body)
|
|
return _Executable(lambda: {"id": "sent-1"})
|
|
|
|
|
|
class _FakeUsersApi:
|
|
def __init__(self, messages_api: _FakeMessagesApi) -> None:
|
|
self._messages_api = messages_api
|
|
|
|
def messages(self) -> _FakeMessagesApi:
|
|
return self._messages_api
|
|
|
|
def getProfile(self, userId: str): # type: ignore[no-untyped-def]
|
|
return _Executable(lambda: {"emailAddress": "owner@example.com"})
|
|
|
|
|
|
class _FakeGmailService:
|
|
def __init__(self, payload_by_id: dict[str, dict[str, Any]]) -> None:
|
|
self.messages_api = _FakeMessagesApi(payload_by_id)
|
|
self.users_api = _FakeUsersApi(self.messages_api)
|
|
|
|
def users(self) -> _FakeUsersApi:
|
|
return self.users_api
|
|
|
|
|
|
def test_unsubscribe_digest_deduplicates_and_persists_state(tmp_path: Path) -> None:
|
|
unsubscribe_url_1 = "https://example.com/unsubscribe?u=abc&utm_source=mail"
|
|
unsubscribe_url_2 = "https://example.com/unsubscribe?fbclid=tracking&u=abc"
|
|
|
|
message_payloads = {
|
|
"m1": {
|
|
"payload": {
|
|
"headers": [
|
|
{"name": "List-Unsubscribe", "value": f"<{unsubscribe_url_1}>"},
|
|
],
|
|
"mimeType": "text/plain",
|
|
"body": {"data": _b64url_text(f"Unsubscribe here: {unsubscribe_url_1}")},
|
|
}
|
|
},
|
|
"m2": {
|
|
"payload": {
|
|
"headers": [],
|
|
"mimeType": "text/plain",
|
|
"body": {"data": _b64url_text(f"Click to unsubscribe: {unsubscribe_url_2}")},
|
|
}
|
|
},
|
|
}
|
|
state_file = tmp_path / "sent_links.json"
|
|
service = _FakeGmailService(message_payloads)
|
|
agent = UnsubscribeDigestAgent(
|
|
gmail_service=service,
|
|
query="label:Advertising",
|
|
state_file=str(state_file),
|
|
recipient_email="owner@example.com",
|
|
send_empty_digest=False,
|
|
)
|
|
|
|
first = agent.scan_and_send_digest(max_results=50)
|
|
second = agent.scan_and_send_digest(max_results=50)
|
|
|
|
assert first.scanned_messages == 2
|
|
assert first.extracted_unique_links == 1
|
|
assert first.new_links == 1
|
|
assert first.email_sent is True
|
|
|
|
assert second.scanned_messages == 2
|
|
assert second.extracted_unique_links == 1
|
|
assert second.new_links == 0
|
|
assert second.email_sent is False
|
|
|
|
assert len(service.messages_api.sent_messages) == 1
|
|
persisted = json.loads(state_file.read_text(encoding="utf-8"))
|
|
assert persisted["sent_links"] == ["https://example.com/unsubscribe?u=abc"]
|