//! Synthesis generation pipeline and job management. //! //! Orchestrates the two-pass LLM pipeline: //! 1. Search pass: LLM generates initial articles via web search //! 2. Scrape: Validate and fetch content from source URLs //! 3. Rewrite pass: LLM rewrites titles/summaries using scraped content //! //! Progress is reported via `tokio::sync::watch` channels per job, //! consumed by SSE endpoints for real-time client updates. use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use chrono::Utc; use dashmap::DashMap; use serde::Serialize; use tokio::sync::watch; use uuid::Uuid; use url::Url; use crate::app_state::AppState; use crate::db; use crate::errors::AppError; use crate::models::settings::UserSettings; use crate::models::synthesis::{ get_iso_week_string, NewsItem, NewsSection, ScrapedNewsItem, }; use crate::services::encryption; use crate::services::llm::factory::create_provider; use crate::services::llm::schema::build_category_schema; use crate::services::prompts; use crate::services::scraper; // ─────────────────────────────────────────────────────────────────── // Progress Events // ─────────────────────────────────────────────────────────────────── /// Progress event sent to SSE clients during generation. /// /// The `watch` channel always holds the latest event, and new subscribers /// immediately receive the current state. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] pub enum ProgressEvent { /// Generation is in progress. #[serde(rename = "progress")] Progress { step: String, message: String, percent: u8, }, /// Generation completed successfully. #[serde(rename = "complete")] Complete { synthesis_id: Uuid }, /// Generation failed with an error. #[serde(rename = "error")] Error { message: String }, } // ─────────────────────────────────────────────────────────────────── // Job Store // ─────────────────────────────────────────────────────────────────── /// Entry in the job store, holding the progress channel and metadata. struct JobEntry { /// Sender side of the watch channel for progress updates. /// Wrapped in Arc so it can be shared with the background task /// without cloning the Sender itself. tx: Arc>, /// A receiver kept alive to prevent the channel from closing. /// Without at least one receiver, `Sender::send()` returns an error /// and does NOT update the stored value. _rx: watch::Receiver, /// User who owns this job. user_id: Uuid, /// When the job was created (for TTL cleanup). created_at: Instant, } /// In-memory store for active generation jobs. /// /// Uses `DashMap` for lock-free concurrent access. Jobs are keyed by /// a random UUID and automatically cleaned up after a TTL. #[derive(Clone)] pub struct JobStore { inner: Arc>, } /// Jobs expire after 1 hour (allows SSE reconnection). const JOB_TTL: Duration = Duration::from_secs(3600); impl JobStore { /// Create a new empty job store. pub fn new() -> Self { Self { inner: Arc::new(DashMap::new()), } } /// Create a new job for a user, returning the job ID and the watch Sender. /// /// Returns `None` if the user already has an active job. pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc>)> { // Check if user already has an active job for entry in self.inner.iter() { if entry.value().user_id == user_id { // Check if the job is still active (not completed/failed) let current = entry.value().tx.borrow().clone(); match current { ProgressEvent::Complete { .. } | ProgressEvent::Error { .. } => { // Job finished, allow creating a new one continue; } ProgressEvent::Progress { .. } => { return None; // Active job exists } } } } let job_id = Uuid::new_v4(); let (tx, rx) = watch::channel(ProgressEvent::Progress { step: "init".into(), message: "Initialisation...".into(), percent: 0, }); let tx = Arc::new(tx); self.inner.insert( job_id, JobEntry { tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(), }, ); Some((job_id, tx)) } /// Get a watch receiver for a job, if it exists and belongs to the given user. pub fn subscribe(&self, job_id: Uuid, user_id: Uuid) -> Option> { self.inner.get(&job_id).and_then(|entry| { if entry.value().user_id == user_id { Some(entry.value().tx.subscribe()) } else { None } }) } /// Check if a user has an active (in-progress) job. pub fn has_active_job(&self, user_id: Uuid) -> Option { for entry in self.inner.iter() { if entry.value().user_id == user_id { let current = entry.value().tx.borrow().clone(); if matches!(current, ProgressEvent::Progress { .. }) { return Some(*entry.key()); } } } None } /// Remove expired jobs (older than TTL). pub fn cleanup_expired(&self) { let now = Instant::now(); self.inner.retain(|_, entry| { now.duration_since(entry.created_at) < JOB_TTL }); } /// Remove a specific job. pub fn remove(&self, job_id: &Uuid) { self.inner.remove(job_id); } /// Get the number of active jobs (for testing/monitoring). pub fn len(&self) -> usize { self.inner.len() } /// Check if the store is empty (for testing). pub fn is_empty(&self) -> bool { self.inner.is_empty() } } // ─────────────────────────────────────────────────────────────────── // Generation Pipeline // ─────────────────────────────────────────────────────────────────── /// Run the full two-pass generation pipeline for a user. /// /// This is the core orchestration function. It is spawned as a background /// tokio task and communicates progress via the `watch` channel. /// /// # Steps /// 1. Load user settings /// 2. Load user sources /// 3. Resolve provider + decrypt API key /// 4. Build schema from categories /// 5. Rate limit check (pass 1) /// 6. LLM search pass /// 7. Parse structured output /// 8. Validate/scrape URLs (parallel, bounded concurrency) /// 9. Rate limit check (pass 2) /// 10. LLM rewrite pass /// 11. Parse final output /// 12. Save synthesis to DB /// 13. Complete pub async fn run_generation( job_id: Uuid, state: AppState, user_id: Uuid, tx: Arc>, ) { let result = run_generation_inner(job_id, &state, user_id, &tx).await; match result { Ok(synthesis_id) => { tx.send(ProgressEvent::Complete { synthesis_id }).ok(); tracing::info!(job_id = %job_id, synthesis_id = %synthesis_id, "Generation completed"); } Err(e) => { tracing::error!(job_id = %job_id, error = %e, "Generation failed"); // Sanitize error message — never expose API keys or internal details let safe_message = sanitize_error_message(&e.to_string()); tx.send(ProgressEvent::Error { message: safe_message, }) .ok(); } } // Keep the job in the store for 5 minutes after completion // to allow SSE reconnection let store = state.job_store.clone(); let jid = job_id; tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(300)).await; store.remove(&jid); }); } /// Inner implementation of the generation pipeline, returning a Result. async fn run_generation_inner( _job_id: Uuid, state: &AppState, user_id: Uuid, tx: &watch::Sender, ) -> Result { // Step 1: Load user settings emit_progress(tx, "settings", "Chargement des parametres...", 5); let settings = db::settings::get_or_create_default(&state.pool, user_id).await?; if settings.categories.is_empty() { return Err(AppError::BadRequest( "Aucune categorie configuree. Veuillez configurer vos parametres.".into(), )); } // Step 2: Load user sources emit_progress(tx, "sources", "Chargement des sources...", 10); let sources = db::sources::list_for_user(&state.pool, user_id).await?; // Step 3: Resolve provider + decrypt API key emit_progress(tx, "provider", "Configuration du fournisseur IA...", 15); let (provider_name, api_key) = resolve_provider_and_key(state, user_id, &settings).await?; let provider = create_provider(&provider_name, api_key)?; // Step 4: Build schema from categories let schema = build_category_schema(&settings.categories, settings.max_items_per_category); // Step 4b: Resolve models — user overrides take priority over admin config let model_research = if !settings.ai_model.is_empty() { settings.ai_model.clone() } else { resolve_model(state, &provider_name).await? }; let model_writing = if !settings.ai_model_writing.is_empty() { settings.ai_model_writing.clone() } else { model_research.clone() }; // Look up or create per-user rate limiter from AppState so limits persist across jobs. let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id); // Step 5: Rate limit check (pass 1) check_rate_limit(state, &user_rate_limiter, &provider_name)?; // Step 6: LLM search pass emit_progress(tx, "search", "Recherche d'actualites en cours...", 30); let current_date = Utc::now() .format("%A %d %B %Y") .to_string(); let (system_prompt, user_prompt) = prompts::build_search_prompt(&settings, &sources, ¤t_date); let raw_results = provider .generate_search_pass(&model_research, &system_prompt, &user_prompt, &schema) .await?; // Step 7: Parse structured output into (category_key, Vec) emit_progress(tx, "parsing", "Analyse des resultats...", 40); let parsed = parse_llm_output(&raw_results, &settings.categories)?; // Step 7b: Filter out homepage URLs (path == "/" or empty) let parsed = filter_homepage_urls(parsed); // Step 8: Adaptive pipeline — decide whether to scrape+rewrite or use search results directly // // If the provider supports native web search and the search pass produced high-quality // results (>70% valid URLs starting with http), we can skip the expensive scrape+rewrite // pass and use the search results directly. let final_sections = if provider.supports_web_search() && url_quality_sufficient(&parsed) { tracing::info!( provider = provider.provider_id(), "Search pass URL quality sufficient, skipping scrape+rewrite pass" ); emit_progress( tx, "finalizing", "Resultats de recherche de bonne qualite, finalisation directe...", 85, ); build_final_sections(&raw_results, &settings.categories)? } else { // Full pipeline: scrape + rewrite emit_progress(tx, "scraping", "Verification des sources...", 45); let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await; // Rate limit check (pass 2) check_rate_limit(state, &user_rate_limiter, &provider_name)?; // LLM rewrite pass emit_progress(tx, "rewrite", "Redaction des resumes...", 80); let (rewrite_system, rewrite_user) = prompts::build_rewrite_prompt(&scraped); let final_results = provider .generate_rewrite_pass(&model_writing, &rewrite_system, &rewrite_user, &schema) .await?; emit_progress(tx, "finalizing", "Finalisation...", 90); build_final_sections(&final_results, &settings.categories)? }; // Step 12: Save synthesis to DB emit_progress(tx, "saving", "Sauvegarde de la synthese...", 95); let week = get_iso_week_string(Utc::now().date_naive()); let sections_json = serde_json::to_value(&final_sections).map_err(|e| { AppError::Internal(anyhow::anyhow!("Failed to serialize sections: {}", e)) })?; // Strip \u0000 null bytes — LLM output occasionally contains them and // PostgreSQL rejects them in JSONB columns. let sections_json = sanitize_json_null_bytes(sections_json); let synthesis = db::syntheses::create(&state.pool, user_id, &week, §ions_json).await?; Ok(synthesis.id) } // ─────────────────────────────────────────────────────────────────── // Helper Functions // ─────────────────────────────────────────────────────────────────── /// Recursively strip `\u0000` null bytes from JSON values. /// /// PostgreSQL rejects null bytes in JSONB text. LLM output occasionally /// contains them (e.g., `Meta AI a annonc\u0000...`). fn sanitize_json_null_bytes(value: serde_json::Value) -> serde_json::Value { match value { serde_json::Value::String(s) => serde_json::Value::String(s.replace('\0', "")), serde_json::Value::Array(arr) => { serde_json::Value::Array(arr.into_iter().map(sanitize_json_null_bytes).collect()) } serde_json::Value::Object(map) => serde_json::Value::Object( map.into_iter() .map(|(k, v)| (k, sanitize_json_null_bytes(v))) .collect(), ), other => other, } } /// Emit a progress event via the watch channel. fn emit_progress(tx: &watch::Sender, step: &str, message: &str, percent: u8) { tx.send(ProgressEvent::Progress { step: step.into(), message: message.into(), percent, }) .ok(); } /// Look up or create a per-user rate limiter stored in AppState. /// /// Returns `None` if the user has no rate limit overrides, in which case the /// global provider rate limiter should be used instead. /// /// Uses DashMap's entry API for atomic check-and-insert, preventing concurrent /// generation jobs from creating independent limiters for the same user. fn get_user_rate_limiter( state: &AppState, settings: &UserSettings, user_id: Uuid, ) -> Option { use crate::app_state::UserRateLimitEntry; match ( settings.rate_limit_max_requests, settings.rate_limit_time_window_seconds, ) { (Some(max_req), Some(window_sec)) => { let mut entry = state .user_rate_limiters .entry(user_id) .or_insert_with(|| UserRateLimitEntry::new(max_req, window_sec)); // Replace if user's settings changed since the limiter was created if entry.settings_changed(max_req, window_sec) { *entry = UserRateLimitEntry::new(max_req, window_sec); } Some(entry.limiter.clone()) } _ => { state.user_rate_limiters.remove(&user_id); None } } } /// Check rate limits using the user's limiter if provided, otherwise the global limiter. fn check_rate_limit( state: &AppState, user_limiter: &Option, provider_name: &str, ) -> Result<(), AppError> { let allowed = match user_limiter { Some(limiter) => limiter.check(&format!("user_gen_{}", provider_name)), None => state.provider_rate_limiter.check(provider_name), }; if !allowed { return Err(AppError::RateLimited( "Limite de requetes atteinte. Veuillez reessayer dans quelques instants.".into(), )); } Ok(()) } /// Filter out articles whose URL is a homepage (path is "/" or empty). /// /// Homepage URLs are typically not useful as article sources and indicate /// the LLM returned a domain root rather than a specific article. fn filter_homepage_urls( parsed: Vec<(String, Vec)>, ) -> Vec<(String, Vec)> { let mut total_filtered = 0usize; let result: Vec<(String, Vec)> = parsed .into_iter() .map(|(cat_key, items)| { let filtered: Vec = items .into_iter() .filter(|item| { match Url::parse(&item.url) { Ok(parsed_url) => { let path = parsed_url.path(); if path == "/" || path.is_empty() { total_filtered += 1; false } else { true } } Err(_) => true, // Keep items with unparseable URLs (handled elsewhere) } }) .collect(); (cat_key, filtered) }) .collect(); if total_filtered > 0 { tracing::warn!( count = total_filtered, "Filtered out homepage URLs from search results" ); } result } /// Resolve the LLM provider and decrypt the user's API key. /// /// If the user has a preferred provider in settings, looks for a key matching /// that provider specifically. Otherwise falls back to the first available key. async fn resolve_provider_and_key( state: &AppState, user_id: Uuid, settings: &UserSettings, ) -> Result<(String, String), AppError> { let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?; // If the user has a preferred provider, look for that specific key if !settings.ai_provider.is_empty() { let key_record = db::api_keys::get_for_user_and_provider( &state.pool, user_id, &settings.ai_provider, ) .await?; match key_record { Some(record) => { let api_key = encryption::decrypt(&master_key, &record.encrypted_key, &record.nonce)?; return Ok((record.provider_name.clone(), api_key)); } None => { return Err(AppError::BadRequest(format!( "Aucune cle API configuree pour le fournisseur '{}'. \ Veuillez ajouter une cle API pour ce fournisseur dans vos parametres.", settings.ai_provider ))); } } } // Fall back to first available key let keys = db::api_keys::list_for_user(&state.pool, user_id).await?; if keys.is_empty() { return Err(AppError::BadRequest( "Aucune cle API configuree. Veuillez ajouter une cle API dans vos parametres.".into(), )); } let key_record = &keys[0]; let api_key = encryption::decrypt( &master_key, &key_record.encrypted_key, &key_record.nonce, )?; Ok((key_record.provider_name.clone(), api_key)) } /// Resolve the model to use for a given provider. /// /// Looks up the first enabled model for the provider from the admin config. /// Falls back to sensible defaults if no admin-configured models exist. async fn resolve_model(state: &AppState, provider_name: &str) -> Result { // Try to get the default model from the admin_providers JSONB models array let model = sqlx::query_scalar::<_, String>( r#" SELECT m->>'model_id' FROM admin_providers, jsonb_array_elements(models) AS m WHERE provider_name = $1 AND is_enabled = true AND (m->>'is_default')::boolean = true LIMIT 1 "#, ) .bind(provider_name) .fetch_optional(&state.pool) .await?; match model { Some(m) => Ok(m), None => { // Fall back to sensible defaults match provider_name { "gemini" => Ok("gemini-2.5-pro".into()), "openai" => Ok("gpt-4o".into()), "anthropic" => Ok("claude-sonnet-4-20250514".into()), _ => Err(AppError::BadRequest(format!( "Aucun modele configure pour le fournisseur '{}'", provider_name ))), } } } } /// Parse the LLM's structured JSON output into category-keyed news items. /// /// Expects the output to have keys like `category_0`, `category_1`, etc. /// Each key maps to an array of `{title, url, summary}` objects. fn parse_llm_output( raw: &serde_json::Value, categories: &[String], ) -> Result)>, AppError> { let mut result = Vec::new(); for (i, _cat) in categories.iter().enumerate() { let key = format!("category_{}", i); let items_val = raw.get(&key).cloned().unwrap_or(serde_json::json!([])); let items: Vec = serde_json::from_value(items_val).unwrap_or_default(); result.push((key, items)); } Ok(result) } /// Scrape articles in parallel with bounded concurrency. /// /// For each category, scrapes all article URLs. Failed scrapes are /// handled gracefully — the article is kept with empty scraped content /// rather than being discarded. async fn scrape_articles( state: &AppState, parsed: &[(String, Vec)], max_age_days: i64, tx: &watch::Sender, ) -> HashMap> { let mut result: HashMap> = HashMap::new(); // Collect all (category_key, item) pairs for parallel processing let mut tasks = Vec::new(); for (cat_key, items) in parsed { for item in items { tasks.push((cat_key.clone(), item.clone())); } } let total = tasks.len(); if total == 0 { return result; } // Use JoinSet for bounded concurrency (max 10 concurrent scrapes) let mut join_set = tokio::task::JoinSet::new(); let mut pending = tasks.into_iter().peekable(); let mut completed = 0usize; let spawn_task = |join_set: &mut tokio::task::JoinSet<_>, cat_key: String, item: NewsItem| { let client = state.http_client.clone(); let url = item.url.clone(); let mad = max_age_days; join_set.spawn(async move { let scraped = scrape_single_article(&client, &url, mad).await; (cat_key, item, scraped) }); }; // Seed the JoinSet with up to 10 initial tasks let max_concurrent = 10; for _ in 0..max_concurrent { if let Some((cat_key, item)) = pending.next() { spawn_task(&mut join_set, cat_key, item); } } // Process results and spawn new tasks as slots open while let Some(join_result) = join_set.join_next().await { completed += 1; // Update progress (45% to 75% range for scraping) let pct = 45 + ((completed as u32 * 30) / total as u32).min(30); emit_progress( tx, "scraping", &format!("Verification des sources ({}/{})...", completed, total), pct as u8, ); if let Ok((cat_key, item, (scraped_content, page_title))) = join_result { let scraped_item = ScrapedNewsItem { title: item.title, url: item.url, summary: item.summary, original_title: page_title, scraped_content, }; result .entry(cat_key) .or_default() .push(scraped_item); } // Spawn next task if available if let Some((cat_key, item)) = pending.next() { spawn_task(&mut join_set, cat_key, item); } } result } /// Scrape a single article URL, returning (body_text, page_title) or empty strings on failure. /// /// Handles all failure modes gracefully: /// - Network errors → empty content (article kept) /// - Soft 404 → article excluded (empty content) /// - Article too old → article excluded (empty content) async fn scrape_single_article( http_client: &reqwest::Client, url: &str, max_age_days: i64, ) -> (String, String) { match scraper::scrape_url(http_client, url).await { Ok(content) => { if !content.ok || content.is_soft_404 { tracing::warn!(url = url, "Soft 404 or error page detected, skipping content"); return (String::new(), String::new()); } if scraper::is_article_too_old(content.published_date, max_age_days) { tracing::warn!(url = url, "Article too old, skipping content"); return (String::new(), String::new()); } let title = content.title.unwrap_or_default(); (content.body_text, title) } Err(e) => { tracing::warn!(url = url, error = %e, "Failed to scrape URL, keeping article with empty content"); (String::new(), String::new()) } } } /// Build the final sections array from the LLM's rewrite output. /// /// Maps `category_N` keys back to the user's category names. fn build_final_sections( raw: &serde_json::Value, categories: &[String], ) -> Result, AppError> { let mut sections = Vec::new(); for (i, cat_name) in categories.iter().enumerate() { let key = format!("category_{}", i); let items_val = raw.get(&key).cloned().unwrap_or(serde_json::json!([])); let items: Vec = serde_json::from_value(items_val).unwrap_or_default(); sections.push(NewsSection { title: cat_name.clone(), items, }); } Ok(sections) } /// Minimum ratio of valid URLs (starting with `http`) required to skip the /// scrape+rewrite pass and use the search pass results directly. const URL_QUALITY_THRESHOLD: f64 = 0.70; /// Check whether the search pass produced sufficiently high-quality URLs. /// /// Returns `true` if more than 70% of the URLs across all categories start /// with `http` (indicating they are real web URLs rather than hallucinated /// or malformed references). /// /// If there are no articles at all, returns `false` to fall through to the /// full pipeline. fn url_quality_sufficient(parsed: &[(String, Vec)]) -> bool { let mut total = 0usize; let mut valid = 0usize; for (_cat_key, items) in parsed { for item in items { total += 1; if item.url.starts_with("http") { valid += 1; } } } if total == 0 { return false; } let ratio = valid as f64 / total as f64; tracing::debug!( total_urls = total, valid_urls = valid, ratio = ratio, threshold = URL_QUALITY_THRESHOLD, "URL quality check" ); ratio >= URL_QUALITY_THRESHOLD } /// Sanitize error messages to prevent leaking sensitive information. /// /// Removes potential API keys, internal paths, and other sensitive data. fn sanitize_error_message(msg: &str) -> String { // If the message contains common API key patterns, replace with generic message if msg.contains("API key") || msg.contains("api_key") || msg.contains("AIza") || msg.contains("sk-") || msg.contains("sk-ant-") || msg.contains("PERMISSION_DENIED") { return "Erreur d'authentification avec le fournisseur IA. Verifiez votre cle API.".into(); } if msg.contains("rate limit") || msg.contains("quota") || msg.contains("429") { return "Limite de requetes du fournisseur IA atteinte. Reessayez plus tard.".into(); } if msg.contains("Database") || msg.contains("sqlx") || msg.contains("postgres") { return "Erreur interne du serveur. Veuillez reessayer.".into(); } // For other errors, truncate and sanitize if msg.len() > 200 { format!("{}...", &msg[..200]) } else { msg.to_string() } } #[cfg(test)] mod tests { use super::*; // ── JobStore tests ─────────────────────────────────────────── #[test] fn job_store_create_and_subscribe() { let store = JobStore::new(); let user_id = Uuid::new_v4(); let (job_id, tx) = store.create_job(user_id).unwrap(); assert_eq!(store.len(), 1); // Subscribe let rx = store.subscribe(job_id, user_id); assert!(rx.is_some()); // Wrong user cannot subscribe let other_user = Uuid::new_v4(); assert!(store.subscribe(job_id, other_user).is_none()); // Check active job assert_eq!(store.has_active_job(user_id), Some(job_id)); assert_eq!(store.has_active_job(other_user), None); drop(tx); } #[test] fn job_store_prevents_duplicate_active_jobs() { let store = JobStore::new(); let user_id = Uuid::new_v4(); let result1 = store.create_job(user_id); assert!(result1.is_some()); // Second job for same user should fail let result2 = store.create_job(user_id); assert!(result2.is_none()); // Different user should succeed let other_user = Uuid::new_v4(); let result3 = store.create_job(other_user); assert!(result3.is_some()); } #[test] fn job_store_allows_new_job_after_completion() { let store = JobStore::new(); let user_id = Uuid::new_v4(); let (_job_id, tx) = store.create_job(user_id).unwrap(); // Complete the job tx.send(ProgressEvent::Complete { synthesis_id: Uuid::new_v4(), }) .ok(); // Should now allow a new job let result2 = store.create_job(user_id); assert!(result2.is_some()); } #[test] fn job_store_allows_new_job_after_error() { let store = JobStore::new(); let user_id = Uuid::new_v4(); let (_job_id, tx) = store.create_job(user_id).unwrap(); // Fail the job tx.send(ProgressEvent::Error { message: "test error".into(), }) .ok(); // Should now allow a new job let result2 = store.create_job(user_id); assert!(result2.is_some()); } #[test] fn job_store_cleanup_expired() { let store = JobStore::new(); let user_id = Uuid::new_v4(); // Create a job and manually set its created_at to the past let (_job_id, _tx) = store.create_job(user_id).unwrap(); assert_eq!(store.len(), 1); // Cleanup should not remove recent jobs store.cleanup_expired(); assert_eq!(store.len(), 1); } #[test] fn job_store_remove() { let store = JobStore::new(); let user_id = Uuid::new_v4(); let (job_id, _tx) = store.create_job(user_id).unwrap(); assert_eq!(store.len(), 1); store.remove(&job_id); assert!(store.is_empty()); } // ── ProgressEvent serialization tests ──────────────────────── #[test] fn progress_event_serialization_progress() { let event = ProgressEvent::Progress { step: "search".into(), message: "Searching...".into(), percent: 30, }; let json = serde_json::to_value(&event).unwrap(); assert_eq!(json["type"], "progress"); assert_eq!(json["step"], "search"); assert_eq!(json["message"], "Searching..."); assert_eq!(json["percent"], 30); } #[test] fn progress_event_serialization_complete() { let synthesis_id = Uuid::nil(); let event = ProgressEvent::Complete { synthesis_id }; let json = serde_json::to_value(&event).unwrap(); assert_eq!(json["type"], "complete"); assert_eq!( json["synthesis_id"], "00000000-0000-0000-0000-000000000000" ); } #[test] fn progress_event_serialization_error() { let event = ProgressEvent::Error { message: "Something went wrong".into(), }; let json = serde_json::to_value(&event).unwrap(); assert_eq!(json["type"], "error"); assert_eq!(json["message"], "Something went wrong"); } // ── parse_llm_output tests ─────────────────────────────────── #[test] fn parse_llm_output_valid() { let raw = serde_json::json!({ "category_0": [ {"title": "Art 1", "url": "https://a.com", "summary": "Sum 1"}, {"title": "Art 2", "url": "https://b.com", "summary": "Sum 2"} ], "category_1": [ {"title": "Art 3", "url": "https://c.com", "summary": "Sum 3"} ] }); let categories = vec!["AI News".into(), "Research".into()]; let result = parse_llm_output(&raw, &categories).unwrap(); assert_eq!(result.len(), 2); assert_eq!(result[0].0, "category_0"); assert_eq!(result[0].1.len(), 2); assert_eq!(result[1].0, "category_1"); assert_eq!(result[1].1.len(), 1); } #[test] fn parse_llm_output_missing_category() { let raw = serde_json::json!({ "category_0": [ {"title": "Art 1", "url": "https://a.com", "summary": "Sum 1"} ] // category_1 is missing }); let categories = vec!["AI News".into(), "Research".into()]; let result = parse_llm_output(&raw, &categories).unwrap(); assert_eq!(result.len(), 2); assert_eq!(result[0].1.len(), 1); assert_eq!(result[1].1.len(), 0); // Missing category → empty } // ── build_final_sections tests ─────────────────────────────── #[test] fn build_final_sections_maps_names() { let raw = serde_json::json!({ "category_0": [ {"title": "Art", "url": "https://a.com", "summary": "Sum"} ], "category_1": [] }); let categories = vec!["Annonces majeures".into(), "Recherche".into()]; let sections = build_final_sections(&raw, &categories).unwrap(); assert_eq!(sections.len(), 2); assert_eq!(sections[0].title, "Annonces majeures"); assert_eq!(sections[0].items.len(), 1); assert_eq!(sections[1].title, "Recherche"); assert_eq!(sections[1].items.len(), 0); } // ── sanitize_error_message tests ───────────────────────────── #[test] fn sanitize_hides_api_key_references() { let msg = "Invalid API key: AIzaSyB-test-key"; let sanitized = sanitize_error_message(msg); assert!(sanitized.contains("cle API")); assert!(!sanitized.contains("AIza")); } #[test] fn sanitize_hides_rate_limit_details() { let msg = "Resource exhausted: rate limit exceeded for project 12345"; let sanitized = sanitize_error_message(msg); assert!(sanitized.contains("Limite")); assert!(!sanitized.contains("12345")); } #[test] fn sanitize_hides_database_details() { let msg = "Database connection to postgres://user:pass@localhost failed"; let sanitized = sanitize_error_message(msg); assert!(sanitized.contains("Erreur interne")); assert!(!sanitized.contains("postgres")); } #[test] fn sanitize_truncates_long_messages() { let msg = "x".repeat(300); let sanitized = sanitize_error_message(&msg); assert!(sanitized.len() < 210); assert!(sanitized.ends_with("...")); } #[test] fn sanitize_passes_normal_messages() { let msg = "Generation failed due to network timeout"; let sanitized = sanitize_error_message(msg); assert_eq!(sanitized, msg); } // ── url_quality_sufficient tests ──────────────────────────── #[test] fn url_quality_all_valid_urls() { let parsed = vec![ ( "category_0".into(), vec![ NewsItem { title: "A".into(), url: "https://example.com/a".into(), summary: "Sum A".into(), }, NewsItem { title: "B".into(), url: "https://example.com/b".into(), summary: "Sum B".into(), }, ], ), ( "category_1".into(), vec![NewsItem { title: "C".into(), url: "http://example.org/c".into(), summary: "Sum C".into(), }], ), ]; // 3/3 = 100% valid -> true assert!(url_quality_sufficient(&parsed)); } #[test] fn url_quality_above_threshold() { // 8 valid out of 10 = 80% > 70% let mut items = Vec::new(); for i in 0..8 { items.push(NewsItem { title: format!("Art {}", i), url: format!("https://example.com/{}", i), summary: "Sum".into(), }); } for i in 8..10 { items.push(NewsItem { title: format!("Art {}", i), url: format!("bad-url-{}", i), summary: "Sum".into(), }); } let parsed = vec![("category_0".into(), items)]; assert!(url_quality_sufficient(&parsed)); } #[test] fn url_quality_exactly_at_threshold() { // 7 valid out of 10 = 70% >= 70% let mut items = Vec::new(); for i in 0..7 { items.push(NewsItem { title: format!("Art {}", i), url: format!("https://example.com/{}", i), summary: "Sum".into(), }); } for i in 7..10 { items.push(NewsItem { title: format!("Art {}", i), url: format!("bad-url-{}", i), summary: "Sum".into(), }); } let parsed = vec![("category_0".into(), items)]; assert!(url_quality_sufficient(&parsed)); } #[test] fn url_quality_below_threshold() { // 6 valid out of 10 = 60% < 70% let mut items = Vec::new(); for i in 0..6 { items.push(NewsItem { title: format!("Art {}", i), url: format!("https://example.com/{}", i), summary: "Sum".into(), }); } for i in 6..10 { items.push(NewsItem { title: format!("Art {}", i), url: format!("no-protocol-{}", i), summary: "Sum".into(), }); } let parsed = vec![("category_0".into(), items)]; assert!(!url_quality_sufficient(&parsed)); } #[test] fn url_quality_all_invalid_urls() { let parsed = vec![( "category_0".into(), vec![ NewsItem { title: "A".into(), url: "not-a-url".into(), summary: "Sum".into(), }, NewsItem { title: "B".into(), url: "also-not-a-url".into(), summary: "Sum".into(), }, ], )]; // 0/2 = 0% -> false assert!(!url_quality_sufficient(&parsed)); } #[test] fn url_quality_empty_articles() { let parsed: Vec<(String, Vec)> = vec![ ("category_0".into(), vec![]), ("category_1".into(), vec![]), ]; // No articles -> false (fall through to full pipeline) assert!(!url_quality_sufficient(&parsed)); } #[test] fn url_quality_empty_categories() { let parsed: Vec<(String, Vec)> = vec![]; assert!(!url_quality_sufficient(&parsed)); } // ── filter_homepage_urls tests ────────────────────────────── #[test] fn test_homepage_url_filtered() { let parsed = vec![( "category_0".into(), vec![ NewsItem { title: "Homepage".into(), url: "https://example.com/".into(), summary: "Sum".into(), }, NewsItem { title: "Homepage no slash".into(), url: "https://example.com".into(), summary: "Sum".into(), }, NewsItem { title: "Real article".into(), url: "https://example.com/article/123".into(), summary: "Sum".into(), }, ], )]; let result = filter_homepage_urls(parsed); assert_eq!(result[0].1.len(), 1); assert_eq!(result[0].1[0].title, "Real article"); } #[test] fn test_article_url_not_filtered() { let parsed = vec![( "category_0".into(), vec![ NewsItem { title: "Article 1".into(), url: "https://example.com/news/article-1".into(), summary: "Sum 1".into(), }, NewsItem { title: "Article 2".into(), url: "https://blog.example.org/2026/03/post".into(), summary: "Sum 2".into(), }, ], )]; let result = filter_homepage_urls(parsed); assert_eq!(result[0].1.len(), 2); } #[test] fn test_homepage_filter_keeps_unparseable_urls() { let parsed = vec![( "category_0".into(), vec![NewsItem { title: "Bad URL".into(), url: "not-a-url".into(), summary: "Sum".into(), }], )]; let result = filter_homepage_urls(parsed); assert_eq!(result[0].1.len(), 1); } #[test] fn sanitize_null_bytes_in_json_strings() { let json = serde_json::json!({ "title": "Hello\u{0000}World", "items": [{"summary": "Text\u{0000}with\u{0000}nulls"}] }); let sanitized = sanitize_json_null_bytes(json); assert_eq!(sanitized["title"], "HelloWorld"); assert_eq!(sanitized["items"][0]["summary"], "Textwithnulls"); } #[test] fn sanitize_preserves_clean_json() { let json = serde_json::json!({ "title": "Clean text", "count": 42, "active": true, "items": [{"url": "https://example.com"}] }); let sanitized = sanitize_json_null_bytes(json.clone()); assert_eq!(sanitized, json); } }