from __future__ import annotations from dataclasses import replace import anyio from fastapi import HTTPException from mcp.server.auth.provider import AccessToken, TokenVerifier from mcp.server.auth.settings import AuthSettings from app.config import Settings from app.security import AuthBackend _MCP_ALWAYS_ENABLED_SCOPES = ["availability:read", "available_meeting_intervals:read"] def resolve_mcp_auth_mode(settings: Settings) -> str: if settings.mcp_auth_mode == "inherit": return settings.auth_mode return settings.mcp_auth_mode def mcp_supported_scopes(settings: Settings) -> list[str]: scopes = list(_MCP_ALWAYS_ENABLED_SCOPES) if settings.mcp_enable_mutation_tools: scopes.extend(["mail:scan", "unsubscribe:read", "unsubscribe:execute"]) return scopes def build_mcp_oauth_auth_settings(settings: Settings) -> AuthSettings: if not settings.mcp_oauth_issuer: raise ValueError("MCP_OAUTH_ISSUER is required when MCP_AUTH_MODE=oauth.") if not settings.mcp_resource_server_url: raise ValueError("MCP_RESOURCE_SERVER_URL is required when MCP_AUTH_MODE=oauth.") return AuthSettings( issuer_url=settings.mcp_oauth_issuer, resource_server_url=settings.mcp_resource_server_url, # Leave transport-level scopes unset and enforce scope checks per tool. required_scopes=None, ) def build_mcp_oauth_token_verifier(settings: Settings) -> TokenVerifier: oauth_settings = replace(settings, auth_mode="oauth") auth_backend = AuthBackend( settings=oauth_settings, oauth_introspection_url=settings.mcp_oauth_introspection_url, oauth_client_id=settings.mcp_oauth_client_id, oauth_client_secret=settings.mcp_oauth_client_secret or None, oauth_issuer=settings.mcp_oauth_issuer, oauth_audience=settings.mcp_oauth_audience, oauth_timeout_seconds=settings.mcp_oauth_timeout_seconds, ) return OAuthIntrospectionTokenVerifier(auth_backend) class OAuthIntrospectionTokenVerifier(TokenVerifier): """FastMCP TokenVerifier backed by MCP OAuth introspection settings.""" def __init__(self, auth_backend: AuthBackend) -> None: self._auth_backend = auth_backend async def verify_token(self, token: str) -> AccessToken | None: auth_context = await anyio.to_thread.run_sync(self._authenticate_token, token) if auth_context is None: return None scopes = sorted(scope for scope in auth_context.scopes if scope and scope != "*") return AccessToken( token=token, client_id=auth_context.subject, scopes=scopes, ) def _authenticate_token(self, token: str): try: return self._auth_backend.authenticate( x_api_key=None, authorization=f"Bearer {token}", required_scopes=set(), ) except HTTPException: return None