From 1ab9c817e45b98feaf7eaa8754c69ae355b1a1e1 Mon Sep 17 00:00:00 2001 From: oabrivard Date: Fri, 27 Mar 2026 14:33:06 +0100 Subject: [PATCH] refactor: extract scrape_and_classify_batch from synthesis pipeline Replaces ~200 duplicated lines in Phase 1 (personalized sources) and Phase 2 (Brave Search) with a shared scrape_and_classify_batch function. Uses ScrapeClassifyCtx to bundle shared parameters. Also prepares synthesis.rs for JobStore extraction by re-exporting from job_store. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/services/synthesis.rs | 848 ++++++++---------------------- 1 file changed, 227 insertions(+), 621 deletions(-) diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index f771d4a..91bf664 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -1,4 +1,4 @@ -//! Synthesis generation pipeline and job management. +//! Synthesis generation pipeline. //! //! Orchestrates the two-phase pipeline: //! 1. Personalized sources: scrape user sources, classify+summarize per article @@ -10,12 +10,9 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use chrono::Utc; -use dashmap::DashMap; -use dashmap::DashSet; -use serde::Serialize; use tokio::sync::watch; use uuid::Uuid; @@ -31,166 +28,8 @@ use crate::services::llm::factory::create_provider; use crate::services::scraper; use crate::services::source_scraper; -// ─────────────────────────────────────────────────────────────────── -// Progress Events -// ─────────────────────────────────────────────────────────────────── - -/// Progress event sent to SSE clients during generation. -/// -/// The `watch` channel always holds the latest event, and new subscribers -/// immediately receive the current state. -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type")] -pub enum ProgressEvent { - /// Generation is in progress. - #[serde(rename = "progress")] - Progress { - step: String, - message: String, - percent: u8, - }, - /// Generation completed successfully. - #[serde(rename = "complete")] - Complete { synthesis_id: Uuid }, - /// Generation failed with an error. - #[serde(rename = "error")] - Error { message: String }, -} - -// ─────────────────────────────────────────────────────────────────── -// Job Store -// ─────────────────────────────────────────────────────────────────── - -/// Entry in the job store, holding the progress channel and metadata. -struct JobEntry { - /// Sender side of the watch channel for progress updates. - /// Wrapped in Arc so it can be shared with the background task - /// without cloning the Sender itself. - tx: Arc>, - /// A receiver kept alive to prevent the channel from closing. - /// Without at least one receiver, `Sender::send()` returns an error - /// and does NOT update the stored value. - _rx: watch::Receiver, - /// User who owns this job. - user_id: Uuid, - /// When the job was created (for TTL cleanup). - created_at: Instant, - /// Flag set to true when the user requests cancellation. - cancelled: Arc, -} - -/// In-memory store for active generation jobs. -/// -/// Uses `DashMap` for lock-free concurrent access. Jobs are keyed by -/// a random UUID and automatically cleaned up after a TTL. -#[derive(Clone)] -pub struct JobStore { - inner: Arc>, - generating_users: Arc>, -} - -/// Jobs expire after 1 hour (allows SSE reconnection). -const JOB_TTL: Duration = Duration::from_secs(3600); - -impl Default for JobStore { - fn default() -> Self { - Self::new() - } -} - -impl JobStore { - /// Create a new empty job store. - pub fn new() -> Self { - Self { - inner: Arc::new(DashMap::new()), - generating_users: Arc::new(DashSet::new()), - } - } - - /// Create a new job for a user, returning the job ID, the watch Sender, and a cancellation flag. - /// - /// Returns `None` if the user already has an active job. - /// Uses an atomic DashSet insert to prevent race conditions on double-click. - pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc>, Arc)> { - if !self.generating_users.insert(user_id) { - return None; - } - let job_id = Uuid::new_v4(); - let (tx, rx) = watch::channel(ProgressEvent::Progress { - step: "init".into(), - message: "Initialisation...".into(), - percent: 0, - }); - let tx = Arc::new(tx); - let cancelled = Arc::new(AtomicBool::new(false)); - self.inner.insert(job_id, JobEntry { - tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(), - cancelled: Arc::clone(&cancelled), - }); - Some((job_id, tx, cancelled)) - } - - /// Get a watch receiver for a job, if it exists and belongs to the given user. - pub fn subscribe(&self, job_id: Uuid, user_id: Uuid) -> Option> { - self.inner.get(&job_id).and_then(|entry| { - if entry.value().user_id == user_id { - Some(entry.value().tx.subscribe()) - } else { - None - } - }) - } - - /// Check if a user has an active (in-progress) job. - pub fn has_active_job(&self, user_id: Uuid) -> Option { - if !self.generating_users.contains(&user_id) { return None; } - for entry in self.inner.iter() { - if entry.value().user_id == user_id { return Some(*entry.key()); } - } - None - } - - /// Signal a job to stop. Returns true if the job was found and belongs to the user. - pub fn cancel_job(&self, job_id: Uuid, user_id: Uuid) -> bool { - if let Some(entry) = self.inner.get(&job_id) { - if entry.value().user_id == user_id { - entry.value().cancelled.store(true, Ordering::Relaxed); - return true; - } - } - false - } - - /// Release the generating lock for a user (called when job completes, errors, or times out). - pub fn release_user(&self, user_id: Uuid) { - self.generating_users.remove(&user_id); - } - - /// Remove expired jobs (older than TTL). - pub fn cleanup_expired(&self) { - let now = Instant::now(); - self.inner.retain(|_, entry| { - let keep = now.duration_since(entry.created_at) < JOB_TTL; - if !keep { self.generating_users.remove(&entry.user_id); } - keep - }); - } - - /// Remove a specific job. - pub fn remove(&self, job_id: &Uuid) { - self.inner.remove(job_id); - } - - /// Get the number of active jobs (for testing/monitoring). - pub fn len(&self) -> usize { - self.inner.len() - } - - /// Check if the store is empty (for testing). - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } -} +// Re-export for downstream consumers that previously imported from synthesis. +pub use crate::services::job_store::{emit_progress, JobStore, ProgressEvent}; // ─────────────────────────────────────────────────────────────────── // Generation Pipeline @@ -439,9 +278,19 @@ pub async fn run_generation_inner( let mut candidates_iter = wave_urls.into_iter(); let mut done = false; + let ctx = ScrapeClassifyCtx { + state, user_id, job_id, provider: &provider, + model_research: &model_research, classify_schema: &classify_schema, + classification_categories: &classification_categories, + user_categories: &user_categories, snippet_size, summary_length: theme.summary_length, + max_age_days: theme.max_age_days as i64, + max_items_per_category: theme.max_items_per_category as usize, + source_type: "personalized_source", + }; + while !done { // Take next batch of candidates, filtering source limits - let mut batch: Vec<(String, String)> = Vec::new(); + let mut batch: Vec<(String, Option)> = Vec::new(); while batch.len() < batch_size { let Some((url, source_url)) = candidates_iter.next() else { break; @@ -457,7 +306,7 @@ pub async fn run_generation_inner( })); continue; } - batch.push((url, source_url)); + batch.push((url, Some(source_url))); } if batch.is_empty() { @@ -468,152 +317,10 @@ pub async fn run_generation_inner( let pct = 5 + ((articles_so_far as u32 * 60) / max_total.max(1) as u32).min(60); emit_progress(tx, "sources", &format!("Vague {}/{} : articles {}/{}...", wave_idx + 1, total_waves, processed + 1, total_candidates), pct as u8); - // Phase A: Scrape batch in parallel - let mut scrape_set = tokio::task::JoinSet::new(); - for (url, source_url) in &batch { - let client = state.http_client.clone(); - let u = url.clone(); - let su = source_url.clone(); - let mad = theme.max_age_days as i64; - scrape_set.spawn(async move { - let result = scrape_single_article(&client, &u, mad).await; - (u, su, result) - }); - } - - let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title) - while let Some(join_result) = scrape_set.join_next().await { - if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result { - if let Some(reason) = drop_reason { - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "personalized_source", - source_url: Some(&source_url), category: None, synthesis_id: None, - status: reason, scraped_ok: false, - published_date: None, - })); - } else { - scraped_articles.push((final_url, source_url, body_text, page_title)); - } - } - } - - if scraped_articles.is_empty() { - processed += batch.len(); - continue; - } - - // Phase B: Classify/summarize batch in parallel - check_rate_limit(state, &user_rate_limiter, &provider_name).await?; - - let mut classify_set = tokio::task::JoinSet::new(); - for (final_url, source_url, body_text, page_title) in &scraped_articles { - let provider_clone = std::sync::Arc::clone(&provider); - let model = Arc::clone(&model_research); - let schema = Arc::clone(&classify_schema); - let cats = Arc::clone(&classification_categories); - let body_snippet: String = body_text.chars().take(snippet_size).collect(); - let title = page_title.clone(); - let url = final_url.clone(); - let su = source_url.clone(); - let pool = state.pool.clone(); - let uid = user_id; - let jid = job_id; - - let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, theme.summary_length); - - classify_set.spawn(async move { - let llm_start = std::time::Instant::now(); - let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await; - let duration = llm_start.elapsed().as_millis() as u64; - - // Log the LLM call - if let Ok(ref resp) = result { - let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default(); - crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok(); - } - - (url, su, title, result) - }); - } - - while let Some(join_result) = classify_set.join_next().await { - if let Ok((final_url, source_url, page_title, llm_result)) = join_result { - let class_response = match llm_result { - Ok(resp) => resp, - Err(e) => { - tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article"); - continue; - } - }; - - // Check if LLM considers this a real article - let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true); - if !is_article { - tracing::info!(url = %final_url, "Article filtered by LLM: not a real article"); - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "personalized_source", - source_url: Some(&source_url), category: None, synthesis_id: None, - status: "filtered_not_article", scraped_ok: true, - published_date: None, - })); - continue; - } - - // Check LLM-extracted date as fallback for articles without a scraper date - if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) { - if !date_str.is_empty() { - if let Some(parsed) = scraper::parse_date_string(date_str) { - if scraper::is_article_too_old(Some(parsed), theme.max_age_days as i64) { - tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)"); - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "personalized_source", - source_url: Some(&source_url), category: None, synthesis_id: None, - status: "filtered_too_old", scraped_ok: true, - published_date: Some(date_str), - })); - continue; - } - } - } - } - - let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string()); - - // Articles without any date go to "Articles sans date" category - if llm_date.is_none() { - let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string(); - let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string(); - article_scraped.entry("category_no_date".to_string()).or_default().push(NewsItem { - title: llm_title, - url: final_url.clone(), - summary: llm_summary, - date: None, - }); - - let source_domain = extract_domain(&source_url).unwrap_or_default(); - *source_counts.entry(source_domain).or_insert(0) += 1; - continue; - } - - let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category( - &class_response, &page_title, &user_categories, &classification_categories, - &filled_counts, theme.max_items_per_category as usize, - ) else { - continue; - }; - - article_scraped.entry(final_cat_key).or_default().push(NewsItem { - title: llm_title, - url: final_url.clone(), - summary: llm_summary, - date: llm_date, - }); - *filled_counts.entry(final_cat_name).or_insert(0) += 1; - - let source_domain = extract_domain(&source_url).unwrap_or_default(); - *source_counts.entry(source_domain).or_insert(0) += 1; - } - } + scrape_and_classify_batch( + &ctx, &batch, &mut article_scraped, &mut filled_counts, + &mut source_counts, &mut pending_traces, &user_rate_limiter, &provider_name, + ).await?; processed += batch.len(); @@ -705,15 +412,26 @@ pub async fn run_generation_inner( emit_progress(tx, "websearch", "Traitement des articles Brave...", 75); let total_candidates = brave_urls.len(); let batch_size = settings.batch_size.max(1) as usize; + let snippet_size = match theme.summary_length { 1 => 500, 2 => 2000, _ => 4000 }; let mut processed = 0usize; let mut candidates_iter = brave_urls.into_iter(); let mut done = false; + let ctx = ScrapeClassifyCtx { + state, user_id, job_id, provider: &provider, + model_research: &model_research, classify_schema: &classify_schema, + classification_categories: &classification_categories, + user_categories: &user_categories, snippet_size, summary_length: theme.summary_length, + max_age_days: theme.max_age_days as i64, + max_items_per_category: theme.max_items_per_category as usize, + source_type: "brave_search", + }; + while !done { - let mut batch: Vec = Vec::new(); + let mut batch: Vec<(String, Option)> = Vec::new(); while batch.len() < batch_size { let Some(url) = candidates_iter.next() else { break }; - batch.push(url); + batch.push((url, None)); } if batch.is_empty() { break; } @@ -721,156 +439,10 @@ pub async fn run_generation_inner( let pct = 75 + ((processed as u32 * 10) / total_candidates.max(1) as u32).min(10); emit_progress(tx, "websearch", &format!("Verification des sources {}/{}...", processed + 1, total_candidates), pct as u8); - // Scrape batch in parallel - let mut scrape_set = tokio::task::JoinSet::new(); - for url in &batch { - let client = state.http_client.clone(); - let u = url.clone(); - let mad = theme.max_age_days as i64; - scrape_set.spawn(async move { - let result = scrape_single_article(&client, &u, mad).await; - (u, result) - }); - } - - let mut scraped_articles: Vec<(String, String, String)> = Vec::new(); // (url, body_text, page_title) - while let Some(join_result) = scrape_set.join_next().await { - if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result { - if let Some(reason) = drop_reason { - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "brave_search", - source_url: None, category: None, synthesis_id: None, - status: reason, scraped_ok: false, - published_date: None, - })); - } else { - scraped_articles.push((final_url, body_text, page_title)); - } - } - } - - if scraped_articles.is_empty() { - processed += batch.len(); - continue; - } - - // Classify/summarize in parallel - check_rate_limit(state, &user_rate_limiter, &provider_name).await?; - - let mut classify_set = tokio::task::JoinSet::new(); - for (final_url, body_text, page_title) in &scraped_articles { - let provider_clone = std::sync::Arc::clone(&provider); - let model = Arc::clone(&model_research); - let schema = Arc::clone(&classify_schema); - let cats = Arc::clone(&classification_categories); - let snippet_size = match theme.summary_length { - 1 => 500, - 2 => 2000, - _ => 4000, - }; - let body_snippet: String = body_text.chars().take(snippet_size).collect(); - let title = page_title.clone(); - let url = final_url.clone(); - let pool = state.pool.clone(); - let uid = user_id; - let jid = job_id; - - let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, theme.summary_length); - - classify_set.spawn(async move { - let llm_start = std::time::Instant::now(); - let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await; - let duration = llm_start.elapsed().as_millis() as u64; - - if let Ok(ref resp) = result { - let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default(); - crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok(); - } - - (url, title, result) - }); - } - - while let Some(join_result) = classify_set.join_next().await { - if let Ok((final_url, page_title, llm_result)) = join_result { - let class_response = match llm_result { - Ok(resp) => resp, - Err(e) => { - tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article"); - continue; - } - }; - - // Check if LLM considers this a real article - let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true); - if !is_article { - tracing::info!(url = %final_url, "Article filtered by LLM: not a real article"); - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "brave_search", - source_url: None, category: None, synthesis_id: None, - status: "filtered_not_article", scraped_ok: true, - published_date: None, - })); - continue; - } - - // Check LLM-extracted date as fallback - if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) { - if !date_str.is_empty() { - if let Some(parsed) = scraper::parse_date_string(date_str) { - if scraper::is_article_too_old(Some(parsed), theme.max_age_days as i64) { - tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)"); - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "brave_search", - source_url: None, category: None, synthesis_id: None, - status: "filtered_too_old", scraped_ok: true, - published_date: Some(date_str), - })); - continue; - } - } - } - } - - let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string()); - - // Articles without any date go to "Articles sans date" category - if llm_date.is_none() { - let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string(); - let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string(); - article_scraped.entry("category_no_date".to_string()).or_default().push(NewsItem { - title: llm_title, - url: final_url.clone(), - summary: llm_summary, - date: None, - }); - - if let Some(domain) = extract_domain(&final_url) { - *source_counts.entry(domain).or_insert(0) += 1; - } - continue; - } - - let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category( - &class_response, &page_title, &user_categories, &classification_categories, - &filled_counts, theme.max_items_per_category as usize, - ) else { - continue; - }; - - article_scraped.entry(final_cat_key).or_default().push(NewsItem { - title: llm_title, - url: final_url.clone(), - summary: llm_summary, - date: llm_date, - }); - *filled_counts.entry(final_cat_name).or_insert(0) += 1; - - if let Some(domain) = extract_domain(&final_url) { - *source_counts.entry(domain).or_insert(0) += 1; - } - } - } + scrape_and_classify_batch( + &ctx, &batch, &mut article_scraped, &mut filled_counts, + &mut source_counts, &mut pending_traces, &user_rate_limiter, &provider_name, + ).await?; processed += batch.len(); @@ -1060,14 +632,196 @@ fn sanitize_json_null_bytes(value: serde_json::Value) -> serde_json::Value { } } -/// Emit a progress event via the watch channel. -fn emit_progress(tx: &watch::Sender, step: &str, message: &str, percent: u8) { - tx.send(ProgressEvent::Progress { - step: step.into(), - message: message.into(), - percent, - }) - .ok(); +/// Context passed to [`scrape_and_classify_batch`] to avoid long argument lists. +struct ScrapeClassifyCtx<'a> { + state: &'a AppState, + user_id: Uuid, + job_id: Uuid, + provider: &'a Arc, + model_research: &'a Arc, + classify_schema: &'a Arc, + classification_categories: &'a Arc>, + user_categories: &'a [String], + snippet_size: usize, + summary_length: i32, + max_age_days: i64, + max_items_per_category: usize, + source_type: &'a str, +} + +/// Scrape and classify a batch of articles, updating shared state. +/// +/// Each article is represented as `(url, Option)`. +/// - `source_type` is used for tracing (e.g. "personalized_source", "brave_search"). +/// - When `source_url` is `Some`, it is recorded in traces; the domain for +/// source counting comes from the source URL. +/// - When `source_url` is `None`, traces have no source URL; the domain for +/// source counting comes from the article URL. +#[allow(clippy::too_many_arguments)] +async fn scrape_and_classify_batch( + ctx: &ScrapeClassifyCtx<'_>, + articles: &[(String, Option)], + article_scraped: &mut HashMap>, + filled_counts: &mut HashMap, + source_counts: &mut HashMap, + pending_traces: &mut Vec, + user_rate_limiter: &Option, + provider_name: &str, +) -> Result<(), AppError> { + // Phase A: Scrape batch in parallel + let mut scrape_set = tokio::task::JoinSet::new(); + for (url, source_url) in articles { + let client = ctx.state.http_client.clone(); + let u = url.clone(); + let su = source_url.clone(); + let mad = ctx.max_age_days; + scrape_set.spawn(async move { + let result = scrape_single_article(&client, &u, mad).await; + (u, su, result) + }); + } + + let mut scraped_articles: Vec<(String, Option, String, String)> = Vec::new(); + while let Some(join_result) = scrape_set.join_next().await { + if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result { + if let Some(reason) = drop_reason { + pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace { + url: &final_url, title: &page_title, source_type: ctx.source_type, + source_url: source_url.as_deref(), category: None, synthesis_id: None, + status: reason, scraped_ok: false, + published_date: None, + })); + } else { + scraped_articles.push((final_url, source_url, body_text, page_title)); + } + } + } + + if scraped_articles.is_empty() { + return Ok(()); + } + + // Phase B: Classify/summarize batch in parallel + check_rate_limit(ctx.state, user_rate_limiter, provider_name).await?; + + let mut classify_set = tokio::task::JoinSet::new(); + for (final_url, source_url, body_text, page_title) in &scraped_articles { + let provider_clone = Arc::clone(ctx.provider); + let model = Arc::clone(ctx.model_research); + let schema = Arc::clone(ctx.classify_schema); + let cats = Arc::clone(ctx.classification_categories); + let body_snippet: String = body_text.chars().take(ctx.snippet_size).collect(); + let title = page_title.clone(); + let url = final_url.clone(); + let su = source_url.clone(); + let pool = ctx.state.pool.clone(); + let uid = ctx.user_id; + let jid = ctx.job_id; + let summary_length = ctx.summary_length; + + let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, summary_length); + + classify_set.spawn(async move { + let llm_start = std::time::Instant::now(); + let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await; + let duration = llm_start.elapsed().as_millis() as u64; + + if let Ok(ref resp) = result { + let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default(); + crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok(); + } + + (url, su, title, result) + }); + } + + while let Some(join_result) = classify_set.join_next().await { + if let Ok((final_url, source_url, page_title, llm_result)) = join_result { + let class_response = match llm_result { + Ok(resp) => resp, + Err(e) => { + tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article"); + continue; + } + }; + + // Check if LLM considers this a real article + let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true); + if !is_article { + tracing::info!(url = %final_url, "Article filtered by LLM: not a real article"); + pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace { + url: &final_url, title: &page_title, source_type: ctx.source_type, + source_url: source_url.as_deref(), category: None, synthesis_id: None, + status: "filtered_not_article", scraped_ok: true, + published_date: None, + })); + continue; + } + + // Check LLM-extracted date as fallback + if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) { + if !date_str.is_empty() { + if let Some(parsed) = scraper::parse_date_string(date_str) { + if scraper::is_article_too_old(Some(parsed), ctx.max_age_days) { + tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)"); + pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace { + url: &final_url, title: &page_title, source_type: ctx.source_type, + source_url: source_url.as_deref(), category: None, synthesis_id: None, + status: "filtered_too_old", scraped_ok: true, + published_date: Some(date_str), + })); + continue; + } + } + } + } + + let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string()); + + // Domain for source counting: prefer source_url if available, else article url + let count_domain = source_url.as_deref() + .and_then(extract_domain) + .or_else(|| extract_domain(&final_url)); + + // Articles without any date go to "Articles sans date" category + if llm_date.is_none() { + let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string(); + let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string(); + article_scraped.entry("category_no_date".to_string()).or_default().push(NewsItem { + title: llm_title, + url: final_url.clone(), + summary: llm_summary, + date: None, + }); + + if let Some(domain) = count_domain { + *source_counts.entry(domain).or_insert(0) += 1; + } + continue; + } + + let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category( + &class_response, &page_title, ctx.user_categories, ctx.classification_categories, + filled_counts, ctx.max_items_per_category, + ) else { + continue; + }; + + article_scraped.entry(final_cat_key).or_default().push(NewsItem { + title: llm_title, + url: final_url.clone(), + summary: llm_summary, + date: llm_date, + }); + *filled_counts.entry(final_cat_name).or_insert(0) += 1; + + if let Some(domain) = count_domain { + *source_counts.entry(domain).or_insert(0) += 1; + } + } + } + + Ok(()) } /// Structured parameters for article history tracing. @@ -1547,154 +1301,6 @@ fn sanitize_error_message(msg: &str) -> String { mod tests { use super::*; - // ── JobStore tests ─────────────────────────────────────────── - - #[test] - fn job_store_create_and_subscribe() { - let store = JobStore::new(); - let user_id = Uuid::new_v4(); - - let (job_id, tx, _cancelled) = store.create_job(user_id).unwrap(); - assert_eq!(store.len(), 1); - - // Subscribe - let rx = store.subscribe(job_id, user_id); - assert!(rx.is_some()); - - // Wrong user cannot subscribe - let other_user = Uuid::new_v4(); - assert!(store.subscribe(job_id, other_user).is_none()); - - // Check active job - assert_eq!(store.has_active_job(user_id), Some(job_id)); - assert_eq!(store.has_active_job(other_user), None); - - drop(tx); - } - - #[test] - fn job_store_prevents_duplicate_active_jobs() { - let store = JobStore::new(); - let user_id = Uuid::new_v4(); - - let _result1 = store.create_job(user_id); - assert!(_result1.is_some()); - - // Second job for same user should fail - let result2 = store.create_job(user_id); - assert!(result2.is_none()); - - // Different user should succeed - let other_user = Uuid::new_v4(); - let result3 = store.create_job(other_user); - assert!(result3.is_some()); - } - - #[test] - fn job_store_allows_new_job_after_completion() { - let store = JobStore::new(); - let user_id = Uuid::new_v4(); - - let (_job_id, tx, _cancelled) = store.create_job(user_id).unwrap(); - - // Complete the job and release the user lock (as the pipeline does) - tx.send(ProgressEvent::Complete { - synthesis_id: Uuid::new_v4(), - }) - .ok(); - store.release_user(user_id); - - // Should now allow a new job - let result2 = store.create_job(user_id); - assert!(result2.is_some()); - } - - #[test] - fn job_store_allows_new_job_after_error() { - let store = JobStore::new(); - let user_id = Uuid::new_v4(); - - let (_job_id, tx, _cancelled) = store.create_job(user_id).unwrap(); - - // Fail the job and release the user lock (as the pipeline does) - tx.send(ProgressEvent::Error { - message: "test error".into(), - }) - .ok(); - store.release_user(user_id); - - // Should now allow a new job - let result2 = store.create_job(user_id); - assert!(result2.is_some()); - } - - #[test] - fn job_store_cleanup_expired() { - let store = JobStore::new(); - let user_id = Uuid::new_v4(); - - // Create a job and manually set its created_at to the past - let (_job_id, _tx, _cancelled) = store.create_job(user_id).unwrap(); - assert_eq!(store.len(), 1); - - // Cleanup should not remove recent jobs - store.cleanup_expired(); - assert_eq!(store.len(), 1); - } - - #[test] - fn job_store_remove() { - let store = JobStore::new(); - let user_id = Uuid::new_v4(); - - let (job_id, _tx, _cancelled) = store.create_job(user_id).unwrap(); - assert_eq!(store.len(), 1); - - store.remove(&job_id); - assert!(store.is_empty()); - } - - // ── ProgressEvent serialization tests ──────────────────────── - - #[test] - fn progress_event_serialization_progress() { - let event = ProgressEvent::Progress { - step: "search".into(), - message: "Searching...".into(), - percent: 30, - }; - - let json = serde_json::to_value(&event).unwrap(); - assert_eq!(json["type"], "progress"); - assert_eq!(json["step"], "search"); - assert_eq!(json["message"], "Searching..."); - assert_eq!(json["percent"], 30); - } - - #[test] - fn progress_event_serialization_complete() { - let synthesis_id = Uuid::nil(); - let event = ProgressEvent::Complete { synthesis_id }; - - let json = serde_json::to_value(&event).unwrap(); - assert_eq!(json["type"], "complete"); - assert_eq!( - json["synthesis_id"], - "00000000-0000-0000-0000-000000000000" - ); - } - - #[test] - fn progress_event_serialization_error() { - let event = ProgressEvent::Error { - message: "Something went wrong".into(), - }; - - let json = serde_json::to_value(&event).unwrap(); - assert_eq!(json["type"], "error"); - assert_eq!(json["message"], "Something went wrong"); - } - // ── parse_llm_output tests ─────────────────────────────────── #[test]