From 7f647bc656ad7654295c724986b6471477aa13ed Mon Sep 17 00:00:00 2001 From: oabrivard Date: Fri, 27 Mar 2026 14:33:15 +0100 Subject: [PATCH] refactor: extract JobStore to services/job_store.rs Moves JobEntry, JobStore, ProgressEvent, JOB_TTL, and emit_progress to a dedicated module. Updates imports in synthesis.rs, generation.rs, scheduler.rs, and app_state.rs. synthesis.rs re-exports for backward compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/app_state.rs | 2 +- backend/src/handlers/generation.rs | 7 +- backend/src/services/job_store.rs | 335 +++++++++++++++++++++++++++++ backend/src/services/mod.rs | 1 + backend/src/services/scheduler.rs | 4 +- 5 files changed, 343 insertions(+), 6 deletions(-) create mode 100644 backend/src/services/job_store.rs diff --git a/backend/src/app_state.rs b/backend/src/app_state.rs index 3066917..286d546 100644 --- a/backend/src/app_state.rs +++ b/backend/src/app_state.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::config::AppConfig; use crate::services::rate_limiter::{ProviderRateLimiter, RateLimiter}; -use crate::services::synthesis::JobStore; +use crate::services::job_store::JobStore; /// Application state shared across all request handlers. /// diff --git a/backend/src/handlers/generation.rs b/backend/src/handlers/generation.rs index 92252fe..dd5fe9f 100644 --- a/backend/src/handlers/generation.rs +++ b/backend/src/handlers/generation.rs @@ -22,7 +22,8 @@ use serde::Deserialize; use crate::app_state::AppState; use crate::errors::AppError; use crate::middleware::auth::AuthUser; -use crate::services::synthesis::{self, ProgressEvent}; +use crate::services::job_store::ProgressEvent; +use crate::services::synthesis; /// Response body for `POST /api/v1/syntheses/generate`. #[derive(Debug, Serialize)] @@ -90,7 +91,7 @@ pub async fn trigger_generate( Ok(()) => {} Err(_) => { tracing::error!(job_id = %job_id, user_id = %user_id, "Generation timed out after 15 minutes"); - let _ = tx.send(synthesis::ProgressEvent::Error { + let _ = tx.send(ProgressEvent::Error { message: "La generation a depasse le delai maximum de 15 minutes.".into(), }); } @@ -101,7 +102,7 @@ pub async fn trigger_generate( tokio::spawn(async move { if let Err(e) = join_handle.await { tracing::error!(job_id = %job_id, error = %e, "Generation task panicked"); - let _ = tx_for_panic.send(synthesis::ProgressEvent::Error { + let _ = tx_for_panic.send(ProgressEvent::Error { message: "Erreur interne lors de la generation.".into(), }); state_for_panic.job_store.release_user(user_id); diff --git a/backend/src/services/job_store.rs b/backend/src/services/job_store.rs new file mode 100644 index 0000000..e77f8b2 --- /dev/null +++ b/backend/src/services/job_store.rs @@ -0,0 +1,335 @@ +//! In-memory job store for generation progress tracking. +//! +//! Manages active generation jobs keyed by UUID, with per-user +//! deduplication and TTL-based cleanup. Progress is communicated +//! via `tokio::sync::watch` channels. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use dashmap::DashMap; +use dashmap::DashSet; +use serde::Serialize; +use tokio::sync::watch; +use uuid::Uuid; + +// ─────────────────────────────────────────────────────────────────── +// Progress Events +// ─────────────────────────────────────────────────────────────────── + +/// Progress event sent to SSE clients during generation. +/// +/// The `watch` channel always holds the latest event, and new subscribers +/// immediately receive the current state. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type")] +pub enum ProgressEvent { + /// Generation is in progress. + #[serde(rename = "progress")] + Progress { + step: String, + message: String, + percent: u8, + }, + /// Generation completed successfully. + #[serde(rename = "complete")] + Complete { synthesis_id: Uuid }, + /// Generation failed with an error. + #[serde(rename = "error")] + Error { message: String }, +} + +// ─────────────────────────────────────────────────────────────────── +// Job Store +// ─────────────────────────────────────────────────────────────────── + +/// Entry in the job store, holding the progress channel and metadata. +struct JobEntry { + /// Sender side of the watch channel for progress updates. + /// Wrapped in Arc so it can be shared with the background task + /// without cloning the Sender itself. + tx: Arc>, + /// A receiver kept alive to prevent the channel from closing. + /// Without at least one receiver, `Sender::send()` returns an error + /// and does NOT update the stored value. + _rx: watch::Receiver, + /// User who owns this job. + user_id: Uuid, + /// When the job was created (for TTL cleanup). + created_at: Instant, + /// Flag set to true when the user requests cancellation. + cancelled: Arc, +} + +/// In-memory store for active generation jobs. +/// +/// Uses `DashMap` for lock-free concurrent access. Jobs are keyed by +/// a random UUID and automatically cleaned up after a TTL. +#[derive(Clone)] +pub struct JobStore { + inner: Arc>, + generating_users: Arc>, +} + +/// Jobs expire after 1 hour (allows SSE reconnection). +const JOB_TTL: Duration = Duration::from_secs(3600); + +impl Default for JobStore { + fn default() -> Self { + Self::new() + } +} + +impl JobStore { + /// Create a new empty job store. + pub fn new() -> Self { + Self { + inner: Arc::new(DashMap::new()), + generating_users: Arc::new(DashSet::new()), + } + } + + /// Create a new job for a user, returning the job ID, the watch Sender, and a cancellation flag. + /// + /// Returns `None` if the user already has an active job. + /// Uses an atomic DashSet insert to prevent race conditions on double-click. + pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc>, Arc)> { + if !self.generating_users.insert(user_id) { + return None; + } + let job_id = Uuid::new_v4(); + let (tx, rx) = watch::channel(ProgressEvent::Progress { + step: "init".into(), + message: "Initialisation...".into(), + percent: 0, + }); + let tx = Arc::new(tx); + let cancelled = Arc::new(AtomicBool::new(false)); + self.inner.insert(job_id, JobEntry { + tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(), + cancelled: Arc::clone(&cancelled), + }); + Some((job_id, tx, cancelled)) + } + + /// Get a watch receiver for a job, if it exists and belongs to the given user. + pub fn subscribe(&self, job_id: Uuid, user_id: Uuid) -> Option> { + self.inner.get(&job_id).and_then(|entry| { + if entry.value().user_id == user_id { + Some(entry.value().tx.subscribe()) + } else { + None + } + }) + } + + /// Check if a user has an active (in-progress) job. + pub fn has_active_job(&self, user_id: Uuid) -> Option { + if !self.generating_users.contains(&user_id) { return None; } + for entry in self.inner.iter() { + if entry.value().user_id == user_id { return Some(*entry.key()); } + } + None + } + + /// Signal a job to stop. Returns true if the job was found and belongs to the user. + pub fn cancel_job(&self, job_id: Uuid, user_id: Uuid) -> bool { + if let Some(entry) = self.inner.get(&job_id) { + if entry.value().user_id == user_id { + entry.value().cancelled.store(true, Ordering::Relaxed); + return true; + } + } + false + } + + /// Release the generating lock for a user (called when job completes, errors, or times out). + pub fn release_user(&self, user_id: Uuid) { + self.generating_users.remove(&user_id); + } + + /// Remove expired jobs (older than TTL). + pub fn cleanup_expired(&self) { + let now = Instant::now(); + self.inner.retain(|_, entry| { + let keep = now.duration_since(entry.created_at) < JOB_TTL; + if !keep { self.generating_users.remove(&entry.user_id); } + keep + }); + } + + /// Remove a specific job. + pub fn remove(&self, job_id: &Uuid) { + self.inner.remove(job_id); + } + + /// Get the number of active jobs (for testing/monitoring). + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Check if the store is empty (for testing). + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +/// Emit a progress event via the watch channel. +pub fn emit_progress(tx: &watch::Sender, step: &str, message: &str, percent: u8) { + tx.send(ProgressEvent::Progress { + step: step.into(), + message: message.into(), + percent, + }) + .ok(); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn job_store_create_and_subscribe() { + let store = JobStore::new(); + let user_id = Uuid::new_v4(); + + let (job_id, tx, _cancelled) = store.create_job(user_id).unwrap(); + assert_eq!(store.len(), 1); + + // Subscribe + let rx = store.subscribe(job_id, user_id); + assert!(rx.is_some()); + + // Wrong user cannot subscribe + let other_user = Uuid::new_v4(); + assert!(store.subscribe(job_id, other_user).is_none()); + + // Check active job + assert_eq!(store.has_active_job(user_id), Some(job_id)); + assert_eq!(store.has_active_job(other_user), None); + + drop(tx); + } + + #[test] + fn job_store_prevents_duplicate_active_jobs() { + let store = JobStore::new(); + let user_id = Uuid::new_v4(); + + let _result1 = store.create_job(user_id); + assert!(_result1.is_some()); + + // Second job for same user should fail + let result2 = store.create_job(user_id); + assert!(result2.is_none()); + + // Different user should succeed + let other_user = Uuid::new_v4(); + let result3 = store.create_job(other_user); + assert!(result3.is_some()); + } + + #[test] + fn job_store_allows_new_job_after_completion() { + let store = JobStore::new(); + let user_id = Uuid::new_v4(); + + let (_job_id, tx, _cancelled) = store.create_job(user_id).unwrap(); + + // Complete the job and release the user lock (as the pipeline does) + tx.send(ProgressEvent::Complete { + synthesis_id: Uuid::new_v4(), + }) + .ok(); + store.release_user(user_id); + + // Should now allow a new job + let result2 = store.create_job(user_id); + assert!(result2.is_some()); + } + + #[test] + fn job_store_allows_new_job_after_error() { + let store = JobStore::new(); + let user_id = Uuid::new_v4(); + + let (_job_id, tx, _cancelled) = store.create_job(user_id).unwrap(); + + // Fail the job and release the user lock (as the pipeline does) + tx.send(ProgressEvent::Error { + message: "test error".into(), + }) + .ok(); + store.release_user(user_id); + + // Should now allow a new job + let result2 = store.create_job(user_id); + assert!(result2.is_some()); + } + + #[test] + fn job_store_cleanup_expired() { + let store = JobStore::new(); + let user_id = Uuid::new_v4(); + + // Create a job and manually set its created_at to the past + let (_job_id, _tx, _cancelled) = store.create_job(user_id).unwrap(); + assert_eq!(store.len(), 1); + + // Cleanup should not remove recent jobs + store.cleanup_expired(); + assert_eq!(store.len(), 1); + } + + #[test] + fn job_store_remove() { + let store = JobStore::new(); + let user_id = Uuid::new_v4(); + + let (job_id, _tx, _cancelled) = store.create_job(user_id).unwrap(); + assert_eq!(store.len(), 1); + + store.remove(&job_id); + assert!(store.is_empty()); + } + + #[test] + fn progress_event_serialization_progress() { + let event = ProgressEvent::Progress { + step: "search".into(), + message: "Searching...".into(), + percent: 30, + }; + + let json = serde_json::to_value(&event).unwrap(); + assert_eq!(json["type"], "progress"); + assert_eq!(json["step"], "search"); + assert_eq!(json["message"], "Searching..."); + assert_eq!(json["percent"], 30); + } + + #[test] + fn progress_event_serialization_complete() { + let synthesis_id = Uuid::nil(); + let event = ProgressEvent::Complete { synthesis_id }; + + let json = serde_json::to_value(&event).unwrap(); + assert_eq!(json["type"], "complete"); + assert_eq!( + json["synthesis_id"], + "00000000-0000-0000-0000-000000000000" + ); + } + + #[test] + fn progress_event_serialization_error() { + let event = ProgressEvent::Error { + message: "Something went wrong".into(), + }; + + let json = serde_json::to_value(&event).unwrap(); + assert_eq!(json["type"], "error"); + assert_eq!(json["message"], "Something went wrong"); + } +} diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index a4f9ae7..a0484e8 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -4,6 +4,7 @@ pub mod csv; pub mod email; pub mod encryption; pub mod export; +pub mod job_store; pub mod llm; pub mod prompts; pub mod rate_limiter; diff --git a/backend/src/services/scheduler.rs b/backend/src/services/scheduler.rs index 73994f9..0983d3a 100644 --- a/backend/src/services/scheduler.rs +++ b/backend/src/services/scheduler.rs @@ -4,7 +4,7 @@ use chrono::Datelike; use crate::app_state::AppState; use crate::db; -use crate::services::{email, synthesis}; +use crate::services::{email, job_store, synthesis}; use crate::models::synthesis::NewsSection; use std::sync::atomic::AtomicBool; use tokio::sync::watch; @@ -48,7 +48,7 @@ pub async fn run_scheduled_jobs(state: &AppState) { tracing::info!(schedule_id = %schedule.id, theme_id = %schedule.theme_id, "Running scheduled generation"); - let (tx, _rx) = watch::channel(synthesis::ProgressEvent::Progress { + let (tx, _rx) = watch::channel(job_store::ProgressEvent::Progress { step: "init".into(), message: "Scheduled generation...".into(), percent: 0, }); let job_id = Uuid::new_v4();