From a14b02ad3c2903f6c7e3940703bfdcebe5c92a7d Mon Sep 17 00:00:00 2001 From: oabrivard Date: Sun, 8 Mar 2026 11:51:53 +0100 Subject: [PATCH] Initial commit --- .env.example | 6 ++ .gitignore | 6 ++ README.md | 92 ++++++++++++++++++ app/__init__.py | 1 + app/calendar_agent.py | 68 ++++++++++++++ app/config.py | 34 +++++++ app/gmail_agent.py | 214 ++++++++++++++++++++++++++++++++++++++++++ app/google_clients.py | 46 +++++++++ app/main.py | 193 +++++++++++++++++++++++++++++++++++++ requirements.txt | 7 ++ 10 files changed, 667 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 app/__init__.py create mode 100644 app/calendar_agent.py create mode 100644 app/config.py create mode 100644 app/gmail_agent.py create mode 100644 app/google_clients.py create mode 100644 app/main.py create mode 100644 requirements.txt diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..50260b0 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +GOOGLE_CLIENT_SECRETS_FILE=credentials.json +GOOGLE_TOKEN_FILE=token.json +AGENT_API_KEY=change-me +GMAIL_SCAN_INTERVAL_MINUTES=5 +GMAIL_QUERY=in:inbox -label:AgentProcessed newer_than:7d +LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..985554c --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +.venv/ +__pycache__/ +*.pyc +credentials.json +token.json diff --git a/README.md b/README.md new file mode 100644 index 0000000..3aeafdb --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +# Personal Gmail + Calendar Agent + +This project runs a small local API service that: + +- scans new Gmail inbox messages +- moves LinkedIn emails to a `LinkedIn` label/folder +- moves advertising emails to an `Advertising` label/folder +- exposes a secure availability endpoint powered by Google Calendar free/busy + +## 1) Prerequisites + +- Python 3.11+ +- A Google account +- A Google Cloud project with: + - Gmail API enabled + - Google Calendar API enabled + - OAuth consent configured + - OAuth Client ID of type **Desktop app** + +## 2) Google OAuth setup + +1. In Google Cloud Console, create a desktop OAuth client. +2. Download the client JSON file. +3. Save it in this project as `credentials.json`. + +The first run opens a browser window for consent and creates `token.json`. + +## 3) Install and configure + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +cp .env.example .env +``` + +Edit `.env` and set: + +- `AGENT_API_KEY` to a strong secret for agent-to-agent calls +- optional scan frequency and Gmail query + +## 4) Run + +```bash +uvicorn app.main:app --reload +``` + +At startup, the scheduler runs every `GMAIL_SCAN_INTERVAL_MINUTES`. + +## 5) API usage + +### Health check + +```bash +curl http://127.0.0.1:8000/health +``` + +### Manual Gmail scan + +```bash +curl -X POST "http://127.0.0.1:8000/scan?max_results=100" \ + -H "X-API-Key: your-secret" +``` + +### Availability for other AI agents + +```bash +curl -X POST "http://127.0.0.1:8000/availability" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-secret" \ + -d '{ + "start": "2026-03-09T09:00:00+01:00", + "end": "2026-03-09T10:00:00+01:00", + "calendar_ids": ["primary"] + }' +``` + +If `available` is `true`, there are no busy slots in that range. + +## Classification behavior + +- LinkedIn detection: sender or subject contains `linkedin` (LinkedIn has priority). +- Advertising detection: Gmail promotion category, `List-Unsubscribe`, `Precedence: bulk/list/junk`, common promo keywords, and marketing sender hints. +- Every scanned message gets an `AgentProcessed` label to avoid reprocessing loops. + +## Notes + +- Gmail "folders" are labels. This agent creates: + - `LinkedIn` + - `Advertising` + - `AgentProcessed` +- Messages classified as LinkedIn/Advertising are removed from `INBOX` (moved out of inbox). diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..d836cc7 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +# Personal agent package diff --git a/app/calendar_agent.py b/app/calendar_agent.py new file mode 100644 index 0000000..16fea68 --- /dev/null +++ b/app/calendar_agent.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any + + +@dataclass(frozen=True) +class AvailabilityResult: + start: str + end: str + available: bool + busy_slots: list[dict[str, str]] + checked_calendars: list[str] + + +class CalendarAvailabilityAgent: + def __init__(self, calendar_service: Any) -> None: + self.calendar_service = calendar_service + + def get_availability( + self, start: str, end: str, calendar_ids: list[str] | None = None + ) -> AvailabilityResult: + start_dt = _parse_iso_datetime(start) + end_dt = _parse_iso_datetime(end) + + if end_dt <= start_dt: + raise ValueError("end must be after start.") + + calendars = calendar_ids or ["primary"] + query_body = { + "timeMin": start_dt.isoformat(), + "timeMax": end_dt.isoformat(), + "items": [{"id": calendar_id} for calendar_id in calendars], + } + + freebusy = self.calendar_service.freebusy().query(body=query_body).execute() + calendars_payload = freebusy.get("calendars", {}) + + busy_slots: list[dict[str, str]] = [] + for calendar_id, data in calendars_payload.items(): + for busy_slot in data.get("busy", []): + busy_slots.append( + { + "calendar_id": calendar_id, + "start": busy_slot["start"], + "end": busy_slot["end"], + } + ) + + return AvailabilityResult( + start=start_dt.isoformat(), + end=end_dt.isoformat(), + available=len(busy_slots) == 0, + busy_slots=busy_slots, + checked_calendars=calendars, + ) + + +def _parse_iso_datetime(value: str) -> datetime: + normalized = value.strip() + if normalized.endswith("Z"): + normalized = normalized[:-1] + "+00:00" + + parsed = datetime.fromisoformat(normalized) + if parsed.tzinfo is None: + raise ValueError("datetime must include a timezone offset, for example +01:00.") + return parsed diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..d2f9b0f --- /dev/null +++ b/app/config.py @@ -0,0 +1,34 @@ +import os +from dataclasses import dataclass + +from dotenv import load_dotenv + +load_dotenv() + +GOOGLE_SCOPES = ( + "https://www.googleapis.com/auth/gmail.modify", + "https://www.googleapis.com/auth/calendar.readonly", +) + + +@dataclass(frozen=True) +class Settings: + google_client_secrets_file: str + google_token_file: str + gmail_scan_interval_minutes: int + gmail_query: str + agent_api_key: str + log_level: str + + +def get_settings() -> Settings: + return Settings( + google_client_secrets_file=os.getenv("GOOGLE_CLIENT_SECRETS_FILE", "credentials.json"), + google_token_file=os.getenv("GOOGLE_TOKEN_FILE", "token.json"), + gmail_scan_interval_minutes=int(os.getenv("GMAIL_SCAN_INTERVAL_MINUTES", "5")), + gmail_query=os.getenv( + "GMAIL_QUERY", "in:inbox -label:AgentProcessed newer_than:7d" + ), + agent_api_key=os.getenv("AGENT_API_KEY", ""), + log_level=os.getenv("LOG_LEVEL", "INFO"), + ) diff --git a/app/gmail_agent.py b/app/gmail_agent.py new file mode 100644 index 0000000..7380c1a --- /dev/null +++ b/app/gmail_agent.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +from dataclasses import dataclass +from email.utils import parseaddr +import logging +from typing import Any + + +METADATA_HEADERS = [ + "From", + "Subject", + "List-Unsubscribe", + "Precedence", +] + +AD_SUBJECT_KEYWORDS = { + "discount", + "offer", + "sale", + "promo", + "newsletter", + "deal", + "save", + "coupon", + "special offer", + "limited time", +} + +AD_SENDER_HINTS = { + "newsletter", + "marketing", + "offers", + "promotions", + "deals", + "no-reply", + "noreply", +} + +logger = logging.getLogger("personal-agent.gmail") + + +@dataclass(frozen=True) +class ScanResult: + scanned: int + linkedin: int + advertising: int + skipped: int + failed: int + + +class GmailTriageAgent: + def __init__(self, gmail_service: Any, query: str) -> None: + self.gmail_service = gmail_service + self.query = query + + def ensure_labels(self) -> dict[str, str]: + labels_response = ( + self.gmail_service.users().labels().list(userId="me").execute() + ) + labels = labels_response.get("labels", []) + label_by_name = {label["name"]: label["id"] for label in labels} + + for required_name in ("LinkedIn", "Advertising", "AgentProcessed"): + if required_name not in label_by_name: + created = ( + self.gmail_service.users() + .labels() + .create( + userId="me", + body={ + "name": required_name, + "labelListVisibility": "labelShow", + "messageListVisibility": "show", + }, + ) + .execute() + ) + label_by_name[required_name] = created["id"] + + return label_by_name + + def scan_and_route_messages(self, max_results: int = 100) -> ScanResult: + label_by_name = self.ensure_labels() + inbox_messages = ( + self.gmail_service.users() + .messages() + .list(userId="me", q=self.query, maxResults=max_results) + .execute() + .get("messages", []) + ) + + linkedin = 0 + advertising = 0 + skipped = 0 + failed = 0 + + for message in inbox_messages: + outcome = self._route_message(message["id"], label_by_name) + if outcome == "linkedin": + linkedin += 1 + elif outcome == "advertising": + advertising += 1 + elif outcome == "skipped": + skipped += 1 + else: + failed += 1 + + return ScanResult( + scanned=len(inbox_messages), + linkedin=linkedin, + advertising=advertising, + skipped=skipped, + failed=failed, + ) + + def _route_message(self, message_id: str, label_by_name: dict[str, str]) -> str: + try: + message = ( + self.gmail_service.users() + .messages() + .get( + userId="me", + id=message_id, + format="metadata", + metadataHeaders=METADATA_HEADERS, + ) + .execute() + ) + headers = { + h["name"].lower(): h["value"] + for h in message.get("payload", {}).get("headers", []) + } + label_ids = set(message.get("labelIds", [])) + + sender = headers.get("from", "") + subject = headers.get("subject", "") + + should_linkedin = self._is_linkedin_email(sender=sender, subject=subject) + should_advertising = self._is_advertising_email( + sender=sender, + subject=subject, + list_unsubscribe=headers.get("list-unsubscribe", ""), + precedence=headers.get("precedence", ""), + message_label_ids=label_ids, + ) + + add_labels = [label_by_name["AgentProcessed"]] + remove_labels = [] + + if should_linkedin: + add_labels.insert(0, label_by_name["LinkedIn"]) + remove_labels.append("INBOX") + outcome = "linkedin" + elif should_advertising: + add_labels.insert(0, label_by_name["Advertising"]) + remove_labels.append("INBOX") + outcome = "advertising" + else: + outcome = "skipped" + + ( + self.gmail_service.users() + .messages() + .modify( + userId="me", + id=message_id, + body={ + "addLabelIds": add_labels, + "removeLabelIds": remove_labels, + }, + ) + .execute() + ) + + return outcome + except Exception: + logger.exception("Failed to route message %s", message_id) + return "failed" + + def _is_linkedin_email(self, sender: str, subject: str) -> bool: + sender_lower = sender.lower() + subject_lower = subject.lower() + + if "linkedin" in sender_lower or "linkedin" in subject_lower: + return True + + parsed_address = parseaddr(sender)[1].lower() + return parsed_address.endswith("@linkedin.com") + + def _is_advertising_email( + self, + sender: str, + subject: str, + list_unsubscribe: str, + precedence: str, + message_label_ids: set[str], + ) -> bool: + sender_lower = sender.lower() + subject_lower = subject.lower() + precedence_lower = precedence.lower() + + if "CATEGORY_PROMOTIONS" in message_label_ids: + return True + + if list_unsubscribe.strip(): + return True + + if precedence_lower in {"bulk", "list", "junk"}: + return True + + if any(keyword in subject_lower for keyword in AD_SUBJECT_KEYWORDS): + return True + + return any(hint in sender_lower for hint in AD_SENDER_HINTS) diff --git a/app/google_clients.py b/app/google_clients.py new file mode 100644 index 0000000..8e7477c --- /dev/null +++ b/app/google_clients.py @@ -0,0 +1,46 @@ +import os + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build + +from app.config import GOOGLE_SCOPES, Settings + + +def get_google_credentials(settings: Settings) -> Credentials: + creds = None + + if os.path.exists(settings.google_token_file): + creds = Credentials.from_authorized_user_file( + settings.google_token_file, GOOGLE_SCOPES + ) + + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + if not os.path.exists(settings.google_client_secrets_file): + raise FileNotFoundError( + f"Missing OAuth client file at {settings.google_client_secrets_file}. " + "Create Google OAuth desktop credentials and save the JSON at this path." + ) + flow = InstalledAppFlow.from_client_secrets_file( + settings.google_client_secrets_file, GOOGLE_SCOPES + ) + creds = flow.run_local_server(port=0) + + with open(settings.google_token_file, "w", encoding="utf-8") as token_file: + token_file.write(creds.to_json()) + + return creds + + +def build_gmail_service(settings: Settings): + creds = get_google_credentials(settings) + return build("gmail", "v1", credentials=creds, cache_discovery=False) + + +def build_calendar_service(settings: Settings): + creds = get_google_credentials(settings) + return build("calendar", "v3", credentials=creds, cache_discovery=False) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..9ba3ad4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime +from typing import Annotated + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from fastapi import Depends, FastAPI, Header, HTTPException, Query, status +from pydantic import BaseModel + +from app.calendar_agent import CalendarAvailabilityAgent +from app.config import get_settings +from app.gmail_agent import GmailTriageAgent +from app.google_clients import build_calendar_service, build_gmail_service + +settings = get_settings() +logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO)) +logger = logging.getLogger("personal-agent") + +app = FastAPI(title="Personal Agent", version="0.1.0") +scheduler: AsyncIOScheduler | None = None +scan_lock: asyncio.Lock | None = None + + +class ScanResponse(BaseModel): + scanned: int + linkedin: int + advertising: int + skipped: int + failed: int + + +class AvailabilityRequest(BaseModel): + start: str + end: str + calendar_ids: list[str] | None = None + + +class BusySlot(BaseModel): + calendar_id: str + start: str + end: str + + +class AvailabilityResponse(BaseModel): + start: str + end: str + available: bool + busy_slots: list[BusySlot] + checked_calendars: list[str] + + +def verify_api_key( + x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, + authorization: Annotated[str | None, Header()] = None, +) -> None: + expected = settings.agent_api_key + if not expected: + return + + provided = x_api_key + if not provided and authorization: + parts = authorization.split(" ", 1) + if len(parts) == 2 and parts[0].lower() == "bearer": + provided = parts[1] + + if provided != expected: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key." + ) + + +def _run_scan_once(max_results: int) -> ScanResponse: + gmail_service = build_gmail_service(settings) + gmail_agent = GmailTriageAgent(gmail_service=gmail_service, query=settings.gmail_query) + result = gmail_agent.scan_and_route_messages(max_results=max_results) + return ScanResponse( + scanned=result.scanned, + linkedin=result.linkedin, + advertising=result.advertising, + skipped=result.skipped, + failed=result.failed, + ) + + +def _get_scan_lock() -> asyncio.Lock: + global scan_lock + if scan_lock is None: + scan_lock = asyncio.Lock() + return scan_lock + + +async def _scheduled_scan() -> None: + lock = _get_scan_lock() + if lock.locked(): + logger.info("Previous scan still running, skipping this tick.") + return + + async with lock: + try: + result = await asyncio.to_thread(_run_scan_once, 100) + logger.info("Scheduled scan complete: %s", result.model_dump()) + except Exception: + logger.exception("Scheduled scan failed") + + +@app.on_event("startup") +async def startup_event() -> None: + global scheduler + _get_scan_lock() + scheduler = AsyncIOScheduler() + scheduler.add_job( + _scheduled_scan, + "interval", + minutes=settings.gmail_scan_interval_minutes, + next_run_time=datetime.now(), + ) + scheduler.start() + logger.info( + "Scheduler started (interval=%s min)", settings.gmail_scan_interval_minutes + ) + + +@app.on_event("shutdown") +async def shutdown_event() -> None: + if scheduler: + scheduler.shutdown(wait=False) + + +@app.get("/health") +def health() -> dict[str, object]: + return { + "status": "ok", + "scan_interval_minutes": settings.gmail_scan_interval_minutes, + } + + +@app.post( + "/scan", + response_model=ScanResponse, + dependencies=[Depends(verify_api_key)], +) +async def scan_now(max_results: int = Query(100, ge=1, le=500)) -> ScanResponse: + async with _get_scan_lock(): + try: + return await asyncio.to_thread(_run_scan_once, max_results) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(exc), + ) from exc + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Gmail scan failed: {exc}", + ) from exc + + +@app.post( + "/availability", + response_model=AvailabilityResponse, + dependencies=[Depends(verify_api_key)], +) +async def availability(request: AvailabilityRequest) -> AvailabilityResponse: + try: + calendar_service = build_calendar_service(settings) + availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service) + result = await asyncio.to_thread( + availability_agent.get_availability, + request.start, + request.end, + request.calendar_ids, + ) + return AvailabilityResponse( + start=result.start, + end=result.end, + available=result.available, + busy_slots=result.busy_slots, + checked_calendars=result.checked_calendars, + ) + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(exc), + ) from exc + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Availability lookup failed: {exc}", + ) from exc diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..17108a8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +apscheduler +fastapi +google-api-python-client +google-auth +google-auth-oauthlib +python-dotenv +uvicorn[standard]