You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
9.1 KiB
Python

from __future__ import annotations
import logging
from app.calendar_agent import CalendarAvailabilityAgent
from app.config import Settings
from app.gmail_agent import GmailTriageAgent
from app.google_clients import build_calendar_service, build_gmail_service
from app.strands_classifier import StrandsEmailClassifier
from app.unsubscribe_agent import UnsubscribeDigestAgent
from app.unsubscribe_hil_agent import (
CandidateSnapshot,
UnsubscribeExecutionResult,
UnsubscribeHumanLoopAgent,
)
from app.core.models import (
CoreAvailabilityResult,
CoreBusySlot,
CoreMeetingInterval,
CoreMeetingIntervalsResult,
CoreMailingListCandidate,
CoreMethodExecution,
CoreScanResult,
CoreUnsubscribeCandidatesResult,
CoreUnsubscribeDigestResult,
CoreUnsubscribeExecutionResult,
CoreUnsubscribeMethod,
)
class CoreAgentService:
def __init__(self, settings: Settings, *, logger: logging.Logger | None = None) -> None:
self.settings = settings
self.logger = logger or logging.getLogger("personal-agent.core")
self._strands_key_warning_logged = False
def scan_mailbox(self, max_results: int) -> CoreScanResult:
gmail_service = build_gmail_service(self.settings)
gmail_agent = GmailTriageAgent(
gmail_service=gmail_service,
query=self.settings.gmail_query,
classifier=self._build_strands_classifier(),
fallback_to_rules=self.settings.llm_fallback_to_rules,
)
result = gmail_agent.scan_and_route_messages(max_results=max_results)
return CoreScanResult(
scanned=result.scanned,
linkedin=result.linkedin,
advertising=result.advertising,
veille_techno=result.veille_techno,
skipped=result.skipped,
failed=result.failed,
)
def check_availability(
self, start: str, end: str, calendar_ids: list[str] | None
) -> CoreAvailabilityResult:
calendar_service = build_calendar_service(self.settings)
availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service)
result = availability_agent.get_availability(start, end, calendar_ids)
return CoreAvailabilityResult(
start=result.start,
end=result.end,
available=result.available,
busy_slots=[
CoreBusySlot(
calendar_id=slot["calendar_id"],
start=slot["start"],
end=slot["end"],
)
for slot in result.busy_slots
],
checked_calendars=result.checked_calendars,
)
def available_meeting_intervals(
self, start: str, end: str, calendar_ids: list[str] | None
) -> CoreMeetingIntervalsResult:
calendar_service = build_calendar_service(self.settings)
availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service)
result = availability_agent.get_available_meeting_intervals(start, end, calendar_ids)
return CoreMeetingIntervalsResult(
start=result.start,
end=result.end,
timezone=result.timezone,
meeting_intervals=[
CoreMeetingInterval(
start=interval["start"],
end=interval["end"],
)
for interval in result.meeting_intervals
],
checked_calendars=result.checked_calendars,
)
def scan_unsubscribe_digest(self, max_results: int) -> CoreUnsubscribeDigestResult:
bounded_max_results = max(1, min(max_results, 500))
gmail_service = build_gmail_service(self.settings)
unsubscribe_agent = UnsubscribeDigestAgent(
gmail_service=gmail_service,
query=self.settings.unsubscribe_query,
state_file=self.settings.unsubscribe_state_file,
recipient_email=self.settings.unsubscribe_digest_recipient,
send_empty_digest=self.settings.unsubscribe_send_empty_digest,
)
result = unsubscribe_agent.scan_and_send_digest(max_results=bounded_max_results)
return CoreUnsubscribeDigestResult(
scanned_messages=result.scanned_messages,
extracted_unique_links=result.extracted_unique_links,
new_links=result.new_links,
sent_to=result.sent_to,
email_sent=result.email_sent,
)
def list_unsubscribe_candidates(self, max_results: int) -> CoreUnsubscribeCandidatesResult:
snapshot = self._build_unsubscribe_hil_agent().discover_candidates(max_results=max_results)
return self._snapshot_to_core(snapshot)
def execute_unsubscribe_selected(
self,
selected_candidate_ids: list[str],
max_results: int,
remember_selection: bool,
) -> CoreUnsubscribeExecutionResult:
result = self._build_unsubscribe_hil_agent().execute_selected(
selected_candidate_ids=selected_candidate_ids,
max_results=max_results,
remember_selection=remember_selection,
)
return self._execution_to_core(result)
def run_unsubscribe_auto(self, max_results: int) -> CoreUnsubscribeExecutionResult:
result = self._build_unsubscribe_hil_agent().execute_for_approved(max_results=max_results)
return self._execution_to_core(result)
def _build_strands_classifier(self) -> StrandsEmailClassifier | None:
if not self.settings.strands_api_key:
if self.settings.llm_fallback_to_rules:
if not self._strands_key_warning_logged:
self.logger.warning(
"Strands API key not set. Falling back to rules-based classification."
)
self._strands_key_warning_logged = True
return None
raise RuntimeError(
"STRANDS_OPENAI_API_KEY (or LLM_API_KEY) is required when LLM_FALLBACK_TO_RULES is disabled."
)
try:
return StrandsEmailClassifier(
api_key=self.settings.strands_api_key,
model_id=self.settings.strands_model_id,
base_url=self.settings.strands_base_url,
timeout_seconds=self.settings.strands_timeout_seconds,
temperature=self.settings.strands_temperature,
)
except Exception:
if self.settings.llm_fallback_to_rules:
self.logger.exception(
"Could not initialize Strands classifier; using rules fallback."
)
return None
raise
def _build_unsubscribe_hil_agent(self) -> UnsubscribeHumanLoopAgent:
gmail_service = build_gmail_service(self.settings)
return UnsubscribeHumanLoopAgent(
gmail_service=gmail_service,
query=self.settings.unsubscribe_hil_query,
state_file=self.settings.unsubscribe_hil_state_file,
http_timeout_seconds=self.settings.unsubscribe_http_timeout_seconds,
user_agent=self.settings.unsubscribe_user_agent,
)
def _snapshot_to_core(self, snapshot: CandidateSnapshot) -> CoreUnsubscribeCandidatesResult:
return CoreUnsubscribeCandidatesResult(
scanned_messages=snapshot.scanned_messages,
candidates=[
CoreMailingListCandidate(
candidate_id=candidate.candidate_id,
list_name=candidate.list_name,
sender_domain=candidate.sender_domain,
message_count=candidate.message_count,
sample_senders=candidate.sample_senders,
sample_subjects=candidate.sample_subjects,
methods=[
CoreUnsubscribeMethod(
method_id=method.method_id,
method_type=method.method_type,
value=method.value,
)
for method in candidate.methods
],
approved=candidate.approved,
)
for candidate in snapshot.candidates
],
)
def _execution_to_core(
self, result: UnsubscribeExecutionResult
) -> CoreUnsubscribeExecutionResult:
return CoreUnsubscribeExecutionResult(
scanned_messages=result.scanned_messages,
candidates_considered=result.candidates_considered,
selected_candidates=result.selected_candidates,
executed_methods=result.executed_methods,
skipped_already_executed=result.skipped_already_executed,
failed_methods=result.failed_methods,
updated_approved_count=result.updated_approved_count,
results=[
CoreMethodExecution(
candidate_id=item.candidate_id,
list_name=item.list_name,
method_id=item.method_id,
method_type=item.method_type,
value=item.value,
success=item.success,
detail=item.detail,
)
for item in result.results
],
)