diff --git a/tests/test_a2a_http_integration.py b/tests/test_a2a_http_integration.py new file mode 100644 index 0000000..649845c --- /dev/null +++ b/tests/test_a2a_http_integration.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from dataclasses import replace +from types import SimpleNamespace + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +import app.a2a.router as a2a_module +from app.config import get_settings +from app.security.auth import AuthBackend + + +class _DummyCoreService: + def check_availability( + self, + start: str, + end: str, + calendar_ids: list[str] | None, + ) -> SimpleNamespace: + checked = calendar_ids or ["primary"] + return SimpleNamespace( + start=start, + end=end, + available=True, + busy_slots=[], + checked_calendars=checked, + ) + + +def _build_test_app() -> FastAPI: + app = FastAPI() + app.include_router(a2a_module.router) + return app + + +def test_a2a_agent_card_endpoint(monkeypatch) -> None: + monkeypatch.setattr(a2a_module, "core_service", _DummyCoreService()) + app = _build_test_app() + + with TestClient(app) as client: + response = client.get("/.well-known/agent-card.json") + + assert response.status_code == 200 + assert response.headers["A2A-Version"] == "1.0" + payload = response.json() + assert payload["protocolVersion"] == "1.0" + assert payload["url"].endswith("/a2a/rpc") + + +def test_a2a_send_message_requires_auth(monkeypatch) -> None: + auth_settings = replace( + get_settings(), + auth_mode="api_key", + agent_api_key="integration-key", + auth_jwt_secret="", + ) + monkeypatch.setattr(a2a_module, "auth_backend", AuthBackend(auth_settings)) + monkeypatch.setattr(a2a_module, "core_service", _DummyCoreService()) + app = _build_test_app() + + with TestClient(app) as client: + response = client.post( + "/a2a/rpc", + json={ + "jsonrpc": "2.0", + "id": "r1", + "method": "SendMessage", + "params": { + "start": "2026-03-10T09:00:00+01:00", + "end": "2026-03-10T10:00:00+01:00", + }, + }, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["error"]["code"] == -32001 + + +def test_a2a_send_message_with_api_key(monkeypatch) -> None: + auth_settings = replace( + get_settings(), + auth_mode="api_key", + agent_api_key="integration-key", + auth_jwt_secret="", + ) + monkeypatch.setattr(a2a_module, "auth_backend", AuthBackend(auth_settings)) + monkeypatch.setattr(a2a_module, "core_service", _DummyCoreService()) + app = _build_test_app() + + with TestClient(app) as client: + response = client.post( + "/a2a/rpc", + headers={"X-API-Key": "integration-key"}, + json={ + "jsonrpc": "2.0", + "id": "r2", + "method": "SendMessage", + "params": { + "start": "2026-03-10T09:00:00+01:00", + "end": "2026-03-10T10:00:00+01:00", + "calendar_ids": ["primary"], + }, + }, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["error"] is None + assert payload["result"]["availability"]["available"] is True + assert payload["result"]["availability"]["checked_calendars"] == ["primary"] diff --git a/tests/test_auth_backend.py b/tests/test_auth_backend.py new file mode 100644 index 0000000..e4c520e --- /dev/null +++ b/tests/test_auth_backend.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import base64 +from dataclasses import replace +import hashlib +import hmac +import json +import time + +import pytest +from fastapi import HTTPException + +from app.config import get_settings +from app.security.auth import AuthBackend + + +def _make_settings(**overrides: object): + return replace(get_settings(), **overrides) + + +def _build_jwt(secret: str, claims: dict[str, object]) -> str: + header = {"alg": "HS256", "typ": "JWT"} + header_b64 = _b64url_json(header) + payload_b64 = _b64url_json(claims) + signing_input = f"{header_b64}.{payload_b64}".encode("utf-8") + signature = hmac.new(secret.encode("utf-8"), signing_input, hashlib.sha256).digest() + signature_b64 = base64.urlsafe_b64encode(signature).decode("utf-8").rstrip("=") + return f"{header_b64}.{payload_b64}.{signature_b64}" + + +def _b64url_json(value: dict[str, object]) -> str: + raw = json.dumps(value, separators=(",", ":"), sort_keys=True).encode("utf-8") + return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=") + + +def test_auth_backend_api_key_mode_accepts_x_api_key() -> None: + settings = _make_settings( + auth_mode="api_key", + agent_api_key="test-api-key", + auth_jwt_secret="", + ) + backend = AuthBackend(settings=settings) + + context = backend.authenticate( + x_api_key="test-api-key", + authorization=None, + required_scopes={"availability:read"}, + ) + + assert context.auth_type == "api_key" + assert context.subject == "api-key" + assert "*" in context.scopes + + +def test_auth_backend_api_key_mode_rejects_invalid_key() -> None: + settings = _make_settings( + auth_mode="api_key", + agent_api_key="expected", + auth_jwt_secret="", + ) + backend = AuthBackend(settings=settings) + + with pytest.raises(HTTPException) as exc_info: + backend.authenticate( + x_api_key="wrong", + authorization=None, + required_scopes={"availability:read"}, + ) + + assert exc_info.value.status_code == 401 + assert str(exc_info.value.detail) == "Invalid API key." + + +def test_auth_backend_jwt_mode_validates_scope_and_claims() -> None: + secret = "jwt-secret" + settings = _make_settings( + auth_mode="jwt", + auth_jwt_secret=secret, + auth_jwt_issuer="https://issuer.example", + auth_jwt_audience="personal-agent", + agent_api_key="", + ) + backend = AuthBackend(settings=settings) + token = _build_jwt( + secret=secret, + claims={ + "sub": "agent-123", + "iss": "https://issuer.example", + "aud": "personal-agent", + "scope": "availability:read unsubscribe:read", + "exp": int(time.time()) + 3600, + }, + ) + + context = backend.authenticate( + x_api_key=None, + authorization=f"Bearer {token}", + required_scopes={"availability:read"}, + ) + + assert context.auth_type == "jwt" + assert context.subject == "agent-123" + assert "availability:read" in context.scopes + + +def test_auth_backend_jwt_mode_rejects_missing_scope() -> None: + secret = "jwt-secret" + settings = _make_settings( + auth_mode="jwt", + auth_jwt_secret=secret, + auth_jwt_issuer=None, + auth_jwt_audience=None, + agent_api_key="", + ) + backend = AuthBackend(settings=settings) + token = _build_jwt( + secret=secret, + claims={ + "sub": "agent-123", + "scope": "unsubscribe:read", + "exp": int(time.time()) + 3600, + }, + ) + + with pytest.raises(HTTPException) as exc_info: + backend.authenticate( + x_api_key=None, + authorization=f"Bearer {token}", + required_scopes={"availability:read"}, + ) + + assert exc_info.value.status_code == 403 + assert str(exc_info.value.detail) == "Missing required scope." + + +def test_auth_backend_hybrid_mode_uses_jwt_when_api_key_missing() -> None: + secret = "jwt-secret" + settings = _make_settings( + auth_mode="hybrid", + agent_api_key="expected-api-key", + auth_jwt_secret=secret, + auth_jwt_issuer=None, + auth_jwt_audience=None, + ) + backend = AuthBackend(settings=settings) + token = _build_jwt( + secret=secret, + claims={ + "sub": "fallback-jwt", + "scope": "availability:read", + "exp": int(time.time()) + 3600, + }, + ) + + context = backend.authenticate( + x_api_key=None, + authorization=f"Bearer {token}", + required_scopes={"availability:read"}, + ) + + assert context.auth_type == "jwt" + assert context.subject == "fallback-jwt" diff --git a/tests/test_gmail_agent_unit.py b/tests/test_gmail_agent_unit.py new file mode 100644 index 0000000..30ed2f8 --- /dev/null +++ b/tests/test_gmail_agent_unit.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from app.gmail_agent import GmailTriageAgent + + +class _FailingClassifier: + def classify(self, **kwargs): # type: ignore[no-untyped-def] + raise RuntimeError("model unavailable") + + +def test_build_effective_query_enforces_inbox_and_unread() -> None: + agent = GmailTriageAgent(gmail_service=object(), query="-label:AgentProcessed") + assert ( + agent._build_effective_query() + == "-label:AgentProcessed in:inbox is:unread" + ) + + +def test_build_effective_query_keeps_existing_requirements() -> None: + agent = GmailTriageAgent( + gmail_service=object(), + query="IN:INBOX is:unread -label:AgentProcessed", + ) + assert agent._build_effective_query() == "IN:INBOX is:unread -label:AgentProcessed" + + +def test_classify_email_returns_other_when_model_fails_and_no_rules_fallback() -> None: + agent = GmailTriageAgent( + gmail_service=object(), + query="", + classifier=_FailingClassifier(), # type: ignore[arg-type] + fallback_to_rules=False, + ) + + label = agent._classify_email( + message_id="m1", + sender="newsletter@example.com", + subject="50% OFF today", + snippet="promo content", + list_unsubscribe="", + precedence="bulk", + message_label_ids={"CATEGORY_PROMOTIONS"}, + ) + + assert label == "OTHER" + + +def test_classify_email_prioritizes_linkedin_over_advertising_signals() -> None: + agent = GmailTriageAgent( + gmail_service=object(), + query="", + classifier=None, + fallback_to_rules=True, + ) + + label = agent._classify_email( + message_id="m2", + sender="jobs-noreply@linkedin.com", + subject="Limited time offer for your profile", + snippet="promotional snippet", + list_unsubscribe="", + precedence="bulk", + message_label_ids={"CATEGORY_PROMOTIONS"}, + ) + + assert label == "LINKEDIN" diff --git a/tests/test_main_http_integration.py b/tests/test_main_http_integration.py new file mode 100644 index 0000000..bf88e66 --- /dev/null +++ b/tests/test_main_http_integration.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from dataclasses import replace +from types import SimpleNamespace + +from fastapi.testclient import TestClient + +import app.main as main_module +from app.config import get_settings +from app.security.auth import AuthBackend + + +class _DummyCoreService: + def scan_mailbox(self, max_results: int) -> SimpleNamespace: + return SimpleNamespace( + scanned=max_results, + linkedin=1, + advertising=2, + veille_techno=0, + skipped=3, + failed=0, + ) + + def check_availability( + self, + start: str, + end: str, + calendar_ids: list[str] | None, + ) -> SimpleNamespace: + return SimpleNamespace( + start=start, + end=end, + available=False, + busy_slots=[ + { + "calendar_id": "primary", + "start": start, + "end": end, + } + ], + checked_calendars=calendar_ids or ["primary"], + ) + + +async def _noop_task() -> None: + return None + + +def _setup_main_test_context(monkeypatch) -> None: + auth_settings = replace( + get_settings(), + auth_mode="api_key", + agent_api_key="integration-key", + auth_jwt_secret="", + ) + monkeypatch.setattr(main_module, "auth_backend", AuthBackend(auth_settings)) + monkeypatch.setattr(main_module, "core_service", _DummyCoreService()) + # Prevent scheduler jobs from executing real background work during lifespan startup. + monkeypatch.setattr(main_module, "_scheduled_scan", _noop_task) + monkeypatch.setattr(main_module, "_scheduled_unsubscribe_digest", _noop_task) + monkeypatch.setattr(main_module, "_scheduled_unsubscribe_auto", _noop_task) + + +def test_main_scan_endpoint_with_api_key(monkeypatch) -> None: + _setup_main_test_context(monkeypatch) + + with TestClient(main_module.app) as client: + response = client.post( + "/scan?max_results=15", + headers={"X-API-Key": "integration-key"}, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["scanned"] == 15 + assert payload["linkedin"] == 1 + assert payload["advertising"] == 2 + + +def test_main_availability_endpoint_rejects_missing_key(monkeypatch) -> None: + _setup_main_test_context(monkeypatch) + + with TestClient(main_module.app) as client: + response = client.post( + "/availability", + json={ + "start": "2026-03-10T09:00:00+01:00", + "end": "2026-03-10T10:00:00+01:00", + "calendar_ids": ["primary"], + }, + ) + + assert response.status_code == 401 diff --git a/tests/test_mcp_tools_auth.py b/tests/test_mcp_tools_auth.py new file mode 100644 index 0000000..392d21e --- /dev/null +++ b/tests/test_mcp_tools_auth.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from dataclasses import replace +from types import SimpleNamespace + +import pytest + +import app.mcp.tools as mcp_tools_module +from app.config import get_settings +from app.security.auth import AuthBackend + + +class _DummyCoreService: + def check_availability( + self, + start: str, + end: str, + calendar_ids: list[str] | None, + ) -> SimpleNamespace: + return SimpleNamespace( + start=start, + end=end, + available=True, + busy_slots=[], + checked_calendars=calendar_ids or ["primary"], + ) + + def scan_mailbox(self, max_results: int) -> SimpleNamespace: + return SimpleNamespace( + scanned=max_results, + linkedin=0, + advertising=0, + veille_techno=0, + skipped=0, + failed=0, + ) + + +class _DummyCtx: + def __init__(self, headers: dict[str, str]) -> None: + self.request_context = SimpleNamespace( + request=SimpleNamespace(headers=headers) + ) + + +def test_mcp_check_availability_requires_auth(monkeypatch) -> None: + auth_settings = replace( + get_settings(), + auth_mode="api_key", + agent_api_key="mcp-key", + auth_jwt_secret="", + ) + monkeypatch.setattr(mcp_tools_module, "auth_backend", AuthBackend(auth_settings)) + monkeypatch.setattr(mcp_tools_module, "core_service", _DummyCoreService()) + + with pytest.raises(PermissionError): + mcp_tools_module.check_availability( + start="2026-03-10T09:00:00+01:00", + end="2026-03-10T10:00:00+01:00", + calendar_ids=["primary"], + ctx=_DummyCtx(headers={}), + ) + + +def test_mcp_check_availability_with_api_key(monkeypatch) -> None: + auth_settings = replace( + get_settings(), + auth_mode="api_key", + agent_api_key="mcp-key", + auth_jwt_secret="", + ) + monkeypatch.setattr(mcp_tools_module, "auth_backend", AuthBackend(auth_settings)) + monkeypatch.setattr(mcp_tools_module, "core_service", _DummyCoreService()) + + payload = mcp_tools_module.check_availability( + start="2026-03-10T09:00:00+01:00", + end="2026-03-10T10:00:00+01:00", + calendar_ids=["primary"], + ctx=_DummyCtx(headers={"x-api-key": "mcp-key"}), + ) + + assert payload["available"] is True + assert payload["checked_calendars"] == ["primary"] + + +def test_mcp_scan_mailbox_requires_mail_scan_scope(monkeypatch) -> None: + auth_settings = replace( + get_settings(), + auth_mode="api_key", + agent_api_key="mcp-key", + auth_jwt_secret="", + ) + monkeypatch.setattr(mcp_tools_module, "auth_backend", AuthBackend(auth_settings)) + monkeypatch.setattr(mcp_tools_module, "core_service", _DummyCoreService()) + + payload = mcp_tools_module.scan_mailbox( + max_results=10, + ctx=_DummyCtx(headers={"x-api-key": "mcp-key"}), + ) + assert payload["scanned"] == 10 diff --git a/tests/test_unsubscribe_digest_unit.py b/tests/test_unsubscribe_digest_unit.py new file mode 100644 index 0000000..123a88c --- /dev/null +++ b/tests/test_unsubscribe_digest_unit.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import base64 +import json +from pathlib import Path +from typing import Any + +from app.unsubscribe_agent import UnsubscribeDigestAgent + + +def _b64url_text(value: str) -> str: + return base64.urlsafe_b64encode(value.encode("utf-8")).decode("utf-8").rstrip("=") + + +class _Executable: + def __init__(self, callback): + self._callback = callback + + def execute(self): # type: ignore[no-untyped-def] + return self._callback() + + +class _FakeMessagesApi: + def __init__(self, message_payload_by_id: dict[str, dict[str, Any]]) -> None: + self._message_payload_by_id = message_payload_by_id + self.sent_messages: list[dict[str, Any]] = [] + + def list(self, userId: str, q: str, maxResults: int): # type: ignore[no-untyped-def] + message_ids = [{"id": key} for key in self._message_payload_by_id.keys()] + return _Executable(lambda: {"messages": message_ids[:maxResults]}) + + def get(self, userId: str, id: str, format: str): # type: ignore[no-untyped-def] + return _Executable(lambda: self._message_payload_by_id[id]) + + def send(self, userId: str, body: dict[str, Any]): # type: ignore[no-untyped-def] + self.sent_messages.append(body) + return _Executable(lambda: {"id": "sent-1"}) + + +class _FakeUsersApi: + def __init__(self, messages_api: _FakeMessagesApi) -> None: + self._messages_api = messages_api + + def messages(self) -> _FakeMessagesApi: + return self._messages_api + + def getProfile(self, userId: str): # type: ignore[no-untyped-def] + return _Executable(lambda: {"emailAddress": "owner@example.com"}) + + +class _FakeGmailService: + def __init__(self, payload_by_id: dict[str, dict[str, Any]]) -> None: + self.messages_api = _FakeMessagesApi(payload_by_id) + self.users_api = _FakeUsersApi(self.messages_api) + + def users(self) -> _FakeUsersApi: + return self.users_api + + +def test_unsubscribe_digest_deduplicates_and_persists_state(tmp_path: Path) -> None: + unsubscribe_url_1 = "https://example.com/unsubscribe?u=abc&utm_source=mail" + unsubscribe_url_2 = "https://example.com/unsubscribe?fbclid=tracking&u=abc" + + message_payloads = { + "m1": { + "payload": { + "headers": [ + {"name": "List-Unsubscribe", "value": f"<{unsubscribe_url_1}>"}, + ], + "mimeType": "text/plain", + "body": {"data": _b64url_text(f"Unsubscribe here: {unsubscribe_url_1}")}, + } + }, + "m2": { + "payload": { + "headers": [], + "mimeType": "text/plain", + "body": {"data": _b64url_text(f"Click to unsubscribe: {unsubscribe_url_2}")}, + } + }, + } + state_file = tmp_path / "sent_links.json" + service = _FakeGmailService(message_payloads) + agent = UnsubscribeDigestAgent( + gmail_service=service, + query="label:Advertising", + state_file=str(state_file), + recipient_email="owner@example.com", + send_empty_digest=False, + ) + + first = agent.scan_and_send_digest(max_results=50) + second = agent.scan_and_send_digest(max_results=50) + + assert first.scanned_messages == 2 + assert first.extracted_unique_links == 1 + assert first.new_links == 1 + assert first.email_sent is True + + assert second.scanned_messages == 2 + assert second.extracted_unique_links == 1 + assert second.new_links == 0 + assert second.email_sent is False + + assert len(service.messages_api.sent_messages) == 1 + persisted = json.loads(state_file.read_text(encoding="utf-8")) + assert persisted["sent_links"] == ["https://example.com/unsubscribe?u=abc"]