diff --git a/backend/src/app_state.rs b/backend/src/app_state.rs index 493f178..4e050f0 100644 --- a/backend/src/app_state.rs +++ b/backend/src/app_state.rs @@ -23,13 +23,20 @@ pub struct AppState { pub provider_rate_limiter: ProviderRateLimiter, /// Per-user rate limiters for generation jobs. /// Keyed by user_id. Created on first generation, replaced if settings change. - /// Tuple: (max_requests, window_seconds, limiter). - pub user_rate_limiters: DashMap, + pub user_rate_limiters: DashMap, /// In-memory store for active generation jobs. /// Maps job_id -> progress watch channel. pub job_store: JobStore, } +/// Cached per-user rate limiter with the settings it was created from. +#[derive(Clone)] +pub struct UserRateLimitEntry { + pub max_requests: i32, + pub window_seconds: i32, + pub limiter: RateLimiter, +} + impl AppState { /// Create a new `AppState` instance. pub fn new(config: AppConfig, pool: PgPool, http_client: reqwest::Client) -> Self { diff --git a/backend/src/handlers/auth.rs b/backend/src/handlers/auth.rs index c94167e..8d395de 100644 --- a/backend/src/handlers/auth.rs +++ b/backend/src/handlers/auth.rs @@ -21,7 +21,7 @@ use crate::db; use crate::errors::AppError; use crate::middleware::auth::AuthUser; use crate::models::user::{MeResponse, UserRole}; -use crate::services::{auth, email, turnstile}; +use crate::services::{auth, turnstile}; use crate::util::token; /// Generic auth response used for register and login. @@ -96,25 +96,16 @@ pub async fn register( tracing::info!(email = %email_lower, "New user registered"); } - // Generate and send magic link — roll back token if email send fails - if let Some(raw_token) = auth::create_magic_link(&state.pool, &email_lower).await? { - if let Err(e) = email::send_magic_link( - &state.http_client, - &state.config.resend_api_key, - &state.config.email_from, - &email_lower, - &state.config.app_url, - &raw_token, - ) - .await - { - let token_hash = crate::util::token::hash_token(&raw_token); - db::magic_links::delete_by_hash(&state.pool, &token_hash) - .await - .ok(); - return Err(e); - } - } + // Generate and send magic link (rolls back token if email delivery fails) + auth::create_and_send_magic_link( + &state.pool, + &state.http_client, + &state.config.resend_api_key, + &state.config.email_from, + &state.config.app_url, + &email_lower, + ) + .await?; Ok(( StatusCode::OK, @@ -157,24 +148,15 @@ pub async fn login( let existing = db::users::find_by_email(&state.pool, &email_lower).await?; if existing.is_some() { - if let Some(raw_token) = auth::create_magic_link(&state.pool, &email_lower).await? { - if let Err(e) = email::send_magic_link( - &state.http_client, - &state.config.resend_api_key, - &state.config.email_from, - &email_lower, - &state.config.app_url, - &raw_token, - ) - .await - { - let token_hash = crate::util::token::hash_token(&raw_token); - db::magic_links::delete_by_hash(&state.pool, &token_hash) - .await - .ok(); - return Err(e); - } - } + auth::create_and_send_magic_link( + &state.pool, + &state.http_client, + &state.config.resend_api_key, + &state.config.email_from, + &state.config.app_url, + &email_lower, + ) + .await?; } else { // Add a small random delay to prevent timing attacks let delay_ms = rand::random::() % 150 + 50; // 50-200ms diff --git a/backend/src/services/auth.rs b/backend/src/services/auth.rs index a4088a3..4c13be4 100644 --- a/backend/src/services/auth.rs +++ b/backend/src/services/auth.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use crate::db; use crate::errors::AppError; use crate::models::user::{User, UserRole}; +use crate::services::email; use crate::util::token; /// Magic link token expiry: 15 minutes. @@ -51,6 +52,32 @@ pub async fn create_magic_link(pool: &PgPool, email: &str) -> Result Result<(), AppError> { + let Some(raw_token) = create_magic_link(pool, to_email).await? else { + return Ok(()); + }; + + if let Err(e) = + email::send_magic_link(http_client, resend_api_key, email_from, to_email, app_url, &raw_token).await + { + let token_hash = token::hash_token(&raw_token); + db::magic_links::delete_by_hash(pool, &token_hash).await.ok(); + return Err(e); + } + Ok(()) +} + /// Verify a magic link token and return the associated email. /// /// The token is consumed atomically (marked as used) to enforce single-use. diff --git a/backend/src/services/scraper.rs b/backend/src/services/scraper.rs index 814408f..bb0b721 100644 --- a/backend/src/services/scraper.rs +++ b/backend/src/services/scraper.rs @@ -146,16 +146,20 @@ pub async fn scrape_url( let final_url = response.url().clone(); // Read body with streaming size limit — bail early to avoid OOM on huge responses. - // Check Content-Length first (fast path), then enforce while streaming chunks. - if let Some(content_length) = response.content_length() { - if content_length as usize > MAX_BODY_SIZE { + // Check Content-Length first (fast reject + pre-allocation), then enforce per chunk. + let content_length = response.content_length(); + if let Some(len) = content_length { + if len as usize > MAX_BODY_SIZE { return Err(AppError::BadRequest( "Response body exceeds 5 MB limit".into(), )); } } - let mut bytes = Vec::new(); + let mut bytes = match content_length { + Some(len) => Vec::with_capacity(len as usize), + None => Vec::new(), + }; while let Some(chunk) = response .chunk() .await diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 6a809d1..cd9ed1f 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -383,38 +383,44 @@ fn emit_progress(tx: &watch::Sender, step: &str, message: &str, p /// Returns `None` if the user has no rate limit overrides, in which case the /// global provider rate limiter should be used instead. /// -/// The limiter is stored in `state.user_rate_limiters` keyed by `user_id` so that -/// rate limit history persists across generation jobs. If the user's settings have -/// changed since the limiter was created, a fresh limiter replaces the old one. +/// Uses DashMap's entry API for atomic check-and-insert, preventing concurrent +/// generation jobs from creating independent limiters for the same user. fn get_user_rate_limiter( state: &AppState, settings: &UserSettings, user_id: Uuid, ) -> Option { + use crate::app_state::UserRateLimitEntry; + match ( settings.rate_limit_max_requests, settings.rate_limit_time_window_seconds, ) { (Some(max_req), Some(window_sec)) => { - // Reuse existing limiter if settings haven't changed - if let Some(entry) = state.user_rate_limiters.get(&user_id) { - let (stored_max, stored_window, ref limiter) = *entry; - if stored_max == max_req && stored_window == window_sec { - return Some(limiter.clone()); + let mut entry = state.user_rate_limiters.entry(user_id).or_insert_with(|| { + UserRateLimitEntry { + max_requests: max_req, + window_seconds: window_sec, + limiter: crate::services::rate_limiter::RateLimiter::new( + max_req as usize, + Duration::from_secs(window_sec as u64), + ), } + }); + // Replace if user's settings changed since the limiter was created + if entry.max_requests != max_req || entry.window_seconds != window_sec { + *entry = UserRateLimitEntry { + max_requests: max_req, + window_seconds: window_sec, + limiter: crate::services::rate_limiter::RateLimiter::new( + max_req as usize, + Duration::from_secs(window_sec as u64), + ), + }; } - // Settings changed or first generation — create and store - let limiter = crate::services::rate_limiter::RateLimiter::new( - max_req as usize, - Duration::from_secs(window_sec as u64), - ); - state - .user_rate_limiters - .insert(user_id, (max_req, window_sec, limiter.clone())); - Some(limiter) + Some(entry.limiter.clone()) } _ => { - // User cleared their overrides — remove stale limiter state.user_rate_limiters.remove(&user_id); None }