feat(mcp): add mutation tools with scope gating

master
oabrivard 6 days ago
parent 1b23493167
commit 54da3efdc9

@ -35,4 +35,5 @@ UNSUBSCRIBE_USER_AGENT=Mozilla/5.0 (compatible; PersonalAgentUnsubscribe/1.0)
A2A_PUBLIC_BASE_URL=
A2A_AGENT_NAME=Personal Agent
A2A_AGENT_DESCRIPTION=Personal productivity agent for calendar availability and email operations.
MCP_ENABLE_MUTATION_TOOLS=false
LOG_LEVEL=INFO

@ -143,6 +143,20 @@ MCP streamable HTTP endpoint:
http://127.0.0.1:8001/mcp
```
By default, MCP exposes only `check_availability`.
To expose internal mutation tools (`scan_mailbox`, `list_unsubscribe_candidates`, `execute_unsubscribe`), set:
```bash
MCP_ENABLE_MUTATION_TOOLS=true
```
Scopes required per MCP tool:
- `check_availability`: `availability:read`
- `scan_mailbox`: `mail:scan`
- `list_unsubscribe_candidates`: `unsubscribe:read`
- `execute_unsubscribe`: `unsubscribe:execute`
### Manual unsubscribe digest
```bash

@ -45,6 +45,7 @@ class Settings:
a2a_public_base_url: str | None
a2a_agent_name: str
a2a_agent_description: str
mcp_enable_mutation_tools: bool
log_level: str
@ -105,6 +106,7 @@ def get_settings() -> Settings:
"A2A_AGENT_DESCRIPTION",
"Personal productivity agent for calendar availability and email operations.",
),
mcp_enable_mutation_tools=_as_bool(os.getenv("MCP_ENABLE_MUTATION_TOOLS", "false")),
log_level=os.getenv("LOG_LEVEL", "INFO"),
)

