fix: architect assessment remediation — 6 issues across backend, frontend, and infra

- Wire hardened scraper client into runtime (SSRF redirect validation was defined but unused)
- Stream scraper body with per-chunk size limit instead of post-download check (DoS/OOM)
- Persist user rate-limit overrides across generation jobs via AppState DashMap
- Roll back magic-link token on email send failure to prevent quota exhaustion
- Fix API error UX: prefer human message over machine error code in frontend
- Unwrap GET /syntheses { items } wrapper in frontend API layer (contract mismatch)
- Bind Postgres to localhost in docker-compose (was exposed on all interfaces)
- Fix CLAUDE.md: runtime queries not compile-time, 10 migrations not 9

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent b9dcf6e749
commit 54d54f2a06

@ -6,7 +6,7 @@ AI Weekly Synth is a self-hosted web application that generates AI-powered weekl
## Architecture ## Architecture
- **Backend**: Rust (Axum) — `backend/` - **Backend**: Rust (Axum) — `backend/`
- **Frontend**: SolidJS + Tailwind CSS v4 — `frontend/` - **Frontend**: SolidJS + Tailwind CSS v4 — `frontend/`
- **Database**: PostgreSQL (via sqlx with compile-time checked queries) - **Database**: PostgreSQL (via sqlx with runtime-checked queries)
- **Deployment**: Docker only (`docker-compose.yml`) - **Deployment**: Docker only (`docker-compose.yml`)
## Project Structure ## Project Structure
@ -117,7 +117,7 @@ cd frontend && npx tsc --noEmit
- `GET /api/v1/admin/users` — user list - `GET /api/v1/admin/users` — user list
- `PUT /api/v1/admin/users/:id/role` — role management - `PUT /api/v1/admin/users/:id/role` — role management
## Database (9 migrations) ## Database (10 migrations)
Tables: `users`, `sessions`, `magic_link_tokens`, `user_settings`, `sources`, `syntheses`, `admin_providers`, `admin_rate_limits`, `user_api_keys`, `audit_log` Tables: `users`, `sessions`, `magic_link_tokens`, `user_settings`, `sources`, `syntheses`, `admin_providers`, `admin_rate_limits`, `user_api_keys`, `audit_log`
## Environment Variables ## Environment Variables

@ -1,6 +1,8 @@
//! Shared application state passed to all handlers via Axum's `State` extractor. //! Shared application state passed to all handlers via Axum's `State` extractor.
use dashmap::DashMap;
use sqlx::PgPool; use sqlx::PgPool;
use uuid::Uuid;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::services::rate_limiter::{ProviderRateLimiter, RateLimiter}; use crate::services::rate_limiter::{ProviderRateLimiter, RateLimiter};
@ -19,6 +21,10 @@ pub struct AppState {
/// Per-provider rate limiter for LLM API calls. /// Per-provider rate limiter for LLM API calls.
/// Loaded from DB at startup, hot-reloaded when admin updates config. /// Loaded from DB at startup, hot-reloaded when admin updates config.
pub provider_rate_limiter: ProviderRateLimiter, pub provider_rate_limiter: ProviderRateLimiter,
/// Per-user rate limiters for generation jobs.
/// Keyed by user_id. Created on first generation, replaced if settings change.
/// Tuple: (max_requests, window_seconds, limiter).
pub user_rate_limiters: DashMap<Uuid, (i32, i32, RateLimiter)>,
/// In-memory store for active generation jobs. /// In-memory store for active generation jobs.
/// Maps job_id -> progress watch channel. /// Maps job_id -> progress watch channel.
pub job_store: JobStore, pub job_store: JobStore,
@ -42,6 +48,7 @@ impl AppState {
http_client, http_client,
auth_rate_limiter, auth_rate_limiter,
provider_rate_limiter, provider_rate_limiter,
user_rate_limiters: DashMap::new(),
job_store, job_store,
} }
} }

@ -86,6 +86,18 @@ pub async fn delete_expired(pool: &PgPool) -> Result<u64, AppError> {
Ok(result.rows_affected()) Ok(result.rows_affected())
} }
/// Delete a magic link token by its hash.
///
/// Used to roll back a token when email delivery fails, so it doesn't
/// consume a quota slot.
pub async fn delete_by_hash(pool: &PgPool, token_hash: &str) -> Result<(), AppError> {
sqlx::query("DELETE FROM magic_tokens WHERE token_hash = $1")
.bind(token_hash)
.execute(pool)
.await?;
Ok(())
}
/// Count unused, non-expired tokens for a given email. /// Count unused, non-expired tokens for a given email.
/// ///
/// Used to prevent spamming magic link requests. /// Used to prevent spamming magic link requests.

