You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

555 lines
18 KiB
Python

from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Annotated
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, FastAPI, Header, HTTPException, Query, status
from pydantic import BaseModel, Field
from app.calendar_agent import CalendarAvailabilityAgent
from app.config import get_settings
from app.gmail_agent import GmailTriageAgent
from app.google_clients import build_calendar_service, build_gmail_service
from app.strands_classifier import StrandsEmailClassifier
from app.unsubscribe_agent import UnsubscribeDigestAgent
from app.unsubscribe_hil_agent import (
UnsubscribeHumanLoopAgent,
CandidateSnapshot,
UnsubscribeExecutionResult,
)
settings = get_settings()
logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO))
logger = logging.getLogger("personal-agent")
scheduler: AsyncIOScheduler | None = None
scan_lock: asyncio.Lock | None = None
unsubscribe_lock: asyncio.Lock | None = None
strands_key_warning_logged = False
class ScanResponse(BaseModel):
scanned: int
linkedin: int
advertising: int
veille_techno: int
skipped: int
failed: int
class AvailabilityRequest(BaseModel):
start: str
end: str
calendar_ids: list[str] | None = None
class BusySlot(BaseModel):
calendar_id: str
start: str
end: str
class AvailabilityResponse(BaseModel):
start: str
end: str
available: bool
busy_slots: list[BusySlot]
checked_calendars: list[str]
class UnsubscribeDigestResponse(BaseModel):
scanned_messages: int
extracted_unique_links: int
new_links: int
sent_to: str | None
email_sent: bool
class UnsubscribeMethodResponse(BaseModel):
method_id: str
method_type: str
value: str
class MailingListCandidateResponse(BaseModel):
candidate_id: str
list_name: str
sender_domain: str
message_count: int
sample_senders: list[str]
sample_subjects: list[str]
methods: list[UnsubscribeMethodResponse]
approved: bool
class UnsubscribeCandidatesResponse(BaseModel):
scanned_messages: int
candidates: list[MailingListCandidateResponse]
class ExecuteUnsubscribeRequest(BaseModel):
selected_candidate_ids: list[str] = Field(default_factory=list)
max_results: int | None = Field(default=None, ge=1, le=500)
remember_selection: bool = True
class MethodExecutionResponse(BaseModel):
candidate_id: str
list_name: str
method_id: str
method_type: str
value: str
success: bool
detail: str
class UnsubscribeExecutionResponse(BaseModel):
scanned_messages: int
candidates_considered: int
selected_candidates: int
executed_methods: int
skipped_already_executed: int
failed_methods: int
updated_approved_count: int
results: list[MethodExecutionResponse]
def _is_api_auth_enabled() -> bool:
return bool(settings.agent_api_key.strip())
def verify_api_key(
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
authorization: Annotated[str | None, Header()] = None,
) -> None:
expected = settings.agent_api_key
if not expected:
return
provided = x_api_key
if not provided and authorization:
parts = authorization.split(" ", 1)
if len(parts) == 2 and parts[0].lower() == "bearer":
provided = parts[1]
if provided != expected:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key."
)
def _run_scan_once(max_results: int) -> ScanResponse:
gmail_service = build_gmail_service(settings)
gmail_agent = GmailTriageAgent(
gmail_service=gmail_service,
query=settings.gmail_query,
classifier=_build_strands_classifier(),
fallback_to_rules=settings.llm_fallback_to_rules,
)
result = gmail_agent.scan_and_route_messages(max_results=max_results)
return ScanResponse(
scanned=result.scanned,
linkedin=result.linkedin,
advertising=result.advertising,
veille_techno=result.veille_techno,
skipped=result.skipped,
failed=result.failed,
)
def _run_unsubscribe_digest_once(max_results: int) -> UnsubscribeDigestResponse:
bounded_max_results = max(1, min(max_results, 500))
gmail_service = build_gmail_service(settings)
unsubscribe_agent = UnsubscribeDigestAgent(
gmail_service=gmail_service,
query=settings.unsubscribe_query,
state_file=settings.unsubscribe_state_file,
recipient_email=settings.unsubscribe_digest_recipient,
send_empty_digest=settings.unsubscribe_send_empty_digest,
)
result = unsubscribe_agent.scan_and_send_digest(max_results=bounded_max_results)
return UnsubscribeDigestResponse(
scanned_messages=result.scanned_messages,
extracted_unique_links=result.extracted_unique_links,
new_links=result.new_links,
sent_to=result.sent_to,
email_sent=result.email_sent,
)
def _run_unsubscribe_candidates_once(max_results: int) -> UnsubscribeCandidatesResponse:
agent = _build_unsubscribe_hil_agent()
snapshot = agent.discover_candidates(max_results=max_results)
return _snapshot_to_response(snapshot)
def _run_unsubscribe_execute_selected(
selected_candidate_ids: list[str],
max_results: int,
remember_selection: bool,
) -> UnsubscribeExecutionResponse:
agent = _build_unsubscribe_hil_agent()
result = agent.execute_selected(
selected_candidate_ids=selected_candidate_ids,
max_results=max_results,
remember_selection=remember_selection,
)
return _execution_to_response(result)
def _run_unsubscribe_auto_once(max_results: int) -> UnsubscribeExecutionResponse:
agent = _build_unsubscribe_hil_agent()
result = agent.execute_for_approved(max_results=max_results)
return _execution_to_response(result)
def _snapshot_to_response(snapshot: CandidateSnapshot) -> UnsubscribeCandidatesResponse:
return UnsubscribeCandidatesResponse(
scanned_messages=snapshot.scanned_messages,
candidates=[
MailingListCandidateResponse(
candidate_id=candidate.candidate_id,
list_name=candidate.list_name,
sender_domain=candidate.sender_domain,
message_count=candidate.message_count,
sample_senders=candidate.sample_senders,
sample_subjects=candidate.sample_subjects,
methods=[
UnsubscribeMethodResponse(
method_id=method.method_id,
method_type=method.method_type,
value=method.value,
)
for method in candidate.methods
],
approved=candidate.approved,
)
for candidate in snapshot.candidates
],
)
def _execution_to_response(result: UnsubscribeExecutionResult) -> UnsubscribeExecutionResponse:
return UnsubscribeExecutionResponse(
scanned_messages=result.scanned_messages,
candidates_considered=result.candidates_considered,
selected_candidates=result.selected_candidates,
executed_methods=result.executed_methods,
skipped_already_executed=result.skipped_already_executed,
failed_methods=result.failed_methods,
updated_approved_count=result.updated_approved_count,
results=[
MethodExecutionResponse(
candidate_id=item.candidate_id,
list_name=item.list_name,
method_id=item.method_id,
method_type=item.method_type,
value=item.value,
success=item.success,
detail=item.detail,
)
for item in result.results
],
)
def _build_strands_classifier() -> StrandsEmailClassifier | None:
global strands_key_warning_logged
if not settings.strands_api_key:
if settings.llm_fallback_to_rules:
if not strands_key_warning_logged:
logger.warning(
"Strands API key not set. Falling back to rules-based classification."
)
strands_key_warning_logged = True
return None
raise RuntimeError(
"STRANDS_OPENAI_API_KEY (or LLM_API_KEY) is required when LLM_FALLBACK_TO_RULES is disabled."
)
try:
return StrandsEmailClassifier(
api_key=settings.strands_api_key,
model_id=settings.strands_model_id,
base_url=settings.strands_base_url,
timeout_seconds=settings.strands_timeout_seconds,
temperature=settings.strands_temperature,
)
except Exception:
if settings.llm_fallback_to_rules:
logger.exception("Could not initialize Strands classifier; using rules fallback.")
return None
raise
def _build_unsubscribe_hil_agent() -> UnsubscribeHumanLoopAgent:
gmail_service = build_gmail_service(settings)
return UnsubscribeHumanLoopAgent(
gmail_service=gmail_service,
query=settings.unsubscribe_hil_query,
state_file=settings.unsubscribe_hil_state_file,
http_timeout_seconds=settings.unsubscribe_http_timeout_seconds,
user_agent=settings.unsubscribe_user_agent,
)
def _get_scan_lock() -> asyncio.Lock:
global scan_lock
if scan_lock is None:
scan_lock = asyncio.Lock()
return scan_lock
def _get_unsubscribe_lock() -> asyncio.Lock:
global unsubscribe_lock
if unsubscribe_lock is None:
unsubscribe_lock = asyncio.Lock()
return unsubscribe_lock
async def _scheduled_scan() -> None:
lock = _get_scan_lock()
if lock.locked():
logger.info("Previous scan still running, skipping this tick.")
return
async with lock:
try:
result = await asyncio.to_thread(_run_scan_once, 100)
logger.info("Scheduled scan complete: %s", result.model_dump())
except Exception:
logger.exception("Scheduled scan failed")
async def _scheduled_unsubscribe_digest() -> None:
lock = _get_unsubscribe_lock()
if lock.locked():
logger.info("Previous unsubscribe digest still running, skipping this tick.")
return
async with lock:
try:
result = await asyncio.to_thread(
_run_unsubscribe_digest_once, settings.unsubscribe_max_results
)
logger.info("Scheduled unsubscribe digest complete: %s", result.model_dump())
except Exception:
logger.exception("Scheduled unsubscribe digest failed")
async def _scheduled_unsubscribe_auto() -> None:
lock = _get_unsubscribe_lock()
if lock.locked():
logger.info("Previous unsubscribe auto run still running, skipping this tick.")
return
async with lock:
try:
result = await asyncio.to_thread(
_run_unsubscribe_auto_once, settings.unsubscribe_hil_max_results
)
logger.info("Scheduled unsubscribe auto run complete: %s", result.model_dump())
except Exception:
logger.exception("Scheduled unsubscribe auto run failed")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global scheduler
_get_scan_lock()
_get_unsubscribe_lock()
logger.info(
"API authentication enabled=%s (header: X-API-Key or Bearer token)",
_is_api_auth_enabled(),
)
scheduler = AsyncIOScheduler()
scheduler.add_job( # type: ignore
_scheduled_scan,
"interval",
minutes=settings.gmail_scan_interval_minutes,
next_run_time=datetime.now(),
)
scheduler.add_job( # type: ignore
_scheduled_unsubscribe_digest,
"interval",
minutes=settings.unsubscribe_digest_interval_minutes,
next_run_time=datetime.now(),
)
if settings.unsubscribe_auto_enabled:
scheduler.add_job( # type: ignore
_scheduled_unsubscribe_auto,
"interval",
minutes=settings.unsubscribe_auto_interval_minutes,
next_run_time=datetime.now(),
)
scheduler.start() # type: ignore
logger.info(
"Scheduler started (scan=%s min, digest=%s min, auto_unsub=%s/%s min)",
settings.gmail_scan_interval_minutes,
settings.unsubscribe_digest_interval_minutes,
settings.unsubscribe_auto_enabled,
settings.unsubscribe_auto_interval_minutes,
)
yield
# Shutdown
if scheduler:
scheduler.shutdown(wait=False) # type: ignore
app = FastAPI(title="Personal Agent", version="0.3.0", lifespan=lifespan) # type: ignore
@app.get("/health")
def health() -> dict[str, object]:
return {
"status": "ok",
"scan_interval_minutes": settings.gmail_scan_interval_minutes,
"unsubscribe_digest_interval_minutes": settings.unsubscribe_digest_interval_minutes,
"unsubscribe_auto_enabled": settings.unsubscribe_auto_enabled,
"unsubscribe_auto_interval_minutes": settings.unsubscribe_auto_interval_minutes,
}
@app.post(
"/scan",
response_model=ScanResponse,
dependencies=[Depends(verify_api_key)],
)
async def scan_now(max_results: int = Query(100, ge=1, le=500)) -> ScanResponse:
async with _get_scan_lock():
try:
return await asyncio.to_thread(_run_scan_once, max_results)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Gmail scan failed: {exc}",
) from exc
@app.post(
"/availability",
response_model=AvailabilityResponse,
dependencies=[Depends(verify_api_key)],
)
async def availability(request: AvailabilityRequest) -> AvailabilityResponse:
try:
calendar_service = build_calendar_service(settings)
availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service)
result = await asyncio.to_thread(
availability_agent.get_availability,
request.start,
request.end,
request.calendar_ids,
)
return AvailabilityResponse(
start=result.start,
end=result.end,
available=result.available,
busy_slots=result.busy_slots, # type: ignore
checked_calendars=result.checked_calendars,
)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Availability lookup failed: {exc}",
) from exc
@app.post(
"/unsubscribe-digest",
response_model=UnsubscribeDigestResponse,
dependencies=[Depends(verify_api_key)],
)
async def unsubscribe_digest_now(
max_results: int = Query(default=settings.unsubscribe_max_results, ge=1, le=500),
) -> UnsubscribeDigestResponse:
async with _get_unsubscribe_lock():
try:
return await asyncio.to_thread(_run_unsubscribe_digest_once, max_results)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unsubscribe digest failed: {exc}",
) from exc
@app.post(
"/unsubscribe/candidates",
response_model=UnsubscribeCandidatesResponse,
dependencies=[Depends(verify_api_key)],
)
async def unsubscribe_candidates(
max_results: int = Query(default=settings.unsubscribe_hil_max_results, ge=1, le=500),
) -> UnsubscribeCandidatesResponse:
async with _get_unsubscribe_lock():
try:
return await asyncio.to_thread(_run_unsubscribe_candidates_once, max_results)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Candidate discovery failed: {exc}",
) from exc
@app.post(
"/unsubscribe/execute",
response_model=UnsubscribeExecutionResponse,
dependencies=[Depends(verify_api_key)],
)
async def unsubscribe_execute(request: ExecuteUnsubscribeRequest) -> UnsubscribeExecutionResponse:
max_results = request.max_results or settings.unsubscribe_hil_max_results
async with _get_unsubscribe_lock():
try:
return await asyncio.to_thread(
_run_unsubscribe_execute_selected,
request.selected_candidate_ids,
max_results,
request.remember_selection,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unsubscribe execution failed: {exc}",
) from exc
@app.post(
"/unsubscribe/auto-run",
response_model=UnsubscribeExecutionResponse,
dependencies=[Depends(verify_api_key)],
)
async def unsubscribe_auto_run(
max_results: int = Query(default=settings.unsubscribe_hil_max_results, ge=1, le=500),
) -> UnsubscribeExecutionResponse:
async with _get_unsubscribe_lock():
try:
return await asyncio.to_thread(_run_unsubscribe_auto_once, max_results)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Auto unsubscribe run failed: {exc}",
) from exc