From 347558a278e65531f219e0278a2fea1c987da7c9 Mon Sep 17 00:00:00 2001 From: oabrivard Date: Thu, 26 Mar 2026 00:42:23 +0100 Subject: [PATCH] fix: atomic job creation, 15min timeout, and panic handling - Replace iterating DashMap check with atomic DashSet insert in create_job to eliminate the race condition where double-click could create two concurrent jobs for the same user - Add release_user method called at end of generation task (normal, timeout, and panic paths) so the generating slot is always freed - Wrap run_generation in tokio::time::timeout(900s) to prevent hung LLM calls from blocking the generation slot forever - Spawn a second task to await the JoinHandle and call release_user + send error event if the generation task panics, preventing SSE clients from hanging indefinitely - Update cleanup_expired to also remove users from generating_users set - Update tests to call release_user after completion/error to match new contract Co-Authored-By: Claude Sonnet 4.6 --- backend/src/handlers/generation.rs | 26 ++++++++++++- backend/src/services/synthesis.rs | 60 ++++++++++++------------------ 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/backend/src/handlers/generation.rs b/backend/src/handlers/generation.rs index da24105..9ba178a 100644 --- a/backend/src/handlers/generation.rs +++ b/backend/src/handlers/generation.rs @@ -4,6 +4,7 @@ //! - `GET /api/v1/syntheses/generate/:job_id/progress` — SSE progress stream use std::convert::Infallible; +use std::sync::Arc; use std::time::Duration; use axum::extract::{Path, State}; @@ -70,8 +71,31 @@ pub async fn trigger_generate( // Spawn the generation pipeline as a background task let state_clone = state.clone(); let user_id = auth_user.id; + let tx_for_panic = Arc::clone(&tx); + let state_for_panic = state.clone(); + + let join_handle = tokio::spawn(async move { + let timeout_duration = std::time::Duration::from_secs(900); + match tokio::time::timeout(timeout_duration, synthesis::run_generation(job_id, state_clone.clone(), user_id, tx.clone())).await { + Ok(()) => {} + Err(_) => { + tracing::error!(job_id = %job_id, user_id = %user_id, "Generation timed out after 15 minutes"); + let _ = tx.send(synthesis::ProgressEvent::Error { + message: "La generation a depasse le delai maximum de 15 minutes.".into(), + }); + } + } + state_clone.job_store.release_user(user_id); + }); + tokio::spawn(async move { - synthesis::run_generation(job_id, state_clone, user_id, tx).await; + if let Err(e) = join_handle.await { + tracing::error!(job_id = %job_id, error = %e, "Generation task panicked"); + let _ = tx_for_panic.send(synthesis::ProgressEvent::Error { + message: "Erreur interne lors de la generation.".into(), + }); + state_for_panic.job_store.release_user(user_id); + } }); Ok(( diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 59cd3fc..5d500ec 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -13,6 +13,7 @@ use std::time::{Duration, Instant}; use chrono::Utc; use dashmap::DashMap; +use dashmap::DashSet; use serde::Serialize; use tokio::sync::watch; use uuid::Uuid; @@ -82,6 +83,7 @@ struct JobEntry { #[derive(Clone)] pub struct JobStore { inner: Arc>, + generating_users: Arc>, } /// Jobs expire after 1 hour (allows SSE reconnection). @@ -98,49 +100,28 @@ impl JobStore { pub fn new() -> Self { Self { inner: Arc::new(DashMap::new()), + generating_users: Arc::new(DashSet::new()), } } /// Create a new job for a user, returning the job ID and the watch Sender. /// /// Returns `None` if the user already has an active job. + /// Uses an atomic DashSet insert to prevent race conditions on double-click. pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc>)> { - // Check if user already has an active job - for entry in self.inner.iter() { - if entry.value().user_id == user_id { - // Check if the job is still active (not completed/failed) - let current = entry.value().tx.borrow().clone(); - match current { - ProgressEvent::Complete { .. } | ProgressEvent::Error { .. } => { - // Job finished, allow creating a new one - continue; - } - ProgressEvent::Progress { .. } => { - return None; // Active job exists - } - } - } + if !self.generating_users.insert(user_id) { + return None; } - let job_id = Uuid::new_v4(); let (tx, rx) = watch::channel(ProgressEvent::Progress { step: "init".into(), message: "Initialisation...".into(), percent: 0, }); - let tx = Arc::new(tx); - - self.inner.insert( - job_id, - JobEntry { - tx: Arc::clone(&tx), - _rx: rx, - user_id, - created_at: Instant::now(), - }, - ); - + self.inner.insert(job_id, JobEntry { + tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(), + }); Some((job_id, tx)) } @@ -157,22 +138,25 @@ impl JobStore { /// Check if a user has an active (in-progress) job. pub fn has_active_job(&self, user_id: Uuid) -> Option { + if !self.generating_users.contains(&user_id) { return None; } for entry in self.inner.iter() { - if entry.value().user_id == user_id { - let current = entry.value().tx.borrow().clone(); - if matches!(current, ProgressEvent::Progress { .. }) { - return Some(*entry.key()); - } - } + if entry.value().user_id == user_id { return Some(*entry.key()); } } None } + /// Release the generating lock for a user (called when job completes, errors, or times out). + pub fn release_user(&self, user_id: Uuid) { + self.generating_users.remove(&user_id); + } + /// Remove expired jobs (older than TTL). pub fn cleanup_expired(&self) { let now = Instant::now(); self.inner.retain(|_, entry| { - now.duration_since(entry.created_at) < JOB_TTL + let keep = now.duration_since(entry.created_at) < JOB_TTL; + if !keep { self.generating_users.remove(&entry.user_id); } + keep }); } @@ -1371,11 +1355,12 @@ mod tests { let (_job_id, tx) = store.create_job(user_id).unwrap(); - // Complete the job + // Complete the job and release the user lock (as the pipeline does) tx.send(ProgressEvent::Complete { synthesis_id: Uuid::new_v4(), }) .ok(); + store.release_user(user_id); // Should now allow a new job let result2 = store.create_job(user_id); @@ -1389,11 +1374,12 @@ mod tests { let (_job_id, tx) = store.create_job(user_id).unwrap(); - // Fail the job + // Fail the job and release the user lock (as the pipeline does) tx.send(ProgressEvent::Error { message: "test error".into(), }) .ok(); + store.release_user(user_id); // Should now allow a new job let result2 = store.create_job(user_id);