diff --git a/.env.example b/.env.example index c541570..efaf71b 100644 --- a/.env.example +++ b/.env.example @@ -35,4 +35,5 @@ UNSUBSCRIBE_USER_AGENT=Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0) A2A_PUBLIC_BASE_URL= A2A_AGENT_NAME=Personal Agent A2A_AGENT_DESCRIPTION=Personal productivity agent for calendar availability and email operations. +MCP_ENABLE_MUTATION_TOOLS=false LOG_LEVEL=INFO diff --git a/README.md b/README.md index af10f63..6590ffd 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,20 @@ MCP streamable HTTP endpoint: http://127.0.0.1:8001/mcp ``` +By default, MCP exposes only `check_availability`. +To expose internal mutation tools (`scan_mailbox`, `list_unsubscribe_candidates`, `execute_unsubscribe`), set: + +```bash +MCP_ENABLE_MUTATION_TOOLS=true +``` + +Scopes required per MCP tool: + +- `check_availability`: `availability:read` +- `scan_mailbox`: `mail:scan` +- `list_unsubscribe_candidates`: `unsubscribe:read` +- `execute_unsubscribe`: `unsubscribe:execute` + ### Manual unsubscribe digest ```bash diff --git a/app/config.py b/app/config.py index 5e29be9..f7bdf82 100644 --- a/app/config.py +++ b/app/config.py @@ -45,6 +45,7 @@ class Settings: a2a_public_base_url: str | None a2a_agent_name: str a2a_agent_description: str + mcp_enable_mutation_tools: bool log_level: str @@ -105,6 +106,7 @@ def get_settings() -> Settings: "A2A_AGENT_DESCRIPTION", "Personal productivity agent for calendar availability and email operations.", ), + mcp_enable_mutation_tools=_as_bool(os.getenv("MCP_ENABLE_MUTATION_TOOLS", "false")), log_level=os.getenv("LOG_LEVEL", "INFO"), ) diff --git a/app/mcp/server.py b/app/mcp/server.py index 1b44a73..cdae7b8 100644 --- a/app/mcp/server.py +++ b/app/mcp/server.py @@ -1,9 +1,16 @@ from __future__ import annotations -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import Context, FastMCP -from app.mcp.tools import check_availability as check_availability_impl +from app.config import get_settings +from app.mcp.tools import ( + check_availability as check_availability_impl, + execute_unsubscribe as execute_unsubscribe_impl, + list_unsubscribe_candidates as list_unsubscribe_candidates_impl, + scan_mailbox as scan_mailbox_impl, +) +settings = get_settings() mcp = FastMCP( "Personal Agent MCP", streamable_http_path="/", @@ -15,5 +22,45 @@ def check_availability( start: str, end: str, calendar_ids: list[str] | None = None, + ctx: Context | None = None, ) -> dict[str, object]: - return check_availability_impl(start=start, end=end, calendar_ids=calendar_ids) + return check_availability_impl( + start=start, + end=end, + calendar_ids=calendar_ids, + ctx=ctx, + ) + + +if settings.mcp_enable_mutation_tools: + + @mcp.tool( + description="Scan unread root-inbox Gmail messages and apply classification labels." + ) + def scan_mailbox(max_results: int = 100, ctx: Context | None = None) -> dict[str, object]: + return scan_mailbox_impl(max_results=max_results, ctx=ctx) + + @mcp.tool( + description="List unsubscribe candidates discovered from advertising emails." + ) + def list_unsubscribe_candidates( + max_results: int = 500, + ctx: Context | None = None, + ) -> dict[str, object]: + return list_unsubscribe_candidates_impl(max_results=max_results, ctx=ctx) + + @mcp.tool( + description="Execute unsubscribe actions for selected candidate IDs." + ) + def execute_unsubscribe( + selected_candidate_ids: list[str], + max_results: int = 500, + remember_selection: bool = True, + ctx: Context | None = None, + ) -> dict[str, object]: + return execute_unsubscribe_impl( + selected_candidate_ids=selected_candidate_ids, + max_results=max_results, + remember_selection=remember_selection, + ctx=ctx, + ) diff --git a/app/mcp/tools.py b/app/mcp/tools.py index 1594318..c701389 100644 --- a/app/mcp/tools.py +++ b/app/mcp/tools.py @@ -3,17 +3,26 @@ from __future__ import annotations import logging from typing import Any +from fastapi import HTTPException +from mcp.server.fastmcp import Context + from app.config import get_settings from app.core.service import CoreAgentService +from app.security import AuthBackend settings = get_settings() core_service = CoreAgentService(settings=settings, logger=logging.getLogger("personal-agent.mcp")) +auth_backend = AuthBackend(settings=settings) def check_availability( - start: str, end: str, calendar_ids: list[str] | None = None + start: str, + end: str, + calendar_ids: list[str] | None = None, + ctx: Context | None = None, ) -> dict[str, Any]: """Return free/busy availability for a time range on one or more calendars.""" + _require_scope(ctx, "availability:read") result = core_service.check_availability(start=start, end=end, calendar_ids=calendar_ids) return { "start": result.start, @@ -29,3 +38,110 @@ def check_availability( ], "checked_calendars": result.checked_calendars, } + + +def scan_mailbox(max_results: int = 100, ctx: Context | None = None) -> dict[str, Any]: + """Scan inbox emails and classify/move them according to current routing rules.""" + _require_scope(ctx, "mail:scan") + result = core_service.scan_mailbox(max_results=max_results) + return { + "scanned": result.scanned, + "linkedin": result.linkedin, + "advertising": result.advertising, + "veille_techno": result.veille_techno, + "skipped": result.skipped, + "failed": result.failed, + } + + +def list_unsubscribe_candidates( + max_results: int = 500, ctx: Context | None = None +) -> dict[str, Any]: + """List unsubscribe candidates discovered from advertising emails.""" + _require_scope(ctx, "unsubscribe:read") + result = core_service.list_unsubscribe_candidates(max_results=max_results) + return { + "scanned_messages": result.scanned_messages, + "candidates": [ + { + "candidate_id": candidate.candidate_id, + "list_name": candidate.list_name, + "sender_domain": candidate.sender_domain, + "message_count": candidate.message_count, + "sample_senders": candidate.sample_senders, + "sample_subjects": candidate.sample_subjects, + "approved": candidate.approved, + "methods": [ + { + "method_id": method.method_id, + "method_type": method.method_type, + "value": method.value, + } + for method in candidate.methods + ], + } + for candidate in result.candidates + ], + } + + +def execute_unsubscribe( + selected_candidate_ids: list[str], + max_results: int = 500, + remember_selection: bool = True, + ctx: Context | None = None, +) -> dict[str, Any]: + """Execute unsubscribe actions for selected mailing list candidate IDs.""" + _require_scope(ctx, "unsubscribe:execute") + result = core_service.execute_unsubscribe_selected( + selected_candidate_ids=selected_candidate_ids, + max_results=max_results, + remember_selection=remember_selection, + ) + return { + "scanned_messages": result.scanned_messages, + "candidates_considered": result.candidates_considered, + "selected_candidates": result.selected_candidates, + "executed_methods": result.executed_methods, + "skipped_already_executed": result.skipped_already_executed, + "failed_methods": result.failed_methods, + "updated_approved_count": result.updated_approved_count, + "results": [ + { + "candidate_id": item.candidate_id, + "list_name": item.list_name, + "method_id": item.method_id, + "method_type": item.method_type, + "value": item.value, + "success": item.success, + "detail": item.detail, + } + for item in result.results + ], + } + + +def _require_scope(ctx: Context | None, scope: str) -> None: + x_api_key, authorization = _extract_auth_headers(ctx) + try: + auth_backend.authenticate( + x_api_key=x_api_key, + authorization=authorization, + required_scopes={scope}, + ) + except HTTPException as exc: + raise PermissionError(f"Unauthorized for scope '{scope}': {exc.detail}") from exc + + +def _extract_auth_headers(ctx: Context | None) -> tuple[str | None, str | None]: + if ctx is None: + return None, None + + request = ctx.request_context.request + headers = getattr(request, "headers", None) + if headers is None: + return None, None + + x_api_key = headers.get("x-api-key") + authorization = headers.get("authorization") + return x_api_key, authorization