@ -1,9 +1,16 @@
from __future__ import annotations
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import Context, FastMCP
from app.mcp.tools import check_availability as check_availability_impl
from app.config import get_settings
from app.mcp.tools import (
check_availability as check_availability_impl,
execute_unsubscribe as execute_unsubscribe_impl,
list_unsubscribe_candidates as list_unsubscribe_candidates_impl,
scan_mailbox as scan_mailbox_impl,
)
settings = get_settings()
mcp = FastMCP(
"Personal Agent MCP",
streamable_http_path="/",
@ -15,5 +22,45 @@ def check_availability(
start: str,
end: str,
calendar_ids: list[str] | None = None,
ctx: Context | None = None,
) -> dict[str, object]:
return check_availability_impl(start=start, end=end, calendar_ids=calendar_ids)
return check_availability_impl(
start=start,
end=end,
calendar_ids=calendar_ids,
ctx=ctx,
)
if settings.mcp_enable_mutation_tools:
@mcp.tool(
description="Scan unread root-inbox Gmail messages and apply classification labels."
)
def scan_mailbox(max_results: int = 100, ctx: Context | None = None) -> dict[str, object]:
return scan_mailbox_impl(max_results=max_results, ctx=ctx)
@mcp.tool(
description="List unsubscribe candidates discovered from advertising emails."
)
def list_unsubscribe_candidates(
max_results: int = 500,
ctx: Context | None = None,
) -> dict[str, object]:
return list_unsubscribe_candidates_impl(max_results=max_results, ctx=ctx)
@mcp.tool(
description="Execute unsubscribe actions for selected candidate IDs."
)
def execute_unsubscribe(
selected_candidate_ids: list[str],
max_results: int = 500,
remember_selection: bool = True,
ctx: Context | None = None,
) -> dict[str, object]:
return execute_unsubscribe_impl(
selected_candidate_ids=selected_candidate_ids,
max_results=max_results,
remember_selection=remember_selection,
ctx=ctx,
)

@ -3,17 +3,26 @@ from __future__ import annotations
import logging
from typing import Any
from fastapi import HTTPException
from mcp.server.fastmcp import Context
from app.config import get_settings
from app.core.service import CoreAgentService
from app.security import AuthBackend
settings = get_settings()
core_service = CoreAgentService(settings=settings, logger=logging.getLogger("personal-agent.mcp"))
auth_backend = AuthBackend(settings=settings)
def check_availability(
start: str, end: str, calendar_ids: list[str] | None = None
start: str,
end: str,
calendar_ids: list[str] | None = None,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Return free/busy availability for a time range on one or more calendars."""
_require_scope(ctx, "availability:read")
result = core_service.check_availability(start=start, end=end, calendar_ids=calendar_ids)
return {
"start": result.start,
@ -29,3 +38,110 @@ def check_availability(
],
"checked_calendars": result.checked_calendars,
}
def scan_mailbox(max_results: int = 100, ctx: Context | None = None) -> dict[str, Any]:
"""Scan inbox emails and classify/move them according to current routing rules."""
_require_scope(ctx, "mail:scan")
result = core_service.scan_mailbox(max_results=max_results)
return {
"scanned": result.scanned,
"linkedin": result.linkedin,
"advertising": result.advertising,
"veille_techno": result.veille_techno,
"skipped": result.skipped,
"failed": result.failed,
}
def list_unsubscribe_candidates(
max_results: int = 500, ctx: Context | None = None
) -> dict[str, Any]:
"""List unsubscribe candidates discovered from advertising emails."""
_require_scope(ctx, "unsubscribe:read")
result = core_service.list_unsubscribe_candidates(max_results=max_results)
return {
"scanned_messages": result.scanned_messages,
"candidates": [
{
"candidate_id": candidate.candidate_id,
"list_name": candidate.list_name,
"sender_domain": candidate.sender_domain,
"message_count": candidate.message_count,
"sample_senders": candidate.sample_senders,
"sample_subjects": candidate.sample_subjects,
"approved": candidate.approved,
"methods": [
{
"method_id": method.method_id,
"method_type": method.method_type,
"value": method.value,
}
for method in candidate.methods
],
}
for candidate in result.candidates
],
}
def execute_unsubscribe(
selected_candidate_ids: list[str],
max_results: int = 500,
remember_selection: bool = True,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Execute unsubscribe actions for selected mailing list candidate IDs."""
_require_scope(ctx, "unsubscribe:execute")
result = core_service.execute_unsubscribe_selected(
selected_candidate_ids=selected_candidate_ids,
max_results=max_results,
remember_selection=remember_selection,
)
return {
"scanned_messages": result.scanned_messages,
"candidates_considered": result.candidates_considered,
"selected_candidates": result.selected_candidates,
"executed_methods": result.executed_methods,
"skipped_already_executed": result.skipped_already_executed,
"failed_methods": result.failed_methods,
"updated_approved_count": result.updated_approved_count,
"results": [
{
"candidate_id": item.candidate_id,
"list_name": item.list_name,
"method_id": item.method_id,
"method_type": item.method_type,
"value": item.value,
"success": item.success,
"detail": item.detail,
}
for item in result.results
],
}
def _require_scope(ctx: Context | None, scope: str) -> None:
x_api_key, authorization = _extract_auth_headers(ctx)
try:
auth_backend.authenticate(
x_api_key=x_api_key,
authorization=authorization,
required_scopes={scope},
)
except HTTPException as exc:
raise PermissionError(f"Unauthorized for scope '{scope}': {exc.detail}") from exc
def _extract_auth_headers(ctx: Context | None) -> tuple[str | None, str | None]:
if ctx is None:
return None, None
request = ctx.request_context.request
headers = getattr(request, "headers", None)
if headers is None:
return None, None
x_api_key = headers.get("x-api-key")
authorization = headers.get("authorization")
return x_api_key, authorization

Loading…
Cancel
Save