Initial commit

master
oabrivard 1 week ago
commit a14b02ad3c

@ -0,0 +1,6 @@
GOOGLE_CLIENT_SECRETS_FILE=credentials.json
GOOGLE_TOKEN_FILE=token.json
AGENT_API_KEY=change-me
GMAIL_SCAN_INTERVAL_MINUTES=5
GMAIL_QUERY=in:inbox -label:AgentProcessed newer_than:7d
LOG_LEVEL=INFO

6
.gitignore vendored

@ -0,0 +1,6 @@
.env
.venv/
__pycache__/
*.pyc
credentials.json
token.json

@ -0,0 +1,92 @@
# Personal Gmail + Calendar Agent
This project runs a small local API service that:
- scans new Gmail inbox messages
- moves LinkedIn emails to a `LinkedIn` label/folder
- moves advertising emails to an `Advertising` label/folder
- exposes a secure availability endpoint powered by Google Calendar free/busy
## 1) Prerequisites
- Python 3.11+
- A Google account
- A Google Cloud project with:
- Gmail API enabled
- Google Calendar API enabled
- OAuth consent configured
- OAuth Client ID of type **Desktop app**
## 2) Google OAuth setup
1. In Google Cloud Console, create a desktop OAuth client.
2. Download the client JSON file.
3. Save it in this project as `credentials.json`.
The first run opens a browser window for consent and creates `token.json`.
## 3) Install and configure
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env
```
Edit `.env` and set:
- `AGENT_API_KEY` to a strong secret for agent-to-agent calls
- optional scan frequency and Gmail query
## 4) Run
```bash
uvicorn app.main:app --reload
```
At startup, the scheduler runs every `GMAIL_SCAN_INTERVAL_MINUTES`.
## 5) API usage
### Health check
```bash
curl http://127.0.0.1:8000/health
```
### Manual Gmail scan
```bash
curl -X POST "http://127.0.0.1:8000/scan?max_results=100" \
-H "X-API-Key: your-secret"
```
### Availability for other AI agents
```bash
curl -X POST "http://127.0.0.1:8000/availability" \
-H "Content-Type: application/json" \
-H "X-API-Key: your-secret" \
-d '{
"start": "2026-03-09T09:00:00+01:00",
"end": "2026-03-09T10:00:00+01:00",
"calendar_ids": ["primary"]
}'
```
If `available` is `true`, there are no busy slots in that range.
## Classification behavior
- LinkedIn detection: sender or subject contains `linkedin` (LinkedIn has priority).
- Advertising detection: Gmail promotion category, `List-Unsubscribe`, `Precedence: bulk/list/junk`, common promo keywords, and marketing sender hints.
- Every scanned message gets an `AgentProcessed` label to avoid reprocessing loops.
## Notes
- Gmail "folders" are labels. This agent creates:
- `LinkedIn`
- `Advertising`
- `AgentProcessed`
- Messages classified as LinkedIn/Advertising are removed from `INBOX` (moved out of inbox).

@ -0,0 +1 @@
# Personal agent package

@ -0,0 +1,68 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class AvailabilityResult:
start: str
end: str
available: bool
busy_slots: list[dict[str, str]]
checked_calendars: list[str]
class CalendarAvailabilityAgent:
def __init__(self, calendar_service: Any) -> None:
self.calendar_service = calendar_service
def get_availability(
self, start: str, end: str, calendar_ids: list[str] | None = None
) -> AvailabilityResult:
start_dt = _parse_iso_datetime(start)
end_dt = _parse_iso_datetime(end)
if end_dt <= start_dt:
raise ValueError("end must be after start.")
calendars = calendar_ids or ["primary"]
query_body = {
"timeMin": start_dt.isoformat(),
"timeMax": end_dt.isoformat(),
"items": [{"id": calendar_id} for calendar_id in calendars],
}
freebusy = self.calendar_service.freebusy().query(body=query_body).execute()
calendars_payload = freebusy.get("calendars", {})
busy_slots: list[dict[str, str]] = []
for calendar_id, data in calendars_payload.items():
for busy_slot in data.get("busy", []):
busy_slots.append(
{
"calendar_id": calendar_id,
"start": busy_slot["start"],
"end": busy_slot["end"],
}
)
return AvailabilityResult(
start=start_dt.isoformat(),
end=end_dt.isoformat(),
available=len(busy_slots) == 0,
busy_slots=busy_slots,
checked_calendars=calendars,
)
def _parse_iso_datetime(value: str) -> datetime:
normalized = value.strip()
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
raise ValueError("datetime must include a timezone offset, for example +01:00.")
return parsed

@ -0,0 +1,34 @@
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
GOOGLE_SCOPES = (
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar.readonly",
)
@dataclass(frozen=True)
class Settings:
google_client_secrets_file: str
google_token_file: str
gmail_scan_interval_minutes: int
gmail_query: str
agent_api_key: str
log_level: str
def get_settings() -> Settings:
return Settings(
google_client_secrets_file=os.getenv("GOOGLE_CLIENT_SECRETS_FILE", "credentials.json"),
google_token_file=os.getenv("GOOGLE_TOKEN_FILE", "token.json"),
gmail_scan_interval_minutes=int(os.getenv("GMAIL_SCAN_INTERVAL_MINUTES", "5")),
gmail_query=os.getenv(
"GMAIL_QUERY", "in:inbox -label:AgentProcessed newer_than:7d"
),
agent_api_key=os.getenv("AGENT_API_KEY", ""),
log_level=os.getenv("LOG_LEVEL", "INFO"),
)

@ -0,0 +1,214 @@
from __future__ import annotations
from dataclasses import dataclass
from email.utils import parseaddr
import logging
from typing import Any
METADATA_HEADERS = [
"From",
"Subject",
"List-Unsubscribe",
"Precedence",
]
AD_SUBJECT_KEYWORDS = {
"discount",
"offer",
"sale",
"promo",
"newsletter",
"deal",
"save",
"coupon",
"special offer",
"limited time",
}
AD_SENDER_HINTS = {
"newsletter",
"marketing",
"offers",
"promotions",
"deals",
"no-reply",
"noreply",
}
logger = logging.getLogger("personal-agent.gmail")
@dataclass(frozen=True)
class ScanResult:
scanned: int
linkedin: int
advertising: int
skipped: int
failed: int
class GmailTriageAgent:
def __init__(self, gmail_service: Any, query: str) -> None:
self.gmail_service = gmail_service
self.query = query
def ensure_labels(self) -> dict[str, str]:
labels_response = (
self.gmail_service.users().labels().list(userId="me").execute()
)
labels = labels_response.get("labels", [])
label_by_name = {label["name"]: label["id"] for label in labels}
for required_name in ("LinkedIn", "Advertising", "AgentProcessed"):
if required_name not in label_by_name:
created = (
self.gmail_service.users()
.labels()
.create(
userId="me",
body={
"name": required_name,
"labelListVisibility": "labelShow",
"messageListVisibility": "show",
},
)
.execute()
)
label_by_name[required_name] = created["id"]
return label_by_name
def scan_and_route_messages(self, max_results: int = 100) -> ScanResult:
label_by_name = self.ensure_labels()
inbox_messages = (
self.gmail_service.users()
.messages()
.list(userId="me", q=self.query, maxResults=max_results)
.execute()
.get("messages", [])
)
linkedin = 0
advertising = 0
skipped = 0
failed = 0
for message in inbox_messages:
outcome = self._route_message(message["id"], label_by_name)
if outcome == "linkedin":
linkedin += 1
elif outcome == "advertising":
advertising += 1
elif outcome == "skipped":
skipped += 1
else:
failed += 1
return ScanResult(
scanned=len(inbox_messages),
linkedin=linkedin,
advertising=advertising,
skipped=skipped,
failed=failed,
)
def _route_message(self, message_id: str, label_by_name: dict[str, str]) -> str:
try:
message = (
self.gmail_service.users()
.messages()
.get(
userId="me",
id=message_id,
format="metadata",
metadataHeaders=METADATA_HEADERS,
)
.execute()
)
headers = {
h["name"].lower(): h["value"]
for h in message.get("payload", {}).get("headers", [])
}
label_ids = set(message.get("labelIds", []))
sender = headers.get("from", "")
subject = headers.get("subject", "")
should_linkedin = self._is_linkedin_email(sender=sender, subject=subject)
should_advertising = self._is_advertising_email(
sender=sender,
subject=subject,
list_unsubscribe=headers.get("list-unsubscribe", ""),
precedence=headers.get("precedence", ""),
message_label_ids=label_ids,
)
add_labels = [label_by_name["AgentProcessed"]]
remove_labels = []
if should_linkedin:
add_labels.insert(0, label_by_name["LinkedIn"])
remove_labels.append("INBOX")
outcome = "linkedin"
elif should_advertising:
add_labels.insert(0, label_by_name["Advertising"])
remove_labels.append("INBOX")
outcome = "advertising"
else:
outcome = "skipped"
(
self.gmail_service.users()
.messages()
.modify(
userId="me",
id=message_id,
body={
"addLabelIds": add_labels,
"removeLabelIds": remove_labels,
},
)
.execute()
)
return outcome
except Exception:
logger.exception("Failed to route message %s", message_id)
return "failed"
def _is_linkedin_email(self, sender: str, subject: str) -> bool:
sender_lower = sender.lower()
subject_lower = subject.lower()
if "linkedin" in sender_lower or "linkedin" in subject_lower:
return True
parsed_address = parseaddr(sender)[1].lower()
return parsed_address.endswith("@linkedin.com")
def _is_advertising_email(
self,
sender: str,
subject: str,
list_unsubscribe: str,
precedence: str,
message_label_ids: set[str],
) -> bool:
sender_lower = sender.lower()
subject_lower = subject.lower()
precedence_lower = precedence.lower()
if "CATEGORY_PROMOTIONS" in message_label_ids:
return True
if list_unsubscribe.strip():
return True
if precedence_lower in {"bulk", "list", "junk"}:
return True
if any(keyword in subject_lower for keyword in AD_SUBJECT_KEYWORDS):
return True
return any(hint in sender_lower for hint in AD_SENDER_HINTS)

@ -0,0 +1,46 @@
import os
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from app.config import GOOGLE_SCOPES, Settings
def get_google_credentials(settings: Settings) -> Credentials:
creds = None
if os.path.exists(settings.google_token_file):
creds = Credentials.from_authorized_user_file(
settings.google_token_file, GOOGLE_SCOPES
)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not os.path.exists(settings.google_client_secrets_file):
raise FileNotFoundError(
f"Missing OAuth client file at {settings.google_client_secrets_file}. "
"Create Google OAuth desktop credentials and save the JSON at this path."
)
flow = InstalledAppFlow.from_client_secrets_file(
settings.google_client_secrets_file, GOOGLE_SCOPES
)
creds = flow.run_local_server(port=0)
with open(settings.google_token_file, "w", encoding="utf-8") as token_file:
token_file.write(creds.to_json())
return creds
def build_gmail_service(settings: Settings):
creds = get_google_credentials(settings)
return build("gmail", "v1", credentials=creds, cache_discovery=False)
def build_calendar_service(settings: Settings):
creds = get_google_credentials(settings)
return build("calendar", "v3", credentials=creds, cache_discovery=False)

@ -0,0 +1,193 @@
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import Annotated
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, FastAPI, Header, HTTPException, Query, status
from pydantic import BaseModel
from app.calendar_agent import CalendarAvailabilityAgent
from app.config import get_settings
from app.gmail_agent import GmailTriageAgent
from app.google_clients import build_calendar_service, build_gmail_service
settings = get_settings()
logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO))
logger = logging.getLogger("personal-agent")
app = FastAPI(title="Personal Agent", version="0.1.0")
scheduler: AsyncIOScheduler | None = None
scan_lock: asyncio.Lock | None = None
class ScanResponse(BaseModel):
scanned: int
linkedin: int
advertising: int
skipped: int
failed: int
class AvailabilityRequest(BaseModel):
start: str
end: str
calendar_ids: list[str] | None = None
class BusySlot(BaseModel):
calendar_id: str
start: str
end: str
class AvailabilityResponse(BaseModel):
start: str
end: str
available: bool
busy_slots: list[BusySlot]
checked_calendars: list[str]
def verify_api_key(
x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
authorization: Annotated[str | None, Header()] = None,
) -> None:
expected = settings.agent_api_key
if not expected:
return
provided = x_api_key
if not provided and authorization:
parts = authorization.split(" ", 1)
if len(parts) == 2 and parts[0].lower() == "bearer":
provided = parts[1]
if provided != expected:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key."
)
def _run_scan_once(max_results: int) -> ScanResponse:
gmail_service = build_gmail_service(settings)
gmail_agent = GmailTriageAgent(gmail_service=gmail_service, query=settings.gmail_query)
result = gmail_agent.scan_and_route_messages(max_results=max_results)
return ScanResponse(
scanned=result.scanned,
linkedin=result.linkedin,
advertising=result.advertising,
skipped=result.skipped,
failed=result.failed,
)
def _get_scan_lock() -> asyncio.Lock:
global scan_lock
if scan_lock is None:
scan_lock = asyncio.Lock()
return scan_lock
async def _scheduled_scan() -> None:
lock = _get_scan_lock()
if lock.locked():
logger.info("Previous scan still running, skipping this tick.")
return
async with lock:
try:
result = await asyncio.to_thread(_run_scan_once, 100)
logger.info("Scheduled scan complete: %s", result.model_dump())
except Exception:
logger.exception("Scheduled scan failed")
@app.on_event("startup")
async def startup_event() -> None:
global scheduler
_get_scan_lock()
scheduler = AsyncIOScheduler()
scheduler.add_job(
_scheduled_scan,
"interval",
minutes=settings.gmail_scan_interval_minutes,
next_run_time=datetime.now(),
)
scheduler.start()
logger.info(
"Scheduler started (interval=%s min)", settings.gmail_scan_interval_minutes
)
@app.on_event("shutdown")
async def shutdown_event() -> None:
if scheduler:
scheduler.shutdown(wait=False)
@app.get("/health")
def health() -> dict[str, object]:
return {
"status": "ok",
"scan_interval_minutes": settings.gmail_scan_interval_minutes,
}
@app.post(
"/scan",
response_model=ScanResponse,
dependencies=[Depends(verify_api_key)],
)
async def scan_now(max_results: int = Query(100, ge=1, le=500)) -> ScanResponse:
async with _get_scan_lock():
try:
return await asyncio.to_thread(_run_scan_once, max_results)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Gmail scan failed: {exc}",
) from exc
@app.post(
"/availability",
response_model=AvailabilityResponse,
dependencies=[Depends(verify_api_key)],
)
async def availability(request: AvailabilityRequest) -> AvailabilityResponse:
try:
calendar_service = build_calendar_service(settings)
availability_agent = CalendarAvailabilityAgent(calendar_service=calendar_service)
result = await asyncio.to_thread(
availability_agent.get_availability,
request.start,
request.end,
request.calendar_ids,
)
return AvailabilityResponse(
start=result.start,
end=result.end,
available=result.available,
busy_slots=result.busy_slots,
checked_calendars=result.checked_calendars,
)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Availability lookup failed: {exc}",
) from exc

@ -0,0 +1,7 @@
apscheduler
fastapi
google-api-python-client
google-auth
google-auth-oauthlib
python-dotenv
uvicorn[standard]
Loading…
Cancel
Save