from __future__ import annotations from typing import Any from mcp.server.auth.provider import TokenVerifier from mcp.server.fastmcp import Context, FastMCP from app.config import Settings, get_settings from app.mcp.oauth import ( build_mcp_oauth_auth_settings, build_mcp_oauth_token_verifier, resolve_mcp_auth_mode, ) from app.mcp.tools import ( check_availability as check_availability_impl, execute_unsubscribe as execute_unsubscribe_impl, list_unsubscribe_candidates as list_unsubscribe_candidates_impl, scan_mailbox as scan_mailbox_impl, ) def check_availability( start: str, end: str, calendar_ids: list[str] | None = None, ctx: Context | None = None, ) -> dict[str, object]: return check_availability_impl( start=start, end=end, calendar_ids=calendar_ids, ctx=ctx, ) def scan_mailbox(max_results: int = 100, ctx: Context | None = None) -> dict[str, object]: return scan_mailbox_impl(max_results=max_results, ctx=ctx) def list_unsubscribe_candidates( max_results: int = 500, ctx: Context | None = None, ) -> dict[str, object]: return list_unsubscribe_candidates_impl(max_results=max_results, ctx=ctx) def execute_unsubscribe( selected_candidate_ids: list[str], max_results: int = 500, remember_selection: bool = True, ctx: Context | None = None, ) -> dict[str, object]: return execute_unsubscribe_impl( selected_candidate_ids=selected_candidate_ids, max_results=max_results, remember_selection=remember_selection, ctx=ctx, ) def build_mcp_server( settings: Settings | None = None, *, token_verifier: TokenVerifier | None = None, ) -> FastMCP: runtime_settings = settings or get_settings() mcp_kwargs: dict[str, Any] = { "streamable_http_path": "/mcp", } if resolve_mcp_auth_mode(runtime_settings) == "oauth": mcp_kwargs["auth"] = build_mcp_oauth_auth_settings(runtime_settings) mcp_kwargs["token_verifier"] = token_verifier or build_mcp_oauth_token_verifier( runtime_settings ) server = FastMCP("Personal Agent MCP", **mcp_kwargs) _register_tools(server, runtime_settings) return server def _register_tools(server: FastMCP, settings: Settings) -> None: server.add_tool( check_availability, description="Check Google Calendar availability for a time range.", ) if settings.mcp_enable_mutation_tools: server.add_tool( scan_mailbox, description="Scan unread root-inbox Gmail messages and apply classification labels.", ) server.add_tool( list_unsubscribe_candidates, description="List unsubscribe candidates discovered from advertising emails.", ) server.add_tool( execute_unsubscribe, description="Execute unsubscribe actions for selected candidate IDs.", ) settings = get_settings() mcp = build_mcp_server(settings=settings)