@ -96,9 +96,9 @@ pub async fn register(
tracing::info!(email = %email_lower, "New user registered"); tracing::info!(email = %email_lower, "New user registered");
} }
// Generate and send magic link // Generate and send magic link — roll back token if email send fails
if let Some(raw_token) = auth::create_magic_link(&state.pool, &email_lower).await? { if let Some(raw_token) = auth::create_magic_link(&state.pool, &email_lower).await? {
email::send_magic_link( if let Err(e) = email::send_magic_link(
&state.http_client, &state.http_client,
&state.config.resend_api_key, &state.config.resend_api_key,
&state.config.email_from, &state.config.email_from,
@ -106,7 +106,14 @@ pub async fn register(
&state.config.app_url, &state.config.app_url,
&raw_token, &raw_token,
) )
.await?; .await
{
let token_hash = crate::util::token::hash_token(&raw_token);
db::magic_links::delete_by_hash(&state.pool, &token_hash)
.await
.ok();
return Err(e);
}
} }
Ok(( Ok((
@ -151,7 +158,7 @@ pub async fn login(
if existing.is_some() { if existing.is_some() {
if let Some(raw_token) = auth::create_magic_link(&state.pool, &email_lower).await? { if let Some(raw_token) = auth::create_magic_link(&state.pool, &email_lower).await? {
email::send_magic_link( if let Err(e) = email::send_magic_link(
&state.http_client, &state.http_client,
&state.config.resend_api_key, &state.config.resend_api_key,
&state.config.email_from, &state.config.email_from,
@ -159,7 +166,14 @@ pub async fn login(
&state.config.app_url, &state.config.app_url,
&raw_token, &raw_token,
) )
.await?; .await
{
let token_hash = crate::util::token::hash_token(&raw_token);
db::magic_links::delete_by_hash(&state.pool, &token_hash)
.await
.ok();
return Err(e);
}
} }
} else { } else {
// Add a small random delay to prevent timing attacks // Add a small random delay to prevent timing attacks

@ -15,6 +15,7 @@ use ai_synth_backend::config::AppConfig;
use ai_synth_backend::db; use ai_synth_backend::db;
use ai_synth_backend::models::user::UserRole; use ai_synth_backend::models::user::UserRole;
use ai_synth_backend::router; use ai_synth_backend::router;
use ai_synth_backend::services::scraper;
use crate::cli::{Cli, Commands}; use crate::cli::{Cli, Commands};
@ -50,7 +51,7 @@ async fn main() -> anyhow::Result<()> {
match cli.command.unwrap_or(Commands::Serve) { match cli.command.unwrap_or(Commands::Serve) {
Commands::Serve => { Commands::Serve => {
let http_client = reqwest::Client::new(); let http_client = scraper::build_scraper_client()?;
let state = app_state::AppState::new(config.clone(), pool, http_client); let state = app_state::AppState::new(config.clone(), pool, http_client);
// Load provider rate limits from DB into in-memory limiter // Load provider rate limits from DB into in-memory limiter

@ -122,7 +122,7 @@ pub async fn scrape_url(
check_ssrf(&parsed_url).await?; check_ssrf(&parsed_url).await?;
// Fetch the page // Fetch the page
let response = http_client let mut response = http_client
.get(url) .get(url)
.send() .send()
.await .await
@ -145,17 +145,29 @@ pub async fn scrape_url(
// Capture final URL BEFORE consuming the response body (follows redirects) // Capture final URL BEFORE consuming the response body (follows redirects)
let final_url = response.url().clone(); let final_url = response.url().clone();
// Read body with size limit // Read body with streaming size limit — bail early to avoid OOM on huge responses.
let bytes = response // Check Content-Length first (fast path), then enforce while streaming chunks.
.bytes() if let Some(content_length) = response.content_length() {
.await if content_length as usize > MAX_BODY_SIZE {
.map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to read response body: {}", e)))?; return Err(AppError::BadRequest(
"Response body exceeds 5 MB limit".into(),
));
}
}
if bytes.len() > MAX_BODY_SIZE { let mut bytes = Vec::new();
while let Some(chunk) = response
.chunk()
.await
.map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to read response body: {}", e)))?
{
if bytes.len() + chunk.len() > MAX_BODY_SIZE {
return Err(AppError::BadRequest( return Err(AppError::BadRequest(
"Response body exceeds 5 MB limit".into(), "Response body exceeds 5 MB limit".into(),
)); ));
} }
bytes.extend_from_slice(&chunk);
}
let html_text = String::from_utf8_lossy(&bytes); let html_text = String::from_utf8_lossy(&bytes);
let document = Html::parse_document(&html_text); let document = Html::parse_document(&html_text);

@ -289,9 +289,8 @@ async fn run_generation_inner(
model_research.clone() model_research.clone()
}; };
// Build per-user rate limiter if the user has overrides configured. // Look up or create per-user rate limiter from AppState so limits persist across jobs.
// Created once and reused across both passes so the limit is actually enforced. let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id);
let user_rate_limiter = build_user_rate_limiter(&settings);
// Step 5: Rate limit check (pass 1) // Step 5: Rate limit check (pass 1)
check_rate_limit(state, &user_rate_limiter, &provider_name)?; check_rate_limit(state, &user_rate_limiter, &provider_name)?;
@ -379,24 +378,46 @@ fn emit_progress(tx: &watch::Sender<ProgressEvent>, step: &str, message: &str, p
.ok(); .ok();
} }
/// Build a per-user rate limiter from settings, if both override fields are configured. /// Look up or create a per-user rate limiter stored in AppState.
/// ///
/// Returns `None` if the user has no rate limit overrides, in which case the /// Returns `None` if the user has no rate limit overrides, in which case the
/// global provider rate limiter should be used instead. /// global provider rate limiter should be used instead.
fn build_user_rate_limiter( ///
/// The limiter is stored in `state.user_rate_limiters` keyed by `user_id` so that
/// rate limit history persists across generation jobs. If the user's settings have
/// changed since the limiter was created, a fresh limiter replaces the old one.
fn get_user_rate_limiter(
state: &AppState,
settings: &UserSettings, settings: &UserSettings,
user_id: Uuid,
) -> Option<crate::services::rate_limiter::RateLimiter> { ) -> Option<crate::services::rate_limiter::RateLimiter> {
match ( match (
settings.rate_limit_max_requests, settings.rate_limit_max_requests,
settings.rate_limit_time_window_seconds, settings.rate_limit_time_window_seconds,
) { ) {
(Some(max_req), Some(window_sec)) => { (Some(max_req), Some(window_sec)) => {
Some(crate::services::rate_limiter::RateLimiter::new( // Reuse existing limiter if settings haven't changed
if let Some(entry) = state.user_rate_limiters.get(&user_id) {
let (stored_max, stored_window, ref limiter) = *entry;
if stored_max == max_req && stored_window == window_sec {
return Some(limiter.clone());
}
}
// Settings changed or first generation — create and store
let limiter = crate::services::rate_limiter::RateLimiter::new(
max_req as usize, max_req as usize,
Duration::from_secs(window_sec as u64), Duration::from_secs(window_sec as u64),
)) );
state
.user_rate_limiters
.insert(user_id, (max_req, window_sec, limiter.clone()));
Some(limiter)
}
_ => {
// User cleared their overrides — remove stale limiter
state.user_rate_limiters.remove(&user_id);
None
} }
_ => None,
} }
} }

@ -35,7 +35,7 @@ services:
networks: networks:
- internal - internal
ports: ports:
- "5432:5432" - "127.0.0.1:5432:5432"
healthcheck: healthcheck:
test: ["CMD-SHELL", "pg_isready -U ai_synth -d ai_synth"] test: ["CMD-SHELL", "pg_isready -U ai_synth -d ai_synth"]
interval: 10s interval: 10s

@ -65,7 +65,7 @@ class ApiClient {
.catch(() => ({ error: 'Unknown error' })); .catch(() => ({ error: 'Unknown error' }));
const apiError: ApiError = { const apiError: ApiError = {
status: response.status, status: response.status,
message: errorBody.error || errorBody.message || `HTTP ${response.status}`, message: errorBody.message || errorBody.error || `HTTP ${response.status}`,
field_errors: errorBody.field_errors, field_errors: errorBody.field_errors,
}; };
throw apiError; throw apiError;

@ -60,8 +60,10 @@ export async function fetchFile(path: string): Promise<Response> {
/** Synthesis API endpoints (CRUD, generation, export, email). */ /** Synthesis API endpoints (CRUD, generation, export, email). */
export const synthesesApi = { export const synthesesApi = {
/** GET /syntheses -- paginated list of the user's syntheses. */ /** GET /syntheses -- paginated list of the user's syntheses. */
list: (limit = 50, offset = 0): Promise<SynthesisListItem[]> => list: async (limit = 50, offset = 0): Promise<SynthesisListItem[]> => {
api.get<SynthesisListItem[]>(`/syntheses?limit=${limit}&offset=${offset}`), const response = await api.get<{ items: SynthesisListItem[] }>(`/syntheses?limit=${limit}&offset=${offset}`);
return response.items;
},
/** GET /syntheses/:id -- fetch a single synthesis with full content. */ /** GET /syntheses/:id -- fetch a single synthesis with full content. */
get: (id: string): Promise<Synthesis> => get: (id: string): Promise<Synthesis> =>

Loading…
Cancel
Save