from __future__ import annotations from dataclasses import replace import logging from typing import Any from fastapi import HTTPException from mcp.server.fastmcp import Context from app.config import Settings, get_settings from app.core.service import CoreAgentService from app.security import AuthBackend settings = get_settings() core_service = CoreAgentService(settings=settings, logger=logging.getLogger("personal-agent.mcp")) def check_availability( start: str, end: str, calendar_ids: list[str] | None = None, ctx: Context | None = None, ) -> dict[str, Any]: """Return free/busy availability for a time range on one or more calendars.""" _require_scope(ctx, "availability:read") result = core_service.check_availability(start=start, end=end, calendar_ids=calendar_ids) return { "start": result.start, "end": result.end, "available": result.available, "busy_slots": [ { "calendar_id": slot.calendar_id, "start": slot.start, "end": slot.end, } for slot in result.busy_slots ], "checked_calendars": result.checked_calendars, } def scan_mailbox(max_results: int = 100, ctx: Context | None = None) -> dict[str, Any]: """Scan inbox emails and classify/move them according to current routing rules.""" _require_scope(ctx, "mail:scan") result = core_service.scan_mailbox(max_results=max_results) return { "scanned": result.scanned, "linkedin": result.linkedin, "advertising": result.advertising, "veille_techno": result.veille_techno, "skipped": result.skipped, "failed": result.failed, } def list_unsubscribe_candidates( max_results: int = 500, ctx: Context | None = None ) -> dict[str, Any]: """List unsubscribe candidates discovered from advertising emails.""" _require_scope(ctx, "unsubscribe:read") result = core_service.list_unsubscribe_candidates(max_results=max_results) return { "scanned_messages": result.scanned_messages, "candidates": [ { "candidate_id": candidate.candidate_id, "list_name": candidate.list_name, "sender_domain": candidate.sender_domain, "message_count": candidate.message_count, "sample_senders": candidate.sample_senders, "sample_subjects": candidate.sample_subjects, "approved": candidate.approved, "methods": [ { "method_id": method.method_id, "method_type": method.method_type, "value": method.value, } for method in candidate.methods ], } for candidate in result.candidates ], } def execute_unsubscribe( selected_candidate_ids: list[str], max_results: int = 500, remember_selection: bool = True, ctx: Context | None = None, ) -> dict[str, Any]: """Execute unsubscribe actions for selected mailing list candidate IDs.""" _require_scope(ctx, "unsubscribe:execute") result = core_service.execute_unsubscribe_selected( selected_candidate_ids=selected_candidate_ids, max_results=max_results, remember_selection=remember_selection, ) return { "scanned_messages": result.scanned_messages, "candidates_considered": result.candidates_considered, "selected_candidates": result.selected_candidates, "executed_methods": result.executed_methods, "skipped_already_executed": result.skipped_already_executed, "failed_methods": result.failed_methods, "updated_approved_count": result.updated_approved_count, "results": [ { "candidate_id": item.candidate_id, "list_name": item.list_name, "method_id": item.method_id, "method_type": item.method_type, "value": item.value, "success": item.success, "detail": item.detail, } for item in result.results ], } def _require_scope(ctx: Context | None, scope: str) -> None: x_api_key, authorization = _extract_auth_headers(ctx) try: auth_backend.authenticate( x_api_key=x_api_key, authorization=authorization, required_scopes={scope}, ) except HTTPException as exc: raise PermissionError(f"Unauthorized for scope '{scope}': {exc.detail}") from exc def _extract_auth_headers(ctx: Context | None) -> tuple[str | None, str | None]: if ctx is None: return None, None request = ctx.request_context.request headers = getattr(request, "headers", None) if headers is None: return None, None x_api_key = headers.get("x-api-key") authorization = headers.get("authorization") return x_api_key, authorization def _build_mcp_auth_backend(base_settings: Settings) -> AuthBackend: resolved_mode = _resolve_mcp_auth_mode(base_settings) effective_settings = ( base_settings if resolved_mode == base_settings.auth_mode else replace(base_settings, auth_mode=resolved_mode) ) return AuthBackend( settings=effective_settings, oauth_introspection_url=base_settings.mcp_oauth_introspection_url, oauth_client_id=base_settings.mcp_oauth_client_id, oauth_client_secret=base_settings.mcp_oauth_client_secret or None, oauth_issuer=base_settings.mcp_oauth_issuer, oauth_audience=base_settings.mcp_oauth_audience, oauth_timeout_seconds=base_settings.mcp_oauth_timeout_seconds, ) def _resolve_mcp_auth_mode(base_settings: Settings) -> str: if base_settings.mcp_auth_mode == "inherit": return base_settings.auth_mode return base_settings.mcp_auth_mode auth_backend = _build_mcp_auth_backend(settings)