fix: atomic job creation, 15min timeout, and panic handling

- Replace iterating DashMap check with atomic DashSet insert in create_job to
  eliminate the race condition where double-click could create two concurrent
  jobs for the same user
- Add release_user method called at end of generation task (normal, timeout,
  and panic paths) so the generating slot is always freed
- Wrap run_generation in tokio::time::timeout(900s) to prevent hung LLM calls
  from blocking the generation slot forever
- Spawn a second task to await the JoinHandle and call release_user + send
  error event if the generation task panics, preventing SSE clients from
  hanging indefinitely
- Update cleanup_expired to also remove users from generating_users set
- Update tests to call release_user after completion/error to match new contract

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 59932589cc
commit 347558a278

@ -4,6 +4,7 @@
//! - `GET /api/v1/syntheses/generate/:job_id/progress` — SSE progress stream
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Path, State};
@ -70,8 +71,31 @@ pub async fn trigger_generate(
// Spawn the generation pipeline as a background task
let state_clone = state.clone();
let user_id = auth_user.id;
let tx_for_panic = Arc::clone(&tx);
let state_for_panic = state.clone();
let join_handle = tokio::spawn(async move {
let timeout_duration = std::time::Duration::from_secs(900);
match tokio::time::timeout(timeout_duration, synthesis::run_generation(job_id, state_clone.clone(), user_id, tx.clone())).await {
Ok(()) => {}
Err(_) => {
tracing::error!(job_id = %job_id, user_id = %user_id, "Generation timed out after 15 minutes");
let _ = tx.send(synthesis::ProgressEvent::Error {
message: "La generation a depasse le delai maximum de 15 minutes.".into(),
});
}
}
state_clone.job_store.release_user(user_id);
});
tokio::spawn(async move {
synthesis::run_generation(job_id, state_clone, user_id, tx).await;
if let Err(e) = join_handle.await {
tracing::error!(job_id = %job_id, error = %e, "Generation task panicked");
let _ = tx_for_panic.send(synthesis::ProgressEvent::Error {
message: "Erreur interne lors de la generation.".into(),
});
state_for_panic.job_store.release_user(user_id);
}
});
Ok((

@ -13,6 +13,7 @@ use std::time::{Duration, Instant};
use chrono::Utc;
use dashmap::DashMap;
use dashmap::DashSet;
use serde::Serialize;
use tokio::sync::watch;
use uuid::Uuid;
@ -82,6 +83,7 @@ struct JobEntry {
#[derive(Clone)]
pub struct JobStore {
inner: Arc<DashMap<Uuid, JobEntry>>,
generating_users: Arc<DashSet<Uuid>>,
}
/// Jobs expire after 1 hour (allows SSE reconnection).
@ -98,49 +100,28 @@ impl JobStore {
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
generating_users: Arc::new(DashSet::new()),
}
}
/// Create a new job for a user, returning the job ID and the watch Sender.
///
/// Returns `None` if the user already has an active job.
/// Uses an atomic DashSet insert to prevent race conditions on double-click.
pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc<watch::Sender<ProgressEvent>>)> {
// Check if user already has an active job
for entry in self.inner.iter() {
if entry.value().user_id == user_id {
// Check if the job is still active (not completed/failed)
let current = entry.value().tx.borrow().clone();
match current {
ProgressEvent::Complete { .. } | ProgressEvent::Error { .. } => {
// Job finished, allow creating a new one
continue;
}
ProgressEvent::Progress { .. } => {
return None; // Active job exists
}
}
}
if !self.generating_users.insert(user_id) {
return None;
}
let job_id = Uuid::new_v4();
let (tx, rx) = watch::channel(ProgressEvent::Progress {
step: "init".into(),
message: "Initialisation...".into(),
percent: 0,
});
let tx = Arc::new(tx);
self.inner.insert(
job_id,
JobEntry {
tx: Arc::clone(&tx),
_rx: rx,
user_id,
created_at: Instant::now(),
},
);
self.inner.insert(job_id, JobEntry {
tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(),
});
Some((job_id, tx))
}
@ -157,22 +138,25 @@ impl JobStore {
/// Check if a user has an active (in-progress) job.
pub fn has_active_job(&self, user_id: Uuid) -> Option<Uuid> {
if !self.generating_users.contains(&user_id) { return None; }
for entry in self.inner.iter() {
if entry.value().user_id == user_id {
let current = entry.value().tx.borrow().clone();
if matches!(current, ProgressEvent::Progress { .. }) {
return Some(*entry.key());
}
}
if entry.value().user_id == user_id { return Some(*entry.key()); }
}
None
}
/// Release the generating lock for a user (called when job completes, errors, or times out).
pub fn release_user(&self, user_id: Uuid) {
self.generating_users.remove(&user_id);
}
/// Remove expired jobs (older than TTL).
pub fn cleanup_expired(&self) {
let now = Instant::now();
self.inner.retain(|_, entry| {
now.duration_since(entry.created_at) < JOB_TTL
let keep = now.duration_since(entry.created_at) < JOB_TTL;
if !keep { self.generating_users.remove(&entry.user_id); }
keep
});
}
@ -1371,11 +1355,12 @@ mod tests {
let (_job_id, tx) = store.create_job(user_id).unwrap();
// Complete the job
// Complete the job and release the user lock (as the pipeline does)
tx.send(ProgressEvent::Complete {
synthesis_id: Uuid::new_v4(),
})
.ok();
store.release_user(user_id);
// Should now allow a new job
let result2 = store.create_job(user_id);
@ -1389,11 +1374,12 @@ mod tests {
let (_job_id, tx) = store.create_job(user_id).unwrap();
// Fail the job
// Fail the job and release the user lock (as the pipeline does)
tx.send(ProgressEvent::Error {
message: "test error".into(),
})
.ok();
store.release_user(user_id);
// Should now allow a new job
let result2 = store.create_job(user_id);

Loading…
Cancel
Save