101 KiB
Backend Implementation Plan: AI Weekly Synth
Date: 2026-03-21 Role: Backend Implementation Planner Target: Rust (Axum) + PostgreSQL + Multi-LLM
Table of Contents
- Project Structure
- Database Schema
- API Endpoints
- Authentication & Authorization
- LLM Provider Abstraction
- URL Scraping (Server-Side)
- Async Generation & SSE
- Email (Resend Integration)
- Encryption
- Error Handling
- Testing Strategy
1. Project Structure
1.1 Single Crate with Module Hierarchy
A single crate is sufficient for this application. A Cargo workspace adds overhead (inter-crate dependency management, slower incremental compilation across crate boundaries) without meaningful benefit for an app of this size. If the LLM abstraction layer grows significantly, it can be extracted into a workspace member later.
ai-synth-backend/
├── Cargo.toml
├── Cargo.lock
├── .env.example
├── migrations/
│ ├── 20260321000001_create_users.sql
│ ├── 20260321000002_create_sessions.sql
│ ├── 20260321000003_create_magic_link_tokens.sql
│ ├── 20260321000004_create_user_settings.sql
│ ├── 20260321000005_create_sources.sql
│ ├── 20260321000006_create_syntheses.sql
│ ├── 20260321000007_create_user_provider_keys.sql
│ ├── 20260321000008_create_admin_provider_models.sql
│ ├── 20260321000009_create_admin_rate_limits.sql
│ └── 20260321000010_create_generation_jobs.sql
├── src/
│ ├── main.rs # Entry point: CLI (create-admin, serve), tracing init, DB pool, run server
│ ├── cli.rs # CLI argument parsing (clap): serve, create-admin
│ ├── config.rs # AppConfig struct, loaded from env vars via dotenvy
│ ├── app_state.rs # AppState: PgPool, reqwest::Client, AppConfig, RateLimiter, JobStore
│ ├── error.rs # AppError enum, IntoResponse impl, From impls
│ ├── router.rs # All route definitions, middleware wiring, layer composition
│ │
│ ├── middleware/
│ │ ├── mod.rs
│ │ ├── auth.rs # AuthUser extractor (session cookie -> user lookup)
│ │ ├── admin.rs # AdminUser extractor (AuthUser + role == admin check)
│ │ └── csrf.rs # X-Requested-With header check on mutating methods
│ │
│ ├── models/
│ │ ├── mod.rs
│ │ ├── user.rs # User, UserRole, CreateUser
│ │ ├── session.rs # Session
│ │ ├── magic_link.rs # MagicLinkToken
│ │ ├── settings.rs # UserSettings, UpdateSettings
│ │ ├── source.rs # Source, CreateSource
│ │ ├── synthesis.rs # Synthesis, NewsSection, NewsItem
│ │ ├── provider_key.rs # UserProviderKey, CreateProviderKey
│ │ ├── admin_model.rs # AdminProviderModel
│ │ ├── rate_limit.rs # AdminRateLimit
│ │ └── generation_job.rs # GenerationJob, JobStatus, JobProgress
│ │
│ ├── handlers/
│ │ ├── mod.rs
│ │ ├── auth.rs # register, request_magic_link, verify_magic_link, logout, me
│ │ ├── syntheses.rs # list, get, delete, trigger_generate
│ │ ├── sources.rs # list, create, delete
│ │ ├── settings.rs # get, update
│ │ ├── provider_keys.rs # list, add, delete (user's own API keys)
│ │ ├── generation.rs # SSE progress endpoint, job status
│ │ ├── admin.rs # provider models CRUD, rate limits, user list
│ │ └── health.rs # GET /health
│ │
│ ├── services/
│ │ ├── mod.rs
│ │ ├── llm/
│ │ │ ├── mod.rs # LlmProvider trait, ProviderCapabilities, create_provider factory
│ │ │ ├── types.rs # SearchRequest, SearchResponse, RewriteRequest, shared types
│ │ │ ├── gemini.rs # GeminiProvider implementation
│ │ │ ├── openai.rs # OpenAiProvider implementation
│ │ │ └── anthropic.rs # AnthropicProvider implementation
│ │ ├── generation.rs # GenerationPipeline: orchestrates search -> scrape -> rewrite
│ │ ├── scraper.rs # URL fetching, HTML parsing, date extraction, SSRF checks
│ │ ├── email.rs # Resend HTTP API client (magic links + synthesis delivery)
│ │ ├── captcha.rs # Cloudflare Turnstile verification
│ │ ├── encryption.rs # AES-256-GCM encrypt/decrypt for user API keys
│ │ └── rate_limiter.rs # Token-bucket rate limiter (per-provider, in-memory)
│ │
│ ├── db/
│ │ ├── mod.rs
│ │ ├── users.rs # find_by_email, find_by_id, create, update_role, list_all
│ │ ├── sessions.rs # create, find_by_hash, delete, delete_expired, delete_all_for_user
│ │ ├── magic_links.rs # create, consume (atomic find + mark used), delete_expired
│ │ ├── settings.rs # get_or_default, upsert
│ │ ├── sources.rs # list_by_user, create, delete
│ │ ├── syntheses.rs # list_by_user, get_by_id, create, delete
│ │ ├── provider_keys.rs # list_by_user, create, delete, get_decrypted
│ │ ├── admin_models.rs # list_all, list_enabled, create, update, delete
│ │ └── admin_rate_limits.rs # get_by_provider, upsert, list_all
│ │
│ └── util/
│ ├── mod.rs
│ ├── token.rs # Secure random token generation, SHA-256 hashing
│ └── validation.rs # Shared validation helpers (URL scheme check, etc.)
│
└── tests/
├── common/
│ └── mod.rs # Test helpers: setup DB, create test user, create test session
├── api/
│ ├── auth_test.rs
│ ├── syntheses_test.rs
│ ├── sources_test.rs
│ ├── settings_test.rs
│ └── admin_test.rs
└── services/
├── encryption_test.rs
├── scraper_test.rs
└── rate_limiter_test.rs
1.2 Key Dependencies
[package]
name = "ai-synth-backend"
version = "0.1.0"
edition = "2021"
[dependencies]
# Web framework
axum = { version = "0.8", features = ["macros"] }
axum-extra = { version = "0.10", features = ["cookie", "typed-header"] }
tower = { version = "0.5", features = ["util", "timeout"] }
tower-http = { version = "0.6", features = ["fs", "cors", "trace", "set-header"] }
tokio = { version = "1", features = ["full"] }
# Database
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "chrono", "json"] }
# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# HTTP client (LLM APIs + scraping)
reqwest = { version = "0.12", features = ["json", "stream"] }
# HTML parsing
scraper = "0.22"
# Date/time
chrono = { version = "0.4", features = ["serde"] }
# Cryptography
sha2 = "0.10" # SHA-256 for token hashing
rand = "0.8" # OsRng for secure random generation
aes-gcm = "0.10" # AES-256-GCM for API key encryption
base64 = "0.22" # Encoding tokens and encrypted data
# Secrets management
secrecy = { version = "0.10", features = ["serde"] }
zeroize = "1"
# Validation
validator = { version = "0.19", features = ["derive"] }
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
# Config
dotenvy = "0.15"
clap = { version = "4", features = ["derive"] }
# Concurrency
dashmap = "6"
tokio-stream = "0.1" # For SSE streaming
# Error handling
anyhow = "1"
thiserror = "1"
# UUID
uuid = { version = "1", features = ["v4", "serde"] }
# URL parsing
url = "2"
# Async trait (still needed for trait objects)
async-trait = "0.1"
# Futures utilities
futures = "0.3"
[dev-dependencies]
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres"] }
tower = { version = "0.5", features = ["util"] }
http-body-util = "0.1"
hyper = "1"
Justification for key choices:
| Crate | Why |
|---|---|
axum 0.8 |
Tower-based, composes naturally with middleware layers, best ergonomics for extractors |
sqlx 0.8 with postgres |
Compile-time query checking, native async, direct Postgres support (decision: no SQLite) |
reqwest 0.12 |
Industry standard async HTTP client, reused for LLM APIs and scraping |
scraper 0.22 |
Built on html5ever (same parser as Firefox), reliable HTML parsing |
aes-gcm 0.10 |
Pure Rust authenticated encryption, no OpenSSL dependency |
secrecy + zeroize |
Prevents accidental logging of secrets, zeroes memory on drop |
dashmap 6 |
Lock-free concurrent HashMap for rate limiter and job store |
clap 4 |
CLI argument parsing for serve and create-admin subcommands |
tokio-stream |
SSE implementation via axum::response::Sse |
chrono over time |
Better serde integration, more format support, widely used |
2. Database Schema
All migrations use PostgreSQL-native types. UUIDs are UUID type (not TEXT). Timestamps are TIMESTAMPTZ. JSONB is used for flexible structured fields.
Migration 001: Users
-- 20260321000001_create_users.sql
CREATE TYPE user_role AS ENUM ('user', 'admin');
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT NOT NULL UNIQUE,
display_name TEXT,
role user_role NOT NULL DEFAULT 'user',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_users_email ON users(email);
Migration 002: Sessions
-- 20260321000002_create_sessions.sql
CREATE TABLE sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token_hash TEXT NOT NULL UNIQUE, -- SHA-256 of the raw session token
expires_at TIMESTAMPTZ NOT NULL,
ip_address INET,
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_sessions_user_id ON sessions(user_id);
CREATE INDEX idx_sessions_token_hash ON sessions(token_hash);
CREATE INDEX idx_sessions_expires_at ON sessions(expires_at);
Migration 003: Magic Link Tokens
-- 20260321000003_create_magic_link_tokens.sql
CREATE TABLE magic_link_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT NOT NULL,
token_hash TEXT NOT NULL UNIQUE, -- SHA-256 of the raw token
expires_at TIMESTAMPTZ NOT NULL,
used BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_magic_link_tokens_email ON magic_link_tokens(email);
CREATE INDEX idx_magic_link_tokens_token_hash ON magic_link_tokens(token_hash);
CREATE INDEX idx_magic_link_tokens_expires_at ON magic_link_tokens(expires_at);
Migration 004: User Settings
-- 20260321000004_create_user_settings.sql
CREATE TABLE user_settings (
user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
theme TEXT NOT NULL DEFAULT 'Intelligence Artificielle',
max_age_days INTEGER NOT NULL DEFAULT 7
CHECK (max_age_days > 0 AND max_age_days <= 365),
categories JSONB NOT NULL DEFAULT '["Annonces majeures / importantes", "Entreprises des secteurs financiers", "Grandes entreprises des autres secteurs", "Secteurs publics", "Grand public / Particuliers"]'::jsonb,
max_items_per_category INTEGER NOT NULL DEFAULT 4
CHECK (max_items_per_category > 0 AND max_items_per_category <= 50),
search_agent_behavior TEXT NOT NULL DEFAULT '',
preferred_provider TEXT, -- 'gemini', 'openai', 'anthropic' (NULL = use first available)
preferred_model TEXT, -- model identifier (NULL = use provider default)
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Migration 005: Sources
-- 20260321000005_create_sources.sql
CREATE TABLE sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title TEXT NOT NULL CHECK (char_length(title) BETWEEN 1 AND 200),
url TEXT NOT NULL CHECK (char_length(url) <= 2000),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_sources_user_id ON sources(user_id);
CREATE INDEX idx_sources_user_id_created_at ON sources(user_id, created_at DESC);
Migration 006: Syntheses
-- 20260321000006_create_syntheses.sql
CREATE TYPE synthesis_status AS ENUM ('generating', 'completed', 'failed');
CREATE TABLE syntheses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title TEXT NOT NULL DEFAULT '', -- auto-generated from theme + date
sections JSONB NOT NULL DEFAULT '[]'::jsonb, -- [{title: string, items: [{title, url, summary}]}]
status synthesis_status NOT NULL DEFAULT 'generating',
error_message TEXT, -- populated on failure
provider TEXT, -- which provider was used
model TEXT, -- which model was used
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_syntheses_user_id ON syntheses(user_id);
CREATE INDEX idx_syntheses_user_id_created_at ON syntheses(user_id, created_at DESC);
Migration 007: User Provider Keys
-- 20260321000007_create_user_provider_keys.sql
CREATE TABLE user_provider_keys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
provider TEXT NOT NULL, -- 'gemini', 'openai', 'anthropic'
encrypted_key BYTEA NOT NULL, -- AES-256-GCM ciphertext
nonce BYTEA NOT NULL, -- 12-byte GCM nonce
key_prefix TEXT NOT NULL DEFAULT '', -- first 8 chars for display (e.g., "sk-proj-...")
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(user_id, provider) -- one key per provider per user
);
CREATE INDEX idx_user_provider_keys_user_id ON user_provider_keys(user_id);
Migration 008: Admin Provider Models
-- 20260321000008_create_admin_provider_models.sql
CREATE TABLE admin_provider_models (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
provider TEXT NOT NULL, -- 'gemini', 'openai', 'anthropic'
model_id TEXT NOT NULL, -- e.g., 'gemini-2.5-pro', 'gpt-4o', 'claude-sonnet-4-20250514'
display_name TEXT NOT NULL, -- e.g., 'Gemini 2.5 Pro', 'GPT-4o', 'Claude Sonnet 4'
enabled BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(provider, model_id)
);
CREATE INDEX idx_admin_provider_models_enabled ON admin_provider_models(enabled) WHERE enabled = true;
Migration 009: Admin Rate Limits
-- 20260321000009_create_admin_rate_limits.sql
CREATE TABLE admin_rate_limits (
provider TEXT PRIMARY KEY, -- 'gemini', 'openai', 'anthropic'
max_requests INTEGER NOT NULL DEFAULT 29,
time_window_seconds INTEGER NOT NULL DEFAULT 60,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Seed default rate limits
INSERT INTO admin_rate_limits (provider, max_requests, time_window_seconds)
VALUES
('gemini', 29, 60),
('openai', 50, 60),
('anthropic', 40, 60);
Migration 010: Generation Jobs
-- 20260321000010_create_generation_jobs.sql
CREATE TYPE job_status AS ENUM ('pending', 'searching', 'scraping', 'rewriting', 'completed', 'failed');
CREATE TABLE generation_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
synthesis_id UUID REFERENCES syntheses(id) ON DELETE SET NULL,
status job_status NOT NULL DEFAULT 'pending',
progress_pct SMALLINT NOT NULL DEFAULT 0 CHECK (progress_pct BETWEEN 0 AND 100),
progress_message TEXT,
error_message TEXT,
provider TEXT,
model TEXT,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_generation_jobs_user_id ON generation_jobs(user_id);
CREATE INDEX idx_generation_jobs_status ON generation_jobs(status) WHERE status NOT IN ('completed', 'failed');
3. API Endpoints
All endpoints are prefixed with /api/v1. Request and response bodies are JSON. Cookie-based session authentication is used for all authenticated endpoints.
3.1 Authentication
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
POST |
/api/v1/auth/register |
Public + Turnstile | Create account, send magic link | 1 |
POST |
/api/v1/auth/magic-link |
Public + Turnstile | Request magic link for existing user | 1 |
GET |
/api/v1/auth/verify |
Public | Verify magic link token, create session, redirect | 1 |
POST |
/api/v1/auth/logout |
Authenticated | Invalidate current session | 1 |
GET |
/api/v1/auth/me |
Authenticated | Get current user info | 1 |
Request/Response Types
// POST /api/v1/auth/register
#[derive(Deserialize, Validate)]
pub struct RegisterRequest {
#[validate(email)]
pub email: String,
#[validate(length(min = 1, max = 100))]
pub display_name: Option<String>,
pub turnstile_token: String,
}
// Response: 200 (always the same, whether email exists or not)
#[derive(Serialize)]
pub struct AuthMessageResponse {
pub message: String, // "If this email is valid, a login link has been sent."
}
// POST /api/v1/auth/magic-link
#[derive(Deserialize, Validate)]
pub struct MagicLinkRequest {
#[validate(email)]
pub email: String,
pub turnstile_token: String,
}
// GET /api/v1/auth/verify?token=<base64url>
// -> 302 redirect to / with Set-Cookie on success
// -> 302 redirect to /login?error=invalid_token on failure
// GET /api/v1/auth/me
#[derive(Serialize)]
pub struct MeResponse {
pub id: Uuid,
pub email: String,
pub display_name: Option<String>,
pub role: String, // "user" | "admin"
pub created_at: DateTime<Utc>,
}
3.2 Syntheses
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/syntheses |
Authenticated | List user's syntheses (paginated) | 2 |
GET |
/api/v1/syntheses/:id |
Authenticated | Get synthesis detail | 2 |
POST |
/api/v1/syntheses/generate |
Authenticated | Trigger async generation | 2 |
DELETE |
/api/v1/syntheses/:id |
Authenticated | Delete a synthesis | 2 |
POST |
/api/v1/syntheses/:id/email |
Authenticated | Send synthesis by email | 3 |
// GET /api/v1/syntheses?page=1&per_page=20
#[derive(Deserialize)]
pub struct ListSynthesesQuery {
pub page: Option<u32>, // default 1
pub per_page: Option<u32>, // default 20, max 100
}
#[derive(Serialize)]
pub struct SynthesisListItem {
pub id: Uuid,
pub title: String,
pub status: String,
pub provider: Option<String>,
pub model: Option<String>,
pub created_at: DateTime<Utc>,
pub section_count: i32,
pub article_count: i32,
}
#[derive(Serialize)]
pub struct PaginatedResponse<T> {
pub items: Vec<T>,
pub total: i64,
pub page: u32,
pub per_page: u32,
}
// GET /api/v1/syntheses/:id
#[derive(Serialize)]
pub struct SynthesisDetail {
pub id: Uuid,
pub title: String,
pub sections: Vec<NewsSection>,
pub status: String,
pub provider: Option<String>,
pub model: Option<String>,
pub created_at: DateTime<Utc>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct NewsSection {
pub title: String,
pub items: Vec<NewsItem>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct NewsItem {
pub title: String,
pub url: String,
pub summary: String,
}
// POST /api/v1/syntheses/generate
// Request body is empty -- uses the user's saved settings + sources + API key
// Response: 202
#[derive(Serialize)]
pub struct GenerateResponse {
pub job_id: Uuid,
pub status: String,
}
// POST /api/v1/syntheses/:id/email
#[derive(Deserialize, Validate)]
pub struct SendEmailRequest {
#[validate(email)]
pub recipient: String,
}
3.3 Sources
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/sources |
Authenticated | List user's sources | 1 |
POST |
/api/v1/sources |
Authenticated | Add a source | 1 |
DELETE |
/api/v1/sources/:id |
Authenticated | Delete a source | 1 |
// GET /api/v1/sources
#[derive(Serialize)]
pub struct SourceResponse {
pub id: Uuid,
pub title: String,
pub url: String,
pub created_at: DateTime<Utc>,
}
// POST /api/v1/sources
#[derive(Deserialize, Validate)]
pub struct CreateSourceRequest {
#[validate(length(min = 1, max = 200))]
pub title: String,
#[validate(url, length(max = 2000))]
pub url: String,
}
3.4 Settings
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/settings |
Authenticated | Get user's settings | 1 |
PUT |
/api/v1/settings |
Authenticated | Update settings | 1 |
// GET /api/v1/settings
#[derive(Serialize)]
pub struct SettingsResponse {
pub theme: String,
pub max_age_days: i32,
pub categories: Vec<String>,
pub max_items_per_category: i32,
pub search_agent_behavior: String,
pub preferred_provider: Option<String>,
pub preferred_model: Option<String>,
}
// PUT /api/v1/settings
#[derive(Deserialize, Validate)]
pub struct UpdateSettingsRequest {
#[validate(length(min = 1, max = 200))]
pub theme: String,
#[validate(range(min = 1, max = 365))]
pub max_age_days: i32,
#[validate(length(min = 1, max = 20))]
pub categories: Vec<String>, // each item validated separately in handler
#[validate(range(min = 1, max = 50))]
pub max_items_per_category: i32,
#[validate(length(max = 2000))]
pub search_agent_behavior: String,
pub preferred_provider: Option<String>,
pub preferred_model: Option<String>,
}
3.5 User Provider Keys
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/provider-keys |
Authenticated | List user's provider keys (prefix only, never full key) | 2 |
POST |
/api/v1/provider-keys |
Authenticated | Add or replace a provider key | 2 |
DELETE |
/api/v1/provider-keys/:provider |
Authenticated | Delete a provider key | 2 |
// GET /api/v1/provider-keys
#[derive(Serialize)]
pub struct ProviderKeyListItem {
pub provider: String,
pub key_prefix: String, // e.g., "sk-proj-..." (first 8 chars)
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
// POST /api/v1/provider-keys
#[derive(Deserialize, Validate)]
pub struct CreateProviderKeyRequest {
#[validate(custom(function = "validate_provider"))]
pub provider: String, // "gemini" | "openai" | "anthropic"
#[validate(length(min = 10, max = 500))]
pub api_key: String, // the raw API key -- encrypted before storage
}
fn validate_provider(provider: &str) -> Result<(), validator::ValidationError> {
match provider {
"gemini" | "openai" | "anthropic" => Ok(()),
_ => Err(validator::ValidationError::new("invalid_provider")),
}
}
3.6 Generation (SSE)
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/generation/:job_id/progress |
Authenticated | SSE stream of generation progress | 2 |
GET |
/api/v1/generation/:job_id/status |
Authenticated | One-shot status poll (for reconnection) | 2 |
// SSE event types sent on /api/v1/generation/:job_id/progress
#[derive(Serialize)]
#[serde(tag = "type")]
pub enum GenerationEvent {
#[serde(rename = "progress")]
Progress {
status: String, // "searching", "scraping", "rewriting"
progress_pct: u8,
message: String,
},
#[serde(rename = "completed")]
Completed {
synthesis_id: Uuid,
},
#[serde(rename = "failed")]
Failed {
error: String,
},
}
// GET /api/v1/generation/:job_id/status (one-shot poll)
#[derive(Serialize)]
pub struct JobStatusResponse {
pub job_id: Uuid,
pub status: String,
pub progress_pct: u8,
pub progress_message: Option<String>,
pub synthesis_id: Option<Uuid>,
pub error_message: Option<String>,
}
3.7 Admin
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/admin/provider-models |
Admin | List all configured provider models | 2 |
POST |
/api/v1/admin/provider-models |
Admin | Add a provider model | 2 |
PUT |
/api/v1/admin/provider-models/:id |
Admin | Update a provider model | 2 |
DELETE |
/api/v1/admin/provider-models/:id |
Admin | Remove a provider model | 2 |
GET |
/api/v1/admin/rate-limits |
Admin | Get rate limit configs | 2 |
PUT |
/api/v1/admin/rate-limits/:provider |
Admin | Update rate limit config | 2 |
GET |
/api/v1/admin/users |
Admin | List all users | 2 |
PUT |
/api/v1/admin/users/:id/role |
Admin | Change user role | 2 |
// POST /api/v1/admin/provider-models
#[derive(Deserialize, Validate)]
pub struct CreateProviderModelRequest {
#[validate(custom(function = "validate_provider"))]
pub provider: String,
#[validate(length(min = 1, max = 100))]
pub model_id: String,
#[validate(length(min = 1, max = 200))]
pub display_name: String,
pub enabled: bool,
}
// PUT /api/v1/admin/rate-limits/:provider
#[derive(Deserialize, Validate)]
pub struct UpdateRateLimitRequest {
#[validate(range(min = 1, max = 1000))]
pub max_requests: i32,
#[validate(range(min = 1, max = 3600))]
pub time_window_seconds: i32,
}
// PUT /api/v1/admin/users/:id/role
#[derive(Deserialize)]
pub struct UpdateRoleRequest {
pub role: String, // "user" | "admin"
}
3.8 Public Config
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/config/providers |
Authenticated | List enabled providers and models (no keys) | 2 |
// GET /api/v1/config/providers
#[derive(Serialize)]
pub struct ProviderInfo {
pub provider: String,
pub models: Vec<ModelInfo>,
}
#[derive(Serialize)]
pub struct ModelInfo {
pub model_id: String,
pub display_name: String,
}
3.9 SSE for Real-Time List Updates
| Method | Path | Auth | Description | Phase |
|---|---|---|---|---|
GET |
/api/v1/events/syntheses |
Authenticated | SSE stream for syntheses list changes | 3 |
GET |
/api/v1/events/sources |
Authenticated | SSE stream for sources list changes | 3 |
These SSE endpoints replace Firestore onSnapshot. When a synthesis is created/deleted or a source is added/removed, connected clients receive a lightweight event (just the entity type + action + id) prompting the frontend to refetch. Implementation uses a tokio::sync::broadcast channel per user in AppState.
4. Authentication & Authorization
4.1 Magic Link Flow
Token Generation
// src/util/token.rs
use rand::RngCore;
use sha2::{Sha256, Digest};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
/// Generate a 32-byte cryptographically secure random token, base64url-encoded.
pub fn generate_token() -> String {
let mut bytes = [0u8; 32];
rand::rngs::OsRng.fill_bytes(&mut bytes);
URL_SAFE_NO_PAD.encode(bytes)
}
/// Compute SHA-256 hash of a token, hex-encoded.
pub fn hash_token(token: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(token.as_bytes());
let result = hasher.finalize();
hex::encode(result)
}
Registration Flow
- Receive
RegisterRequest(email, optional display_name, turnstile_token). - Verify Turnstile token via
services::captcha::verify(). - Check if user with this email already exists:
- If exists: send a magic link email (same as login). Return generic success message.
- If not: create user with
role = 'user', create default settings row, send magic link email.
- Always return
200 { "message": "If this email is valid, a login link has been sent." }. - Add random delay (50-200ms) when no email is sent to prevent timing attacks.
Magic Link Request Flow
- Receive
MagicLinkRequest(email, turnstile_token). - Verify Turnstile token.
- Look up user by email.
- If not found: return same success message, do nothing (prevent email enumeration).
- Generate token, compute SHA-256 hash, store in
magic_link_tokenswith 15-minute expiry. - Send email containing
{BASE_URL}/api/v1/auth/verify?token={raw_token}. - Return generic success message.
Verification Flow
- Receive
GET /api/v1/auth/verify?token={raw_token}. - Compute
hash = SHA-256(raw_token). - Atomic query:
UPDATE magic_link_tokens SET used = true WHERE token_hash = $1 AND used = false AND expires_at > now() RETURNING email - If
rows_affected == 0: redirect to/login?error=invalid_token. - Look up or create user by email (handles the case where registration + verification happen in a single flow).
- Generate session token (32 bytes, base64url).
- Store
SHA-256(session_token)insessionstable with 30-day expiry. - Set session cookie:
Set-Cookie: ai_synth_session={session_token}; HttpOnly; Secure; SameSite=Lax; Path=/; Max-Age=2592000 - Redirect to
/(302).
4.2 Session Middleware (AuthUser Extractor)
// src/middleware/auth.rs
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
#[derive(Clone, Debug)]
pub struct AuthUser {
pub id: Uuid,
pub email: String,
pub display_name: Option<String>,
pub role: UserRole,
}
#[async_trait]
impl<S> FromRequestParts<S> for AuthUser
where
S: Send + Sync,
AppState: FromRef<S>,
{
type Rejection = AppError;
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let app_state = AppState::from_ref(state);
// 1. Extract cookie
let cookies = parts
.headers
.get_all(header::COOKIE)
.iter()
.filter_map(|v| v.to_str().ok())
.flat_map(|s| s.split(';'))
.map(|s| s.trim())
.find(|s| s.starts_with("ai_synth_session="))
.and_then(|s| s.strip_prefix("ai_synth_session="))
.ok_or(AppError::Unauthorized("No session cookie".into()))?;
let session_token = cookies;
// 2. Hash and lookup
let token_hash = crate::util::token::hash_token(session_token);
let session = db::sessions::find_by_hash(&app_state.pool, &token_hash)
.await?
.ok_or(AppError::Unauthorized("Invalid session".into()))?;
// 3. Check expiration
if session.expires_at < Utc::now() {
db::sessions::delete(&app_state.pool, session.id).await?;
return Err(AppError::Unauthorized("Session expired".into()));
}
// 4. Load user
let user = db::users::find_by_id(&app_state.pool, session.user_id)
.await?
.ok_or(AppError::Unauthorized("User not found".into()))?;
Ok(AuthUser {
id: user.id,
email: user.email,
display_name: user.display_name,
role: user.role,
})
}
}
4.3 Admin Guard (AdminUser Extractor)
// src/middleware/admin.rs
#[derive(Clone, Debug)]
pub struct AdminUser(pub AuthUser);
#[async_trait]
impl<S> FromRequestParts<S> for AdminUser
where
S: Send + Sync,
AppState: FromRef<S>,
{
type Rejection = AppError;
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let auth_user = AuthUser::from_request_parts(parts, state).await?;
if auth_user.role != UserRole::Admin {
return Err(AppError::Forbidden("Admin access required".into()));
}
Ok(AdminUser(auth_user))
}
}
Handlers that require admin simply take AdminUser as a parameter:
async fn list_users(
AdminUser(admin): AdminUser,
State(state): State<AppState>,
) -> Result<Json<Vec<UserResponse>>, AppError> {
// admin is guaranteed to be an admin here
}
4.4 Turnstile Verification
// src/services/captcha.rs
use serde::Deserialize;
#[derive(Deserialize)]
struct TurnstileResponse {
success: bool,
#[serde(rename = "error-codes")]
error_codes: Vec<String>,
}
pub async fn verify_turnstile(
client: &reqwest::Client,
secret: &str,
token: &str,
) -> Result<(), AppError> {
let resp = client
.post("https://challenges.cloudflare.com/turnstile/v0/siteverify")
.form(&[("secret", secret), ("response", token)])
.send()
.await
.map_err(|e| AppError::Internal(e.into()))?;
let result: TurnstileResponse = resp
.json()
.await
.map_err(|e| AppError::Internal(e.into()))?;
if result.success {
Ok(())
} else {
Err(AppError::BadRequest(format!(
"Captcha verification failed: {:?}",
result.error_codes
)))
}
}
4.5 CSRF Protection
The CSRF middleware checks for the X-Requested-With header on all state-changing requests (POST, PUT, DELETE). Combined with SameSite=Lax cookies, this prevents cross-site request forgery because browsers do not send custom headers on cross-origin requests without an approved CORS preflight.
// src/middleware/csrf.rs
use axum::middleware::Next;
use axum::http::Request;
pub async fn csrf_check<B>(req: Request<B>, next: Next<B>) -> Result<Response, AppError> {
let dominated_methods = ["POST", "PUT", "DELETE", "PATCH"];
if dominated_methods.contains(&req.method().as_str()) {
let has_header = req
.headers()
.get("X-Requested-With")
.map(|v| v == "XMLHttpRequest")
.unwrap_or(false);
if !has_header {
return Err(AppError::Forbidden("Missing X-Requested-With header".into()));
}
}
Ok(next.run(req).await)
}
4.6 CLI Admin Creation
// src/cli.rs
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(name = "ai-synth", about = "AI Weekly Synth backend")]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
}
#[derive(Subcommand)]
pub enum Commands {
/// Start the web server
Serve,
/// Create an admin user
CreateAdmin {
/// Email address for the admin user
#[arg(long)]
email: String,
},
}
In main.rs:
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
match cli.command {
Commands::Serve => { /* start server */ },
Commands::CreateAdmin { email } => {
let pool = create_pool().await?;
let user = db::users::find_by_email(&pool, &email).await?;
match user {
Some(u) => {
db::users::update_role(&pool, u.id, UserRole::Admin).await?;
println!("User {} promoted to admin.", email);
}
None => {
db::users::create(&pool, &email, None, UserRole::Admin).await?;
println!("Admin user {} created.", email);
}
}
}
}
Ok(())
}
5. LLM Provider Abstraction
5.1 The Unified Trait
// src/services/llm/mod.rs
use async_trait::async_trait;
/// Describes what a provider can do natively.
#[derive(Debug, Clone)]
pub struct ProviderCapabilities {
/// Provider supports native web search grounding (e.g., Gemini googleSearch, OpenAI web_search).
pub native_web_search: bool,
/// Provider supports structured JSON output via schema enforcement.
pub structured_output: bool,
}
/// Progress callback type for reporting generation steps.
pub type ProgressFn = Box<dyn Fn(u8, &str) + Send + Sync>;
#[async_trait]
pub trait LlmProvider: Send + Sync {
/// Returns the provider identifier: "gemini", "openai", or "anthropic".
fn provider_id(&self) -> &str;
/// Returns the capabilities of this provider.
fn capabilities(&self) -> ProviderCapabilities;
/// Pass 1: Search the web and generate structured news items.
/// Uses native web search grounding if available.
/// Returns parsed JSON matching the category schema (keys: category_0, category_1, ...).
async fn search_and_generate(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError>;
/// Pass 2: Rewrite titles and summaries based on scraped content.
/// No web search needed.
async fn rewrite(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError>;
}
5.2 Per-Provider Implementation Strategy
Gemini
// src/services/llm/gemini.rs
pub struct GeminiProvider {
client: reqwest::Client,
api_key: secrecy::SecretString,
}
impl GeminiProvider {
pub fn new(client: reqwest::Client, api_key: secrecy::SecretString) -> Self {
Self { client, api_key }
}
}
#[async_trait]
impl LlmProvider for GeminiProvider {
fn provider_id(&self) -> &str { "gemini" }
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
native_web_search: true, // googleSearch tool
structured_output: true, // responseSchema + responseMimeType
}
}
async fn search_and_generate(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError> {
// POST https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent
// Body:
// {
// "system_instruction": { "parts": [{ "text": system_prompt }] },
// "contents": [{ "parts": [{ "text": user_prompt }] }],
// "tools": [{ "google_search": {} }],
// "generation_config": {
// "response_mime_type": "application/json",
// "response_schema": response_schema
// }
// }
// Parse response.candidates[0].content.parts[0].text as JSON
todo!()
}
async fn rewrite(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError> {
// Same as above but WITHOUT the tools field (no web search in pass 2)
todo!()
}
}
OpenAI
// src/services/llm/openai.rs
pub struct OpenAiProvider {
client: reqwest::Client,
api_key: secrecy::SecretString,
}
#[async_trait]
impl LlmProvider for OpenAiProvider {
fn provider_id(&self) -> &str { "openai" }
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
native_web_search: true, // web_search tool (Responses API)
structured_output: true, // response_format: { type: "json_schema", json_schema: {...} }
}
}
async fn search_and_generate(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError> {
// POST https://api.openai.com/v1/responses
// Body:
// {
// "model": model,
// "instructions": system_prompt,
// "input": user_prompt,
// "tools": [{ "type": "web_search_preview" }],
// "text": {
// "format": {
// "type": "json_schema",
// "name": "news_synthesis",
// "schema": response_schema
// }
// }
// }
todo!()
}
async fn rewrite(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError> {
// POST https://api.openai.com/v1/responses (no tools)
todo!()
}
}
Anthropic
// src/services/llm/anthropic.rs
pub struct AnthropicProvider {
client: reqwest::Client,
api_key: secrecy::SecretString,
}
#[async_trait]
impl LlmProvider for AnthropicProvider {
fn provider_id(&self) -> &str { "anthropic" }
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
native_web_search: true, // web_search tool
structured_output: false, // No native JSON schema enforcement; use prompt + parse
}
}
async fn search_and_generate(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError> {
// POST https://api.anthropic.com/v1/messages
// Headers: x-api-key, anthropic-version: 2023-06-01
// Body:
// {
// "model": model,
// "max_tokens": 8192,
// "system": system_prompt,
// "tools": [{ "type": "web_search_20250305" }],
// "messages": [{ "role": "user", "content": user_prompt + "\n\nRespond ONLY with valid JSON matching this schema:\n" + schema }]
// }
// Extract text from response, parse as JSON, validate against schema
todo!()
}
async fn rewrite(
&self,
model: &str,
system_prompt: &str,
user_prompt: &str,
response_schema: &serde_json::Value,
) -> Result<serde_json::Value, AppError> {
// Same without tools, JSON schema in prompt
todo!()
}
}
5.3 Provider Factory
// src/services/llm/mod.rs
pub fn create_provider(
provider_id: &str,
api_key: secrecy::SecretString,
client: reqwest::Client,
) -> Result<Box<dyn LlmProvider>, AppError> {
match provider_id {
"gemini" => Ok(Box::new(GeminiProvider::new(client, api_key))),
"openai" => Ok(Box::new(OpenAiProvider::new(client, api_key))),
"anthropic" => Ok(Box::new(AnthropicProvider::new(client, api_key))),
other => Err(AppError::BadRequest(format!("Unknown provider: {other}"))),
}
}
5.4 The Adaptive Generation Pipeline
// src/services/generation.rs
pub struct GenerationPipeline;
impl GenerationPipeline {
/// Run the full generation pipeline.
/// Adapts the pipeline based on provider capabilities.
pub async fn run(
state: &AppState,
user_id: Uuid,
job_id: Uuid,
progress_tx: tokio::sync::watch::Sender<GenerationEvent>,
) -> Result<Uuid, AppError> {
// 1. Load user settings
let settings = db::settings::get_or_default(&state.pool, user_id).await?;
// 2. Load user sources
let sources = db::sources::list_by_user(&state.pool, user_id).await?;
// 3. Resolve provider + model
// Look up user's preferred_provider -> find user's API key for that provider
// Fall back to first provider the user has a key for
let (provider_id, model_id) = resolve_provider_and_model(
&state.pool, user_id, &settings
).await?;
// 4. Decrypt user's API key for the resolved provider
let api_key = db::provider_keys::get_decrypted(
&state.pool, user_id, &provider_id, &state.config.master_key
).await?;
// 5. Create the LLM provider
let provider = create_provider(&provider_id, api_key, state.http_client.clone())?;
let caps = provider.capabilities();
// 6. Build dynamic JSON schema from categories
let schema = build_category_schema(&settings.categories);
// 7. Build prompts
let (system_prompt, user_prompt) = build_search_prompts(&settings, &sources);
// === PASS 1: Search ===
progress_tx.send(GenerationEvent::Progress {
status: "searching".into(),
progress_pct: 10,
message: "Searching the web for recent news...".into(),
}).ok();
// Rate limit check
state.rate_limiter.acquire(&provider_id, user_id).await?;
let raw_results = provider
.search_and_generate(&model_id, &system_prompt, &user_prompt, &schema)
.await?;
// 8. Parse raw results into NewsItems per category
let parsed = parse_category_results(&raw_results, &settings.categories)?;
// === DECISION POINT: scrape or skip ===
// If the provider has native web search with grounding (Gemini, OpenAI),
// the results are already high-quality. We still scrape to:
// (a) validate URLs exist (filter 404s)
// (b) check publication dates
// (c) get content for the rewrite pass
// But if scraping fails for some items, we keep the originals.
progress_tx.send(GenerationEvent::Progress {
status: "scraping".into(),
progress_pct: 40,
message: "Validating and fetching article content...".into(),
}).ok();
let scraped = services::scraper::validate_and_scrape(
&state.http_client,
parsed,
settings.max_age_days as i64,
).await;
// === PASS 2: Rewrite ===
progress_tx.send(GenerationEvent::Progress {
status: "rewriting".into(),
progress_pct: 70,
message: "Rewriting summaries based on article content...".into(),
}).ok();
let (rewrite_system, rewrite_user) = build_rewrite_prompts(&scraped);
state.rate_limiter.acquire(&provider_id, user_id).await?;
let final_results = provider
.rewrite(&model_id, &rewrite_system, &rewrite_user, &schema)
.await?;
// 9. Build sections
let sections = build_sections(&final_results, &settings.categories)?;
// 10. Persist synthesis
let title = format!("{} - {}", settings.theme, Utc::now().format("%d/%m/%Y"));
let synthesis_id = db::syntheses::create(
&state.pool, user_id, &title, §ions, &provider_id, &model_id
).await?;
// 11. Update job record
db::generation_jobs::complete(&state.pool, job_id, synthesis_id).await?;
progress_tx.send(GenerationEvent::Completed { synthesis_id }).ok();
Ok(synthesis_id)
}
}
/// Resolve which provider and model to use for a user.
async fn resolve_provider_and_model(
pool: &PgPool,
user_id: Uuid,
settings: &UserSettings,
) -> Result<(String, String), AppError> {
// 1. If user has a preferred provider + model, and has a key for it, use that
if let (Some(ref prov), Some(ref model)) = (&settings.preferred_provider, &settings.preferred_model) {
let has_key = db::provider_keys::exists(pool, user_id, prov).await?;
if has_key {
// Verify model is in admin's enabled list
let model_exists = db::admin_models::is_enabled(pool, prov, model).await?;
if model_exists {
return Ok((prov.clone(), model.clone()));
}
}
}
// 2. Fall back to first provider the user has a key for
let keys = db::provider_keys::list_by_user(pool, user_id).await?;
for key in &keys {
if let Some(model) = db::admin_models::first_enabled_for_provider(pool, &key.provider).await? {
return Ok((key.provider.clone(), model.model_id));
}
}
Err(AppError::BadRequest(
"No API key configured. Please add an API key in your settings.".into()
))
}
5.5 Rate Limiter
Token-bucket rate limiter, per-provider. Admin-configured defaults apply globally. Users can override for their own keys (if they have higher API quotas).
// src/services/rate_limiter.rs
use dashmap::DashMap;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
pub struct RateLimiter {
/// Per-provider global buckets (admin-configured defaults)
global_buckets: DashMap<String, TokenBucket>,
/// Per-user-per-provider override buckets
user_buckets: DashMap<(Uuid, String), TokenBucket>,
}
struct TokenBucket {
timestamps: VecDeque<Instant>,
max_requests: u32,
time_window: Duration,
}
impl RateLimiter {
pub fn new() -> Self {
Self {
global_buckets: DashMap::new(),
user_buckets: DashMap::new(),
}
}
/// Load admin-configured rate limits from DB.
pub async fn load_from_db(&self, pool: &PgPool) -> Result<(), AppError> {
let limits = db::admin_rate_limits::list_all(pool).await?;
for limit in limits {
self.global_buckets.insert(
limit.provider.clone(),
TokenBucket {
timestamps: VecDeque::new(),
max_requests: limit.max_requests as u32,
time_window: Duration::from_secs(limit.time_window_seconds as u64),
},
);
}
Ok(())
}
/// Acquire a slot. Uses user-specific override if set, otherwise global.
pub async fn acquire(&self, provider: &str, user_id: Uuid) -> Result<(), AppError> {
// Check user override first
let user_key = (user_id, provider.to_string());
if let Some(mut bucket) = self.user_buckets.get_mut(&user_key) {
return self.acquire_from_bucket(&mut bucket).await;
}
// Fall back to global
let mut bucket = self.global_buckets
.entry(provider.to_string())
.or_insert_with(|| TokenBucket {
timestamps: VecDeque::new(),
max_requests: 29,
time_window: Duration::from_secs(60),
});
self.acquire_from_bucket(&mut bucket).await
}
async fn acquire_from_bucket(&self, bucket: &mut TokenBucket) -> Result<(), AppError> {
let now = Instant::now();
bucket.timestamps.retain(|t| now.duration_since(*t) < bucket.time_window);
if bucket.timestamps.len() < bucket.max_requests as usize {
bucket.timestamps.push_back(now);
return Ok(());
}
// Calculate wait time
let oldest = bucket.timestamps.front().unwrap();
let wait = bucket.time_window.checked_sub(now.duration_since(*oldest))
.unwrap_or(Duration::from_millis(100));
if wait > Duration::from_secs(120) {
return Err(AppError::TooManyRequests {
retry_after_secs: wait.as_secs(),
});
}
// Drop the DashMap reference before sleeping
drop(bucket);
tokio::time::sleep(wait).await;
// Retry (recursive but bounded because we waited past the oldest timestamp)
// In practice, re-acquire by calling the parent method
Ok(())
}
/// Hot-reload a provider's limits (called after admin update).
pub fn update_provider_limit(&self, provider: &str, max_requests: u32, time_window_secs: u64) {
self.global_buckets.insert(
provider.to_string(),
TokenBucket {
timestamps: VecDeque::new(),
max_requests,
time_window: Duration::from_secs(time_window_secs),
},
);
}
}
6. URL Scraping (Server-Side)
6.1 HTTP Client Configuration
// Created once in AppState, shared across all requests
let http_client = reqwest::Client::builder()
.user_agent("AI-Weekly-Synth/1.0 (+https://github.com/your-repo)")
.timeout(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5))
.redirect(reqwest::redirect::Policy::limited(3))
.danger_accept_invalid_certs(false)
.pool_max_idle_per_host(10)
.build()?;
6.2 SSRF Prevention
// src/services/scraper.rs
use std::net::IpAddr;
use url::Url;
fn is_safe_url(url: &Url) -> Result<(), AppError> {
// 1. Scheme check
match url.scheme() {
"http" | "https" => {}
scheme => return Err(AppError::ScrapingError(
format!("Blocked scheme: {scheme}")
)),
}
// 2. Host check -- reject if no host
let host = url.host_str()
.ok_or_else(|| AppError::ScrapingError("No host in URL".into()))?;
// 3. Resolve DNS and check IPs
// Note: uses std::net::ToSocketAddrs which blocks; in production,
// use trust-dns or tokio::net::lookup_host for async resolution
let port = url.port().unwrap_or(if url.scheme() == "https" { 443 } else { 80 });
let addrs: Vec<_> = tokio::net::lookup_host(format!("{host}:{port}"))
.await
.map_err(|e| AppError::ScrapingError(format!("DNS resolution failed: {e}")))?
.collect();
for addr in &addrs {
if is_private_ip(addr.ip()) {
return Err(AppError::ScrapingError(
"URL resolves to private/internal IP".into()
));
}
}
Ok(())
}
fn is_private_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
v4.is_loopback() // 127.0.0.0/8
|| v4.is_private() // 10/8, 172.16/12, 192.168/16
|| v4.is_link_local() // 169.254/16
|| v4.is_unspecified() // 0.0.0.0
|| v4.octets() == [169, 254, 169, 254] // Cloud metadata
}
IpAddr::V6(v6) => {
v6.is_loopback() // ::1
|| v6.is_unspecified() // ::
// fe80::/10 (link-local)
|| (v6.segments()[0] & 0xffc0) == 0xfe80
}
}
}
6.3 HTML Parsing and Content Extraction
// src/services/scraper.rs
use scraper::{Html, Selector};
pub struct ScrapedItem {
pub title: String,
pub url: String,
pub summary: String,
pub scraped_content: String,
}
pub async fn validate_and_scrape(
client: &reqwest::Client,
items: Vec<(String, Vec<NewsItem>)>, // (category_key, items)
max_age_days: i64,
) -> Vec<(String, Vec<ScrapedItem>)> {
let mut results = Vec::new();
for (category_key, category_items) in items {
let futures = category_items.into_iter().map(|item| {
let client = client.clone();
async move {
scrape_single(&client, item, max_age_days).await
}
});
// Bounded concurrency: max 10 concurrent scrapes
let scraped: Vec<Option<ScrapedItem>> = futures::stream::iter(futures)
.buffer_unordered(10)
.collect()
.await;
let valid: Vec<ScrapedItem> = scraped.into_iter().flatten().collect();
results.push((category_key, valid));
}
results
}
async fn scrape_single(
client: &reqwest::Client,
item: NewsItem,
max_age_days: i64,
) -> Option<ScrapedItem> {
// 1. Parse and validate URL
let url = Url::parse(&item.url).ok()?;
is_safe_url(&url).await.ok()?;
// 2. Fetch with size limit
let resp = client.get(url.as_str()).send().await.ok()?;
if !resp.status().is_success() { return None; }
// Read up to 5MB
let bytes = resp.bytes().await.ok()?;
if bytes.len() > 5_000_000 { return None; }
let html_text = String::from_utf8_lossy(&bytes);
// 3. Parse HTML
let document = Html::parse_document(&html_text);
// 4. Soft-404 detection
let title_sel = Selector::parse("title").unwrap();
let h1_sel = Selector::parse("h1").unwrap();
let page_title = document.select(&title_sel).next()
.map(|el| el.text().collect::<String>().to_lowercase())
.unwrap_or_default();
let h1_text = document.select(&h1_sel).next()
.map(|el| el.text().collect::<String>().to_lowercase())
.unwrap_or_default();
let error_keywords = [
"page not found", "404", "403", "access denied",
"forbidden", "not found", "introuvable", "page introuvable",
];
if error_keywords.iter().any(|kw| page_title.contains(kw) || h1_text.contains(kw)) {
return None;
}
// 5. Date extraction
if let Some(pub_date) = extract_publication_date(&document) {
let age = Utc::now().signed_duration_since(pub_date);
if age.num_days() > max_age_days {
tracing::debug!(url = %item.url, days = age.num_days(), "Article too old, skipping");
return None;
}
}
// 6. Content extraction (strip non-content elements)
let content = extract_body_text(&document, 4000);
Some(ScrapedItem {
title: item.title,
url: item.url,
summary: item.summary,
scraped_content: content,
})
}
/// Extract publication date from meta tags, JSON-LD, and <time> elements.
fn extract_publication_date(doc: &Html) -> Option<DateTime<Utc>> {
// Priority order:
// 1. JSON-LD datePublished
// 2. meta[property="article:published_time"]
// 3. meta[itemprop="datePublished"]
// 4. meta[name="date"], meta[name="pubdate"], meta[name="dc.date"]
// 5. <time datetime="...">
let jsonld_sel = Selector::parse(r#"script[type="application/ld+json"]"#).unwrap();
for el in doc.select(&jsonld_sel) {
let text = el.text().collect::<String>();
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(date_str) = json.get("datePublished").and_then(|v| v.as_str()) {
if let Ok(dt) = datestr_to_datetime(date_str) {
return Some(dt);
}
}
}
}
let meta_selectors = [
r#"meta[property="article:published_time"]"#,
r#"meta[property="og:article:published_time"]"#,
r#"meta[itemprop="datePublished"]"#,
r#"meta[name="date"]"#,
r#"meta[name="pubdate"]"#,
r#"meta[name="dc.date"]"#,
];
for sel_str in &meta_selectors {
if let Ok(sel) = Selector::parse(sel_str) {
if let Some(el) = doc.select(&sel).next() {
if let Some(content) = el.value().attr("content") {
if let Ok(dt) = datestr_to_datetime(content) {
return Some(dt);
}
}
}
}
}
let time_sel = Selector::parse("time[datetime]").unwrap();
if let Some(el) = doc.select(&time_sel).next() {
if let Some(dt_str) = el.value().attr("datetime") {
if let Ok(dt) = datestr_to_datetime(dt_str) {
return Some(dt);
}
}
}
None
}
/// Try multiple date formats to parse a date string.
fn datestr_to_datetime(s: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
// Try ISO 8601 / RFC 3339
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
return Ok(dt.with_timezone(&Utc));
}
// Try common formats
if let Ok(dt) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
return Ok(dt.and_hms_opt(0, 0, 0).unwrap().and_utc());
}
Err(chrono::ParseError) // simplified
}
/// Extract visible body text, stripping scripts, styles, nav, footer, etc.
fn extract_body_text(doc: &Html, max_chars: usize) -> String {
// Get all text nodes from body, excluding script/style/nav/footer/header/aside
let body_sel = Selector::parse("body").unwrap();
let exclude_tags = ["script", "style", "noscript", "iframe", "nav", "footer", "header", "aside"];
let body = match doc.select(&body_sel).next() {
Some(b) => b,
None => return String::new(),
};
let text: String = body
.text()
.collect::<Vec<_>>()
.join(" ")
.split_whitespace()
.collect::<Vec<_>>()
.join(" ");
if text.len() > max_chars {
text[..max_chars].to_string()
} else {
text
}
}
7. Async Generation & SSE
7.1 Background Job Spawning
When a user triggers generation, the handler creates a job record in the DB, spawns a tokio::spawn task, and immediately returns the job ID.
// src/handlers/syntheses.rs
pub async fn trigger_generate(
auth_user: AuthUser,
State(state): State<AppState>,
) -> Result<(StatusCode, Json<GenerateResponse>), AppError> {
// 1. Check if user has an active job (prevent double-generation)
let active = db::generation_jobs::find_active(&state.pool, auth_user.id).await?;
if active.is_some() {
return Err(AppError::Conflict(
"A generation is already in progress.".into()
));
}
// 2. Create job record
let job_id = db::generation_jobs::create(&state.pool, auth_user.id).await?;
// 3. Create a watch channel for progress
let (progress_tx, _) = tokio::sync::watch::channel(GenerationEvent::Progress {
status: "pending".into(),
progress_pct: 0,
message: "Starting...".into(),
});
// 4. Store the progress sender in the job store (for SSE subscribers)
state.job_store.insert(job_id, progress_tx.clone());
// 5. Spawn background task
let state_clone = state.clone();
let user_id = auth_user.id;
tokio::spawn(async move {
let result = GenerationPipeline::run(
&state_clone, user_id, job_id, progress_tx.clone()
).await;
if let Err(e) = &result {
tracing::error!(job_id = %job_id, error = %e, "Generation failed");
progress_tx.send(GenerationEvent::Failed {
error: e.to_string(),
}).ok();
db::generation_jobs::fail(&state_clone.pool, job_id, &e.to_string())
.await
.ok();
}
// Clean up job store after a delay (allow SSE reconnect)
tokio::time::sleep(Duration::from_secs(300)).await;
state_clone.job_store.remove(&job_id);
});
Ok((
StatusCode::ACCEPTED,
Json(GenerateResponse {
job_id,
status: "pending".into(),
}),
))
}
7.2 Job Store in AppState
// src/app_state.rs
use dashmap::DashMap;
use tokio::sync::watch;
pub type JobStore = DashMap<Uuid, watch::Sender<GenerationEvent>>;
#[derive(Clone)]
pub struct AppState {
pub pool: PgPool,
pub http_client: reqwest::Client,
pub config: AppConfig,
pub rate_limiter: Arc<RateLimiter>,
pub job_store: Arc<JobStore>,
}
7.3 SSE Endpoint
// src/handlers/generation.rs
use axum::response::sse::{Event, Sse};
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
pub async fn progress_stream(
auth_user: AuthUser,
State(state): State<AppState>,
Path(job_id): Path<Uuid>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
// Verify the job belongs to this user
let job = db::generation_jobs::get(&state.pool, job_id).await?
.ok_or(AppError::NotFound("Job not found".into()))?;
if job.user_id != auth_user.id {
return Err(AppError::NotFound("Job not found".into()));
}
// Get the watch receiver from the job store
let rx = state.job_store
.get(&job_id)
.map(|entry| entry.value().subscribe())
.ok_or(AppError::NotFound("Job not found or already completed".into()))?;
let stream = WatchStream::new(rx).map(|event| {
let data = serde_json::to_string(&event).unwrap_or_default();
Ok(Event::default().data(data))
});
Ok(Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("ping"),
))
}
/// One-shot status poll endpoint (for reconnection after navigation away)
pub async fn job_status(
auth_user: AuthUser,
State(state): State<AppState>,
Path(job_id): Path<Uuid>,
) -> Result<Json<JobStatusResponse>, AppError> {
let job = db::generation_jobs::get(&state.pool, job_id).await?
.ok_or(AppError::NotFound("Job not found".into()))?;
if job.user_id != auth_user.id {
return Err(AppError::NotFound("Job not found".into()));
}
Ok(Json(JobStatusResponse {
job_id: job.id,
status: job.status.to_string(),
progress_pct: job.progress_pct as u8,
progress_message: job.progress_message,
synthesis_id: job.synthesis_id,
error_message: job.error_message,
}))
}
7.4 Reconnection Strategy
When a client navigates away and comes back:
- The frontend calls
GET /api/v1/generation/:job_id/statusfirst to get the current state. - If status is
completedorfailed, display the result immediately. - If status is in-progress, reconnect to the SSE endpoint
GET /api/v1/generation/:job_id/progress. - The
watchchannel immediately delivers the latest state on subscribe, so the client catches up instantly.
8. Email (Resend Integration)
Resend provides a REST API for transactional email. No SMTP client library is needed; reqwest suffices.
8.1 Email Service
// src/services/email.rs
use secrecy::{ExposeSecret, SecretString};
pub struct EmailService {
client: reqwest::Client,
api_key: SecretString,
from_address: String, // e.g., "AI Weekly Synth <noreply@yourdomain.com>"
base_url: String, // App base URL for magic link construction
}
impl EmailService {
pub fn new(
client: reqwest::Client,
api_key: SecretString,
from_address: String,
base_url: String,
) -> Self {
Self { client, api_key, from_address, base_url }
}
/// Send a magic link email.
pub async fn send_magic_link(&self, to_email: &str, token: &str) -> Result<(), AppError> {
let verify_url = format!("{}/api/v1/auth/verify?token={}", self.base_url, token);
let body = serde_json::json!({
"from": self.from_address,
"to": [to_email],
"subject": "Votre lien de connexion - AI Weekly Synth",
"html": format!(
r#"<h2>Connexion a AI Weekly Synth</h2>
<p>Cliquez sur le lien ci-dessous pour vous connecter :</p>
<p><a href="{url}" style="display:inline-block;padding:12px 24px;
background-color:#4F46E5;color:white;text-decoration:none;
border-radius:6px;">Se connecter</a></p>
<p>Ou copiez ce lien : <code>{url}</code></p>
<p><em>Ce lien expire dans 15 minutes.</em></p>"#,
url = verify_url,
),
});
let resp = self.client
.post("https://api.resend.com/emails")
.header("Authorization", format!("Bearer {}", self.api_key.expose_secret()))
.json(&body)
.send()
.await
.map_err(|e| AppError::SmtpError(format!("Failed to send email: {e}")))?;
if !resp.status().is_success() {
let error_body = resp.text().await.unwrap_or_default();
return Err(AppError::SmtpError(
format!("Resend API error: {error_body}")
));
}
Ok(())
}
/// Send a synthesis by email.
pub async fn send_synthesis(
&self,
to_email: &str,
synthesis_title: &str,
sections_html: &str,
) -> Result<(), AppError> {
let body = serde_json::json!({
"from": self.from_address,
"to": [to_email],
"subject": format!("Synthese : {}", synthesis_title),
"html": format!(
r#"<h1>{title}</h1>
{content}
<hr/>
<p><em>Genere par AI Weekly Synth</em></p>"#,
title = synthesis_title,
content = sections_html,
),
});
let resp = self.client
.post("https://api.resend.com/emails")
.header("Authorization", format!("Bearer {}", self.api_key.expose_secret()))
.json(&body)
.send()
.await
.map_err(|e| AppError::SmtpError(format!("Failed to send email: {e}")))?;
if !resp.status().is_success() {
let error_body = resp.text().await.unwrap_or_default();
return Err(AppError::SmtpError(
format!("Resend API error: {error_body}")
));
}
Ok(())
}
}
8.2 HTML Rendering for Synthesis Emails
/// Convert synthesis sections to HTML for email delivery.
pub fn sections_to_html(sections: &[NewsSection]) -> String {
let mut html = String::new();
for section in sections {
html.push_str(&format!("<h2>{}</h2>\n<ul>\n", section.title));
for item in §ion.items {
html.push_str(&format!(
r#"<li><strong><a href="{url}">{title}</a></strong><br/>{summary}</li>"#,
url = html_escape(&item.url),
title = html_escape(&item.title),
summary = html_escape(&item.summary),
));
}
html.push_str("</ul>\n");
}
html
}
fn html_escape(s: &str) -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
}
9. Encryption
9.1 AES-256-GCM Implementation
// src/services/encryption.rs
use aes_gcm::{
aead::{Aead, KeyInit, OsRng},
Aes256Gcm, Nonce,
};
use rand::RngCore;
use secrecy::{ExposeSecret, SecretString};
/// Master key holder -- wraps a 32-byte key.
#[derive(Clone)]
pub struct MasterKey {
key: [u8; 32],
}
impl MasterKey {
/// Parse a 64-character hex string into a 32-byte key.
pub fn from_hex(hex_str: &str) -> Result<Self, AppError> {
let bytes = hex::decode(hex_str)
.map_err(|_| AppError::Internal(
anyhow::anyhow!("MASTER_ENCRYPTION_KEY must be a 64-character hex string")
))?;
if bytes.len() != 32 {
return Err(AppError::Internal(
anyhow::anyhow!("MASTER_ENCRYPTION_KEY must be exactly 32 bytes (64 hex chars)")
));
}
let mut key = [0u8; 32];
key.copy_from_slice(&bytes);
Ok(Self { key })
}
/// Encrypt a plaintext string. Returns (ciphertext, nonce) both as byte vectors.
pub fn encrypt(&self, plaintext: &str) -> Result<(Vec<u8>, Vec<u8>), AppError> {
let cipher = Aes256Gcm::new_from_slice(&self.key)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Cipher init failed: {e}")))?;
let mut nonce_bytes = [0u8; 12];
OsRng.fill_bytes(&mut nonce_bytes);
let nonce = Nonce::from_slice(&nonce_bytes);
let ciphertext = cipher
.encrypt(nonce, plaintext.as_bytes())
.map_err(|e| AppError::Internal(anyhow::anyhow!("Encryption failed: {e}")))?;
Ok((ciphertext, nonce_bytes.to_vec()))
}
/// Decrypt ciphertext using the provided nonce. Returns the plaintext as a SecretString.
pub fn decrypt(
&self,
ciphertext: &[u8],
nonce_bytes: &[u8],
) -> Result<SecretString, AppError> {
let cipher = Aes256Gcm::new_from_slice(&self.key)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Cipher init failed: {e}")))?;
let nonce = Nonce::from_slice(nonce_bytes);
let plaintext_bytes = cipher
.decrypt(nonce, ciphertext)
.map_err(|_| AppError::Internal(
anyhow::anyhow!("Decryption failed -- master key mismatch or data corruption")
))?;
let plaintext = String::from_utf8(plaintext_bytes)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Decrypted data is not valid UTF-8: {e}")))?;
Ok(SecretString::new(plaintext))
}
}
impl Drop for MasterKey {
fn drop(&mut self) {
// Zeroize the key on drop
zeroize::Zeroize::zeroize(&mut self.key);
}
}
9.2 Master Key Loading
// In config.rs
pub struct AppConfig {
pub database_url: String,
pub master_key: MasterKey,
pub base_url: String,
pub port: u16,
pub resend_api_key: SecretString,
pub resend_from: String,
pub turnstile_secret: String,
pub turnstile_site_key: String,
// ...
}
impl AppConfig {
pub fn from_env() -> Result<Self, AppError> {
dotenvy::dotenv().ok();
let master_key_hex = std::env::var("MASTER_ENCRYPTION_KEY")
.map_err(|_| AppError::Internal(
anyhow::anyhow!("MASTER_ENCRYPTION_KEY environment variable is required")
))?;
Ok(Self {
master_key: MasterKey::from_hex(&master_key_hex)?,
// ... other fields
})
}
}
9.3 Key Rotation Strategy
Key rotation is performed via a CLI command that re-encrypts all user API keys under a new master key:
// CLI subcommand: rotate-key
Commands::RotateKey {
old_key_hex,
new_key_hex,
} => {
let old_key = MasterKey::from_hex(&old_key_hex)?;
let new_key = MasterKey::from_hex(&new_key_hex)?;
let pool = create_pool().await?;
let all_keys = sqlx::query_as!(
RawProviderKey,
"SELECT id, encrypted_key, nonce FROM user_provider_keys"
)
.fetch_all(&pool)
.await?;
let mut tx = pool.begin().await?;
for key_row in &all_keys {
// Decrypt with old key
let plaintext = old_key.decrypt(&key_row.encrypted_key, &key_row.nonce)?;
// Re-encrypt with new key
let (new_ciphertext, new_nonce) = new_key.encrypt(plaintext.expose_secret())?;
// Update in transaction
sqlx::query!(
"UPDATE user_provider_keys SET encrypted_key = $1, nonce = $2 WHERE id = $3",
new_ciphertext,
new_nonce,
key_row.id
)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
println!("Rotated {} keys successfully.", all_keys.len());
}
After running: update the MASTER_ENCRYPTION_KEY environment variable and restart the application.
10. Error Handling
10.1 Error Type Hierarchy
// src/error.rs
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use serde_json::json;
#[derive(Debug, thiserror::Error)]
pub enum AppError {
// === Client errors ===
#[error("Bad request: {0}")]
BadRequest(String),
#[error("Unauthorized: {0}")]
Unauthorized(String),
#[error("Forbidden: {0}")]
Forbidden(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Conflict: {0}")]
Conflict(String),
#[error("Too many requests")]
TooManyRequests { retry_after_secs: u64 },
#[error("Validation error")]
Validation(Vec<FieldError>),
// === Server errors ===
#[error("Internal error: {0}")]
Internal(#[from] anyhow::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("LLM provider error: {0}")]
LlmError(String),
#[error("Email error: {0}")]
SmtpError(String),
#[error("Scraping error: {0}")]
ScrapingError(String),
}
#[derive(Debug, serde::Serialize)]
pub struct FieldError {
pub field: String,
pub message: String,
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, body) = match &self {
AppError::BadRequest(msg) => (
StatusCode::BAD_REQUEST,
json!({ "error": msg }),
),
AppError::Unauthorized(_) => (
StatusCode::UNAUTHORIZED,
json!({ "error": "Unauthorized" }),
),
AppError::Forbidden(_) => (
StatusCode::FORBIDDEN,
json!({ "error": "Forbidden" }),
),
AppError::NotFound(msg) => (
StatusCode::NOT_FOUND,
json!({ "error": msg }),
),
AppError::Conflict(msg) => (
StatusCode::CONFLICT,
json!({ "error": msg }),
),
AppError::TooManyRequests { retry_after_secs } => {
let mut resp = (
StatusCode::TOO_MANY_REQUESTS,
Json(json!({ "error": "Too many requests", "retry_after": retry_after_secs })),
).into_response();
resp.headers_mut().insert(
"Retry-After",
retry_after_secs.to_string().parse().unwrap(),
);
return resp;
}
AppError::Validation(errors) => (
StatusCode::UNPROCESSABLE_ENTITY,
json!({ "error": "Validation failed", "details": errors }),
),
AppError::Database(e) => {
tracing::error!(error = %e, "Database error");
(
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "error": "Internal server error" }),
)
}
AppError::Internal(e) => {
tracing::error!(error = ?e, "Internal error");
(
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "error": "Internal server error" }),
)
}
AppError::LlmError(msg) => {
tracing::error!(error = %msg, "LLM provider error");
(
StatusCode::BAD_GATEWAY,
json!({ "error": "AI provider error. Please try again." }),
)
}
AppError::SmtpError(msg) => {
tracing::error!(error = %msg, "Email error");
(
StatusCode::BAD_GATEWAY,
json!({ "error": "Failed to send email. Please try again." }),
)
}
AppError::ScrapingError(msg) => {
tracing::warn!(error = %msg, "Scraping error");
(
StatusCode::BAD_GATEWAY,
json!({ "error": "Failed to fetch article content." }),
)
}
};
(status, Json(body)).into_response()
}
}
Key design decisions:
- Never expose internal details to the client (database errors, stack traces).
- Log server errors at
error!level with full context for debugging. - Client errors return the user-facing message directly.
- LLM and email errors return 502 Bad Gateway (upstream service failure).
- The
From<sqlx::Error>impl converts specific constraint violations (unique, foreign key) intoConflictorBadRequestwhere appropriate.
10.2 Logging Strategy
// In main.rs
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
fn init_tracing() {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| {
"ai_synth_backend=debug,tower_http=info,sqlx=warn".into()
}))
.with(tracing_subscriber::fmt::layer()
.with_target(true)
.with_thread_ids(false)
.with_file(true)
.with_line_number(true))
.init();
}
Logging conventions:
- Never log decrypted API keys, session tokens, magic link tokens, or the master key.
- Use
secrecy::SecretStringfor all secret values. ItsDebugandDisplayimpls print[REDACTED]. - Structured fields: Use
tracing::info!(user_id = %id, action = "generate", ...)not string interpolation. - Request tracing: Use
tower_http::trace::TraceLayerfor automatic request/response logging.
11. Testing Strategy
11.1 Test Database Setup
Tests use a real PostgreSQL instance (via Docker in CI, or a local dev instance). Each test gets its own schema or uses transactions that are rolled back.
// tests/common/mod.rs
use sqlx::PgPool;
use uuid::Uuid;
/// Create a test database pool pointing to a test-specific schema.
pub async fn setup_test_db() -> PgPool {
dotenvy::dotenv().ok();
let base_url = std::env::var("TEST_DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/ai_synth_test".into());
let pool = PgPool::connect(&base_url).await.unwrap();
// Run migrations
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
pool
}
/// Create a test user and return (user_id, session_token).
pub async fn create_test_user(pool: &PgPool) -> (Uuid, String) {
let user_id = Uuid::new_v4();
let email = format!("test-{}@example.com", Uuid::new_v4());
sqlx::query!(
"INSERT INTO users (id, email, role) VALUES ($1, $2, 'user')",
user_id,
email,
)
.execute(pool)
.await
.unwrap();
// Create session
let token = crate::util::token::generate_token();
let hash = crate::util::token::hash_token(&token);
let expires = chrono::Utc::now() + chrono::Duration::days(30);
sqlx::query!(
"INSERT INTO sessions (user_id, token_hash, expires_at) VALUES ($1, $2, $3)",
user_id,
hash,
expires,
)
.execute(pool)
.await
.unwrap();
(user_id, token)
}
/// Create an admin user and return (user_id, session_token).
pub async fn create_test_admin(pool: &PgPool) -> (Uuid, String) {
let user_id = Uuid::new_v4();
let email = format!("admin-{}@example.com", Uuid::new_v4());
sqlx::query!(
"INSERT INTO users (id, email, role) VALUES ($1, $2, 'admin')",
user_id,
email,
)
.execute(pool)
.await
.unwrap();
let token = crate::util::token::generate_token();
let hash = crate::util::token::hash_token(&token);
let expires = chrono::Utc::now() + chrono::Duration::days(30);
sqlx::query!(
"INSERT INTO sessions (user_id, token_hash, expires_at) VALUES ($1, $2, $3)",
user_id,
hash,
expires,
)
.execute(pool)
.await
.unwrap();
(user_id, token)
}
/// Build a test app (Axum router) with real DB and mock external services.
pub async fn build_test_app(pool: PgPool) -> axum::Router {
let config = AppConfig::test_defaults();
let state = AppState {
pool,
http_client: reqwest::Client::new(),
config,
rate_limiter: Arc::new(RateLimiter::new()),
job_store: Arc::new(DashMap::new()),
};
crate::router::create_router(state)
}
11.2 Unit Test Examples
Encryption
// tests/services/encryption_test.rs
#[test]
fn test_encrypt_decrypt_roundtrip() {
let master_key = MasterKey::from_hex(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
).unwrap();
let plaintext = "sk-proj-abc123def456";
let (ciphertext, nonce) = master_key.encrypt(plaintext).unwrap();
// Ciphertext should not equal plaintext
assert_ne!(ciphertext, plaintext.as_bytes());
// Decrypt should return original
let decrypted = master_key.decrypt(&ciphertext, &nonce).unwrap();
assert_eq!(decrypted.expose_secret(), plaintext);
}
#[test]
fn test_decrypt_wrong_key_fails() {
let key1 = MasterKey::from_hex(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
).unwrap();
let key2 = MasterKey::from_hex(
"fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210"
).unwrap();
let (ciphertext, nonce) = key1.encrypt("secret").unwrap();
assert!(key2.decrypt(&ciphertext, &nonce).is_err());
}
#[test]
fn test_each_encryption_produces_unique_nonce() {
let master_key = MasterKey::from_hex(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
).unwrap();
let (_, nonce1) = master_key.encrypt("same text").unwrap();
let (_, nonce2) = master_key.encrypt("same text").unwrap();
assert_ne!(nonce1, nonce2);
}
Rate Limiter
// tests/services/rate_limiter_test.rs
#[tokio::test]
async fn test_rate_limiter_allows_within_limit() {
let limiter = RateLimiter::new();
limiter.update_provider_limit("test", 5, 60);
let user_id = Uuid::new_v4();
for _ in 0..5 {
limiter.acquire("test", user_id).await.unwrap();
}
}
#[tokio::test]
async fn test_rate_limiter_blocks_when_exceeded() {
let limiter = RateLimiter::new();
limiter.update_provider_limit("test", 2, 60);
let user_id = Uuid::new_v4();
limiter.acquire("test", user_id).await.unwrap();
limiter.acquire("test", user_id).await.unwrap();
// Third should block or return TooManyRequests
// (implementation detail: may sleep or error)
}
Token Utilities
// Unit tests in src/util/token.rs
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_token_length() {
let token = generate_token();
// 32 bytes base64url-encoded = 43 characters
assert_eq!(token.len(), 43);
}
#[test]
fn test_generate_token_uniqueness() {
let t1 = generate_token();
let t2 = generate_token();
assert_ne!(t1, t2);
}
#[test]
fn test_hash_token_deterministic() {
let token = "test-token-value";
let h1 = hash_token(token);
let h2 = hash_token(token);
assert_eq!(h1, h2);
}
#[test]
fn test_hash_token_different_inputs() {
assert_ne!(hash_token("a"), hash_token("b"));
}
}
11.3 Integration Test (API)
// tests/api/sources_test.rs
use axum::http::{Request, StatusCode};
use tower::ServiceExt;
use http_body_util::BodyExt;
#[tokio::test]
async fn test_create_and_list_sources() {
let pool = common::setup_test_db().await;
let (user_id, session_token) = common::create_test_user(&pool).await;
let app = common::build_test_app(pool).await;
// Create a source
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/sources")
.header("Content-Type", "application/json")
.header("Cookie", format!("ai_synth_session={session_token}"))
.header("X-Requested-With", "XMLHttpRequest")
.body(serde_json::to_string(&serde_json::json!({
"title": "Test Source",
"url": "https://example.com"
})).unwrap().into())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// List sources
let resp = app
.clone()
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/sources")
.header("Cookie", format!("ai_synth_session={session_token}"))
.body(axum::body::Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let sources: Vec<serde_json::Value> = serde_json::from_slice(&body).unwrap();
assert_eq!(sources.len(), 1);
assert_eq!(sources[0]["title"], "Test Source");
}
#[tokio::test]
async fn test_cannot_access_other_users_sources() {
let pool = common::setup_test_db().await;
let (user1_id, token1) = common::create_test_user(&pool).await;
let (user2_id, token2) = common::create_test_user(&pool).await;
let app = common::build_test_app(pool.clone()).await;
// User 1 creates a source
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/sources")
.header("Content-Type", "application/json")
.header("Cookie", format!("ai_synth_session={token1}"))
.header("X-Requested-With", "XMLHttpRequest")
.body(serde_json::to_string(&serde_json::json!({
"title": "User 1 Source",
"url": "https://example.com"
})).unwrap().into())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// User 2 lists sources -- should see empty list
let resp = app
.clone()
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/sources")
.header("Cookie", format!("ai_synth_session={token2}"))
.body(axum::body::Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = resp.into_body().collect().await.unwrap().to_bytes();
let sources: Vec<serde_json::Value> = serde_json::from_slice(&body).unwrap();
assert_eq!(sources.len(), 0);
}
#[tokio::test]
async fn test_unauthenticated_request_returns_401() {
let pool = common::setup_test_db().await;
let app = common::build_test_app(pool).await;
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/sources")
.body(axum::body::Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
11.4 Test Conventions
| Layer | Tool | What to test |
|---|---|---|
util/ |
#[test] unit tests |
Token generation, hashing, encryption roundtrips |
services/ |
#[tokio::test] |
Rate limiter behavior, scraper parsing (with HTML fixtures), email rendering |
db/ |
#[tokio::test] with real Postgres |
Query correctness, constraint enforcement, data isolation |
handlers/ |
#[tokio::test] with tower::ServiceExt::oneshot |
Full HTTP request/response cycle, status codes, auth enforcement, CSRF checks |
| LLM providers | Mock HTTP server (wiremock crate) |
Response parsing, error handling, schema construction |
Appendix A: Router Wiring
// src/router.rs
use axum::{
middleware,
routing::{get, post, put, delete},
Router,
};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tower_http::services::ServeDir;
pub fn create_router(state: AppState) -> Router {
let api_routes = Router::new()
// === Public auth routes (no session required) ===
.route("/auth/register", post(handlers::auth::register))
.route("/auth/magic-link", post(handlers::auth::request_magic_link))
.route("/auth/verify", get(handlers::auth::verify_magic_link))
// === Authenticated routes ===
.route("/auth/logout", post(handlers::auth::logout))
.route("/auth/me", get(handlers::auth::me))
.route("/syntheses", get(handlers::syntheses::list))
.route("/syntheses/:id", get(handlers::syntheses::get))
.route("/syntheses/:id", delete(handlers::syntheses::delete))
.route("/syntheses/generate", post(handlers::syntheses::trigger_generate))
.route("/syntheses/:id/email", post(handlers::syntheses::send_email))
.route("/sources", get(handlers::sources::list))
.route("/sources", post(handlers::sources::create))
.route("/sources/:id", delete(handlers::sources::delete))
.route("/settings", get(handlers::settings::get))
.route("/settings", put(handlers::settings::update))
.route("/provider-keys", get(handlers::provider_keys::list))
.route("/provider-keys", post(handlers::provider_keys::create))
.route("/provider-keys/:provider", delete(handlers::provider_keys::delete))
.route("/generation/:job_id/progress", get(handlers::generation::progress_stream))
.route("/generation/:job_id/status", get(handlers::generation::job_status))
.route("/config/providers", get(handlers::admin::list_enabled_providers))
// === Admin routes ===
.route("/admin/provider-models", get(handlers::admin::list_provider_models))
.route("/admin/provider-models", post(handlers::admin::create_provider_model))
.route("/admin/provider-models/:id", put(handlers::admin::update_provider_model))
.route("/admin/provider-models/:id", delete(handlers::admin::delete_provider_model))
.route("/admin/rate-limits", get(handlers::admin::list_rate_limits))
.route("/admin/rate-limits/:provider", put(handlers::admin::update_rate_limit))
.route("/admin/users", get(handlers::admin::list_users))
.route("/admin/users/:id/role", put(handlers::admin::update_user_role))
// === Health ===
.route("/health", get(handlers::health::health_check));
let app = Router::new()
.nest("/api/v1", api_routes)
// CSRF middleware on API routes (checks X-Requested-With on POST/PUT/DELETE)
.layer(middleware::from_fn(middleware::csrf::csrf_check))
// CORS
.layer(build_cors_layer(&state.config))
// Request tracing
.layer(TraceLayer::new_for_http())
// Static files fallback (SPA)
.fallback_service(
ServeDir::new(&state.config.static_dir)
.fallback(ServeDir::new(&state.config.static_dir).append_index_html_on_directories(true))
)
.with_state(state);
app
}
fn build_cors_layer(config: &AppConfig) -> CorsLayer {
CorsLayer::new()
.allow_origin(config.base_url.parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
.allow_headers([
header::CONTENT_TYPE,
header::COOKIE,
HeaderName::from_static("x-requested-with"),
])
.allow_credentials(true)
.max_age(Duration::from_secs(3600))
}
Appendix B: AppConfig Full Definition
// src/config.rs
#[derive(Clone)]
pub struct AppConfig {
// Server
pub port: u16,
pub base_url: String, // e.g., "https://synth.example.com"
pub static_dir: String, // path to SolidJS build output
// Database
pub database_url: String, // postgres://...
// Encryption
pub master_key: MasterKey, // from MASTER_ENCRYPTION_KEY env var
// Email (Resend)
pub resend_api_key: SecretString, // from RESEND_API_KEY env var
pub resend_from: String, // e.g., "AI Weekly Synth <noreply@example.com>"
// Captcha (Cloudflare Turnstile)
pub turnstile_secret: String,
pub turnstile_site_key: String,
// Logging
pub rust_log: String,
}
impl AppConfig {
pub fn from_env() -> Result<Self, AppError> {
dotenvy::dotenv().ok();
Ok(Self {
port: env_var_or("PORT", "8080").parse()?,
base_url: env_var("BASE_URL")?,
static_dir: env_var_or("STATIC_DIR", "./static"),
database_url: env_var("DATABASE_URL")?,
master_key: MasterKey::from_hex(&env_var("MASTER_ENCRYPTION_KEY")?)?,
resend_api_key: SecretString::new(env_var("RESEND_API_KEY")?),
resend_from: env_var_or("RESEND_FROM", "AI Weekly Synth <noreply@example.com>"),
turnstile_secret: env_var("TURNSTILE_SECRET")?,
turnstile_site_key: env_var("TURNSTILE_SITE_KEY")?,
rust_log: env_var_or("RUST_LOG", "ai_synth_backend=info"),
})
}
#[cfg(test)]
pub fn test_defaults() -> Self {
Self {
port: 0,
base_url: "http://localhost:3000".into(),
static_dir: "./static".into(),
database_url: "postgres://postgres:postgres@localhost:5432/ai_synth_test".into(),
master_key: MasterKey::from_hex(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
).unwrap(),
resend_api_key: SecretString::new("test-resend-key".into()),
resend_from: "test@example.com".into(),
turnstile_secret: "test-secret".into(),
turnstile_site_key: "test-site-key".into(),
rust_log: "debug".into(),
}
}
}
fn env_var(name: &str) -> Result<String, AppError> {
std::env::var(name).map_err(|_| {
AppError::Internal(anyhow::anyhow!("Missing required env var: {name}"))
})
}
fn env_var_or(name: &str, default: &str) -> String {
std::env::var(name).unwrap_or_else(|_| default.to_string())
}
Appendix C: Prompt Construction
The prompts mirror the current Gemini service but are provider-agnostic.
// src/services/generation.rs (prompt helpers)
fn build_search_prompts(
settings: &UserSettings,
sources: &[Source],
) -> (String, String) {
let current_date = Utc::now().format("%A %d %B %Y").to_string();
let sources_text = if sources.is_empty() {
String::new()
} else {
let list = sources.iter()
.map(|s| format!("- {} ({})", s.title, s.url))
.collect::<Vec<_>>()
.join("\n");
format!(
"\nYou MUST also consult these custom sources:\n{}\n",
list
)
};
let categories_text = settings.categories.iter()
.enumerate()
.map(|(i, cat)| format!("{}. {}", i + 1, cat))
.collect::<Vec<_>>()
.join("\n");
let system_prompt = format!(
"You are a precise AI assistant. You MUST provide complete and exact URLs. \
Focus ONLY on news from the last {} days.",
settings.max_age_days
);
let user_prompt = format!(
"Today is {date}.\n\
You are an expert analyst on the topic: \"{theme}\".\n\
Search for news STRICTLY from the last {days} days.\n\
Do NOT return any news older than {days} days.\n\
{sources}\
{behavior}\n\
The synthesis must be divided into {count} sections:\n\
{categories}\n\n\
For each category, provide at most {max_items} articles.\n\
For each article, provide: a provisional title, the exact source URL, \
and a provisional summary.\n\
Return the result as JSON using keys category_0, category_1, etc. \
matching the section order above.",
date = current_date,
theme = settings.theme,
days = settings.max_age_days,
sources = sources_text,
behavior = settings.search_agent_behavior,
count = settings.categories.len(),
categories = categories_text,
max_items = settings.max_items_per_category,
);
(system_prompt, user_prompt)
}
fn build_rewrite_prompts(
scraped: &[(String, Vec<ScrapedItem>)],
) -> (String, String) {
let system_prompt = "You are a precise AI assistant. Generate titles and summaries \
that faithfully reflect the provided content.".to_string();
let data = serde_json::to_string_pretty(scraped).unwrap_or_default();
let user_prompt = format!(
"You are an expert news analyst.\n\
Below is a list of articles organized by category, each with raw text content \
extracted from the source websites ('scraped_content').\n\
Your task is to rewrite the 'title' and 'summary' (4-5 lines) for each article \
so they EXACTLY and FAITHFULLY reflect the provided text content.\n\
If 'scraped_content' is empty or insufficient, use the original title and summary.\n\
KEEP the exact same URLs. Do NOT remove any article.\n\n\
Article data:\n{data}",
data = data,
);
(system_prompt, user_prompt)
}
/// Build the JSON schema for structured output, based on user categories.
fn build_category_schema(categories: &[String]) -> serde_json::Value {
let mut properties = serde_json::Map::new();
let mut required = Vec::new();
for (i, _cat) in categories.iter().enumerate() {
let key = format!("category_{}", i);
properties.insert(key.clone(), serde_json::json!({
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string" },
"url": { "type": "string" },
"summary": { "type": "string" }
},
"required": ["title", "url", "summary"]
}
}));
required.push(serde_json::Value::String(key));
}
serde_json::json!({
"type": "object",
"properties": properties,
"required": required
})
}
Appendix D: Background Tasks
Besides generation jobs, the application needs periodic cleanup tasks.
// In main.rs, after starting the server
// Periodic cleanup: expired sessions and magic link tokens
let cleanup_pool = pool.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(3600)); // every hour
loop {
interval.tick().await;
if let Err(e) = db::sessions::delete_expired(&cleanup_pool).await {
tracing::error!(error = %e, "Failed to clean up expired sessions");
}
if let Err(e) = db::magic_links::delete_expired(&cleanup_pool).await {
tracing::error!(error = %e, "Failed to clean up expired magic link tokens");
}
}
});
Appendix E: .env.example
# === Required ===
DATABASE_URL=postgres://ai_synth:password@localhost:5432/ai_synth
BASE_URL=https://synth.example.com
MASTER_ENCRYPTION_KEY=<64-character-hex-string>
# === Email (Resend) ===
RESEND_API_KEY=re_xxxxxxxxxxxx
RESEND_FROM=AI Weekly Synth <noreply@yourdomain.com>
# === Captcha (Cloudflare Turnstile) ===
TURNSTILE_SECRET=0x4AAAAAAA...
TURNSTILE_SITE_KEY=0x4BBBBBB...
# === Optional ===
PORT=8080
STATIC_DIR=./static
RUST_LOG=ai_synth_backend=info,tower_http=info