diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..da76f69 --- /dev/null +++ b/app/core/__init__.py @@ -0,0 +1,25 @@ +from app.core.models import ( + CoreAvailabilityResult, + CoreBusySlot, + CoreMailingListCandidate, + CoreMethodExecution, + CoreScanResult, + CoreUnsubscribeCandidatesResult, + CoreUnsubscribeDigestResult, + CoreUnsubscribeExecutionResult, + CoreUnsubscribeMethod, +) +from app.core.service import CoreAgentService + +__all__ = [ + "CoreAgentService", + "CoreScanResult", + "CoreAvailabilityResult", + "CoreBusySlot", + "CoreUnsubscribeDigestResult", + "CoreUnsubscribeMethod", + "CoreMailingListCandidate", + "CoreUnsubscribeCandidatesResult", + "CoreMethodExecution", + "CoreUnsubscribeExecutionResult", +] diff --git a/app/core/models.py b/app/core/models.py new file mode 100644 index 0000000..f3b73a8 --- /dev/null +++ b/app/core/models.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class CoreScanResult: + scanned: int + linkedin: int + advertising: int + veille_techno: int + skipped: int + failed: int + + +@dataclass(frozen=True) +class CoreBusySlot: + calendar_id: str + start: str + end: str + + +@dataclass(frozen=True) +class CoreAvailabilityResult: + start: str + end: str + available: bool + busy_slots: list[CoreBusySlot] + checked_calendars: list[str] + + +@dataclass(frozen=True) +class CoreUnsubscribeDigestResult: + scanned_messages: int + extracted_unique_links: int + new_links: int + sent_to: str | None + email_sent: bool + + +@dataclass(frozen=True) +class CoreUnsubscribeMethod: + method_id: str + method_type: str + value: str + + +@dataclass(frozen=True) +class CoreMailingListCandidate: + candidate_id: str + list_name: str + sender_domain: str + message_count: int + sample_senders: list[str] + sample_subjects: list[str] + methods: list[CoreUnsubscribeMethod] + approved: bool + + +@dataclass(frozen=True) +class CoreUnsubscribeCandidatesResult: + scanned_messages: int + candidates: list[CoreMailingListCandidate] + + +@dataclass(frozen=True) +class CoreMethodExecution: + candidate_id: str + list_name: str + method_id: str + method_type: str + value: str + success: bool + detail: str + + +@dataclass(frozen=True) +class CoreUnsubscribeExecutionResult: + scanned_messages: int + candidates_considered: int + selected_candidates: int + executed_methods: int + skipped_already_executed: int + failed_methods: int + updated_approved_count: int + results: list[CoreMethodExecution] diff --git a/app/core/service.py b/app/core/service.py new file mode 100644 index 0000000..eae4215 --- /dev/null +++ b/app/core/service.py @@ -0,0 +1,202 @@ +from __future__ import annotations + +import logging + +from app.calendar_agent import CalendarAvailabilityAgent +from app.config import Settings +from app.gmail_agent import GmailTriageAgent +from app.google_clients import build_calendar_service, build_gmail_service +from app.strands_classifier import StrandsEmailClassifier +from app.unsubscribe_agent import UnsubscribeDigestAgent +from app.unsubscribe_hil_agent import ( + CandidateSnapshot, + UnsubscribeExecutionResult, + UnsubscribeHumanLoopAgent, +) + +from app.core.models import ( + CoreAvailabilityResult, + CoreBusySlot, + CoreMailingListCandidate, + CoreMethodExecution, + CoreScanResult, + CoreUnsubscribeCandidatesResult, + CoreUnsubscribeDigestResult, + CoreUnsubscribeExecutionResult, + CoreUnsubscribeMethod, +) + + +class CoreAgentService: + def __init__(self, settings: Settings, *, logger: logging.Logger | None = None) -> None: + self.settings = settings + self.logger = logger or logging.getLogger("personal-agent.core") + self._strands_key_warning_logged = False + + def scan_mailbox(self, max_results: int) -> CoreScanResult: + gmail_service = build_gmail_service(self.settings) + gmail_agent = GmailTriageAgent( + gmail_service=gmail_service, + query=self.settings.gmail_query, + classifier=self._build_strands_classifier(), + fallback_to_rules=self.settings.llm_fallback_to_rules, + ) + result = gmail_agent.scan_and_route_messages(max_results=max_results) + return CoreScanResult( + scanned=result.scanned, + linkedin=result.linkedin, + advertising=result.advertising, + veille_techno=result.veille_techno, + skipped=result.skipped, + failed=result.failed, + ) + + def check_availability( + self, start: str, end: str, calendar_ids: list[str] | None + ) -> CoreAvailabilityResult: + calendar_service = build_calendar_service(self.settings) + availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service) + result = availability_agent.get_availability(start, end, calendar_ids) + return CoreAvailabilityResult( + start=result.start, + end=result.end, + available=result.available, + busy_slots=[ + CoreBusySlot( + calendar_id=slot["calendar_id"], + start=slot["start"], + end=slot["end"], + ) + for slot in result.busy_slots + ], + checked_calendars=result.checked_calendars, + ) + + def scan_unsubscribe_digest(self, max_results: int) -> CoreUnsubscribeDigestResult: + bounded_max_results = max(1, min(max_results, 500)) + gmail_service = build_gmail_service(self.settings) + unsubscribe_agent = UnsubscribeDigestAgent( + gmail_service=gmail_service, + query=self.settings.unsubscribe_query, + state_file=self.settings.unsubscribe_state_file, + recipient_email=self.settings.unsubscribe_digest_recipient, + send_empty_digest=self.settings.unsubscribe_send_empty_digest, + ) + result = unsubscribe_agent.scan_and_send_digest(max_results=bounded_max_results) + return CoreUnsubscribeDigestResult( + scanned_messages=result.scanned_messages, + extracted_unique_links=result.extracted_unique_links, + new_links=result.new_links, + sent_to=result.sent_to, + email_sent=result.email_sent, + ) + + def list_unsubscribe_candidates(self, max_results: int) -> CoreUnsubscribeCandidatesResult: + snapshot = self._build_unsubscribe_hil_agent().discover_candidates(max_results=max_results) + return self._snapshot_to_core(snapshot) + + def execute_unsubscribe_selected( + self, + selected_candidate_ids: list[str], + max_results: int, + remember_selection: bool, + ) -> CoreUnsubscribeExecutionResult: + result = self._build_unsubscribe_hil_agent().execute_selected( + selected_candidate_ids=selected_candidate_ids, + max_results=max_results, + remember_selection=remember_selection, + ) + return self._execution_to_core(result) + + def run_unsubscribe_auto(self, max_results: int) -> CoreUnsubscribeExecutionResult: + result = self._build_unsubscribe_hil_agent().execute_for_approved(max_results=max_results) + return self._execution_to_core(result) + + def _build_strands_classifier(self) -> StrandsEmailClassifier | None: + if not self.settings.strands_api_key: + if self.settings.llm_fallback_to_rules: + if not self._strands_key_warning_logged: + self.logger.warning( + "Strands API key not set. Falling back to rules-based classification." + ) + self._strands_key_warning_logged = True + return None + raise RuntimeError( + "STRANDS_OPENAI_API_KEY (or LLM_API_KEY) is required when LLM_FALLBACK_TO_RULES is disabled." + ) + + try: + return StrandsEmailClassifier( + api_key=self.settings.strands_api_key, + model_id=self.settings.strands_model_id, + base_url=self.settings.strands_base_url, + timeout_seconds=self.settings.strands_timeout_seconds, + temperature=self.settings.strands_temperature, + ) + except Exception: + if self.settings.llm_fallback_to_rules: + self.logger.exception( + "Could not initialize Strands classifier; using rules fallback." + ) + return None + raise + + def _build_unsubscribe_hil_agent(self) -> UnsubscribeHumanLoopAgent: + gmail_service = build_gmail_service(self.settings) + return UnsubscribeHumanLoopAgent( + gmail_service=gmail_service, + query=self.settings.unsubscribe_hil_query, + state_file=self.settings.unsubscribe_hil_state_file, + http_timeout_seconds=self.settings.unsubscribe_http_timeout_seconds, + user_agent=self.settings.unsubscribe_user_agent, + ) + + def _snapshot_to_core(self, snapshot: CandidateSnapshot) -> CoreUnsubscribeCandidatesResult: + return CoreUnsubscribeCandidatesResult( + scanned_messages=snapshot.scanned_messages, + candidates=[ + CoreMailingListCandidate( + candidate_id=candidate.candidate_id, + list_name=candidate.list_name, + sender_domain=candidate.sender_domain, + message_count=candidate.message_count, + sample_senders=candidate.sample_senders, + sample_subjects=candidate.sample_subjects, + methods=[ + CoreUnsubscribeMethod( + method_id=method.method_id, + method_type=method.method_type, + value=method.value, + ) + for method in candidate.methods + ], + approved=candidate.approved, + ) + for candidate in snapshot.candidates + ], + ) + + def _execution_to_core( + self, result: UnsubscribeExecutionResult + ) -> CoreUnsubscribeExecutionResult: + return CoreUnsubscribeExecutionResult( + scanned_messages=result.scanned_messages, + candidates_considered=result.candidates_considered, + selected_candidates=result.selected_candidates, + executed_methods=result.executed_methods, + skipped_already_executed=result.skipped_already_executed, + failed_methods=result.failed_methods, + updated_approved_count=result.updated_approved_count, + results=[ + CoreMethodExecution( + candidate_id=item.candidate_id, + list_name=item.list_name, + method_id=item.method_id, + method_type=item.method_type, + value=item.value, + success=item.success, + detail=item.detail, + ) + for item in result.results + ], + ) diff --git a/app/main.py b/app/main.py index 767201b..603a2ac 100644 --- a/app/main.py +++ b/app/main.py @@ -10,26 +10,17 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import Depends, FastAPI, Header, HTTPException, Query, status from pydantic import BaseModel, Field -from app.calendar_agent import CalendarAvailabilityAgent from app.config import get_settings -from app.gmail_agent import GmailTriageAgent -from app.google_clients import build_calendar_service, build_gmail_service -from app.strands_classifier import StrandsEmailClassifier -from app.unsubscribe_agent import UnsubscribeDigestAgent -from app.unsubscribe_hil_agent import ( - UnsubscribeHumanLoopAgent, - CandidateSnapshot, - UnsubscribeExecutionResult, -) +from app.core.service import CoreAgentService settings = get_settings() logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO)) logger = logging.getLogger("personal-agent") +core_service = CoreAgentService(settings=settings, logger=logger) scheduler: AsyncIOScheduler | None = None scan_lock: asyncio.Lock | None = None unsubscribe_lock: asyncio.Lock | None = None -strands_key_warning_logged = False class ScanResponse(BaseModel): @@ -143,14 +134,7 @@ def verify_api_key( def _run_scan_once(max_results: int) -> ScanResponse: - gmail_service = build_gmail_service(settings) - gmail_agent = GmailTriageAgent( - gmail_service=gmail_service, - query=settings.gmail_query, - classifier=_build_strands_classifier(), - fallback_to_rules=settings.llm_fallback_to_rules, - ) - result = gmail_agent.scan_and_route_messages(max_results=max_results) + result = core_service.scan_mailbox(max_results=max_results) return ScanResponse( scanned=result.scanned, linkedin=result.linkedin, @@ -162,16 +146,7 @@ def _run_scan_once(max_results: int) -> ScanResponse: def _run_unsubscribe_digest_once(max_results: int) -> UnsubscribeDigestResponse: - bounded_max_results = max(1, min(max_results, 500)) - gmail_service = build_gmail_service(settings) - unsubscribe_agent = UnsubscribeDigestAgent( - gmail_service=gmail_service, - query=settings.unsubscribe_query, - state_file=settings.unsubscribe_state_file, - recipient_email=settings.unsubscribe_digest_recipient, - send_empty_digest=settings.unsubscribe_send_empty_digest, - ) - result = unsubscribe_agent.scan_and_send_digest(max_results=bounded_max_results) + result = core_service.scan_unsubscribe_digest(max_results=max_results) return UnsubscribeDigestResponse( scanned_messages=result.scanned_messages, extracted_unique_links=result.extracted_unique_links, @@ -182,34 +157,9 @@ def _run_unsubscribe_digest_once(max_results: int) -> UnsubscribeDigestResponse: def _run_unsubscribe_candidates_once(max_results: int) -> UnsubscribeCandidatesResponse: - agent = _build_unsubscribe_hil_agent() - snapshot = agent.discover_candidates(max_results=max_results) - return _snapshot_to_response(snapshot) - - -def _run_unsubscribe_execute_selected( - selected_candidate_ids: list[str], - max_results: int, - remember_selection: bool, -) -> UnsubscribeExecutionResponse: - agent = _build_unsubscribe_hil_agent() - result = agent.execute_selected( - selected_candidate_ids=selected_candidate_ids, - max_results=max_results, - remember_selection=remember_selection, - ) - return _execution_to_response(result) - - -def _run_unsubscribe_auto_once(max_results: int) -> UnsubscribeExecutionResponse: - agent = _build_unsubscribe_hil_agent() - result = agent.execute_for_approved(max_results=max_results) - return _execution_to_response(result) - - -def _snapshot_to_response(snapshot: CandidateSnapshot) -> UnsubscribeCandidatesResponse: + result = core_service.list_unsubscribe_candidates(max_results=max_results) return UnsubscribeCandidatesResponse( - scanned_messages=snapshot.scanned_messages, + scanned_messages=result.scanned_messages, candidates=[ MailingListCandidateResponse( candidate_id=candidate.candidate_id, @@ -228,12 +178,21 @@ def _snapshot_to_response(snapshot: CandidateSnapshot) -> UnsubscribeCandidatesR ], approved=candidate.approved, ) - for candidate in snapshot.candidates + for candidate in result.candidates ], ) -def _execution_to_response(result: UnsubscribeExecutionResult) -> UnsubscribeExecutionResponse: +def _run_unsubscribe_execute_selected( + selected_candidate_ids: list[str], + max_results: int, + remember_selection: bool, +) -> UnsubscribeExecutionResponse: + result = core_service.execute_unsubscribe_selected( + selected_candidate_ids=selected_candidate_ids, + max_results=max_results, + remember_selection=remember_selection, + ) return UnsubscribeExecutionResponse( scanned_messages=result.scanned_messages, candidates_considered=result.candidates_considered, @@ -257,44 +216,28 @@ def _execution_to_response(result: UnsubscribeExecutionResult) -> UnsubscribeExe ) -def _build_strands_classifier() -> StrandsEmailClassifier | None: - global strands_key_warning_logged - - if not settings.strands_api_key: - if settings.llm_fallback_to_rules: - if not strands_key_warning_logged: - logger.warning( - "Strands API key not set. Falling back to rules-based classification." - ) - strands_key_warning_logged = True - return None - raise RuntimeError( - "STRANDS_OPENAI_API_KEY (or LLM_API_KEY) is required when LLM_FALLBACK_TO_RULES is disabled." - ) - - try: - return StrandsEmailClassifier( - api_key=settings.strands_api_key, - model_id=settings.strands_model_id, - base_url=settings.strands_base_url, - timeout_seconds=settings.strands_timeout_seconds, - temperature=settings.strands_temperature, - ) - except Exception: - if settings.llm_fallback_to_rules: - logger.exception("Could not initialize Strands classifier; using rules fallback.") - return None - raise - - -def _build_unsubscribe_hil_agent() -> UnsubscribeHumanLoopAgent: - gmail_service = build_gmail_service(settings) - return UnsubscribeHumanLoopAgent( - gmail_service=gmail_service, - query=settings.unsubscribe_hil_query, - state_file=settings.unsubscribe_hil_state_file, - http_timeout_seconds=settings.unsubscribe_http_timeout_seconds, - user_agent=settings.unsubscribe_user_agent, +def _run_unsubscribe_auto_once(max_results: int) -> UnsubscribeExecutionResponse: + result = core_service.run_unsubscribe_auto(max_results=max_results) + return UnsubscribeExecutionResponse( + scanned_messages=result.scanned_messages, + candidates_considered=result.candidates_considered, + selected_candidates=result.selected_candidates, + executed_methods=result.executed_methods, + skipped_already_executed=result.skipped_already_executed, + failed_methods=result.failed_methods, + updated_approved_count=result.updated_approved_count, + results=[ + MethodExecutionResponse( + candidate_id=item.candidate_id, + list_name=item.list_name, + method_id=item.method_id, + method_type=item.method_type, + value=item.value, + success=item.success, + detail=item.detail, + ) + for item in result.results + ], ) @@ -444,10 +387,8 @@ async def scan_now(max_results: int = Query(100, ge=1, le=500)) -> ScanResponse: ) async def availability(request: AvailabilityRequest) -> AvailabilityResponse: try: - calendar_service = build_calendar_service(settings) - availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service) result = await asyncio.to_thread( - availability_agent.get_availability, + core_service.check_availability, request.start, request.end, request.calendar_ids,