From 7a8427316cde20fb735603ffcce019eb12f63747 Mon Sep 17 00:00:00 2001 From: oabrivard Date: Wed, 25 Mar 2026 01:07:53 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20rewrite=20synthesis=20pipeline=20?= =?UTF-8?q?=E2=80=94=20per-article=20classify/summarize,=20no=20rewrite=20?= =?UTF-8?q?pass?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/services/synthesis.rs | 1925 ++++------------------------- 1 file changed, 255 insertions(+), 1670 deletions(-) diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 47b7bf0..52d295b 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -1,9 +1,8 @@ //! Synthesis generation pipeline and job management. //! -//! Orchestrates the two-pass LLM pipeline: -//! 1. Search pass: LLM generates initial articles via web search -//! 2. Scrape: Validate and fetch content from source URLs -//! 3. Rewrite pass: LLM rewrites titles/summaries using scraped content +//! Orchestrates the two-phase pipeline: +//! 1. Personalized sources: scrape user sources, classify+summarize per article +//! 2. Web search fallback: LLM search for missing categories, scrape to validate //! //! Progress is reported via `tokio::sync::watch` channels per job, //! consumed by SSE endpoints for real-time client updates. @@ -18,19 +17,15 @@ use serde::Serialize; use tokio::sync::watch; use uuid::Uuid; -use url::Url; - use crate::app_state::AppState; use crate::db; use crate::errors::AppError; use crate::models::settings::UserSettings; use crate::models::synthesis::{ - get_iso_week_string, NewsItem, NewsSection, ScrapedNewsItem, + get_iso_week_string, NewsItem, NewsSection, }; use crate::services::encryption; use crate::services::llm::factory::create_provider; -use crate::services::llm::schema::build_category_schema; -use crate::services::prompts; use crate::services::scraper; use crate::services::source_scraper; @@ -195,25 +190,15 @@ impl JobStore { // Generation Pipeline // ─────────────────────────────────────────────────────────────────── -/// Run the full two-pass generation pipeline for a user. +/// Run the full generation pipeline for a user. /// /// This is the core orchestration function. It is spawned as a background /// tokio task and communicates progress via the `watch` channel. /// -/// # Steps -/// 1. Load user settings -/// 2. Load user sources -/// 3. Resolve provider + decrypt API key -/// 4. Build schema from categories -/// 5. Rate limit check (pass 1) -/// 6. LLM search pass -/// 7. Parse structured output -/// 8. Validate/scrape URLs (parallel, bounded concurrency) -/// 9. Rate limit check (pass 2) -/// 10. LLM rewrite pass -/// 11. Parse final output -/// 12. Save synthesis to DB -/// 13. Complete +/// # Phases +/// 1. Personalized sources: extract links, scrape, classify+summarize per article +/// 2. Web search fallback: LLM search for under-filled categories, scrape to validate +/// 3. Save synthesis to DB pub async fn run_generation( job_id: Uuid, state: AppState, @@ -255,587 +240,299 @@ async fn run_generation_inner( user_id: Uuid, tx: &watch::Sender, ) -> Result { - // Step 1: Load user settings + // === INITIALIZATION === emit_progress(tx, "settings", "Chargement des parametres...", 5); let settings = db::settings::get_or_create_default(&state.pool, user_id).await?; - // Cleanup old article history entries if settings.article_history_days > 0 { - let deleted = db::article_history::cleanup_old( - &state.pool, - user_id, - settings.article_history_days, - ) - .await - .unwrap_or(0); - if deleted > 0 { - tracing::info!(deleted = deleted, "Cleaned up old article history entries"); - } - // Truncate old LLM call logs - db::llm_call_log::truncate_old(&state.pool, user_id, settings.article_history_days) - .await - .ok(); + db::article_history::cleanup_old(&state.pool, user_id, settings.article_history_days).await.unwrap_or(0); + db::llm_call_log::truncate_old(&state.pool, user_id, settings.article_history_days).await.ok(); } - if settings.categories.is_empty() { - return Err(AppError::BadRequest( - "Aucune categorie configuree. Veuillez configurer vos parametres.".into(), - )); - } + let user_categories = if settings.categories.is_empty() { + Vec::new() + } else { + settings.categories.clone() + }; + let mut classification_categories = user_categories.clone(); + classification_categories.push("Autre".to_string()); - // Step 2: Load user sources emit_progress(tx, "sources", "Chargement des sources...", 10); let sources = db::sources::list_for_user(&state.pool, user_id).await?; - // Step 3: Resolve provider + decrypt API key emit_progress(tx, "provider", "Configuration du fournisseur IA...", 12); let (provider_name, api_key) = resolve_provider_and_key(state, user_id, &settings).await?; let provider = create_provider(&provider_name, api_key)?; - - // Step 4: Resolve models - let model_research = if !settings.ai_model.is_empty() { - settings.ai_model.clone() - } else { - resolve_model(state, &provider_name).await? - }; - let model_writing = if !settings.ai_model_writing.is_empty() { - settings.ai_model_writing.clone() - } else { - model_research.clone() - }; - + let model_research = if !settings.ai_model.is_empty() { settings.ai_model.clone() } else { resolve_model(state, &provider_name).await? }; let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id); - let llm_for_scraping: Option<(std::sync::Arc, String)> = None; - - // Build categories list with "Autre" appended for classification - let mut classification_categories = settings.categories.clone(); - classification_categories.push("Autre".to_string()); - - // Track how many articles fill each category across both phases + // Tracking structures + let mut article_scraped: HashMap> = HashMap::new(); + let mut source_counts: HashMap = HashMap::new(); + let mut url_source: HashMap = HashMap::new(); let mut filled_counts: HashMap = HashMap::new(); - // Combined scraped articles keyed by category - let mut all_scraped: HashMap> = HashMap::new(); - // Overflow articles that didn't fit any category (used for fill-up) - let mut all_overflow: Vec = Vec::new(); - // Track all URLs seen (for cross-phase dedup) let mut seen_urls: std::collections::HashSet = std::collections::HashSet::new(); + let max_total = (user_categories.len() + 1) * settings.max_items_per_category as usize; + let classify_schema = crate::services::llm::schema::build_article_classify_schema(); - // ═══════════════════════════════════════════════════════════════ - // PHASE 1: Personalized Sources (scrape-based, no LLM for discovery) - // ═══════════════════════════════════════════════════════════════ + // === PHASE 1: Personalized Sources === if !sources.is_empty() { emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15); - let max_sources = sources.len().min(10); // Cap at 10 sources - let max_links_per_source = (2 * settings.max_articles_per_source) as usize; - // 1a. Extract article links from each source page - let mut candidate_urls: Vec = Vec::new(); - for source in sources.iter().take(max_sources) { + let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None); + let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref()); + let max_sources = rotated_sources.len().min(10); + let max_links = 10usize; + + // 1a. Extract article links + filter against history + let mut candidate_urls: Vec<(String, String)> = Vec::new(); // (article_url, source_url) + + for source in rotated_sources.iter().take(max_sources) { let links = if settings.use_llm_for_source_links { source_scraper::extract_article_links_with_llm( - &state.http_client, - &source.url, - max_links_per_source, - &provider, - &model_research, - ) - .await + &state.http_client, &source.url, max_links, &provider, &model_research, + ).await } else { source_scraper::extract_article_links( - &state.http_client, - &source.url, - max_links_per_source, - ) - .await + &state.http_client, &source.url, max_links, + ).await }; - match links { - Ok(links) => { - tracing::info!( - source = %source.title, - url = %source.url, - links_found = links.len(), - "Extracted article links from source" - ); - candidate_urls.extend(links); - } - Err(e) => { - tracing::warn!( - source = %source.title, - url = %source.url, - error = %e, - "Failed to extract links from source, skipping" - ); + + if let Ok(links) = links { + tracing::info!(source = %source.title, links = links.len(), "Extracted links from source"); + for link in links { + if seen_urls.insert(link.to_lowercase()) { + candidate_urls.push((link, source.url.clone())); + } } + } else if let Err(e) = links { + tracing::warn!(source = %source.title, error = %e, "Failed to extract links"); } } - // Deduplicate candidate URLs - let mut seen = std::collections::HashSet::new(); - candidate_urls.retain(|url| seen.insert(url.to_lowercase())); - - if !candidate_urls.is_empty() { - // 1b. Scrape candidate articles - let scraped_articles = scrape_flat_urls( - state, - &candidate_urls, - settings.max_age_days as i64, - tx, - llm_for_scraping.clone(), - ) - .await; - - // 1c. Filter empty content - // Trace articles with empty content - if settings.article_history_days > 0 { - for article in &scraped_articles { - if article.scraped_content.trim().is_empty() { - trace_article(&state.pool, user_id, job_id, &article.url, &article.title, - "personalized_source", article.source_url.as_deref(), None, None, - "filtered_empty", false).await; + // Filter against article history + if settings.article_history_days > 0 && !candidate_urls.is_empty() { + let hashes: Vec = candidate_urls.iter().map(|(url, _)| hash_article_url(url)).collect(); + let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default(); + if !existing.is_empty() { + for (url, source_url) in &candidate_urls { + if existing.contains(&hash_article_url(url)) { + trace_article(&state.pool, user_id, job_id, url, "", "personalized_source", Some(source_url), None, None, "filtered_history", false).await; } } + candidate_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url))); } - let valid_articles: Vec = scraped_articles - .into_iter() - .filter(|a| !a.scraped_content.trim().is_empty()) - .collect(); - - // 1d. Filter against article history (cross-synthesis dedup) - let mut valid_articles = if settings.article_history_days > 0 { - let pre_history_articles = valid_articles.clone(); - let hashes: Vec = valid_articles.iter().map(|a| hash_article_url(&a.url)).collect(); - let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes) - .await - .unwrap_or_default(); - if !existing.is_empty() { - tracing::info!(filtered = existing.len(), "Phase 1: filtered articles already in history"); - } - // Trace history-filtered articles - if !existing.is_empty() { - for article in &pre_history_articles { - if existing.contains(&hash_article_url(&article.url)) { - trace_article(&state.pool, user_id, job_id, &article.url, &article.title, - "personalized_source", article.source_url.as_deref(), None, None, - "filtered_history", true).await; - } - } - } - valid_articles - .into_iter() - .filter(|a| !existing.contains(&hash_article_url(&a.url))) - .collect::>() - } else { - valid_articles - }; + } - // 1e. Retry if under-filled after history filtering (1 attempt) - let target = settings.categories.len() * settings.max_items_per_category as usize; - if valid_articles.len() < target && settings.article_history_days > 0 { - tracing::info!( - have = valid_articles.len(), - need = target, - "Phase 1 under-filled after history filter, retrying" - ); - - let already_fetched: std::collections::HashSet = candidate_urls - .iter() - .map(|u| u.to_lowercase()) - .collect(); - - let mut retry_urls: Vec = Vec::new(); - for source in sources.iter().take(max_sources) { - let links = if settings.use_llm_for_source_links { - source_scraper::extract_article_links_with_llm( - &state.http_client, &source.url, max_links_per_source, - &provider, &model_research, - ).await - } else { - source_scraper::extract_article_links( - &state.http_client, &source.url, max_links_per_source, - ).await - }; - if let Ok(links) = links { - for link in links { - if !already_fetched.contains(&link.to_lowercase()) { - retry_urls.push(link); - } - } - } - } + // Track url -> source + for (url, source_url) in &candidate_urls { + url_source.insert(url.clone(), source_url.clone()); + } - if !retry_urls.is_empty() { - let retry_scraped = scrape_flat_urls( - state, &retry_urls, settings.max_age_days as i64, tx, - llm_for_scraping.clone(), - ).await; - let mut retry_valid: Vec = retry_scraped - .into_iter() - .filter(|a| !a.scraped_content.trim().is_empty()) - .collect(); - - if !retry_valid.is_empty() && settings.article_history_days > 0 { - let hashes: Vec = retry_valid.iter().map(|a| hash_article_url(&a.url)).collect(); - let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes) - .await.unwrap_or_default(); - retry_valid.retain(|a| !existing.contains(&hash_article_url(&a.url))); - } + // 1b. Scrape, classify, summarize each article + emit_progress(tx, "processing", "Traitement des articles...", 25); + let total_candidates = candidate_urls.len(); - valid_articles.extend(retry_valid); - tracing::info!(total = valid_articles.len(), "Phase 1 after retry"); - } + for (idx, (url, source_url)) in candidate_urls.into_iter().enumerate() { + let pct = 25 + ((idx as u32 * 40) / total_candidates.max(1) as u32).min(40); + emit_progress(tx, "processing", &format!("Article {}/{}...", idx + 1, total_candidates), pct as u8); + + // Check source limit + let source_domain = extract_domain(&source_url).unwrap_or_default(); + let source_count = source_counts.get(&source_domain).copied().unwrap_or(0); + if source_count >= settings.max_articles_per_source as usize { + trace_article(&state.pool, user_id, job_id, &url, "", "personalized_source", Some(&source_url), None, None, "filtered_diversity", false).await; + continue; } - tracing::info!( - valid_count = valid_articles.len(), - "Phase 1: valid articles from personalized sources" - ); - - if !valid_articles.is_empty() { - // 1f. LLM classification call - emit_progress(tx, "classifying", "Classification des articles...", 35); - check_rate_limit(state, &user_rate_limiter, &provider_name)?; - - // TODO(Task 5): replace with per-article classify pipeline - let _ = (&valid_articles, &classification_categories, &filled_counts); - let _ = (); // phase1 classification stub - - // 1f. Enforce max_articles_per_source across all categories - // (reuse domain counting logic) - let max_per_source = settings.max_articles_per_source as usize; - let mut domain_counts: HashMap = HashMap::new(); - // Collect items before diversity pruning for tracing - let pre_diversity_snapshot: Vec = if settings.article_history_days > 0 { - all_scraped.values().flat_map(|items| items.iter().cloned()).collect() - } else { - Vec::new() - }; - for (_, items) in &mut all_scraped { - items.retain(|item| { - if let Some(domain) = extract_domain(&item.url) { - let count = domain_counts.entry(domain).or_insert(0); - if *count >= max_per_source { - false - } else { - *count += 1; - true - } - } else { - true - } - }); - } - // Trace diversity-filtered articles - if settings.article_history_days > 0 { - let post_urls: std::collections::HashSet = all_scraped.values() - .flat_map(|items| items.iter().map(|i| i.url.clone())) - .collect(); - for article in &pre_diversity_snapshot { - if !post_urls.contains(&article.url) { - trace_article(&state.pool, user_id, job_id, &article.url, &article.title, - "personalized_source", article.source_url.as_deref(), None, None, - "filtered_diversity", true).await; - } - } - } + // Scrape + let (body_text, page_title, final_url) = scrape_single_article(&state.http_client, &url, settings.max_age_days as i64).await; + + if body_text.trim().is_empty() { + trace_article(&state.pool, user_id, job_id, &final_url, &page_title, "personalized_source", Some(&source_url), None, None, "filtered_empty", false).await; + continue; + } + + // LLM classify + summarize + check_rate_limit(state, &user_rate_limiter, &provider_name)?; + let body_snippet: String = body_text.chars().take(500).collect(); + let (class_sys, class_user) = crate::services::prompts::build_article_classify_prompt(&page_title, &body_snippet, &classification_categories); + + let llm_start = std::time::Instant::now(); + let class_response = provider.call_llm(&model_research, &class_sys, &class_user, &classify_schema).await?; + let llm_duration = llm_start.elapsed().as_millis() as u64; + log_llm_call(&state.pool, user_id, job_id, "classify_summarize", &model_research, &class_sys, &class_user, &class_response, llm_duration).await; + + let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string(); + let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string(); + let mut llm_category = class_response.get("category").and_then(|c| c.as_str()).unwrap_or("Autre").to_string(); - // Recount filled_counts after trimming - filled_counts.clear(); - for (cat_key, items) in &all_scraped { - let cat_name = if cat_key == "category_autre" { - "Autre".to_string() - } else { - let idx: usize = cat_key - .strip_prefix("category_") - .and_then(|s| s.parse().ok()) - .unwrap_or(0); - settings.categories.get(idx).cloned().unwrap_or_default() - }; - *filled_counts.entry(cat_name).or_insert(0) += items.len(); + // Validate category + if !classification_categories.iter().any(|c| c.to_lowercase() == llm_category.to_lowercase()) { + llm_category = "Autre".to_string(); + } + + // Map category to key + let cat_key = if llm_category.to_lowercase() == "autre" { + "category_autre".to_string() + } else { + user_categories.iter().position(|c| c.to_lowercase() == llm_category.to_lowercase()) + .map(|i| format!("category_{}", i)) + .unwrap_or_else(|| "category_autre".to_string()) + }; + + // Check if category is full -> overflow to "Autre" + let cat_filled = filled_counts.get(&llm_category).copied().unwrap_or(0); + let (final_cat_key, final_cat_name) = if cat_filled >= settings.max_items_per_category as usize && llm_category.to_lowercase() != "autre" { + let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0); + if autre_filled >= settings.max_items_per_category as usize { + continue; // Both full -- skip } + ("category_autre".to_string(), "Autre".to_string()) + } else { + (cat_key, llm_category) + }; + + article_scraped.entry(final_cat_key).or_default().push(NewsItem { + title: llm_title, + url: final_url.clone(), + summary: llm_summary, + }); + *filled_counts.entry(final_cat_name).or_insert(0) += 1; + *source_counts.entry(source_domain).or_insert(0) += 1; + + let total: usize = article_scraped.values().map(|v| v.len()).sum(); + if total >= max_total { + break; } } } - // ═══════════════════════════════════════════════════════════════ - // PHASE 2: Web Search Fallback (LLM-based) - // Only runs if any user-defined category is under-filled - // ═══════════════════════════════════════════════════════════════ - let category_gaps: Vec<(String, i32)> = settings - .categories - .iter() - .filter_map(|cat| { - let filled = filled_counts.get(cat).copied().unwrap_or(0); - let needed = settings.max_items_per_category as usize - filled.min(settings.max_items_per_category as usize); - if needed > 0 { - Some((cat.clone(), needed as i32)) - } else { - None - } - }) - .collect(); + // === PHASE 2: Web Search Fallback === + let category_gaps: Vec<(String, i32)> = user_categories.iter().filter_map(|cat| { + let filled = filled_counts.get(cat).copied().unwrap_or(0); + let needed = (settings.max_items_per_category as usize).saturating_sub(filled); + if needed > 0 { Some((cat.clone(), needed as i32)) } else { None } + }).collect(); if !category_gaps.is_empty() { - emit_progress(tx, "search", "Recherche d'actualites complementaires...", 45); - - // Rate limit check before search pass + emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70); check_rate_limit(state, &user_rate_limiter, &provider_name)?; - // Source diversity tracking removed (source_diversity_window setting dropped) - let recent_domains: Vec = Vec::new(); - - // Build search schema for gap categories - let search_schema = build_category_schema(&settings.categories, settings.max_items_per_category); - + let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category); let current_date = Utc::now().format("%A %d %B %Y").to_string(); - let (system_prompt, user_prompt) = prompts::build_search_prompt( - &settings, - &sources, - ¤t_date, - &recent_domains, - Some(&category_gaps), - ); + let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&settings, &sources, ¤t_date, &[], Some(&category_gaps)); let llm_start = std::time::Instant::now(); - let raw_results = provider - .call_llm(&model_research, &system_prompt, &user_prompt, &search_schema) - .await?; + let raw_results = provider.call_llm(&model_research, &sys_prompt, &usr_prompt, &search_schema).await?; let llm_duration = llm_start.elapsed().as_millis() as u64; - log_llm_call(&state.pool, user_id, job_id, "search", &model_research, - &system_prompt, &user_prompt, &raw_results, llm_duration).await; + log_llm_call(&state.pool, user_id, job_id, "search", &model_research, &sys_prompt, &usr_prompt, &raw_results, llm_duration).await; - // Parse + filter - emit_progress(tx, "parsing", "Analyse des resultats...", 55); - let parsed = parse_llm_output(&raw_results, &settings.categories)?; + emit_progress(tx, "parsing", "Analyse des resultats...", 75); + let parsed = parse_llm_output(&raw_results, &user_categories)?; - // Trace homepage-filtered articles - let pre_homepage_parsed = if settings.article_history_days > 0 { - Some(parsed.clone()) - } else { - None - }; - let parsed = filter_homepage_urls(parsed); - if let Some(ref pre) = pre_homepage_parsed { - let post_urls: std::collections::HashSet = parsed.iter() - .flat_map(|(_, items)| items.iter().map(|i| i.url.clone())) - .collect(); - for (_, items) in pre { - for item in items { - if !post_urls.contains(&item.url) { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, - "web_search", None, None, None, - "filtered_homepage", false).await; + // Filter and validate Phase 2 articles + let mut phase2_items: Vec<(String, NewsItem)> = Vec::new(); + + for (cat_key, items) in parsed { + for item in items { + let url_lower = item.url.to_lowercase(); + + // Homepage filter + if let Ok(parsed_url) = url::Url::parse(&item.url) { + let path = parsed_url.path(); + if path.is_empty() || path == "/" { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_homepage", false).await; + continue; } } - } - } - // Cross-phase dedup: remove URLs already found in Phase 1 - let parsed: Vec<(String, Vec)> = parsed - .into_iter() - .map(|(cat_key, items)| { - let deduped: Vec = items - .into_iter() - .filter(|item| { - if seen_urls.contains(&item.url.to_lowercase()) { - // Tracing is handled below for async context - false - } else { - true - } - }) - .collect(); - (cat_key, deduped) - }) - .collect(); - // Trace cross-phase dedup drops (we need to check pre vs post since filter is sync) - if settings.article_history_days > 0 { - if let Some(ref pre) = pre_homepage_parsed { - let post_urls: std::collections::HashSet = parsed.iter() - .flat_map(|(_, items)| items.iter().map(|i| i.url.to_lowercase())) - .collect(); - for (_, items) in pre { - for item in items { - let lower = item.url.to_lowercase(); - if seen_urls.contains(&lower) && !post_urls.contains(&lower) { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, - "web_search", None, None, None, - "filtered_cross_phase_dedup", false).await; - } - } + // Cross-phase dedup + if seen_urls.contains(&url_lower) { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_cross_phase_dedup", false).await; + continue; } - } - } - let parsed = dedup_by_url(parsed); - let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source); - - // Filter against article history BEFORE scraping (saves HTTP requests) - let parsed = if settings.article_history_days > 0 { - let all_urls: Vec = parsed.iter() - .flat_map(|(_, items)| items.iter().map(|i| i.url.clone())) - .collect(); - let hashes: Vec = all_urls.iter().map(|u| hash_article_url(u)).collect(); - let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes) - .await - .unwrap_or_default(); - if !existing.is_empty() { - tracing::info!(filtered = existing.len(), "Phase 2: filtered articles already in history"); - // Trace history-filtered articles - for (_, items) in &parsed { - for item in items { - if existing.contains(&hash_article_url(&item.url)) { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, - "web_search", None, None, None, - "filtered_history", false).await; - } + // History dedup + if settings.article_history_days > 0 { + let hash = hash_article_url(&item.url); + let exists = db::article_history::check_urls_exist(&state.pool, user_id, &[hash.clone()]).await.unwrap_or_default(); + if exists.contains(&hash) { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_history", false).await; + continue; } } - } - parsed - .into_iter() - .map(|(cat_key, items)| { - let filtered = items - .into_iter() - .filter(|item| !existing.contains(&hash_article_url(&item.url))) - .collect(); - (cat_key, filtered) - }) - .collect() - } else { - parsed - }; - // Scrape web search results - emit_progress(tx, "scraping", "Verification des sources web...", 60); - let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx, llm_for_scraping.clone()).await; - // Trace empty content drops from Phase 2 scraping - if settings.article_history_days > 0 { - for (_, items) in &scraped { - for item in items { - if item.scraped_content.trim().is_empty() { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, - "web_search", None, None, None, - "filtered_empty", false).await; + // Source limit + if let Some(domain) = extract_domain(&item.url) { + let count = source_counts.get(&domain).copied().unwrap_or(0); + if count >= settings.max_articles_per_source as usize { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_diversity", false).await; + continue; } } - } - } - let scraped = filter_empty_scraped_articles(scraped); - // Flatten scraped articles for classification - let phase2_articles: Vec = scraped - .into_values() - .flat_map(|items| items.into_iter()) - .collect(); - - if !phase2_articles.is_empty() { - // LLM classification for Phase 2 articles - emit_progress(tx, "classifying", "Classification des resultats web...", 70); - check_rate_limit(state, &user_rate_limiter, &provider_name)?; - - // TODO(Task 5): replace with per-article classify pipeline - let _ = (&phase2_articles, &classification_categories, &filled_counts); - let _ = (); // phase2 classification stub + seen_urls.insert(url_lower); + phase2_items.push((cat_key.clone(), item)); + } } - } - // ═══════════════════════════════════════════════════════════════ - // COMBINED REWRITE PASS - // ═══════════════════════════════════════════════════════════════ - - // Fill-up: if total articles are below 75% of max, expand "Autre" with overflow - let total_articles: usize = all_scraped.values().map(|v| v.len()).sum(); - let max_articles = settings.categories.len() * settings.max_items_per_category as usize; - let target = (SYNTHESIS_MIN_FILL_RATIO * max_articles as f64).ceil() as usize; - let shortfall = target.saturating_sub(total_articles); - - if shortfall > 0 && !all_overflow.is_empty() { - tracing::info!( - total = total_articles, - target = target, - shortfall = shortfall, - overflow_available = all_overflow.len(), - "Synthesis under-filled, adding overflow to Autre" - ); + // Scrape Phase 2 for validation + emit_progress(tx, "scraping", "Verification des sources web...", 80); + for (cat_key, item) in phase2_items { + let (body_text, _, final_url) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await; - // Count domain occurrences across all categories for source diversity enforcement - let mut domain_counts: HashMap = HashMap::new(); - for items in all_scraped.values() { - for item in items { - if let Some(domain) = extract_domain(&item.url) { - *domain_counts.entry(domain).or_insert(0) += 1; - } + if body_text.trim().is_empty() { + trace_article(&state.pool, user_id, job_id, &final_url, &item.title, "web_search", None, None, None, "filtered_empty", false).await; + continue; } - } - let max_per_source = settings.max_articles_per_source as usize; - let mut added = 0usize; + article_scraped.entry(cat_key).or_default().push(NewsItem { + title: item.title, + url: final_url, + summary: item.summary, + }); - for article in all_overflow { - if added >= shortfall { - break; - } - // Enforce source diversity on overflow articles - if let Some(domain) = extract_domain(&article.url) { - let count = domain_counts.get(&domain).copied().unwrap_or(0); - if count >= max_per_source { - continue; - } - *domain_counts.entry(domain).or_insert(0) += 1; + if let Some(domain) = extract_domain(&item.url) { + *source_counts.entry(domain).or_insert(0) += 1; } - all_scraped - .entry("category_autre".to_string()) - .or_default() - .push(article); - added += 1; - } - - if added > 0 { - tracing::info!(added = added, "Added overflow articles to Autre"); } } - if all_scraped.values().all(|items| items.is_empty()) { - return Err(AppError::BadRequest( - "Aucun article valide trouve. Verifiez vos sources et categories.".into(), - )); + // === SAVE === + if article_scraped.values().all(|items| items.is_empty()) { + return Err(AppError::BadRequest("Aucun article valide trouve. Verifiez vos sources et categories.".into())); } - emit_progress(tx, "rewrite", "Redaction des resumes...", 80); - check_rate_limit(state, &user_rate_limiter, &provider_name)?; - - // TODO(Task 5): rewrite pass replaced by per-article classify pipeline - let rewrite_schema = build_rewrite_schema(&all_scraped, &settings.categories); - let _ = rewrite_schema; - - let llm_start = std::time::Instant::now(); - let _ = llm_start; - let final_results = serde_json::Value::Object(serde_json::Map::new()); // stub: replaced in Task 5 - - emit_progress(tx, "finalizing", "Finalisation...", 90); - let mut final_sections = build_final_sections(&final_results, &settings.categories)?; + emit_progress(tx, "saving", "Sauvegarde de la synthese...", 90); - restore_scraped_urls(&mut final_sections, &all_scraped, &settings.categories); + let mut final_sections: Vec = Vec::new(); + for (i, cat_name) in user_categories.iter().enumerate() { + let key = format!("category_{}", i); + if let Some(items) = article_scraped.get(&key) { + if !items.is_empty() { + final_sections.push(NewsSection { title: cat_name.clone(), items: items.clone() }); + } + } + } + if let Some(autre_items) = article_scraped.get("category_autre") { + if !autre_items.is_empty() { + final_sections.push(NewsSection { title: "Autre".to_string(), items: autre_items.clone() }); + } + } - // Save synthesis to DB - emit_progress(tx, "saving", "Sauvegarde de la synthese...", 95); - let week = get_iso_week_string(Utc::now().date_naive()); - let sections_json = serde_json::to_value(&final_sections).map_err(|e| { - AppError::Internal(anyhow::anyhow!("Failed to serialize sections: {}", e)) - })?; + let sections_json = serde_json::to_value(&final_sections).map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to serialize: {}", e)))?; let sections_json = sanitize_json_null_bytes(sections_json); - let synthesis = - db::syntheses::create(&state.pool, user_id, &week, §ions_json, job_id).await?; + let synthesis = db::syntheses::create(&state.pool, user_id, &get_iso_week_string(Utc::now().date_naive()), §ions_json, job_id).await?; - // Record used articles in history with full tracing metadata if settings.article_history_days > 0 { for section in &final_sections { for item in §ion.items { + let source_url = url_source.get(&item.url).map(|s| s.as_str()); trace_article(&state.pool, user_id, job_id, &item.url, &item.title, - "used", None, Some(§ion.title), Some(synthesis.id), - "used", true).await; + if source_url.is_some() { "personalized_source" } else { "web_search" }, + source_url, Some(§ion.title), Some(synthesis.id), "used", true).await; } } } @@ -847,10 +544,6 @@ async fn run_generation_inner( // Helper Functions // ─────────────────────────────────────────────────────────────────── -/// Minimum fill ratio for synthesis. If total articles are below this percentage -/// of the maximum capacity, overflow articles are added to "Autre" to compensate. -const SYNTHESIS_MIN_FILL_RATIO: f64 = 0.75; - /// Recursively strip `\u0000` null bytes from JSON values. /// /// PostgreSQL rejects null bytes in JSONB text. LLM output occasionally @@ -987,233 +680,6 @@ fn check_rate_limit( Ok(()) } -/// Filter out articles whose URL is a homepage (path is "/" or empty). -/// -/// Homepage URLs are typically not useful as article sources and indicate -/// the LLM returned a domain root rather than a specific article. -fn filter_homepage_urls( - parsed: Vec<(String, Vec)>, -) -> Vec<(String, Vec)> { - let mut total_filtered = 0usize; - - let result: Vec<(String, Vec)> = parsed - .into_iter() - .map(|(cat_key, items)| { - let filtered: Vec = items - .into_iter() - .filter(|item| { - match Url::parse(&item.url) { - Ok(parsed_url) => { - let path = parsed_url.path(); - if path == "/" || path.is_empty() { - total_filtered += 1; - false - } else { - true - } - } - Err(_) => true, // Keep items with unparseable URLs (handled elsewhere) - } - }) - .collect(); - (cat_key, filtered) - }) - .collect(); - - if total_filtered > 0 { - tracing::warn!( - count = total_filtered, - "Filtered out homepage URLs from search results" - ); - } - - result -} - -/// Remove duplicate articles with the same URL across all categories. -/// -/// Keeps the first occurrence (in category order) and drops subsequent duplicates. -/// Remove scraped articles with empty content from the data passed to the rewrite pass. -/// -/// Articles with empty `scraped_content` are those where scraping failed (network error), -/// the page was a soft 404, or the article was too old. Keeping them would produce -/// empty or low-quality output in the final synthesis. -/// Build a JSON schema for the rewrite pass that matches the actual scraped item counts. -/// -/// Unlike the search pass schema (which uses `minItems`/`maxItems` from user settings), -/// the rewrite schema uses the actual number of items per category after scraping and -/// filtering. This prevents the LLM from duplicating content to fill a quota. -fn build_rewrite_schema( - scraped: &HashMap>, - categories: &[String], -) -> serde_json::Value { - // Build a schema where each category's minItems/maxItems matches the actual count - let news_item_schema = serde_json::json!({ - "type": "object", - "properties": { - "title": { "type": "string", "description": "The title of the news article" }, - "url": { "type": "string", "description": "The URL of the source article" }, - "summary": { "type": "string", "description": "A concise summary of the article" } - }, - "required": ["title", "url", "summary"], - "additionalProperties": false - }); - - let mut properties = serde_json::Map::new(); - let mut required = Vec::new(); - - // User categories - for (i, cat_name) in categories.iter().enumerate() { - let key = format!("category_{}", i); - let count = scraped.get(&key).map_or(0, |items| items.len() as i32); - if count == 0 { - continue; // Omit empty categories — no hallucinated articles - } - properties.insert( - key.clone(), - serde_json::json!({ - "type": "array", - "description": cat_name, - "items": news_item_schema, - "minItems": count, - "maxItems": count - }), - ); - required.push(serde_json::Value::String(key)); - } - - // "Autre" category (if it has articles) - if let Some(autre_items) = scraped.get("category_autre") { - let count = autre_items.len() as i32; - if count > 0 { - properties.insert( - "category_autre".to_string(), - serde_json::json!({ - "type": "array", - "description": "Autre", - "items": news_item_schema, - "minItems": count, - "maxItems": count - }), - ); - required.push(serde_json::Value::String("category_autre".to_string())); - } - } - - serde_json::json!({ - "type": "object", - "properties": properties, - "required": required, - "additionalProperties": false - }) -} - -fn filter_empty_scraped_articles( - scraped: HashMap>, -) -> HashMap> { - scraped - .into_iter() - .map(|(cat_key, items)| { - let filtered: Vec = items - .into_iter() - .filter(|item| !item.scraped_content.trim().is_empty()) - .collect(); - (cat_key, filtered) - }) - .collect() -} - -fn dedup_by_url(parsed: Vec<(String, Vec)>) -> Vec<(String, Vec)> { - let mut seen: std::collections::HashSet = std::collections::HashSet::new(); - parsed - .into_iter() - .map(|(cat_key, items)| { - let deduped = items - .into_iter() - .filter(|item| { - let url = item.url.to_lowercase(); - seen.insert(url) - }) - .collect(); - (cat_key, deduped) - }) - .collect() -} - -/// Limit the number of articles from the same domain across all categories. -/// -/// Spreads articles across categories first (at most 1 per domain per category), -/// then fills remaining slots from dropped articles in encounter order. -fn limit_articles_per_source( - parsed: Vec<(String, Vec)>, - max_per_source: i32, -) -> Vec<(String, Vec)> { - let max = max_per_source as usize; - - // Pass 1: keep at most 1 article per domain per category - let mut kept: Vec<(String, Vec)> = Vec::new(); - let mut dropped: Vec<(usize, NewsItem)> = Vec::new(); // (category_index, item) - - for (cat_idx, (cat_key, items)) in parsed.into_iter().enumerate() { - let mut cat_kept = Vec::new(); - let mut seen_in_cat: std::collections::HashSet = std::collections::HashSet::new(); - - for item in items { - let domain = extract_domain(&item.url); - if let Some(ref d) = domain { - if seen_in_cat.contains(d) { - dropped.push((cat_idx, item)); - continue; - } - seen_in_cat.insert(d.clone()); - } - cat_kept.push(item); - } - - kept.push((cat_key, cat_kept)); - } - - // Cap enforcement: if any domain exceeds max after pass 1 (when categories > max), - // keep the first max articles in category order, drop the rest. - let mut cap_counts: std::collections::HashMap = std::collections::HashMap::new(); - for (_, items) in &mut kept { - items.retain(|item| { - let domain = extract_domain(&item.url); - match domain { - Some(ref d) => { - let count = cap_counts.entry(d.clone()).or_insert(0); - if *count >= max { - false - } else { - *count += 1; - true - } - } - None => true, // keep unparseable URLs - } - }); - } - - // Use cap_counts as the authoritative domain counts going forward - let mut domain_counts = cap_counts; - - // Pass 2: fill from dropped articles, back into their original category - for (cat_idx, item) in dropped { - if let Some(d) = extract_domain(&item.url) { - let count = domain_counts.get(&d).copied().unwrap_or(0); - if count < max { - *domain_counts.entry(d).or_insert(0) += 1; - kept[cat_idx].1.push(item); - } - } else { - // Unparseable URL — keep it - kept[cat_idx].1.push(item); - } - } - - kept -} - /// Extract the domain (host) from a URL, or None if unparseable. fn extract_domain(url: &str) -> Option { url::Url::parse(url) @@ -1382,216 +848,21 @@ fn parse_llm_output( Ok(result) } -/// Scrape articles in parallel with bounded concurrency. -/// -/// For each category, scrapes all article URLs. Failed scrapes are -/// handled gracefully — the article is kept with empty scraped content -/// rather than being discarded. -/// When `llm` is `Some`, uses LLM-assisted extraction with reduced concurrency. -async fn scrape_articles( - state: &AppState, - parsed: &[(String, Vec)], - max_age_days: i64, - tx: &watch::Sender, - llm: Option<(std::sync::Arc, String)>, -) -> HashMap> { - let mut result: HashMap> = HashMap::new(); - - // Collect all (category_key, item) pairs for parallel processing - let mut tasks = Vec::new(); - for (cat_key, items) in parsed { - for item in items { - tasks.push((cat_key.clone(), item.clone())); - } - } - - let total = tasks.len(); - if total == 0 { - return result; - } - - // Use JoinSet for bounded concurrency - let mut join_set = tokio::task::JoinSet::new(); - let mut pending = tasks.into_iter().peekable(); - let mut completed = 0usize; - - let max_concurrent = if llm.is_some() { 5 } else { 10 }; - - // Seed initial tasks - for _ in 0..max_concurrent { - if let Some((cat_key, item)) = pending.next() { - if let Some((ref provider, ref model)) = llm { - let provider = std::sync::Arc::clone(provider); - let model = model.clone(); - let client = state.http_client.clone(); - let url = item.url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; - (cat_key, item, (scraped_content, page_title, final_url)) - }); - } else { - let client = state.http_client.clone(); - let url = item.url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let scraped = scrape_single_article(&client, &url, mad).await; - (cat_key, item, scraped) - }); - } - } - } - - // Process results and spawn new tasks as slots open - while let Some(join_result) = join_set.join_next().await { - completed += 1; - - // Update progress (45% to 75% range for scraping) - let pct = 45 + ((completed as u32 * 30) / total as u32).min(30); - let progress_label = if llm.is_some() { - format!("Extraction IA des articles ({}/{})...", completed, total) - } else { - format!("Verification des sources ({}/{})...", completed, total) - }; - emit_progress(tx, "scraping", &progress_label, pct as u8); - - if let Ok((cat_key, item, (scraped_content, page_title, final_url))) = join_result { - let scraped_item = ScrapedNewsItem { - title: item.title, - url: final_url, - summary: item.summary, - original_title: page_title, - scraped_content, - source_url: None, - }; - - result - .entry(cat_key) - .or_default() - .push(scraped_item); - } - - // Spawn next task if available - if let Some((cat_key, item)) = pending.next() { - if let Some((ref provider, ref model)) = llm { - let provider = std::sync::Arc::clone(provider); - let model = model.clone(); - let client = state.http_client.clone(); - let url = item.url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; - (cat_key, item, (scraped_content, page_title, final_url)) - }); - } else { - let client = state.http_client.clone(); - let url = item.url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let scraped = scrape_single_article(&client, &url, mad).await; - (cat_key, item, scraped) - }); - } - } - } - - result -} - -/// Scrape a flat list of URLs and return ScrapedNewsItems. -/// -/// Used in Phase 1 where articles haven't been classified yet. -/// Reuses the same scraper infrastructure as `scrape_articles`. -/// When `llm` is `Some`, uses LLM-assisted extraction with reduced concurrency. -async fn scrape_flat_urls( - state: &AppState, - urls: &[String], - max_age_days: i64, - tx: &watch::Sender, - llm: Option<(std::sync::Arc, String)>, -) -> Vec { - let total = urls.len(); - if total == 0 { - return Vec::new(); - } - - let mut join_set = tokio::task::JoinSet::new(); - let mut pending = urls.iter().enumerate().peekable(); - let mut completed = 0usize; - let mut results = Vec::new(); - - let max_concurrent = if llm.is_some() { 5 } else { 10 }; - - // Seed initial tasks - for _ in 0..max_concurrent { - if let Some((_, url)) = pending.next() { - if let Some((ref provider, ref model)) = llm { - let provider = std::sync::Arc::clone(provider); - let model = model.clone(); - let client = state.http_client.clone(); - let url = url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; - (url, scraped_content, page_title, final_url) - }); - } else { - let client = state.http_client.clone(); - let url = url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article(&client, &url, mad).await; - (url, scraped_content, page_title, final_url) - }); - } - } - } - - while let Some(join_result) = join_set.join_next().await { - completed += 1; - let pct = 15 + ((completed as u32 * 15) / total as u32).min(15); - let progress_label = if llm.is_some() { - format!("Extraction IA des articles ({}/{})...", completed, total) - } else { - format!("Analyse des sources ({}/{})...", completed, total) - }; - emit_progress(tx, "scraping_sources", &progress_label, pct as u8); - - if let Ok((_original_url, scraped_content, page_title, final_url)) = join_result { - results.push(ScrapedNewsItem { - title: page_title.clone(), - url: final_url, // Use redirect-resolved URL - summary: String::new(), // No LLM summary yet - original_title: page_title, - scraped_content, - source_url: None, - }); - } - - if let Some((_, url)) = pending.next() { - if let Some((ref provider, ref model)) = llm { - let provider = std::sync::Arc::clone(provider); - let model = model.clone(); - let client = state.http_client.clone(); - let url = url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; - (url, scraped_content, page_title, final_url) - }); - } else { - let client = state.http_client.clone(); - let url = url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article(&client, &url, mad).await; - (url, scraped_content, page_title, final_url) - }); - } +/// Rotate the sources list so that the source after the last-used source comes first. +fn rotate_sources(sources: Vec, last_source_url: Option<&str>) -> Vec { + let Some(last_url) = last_source_url else { + return sources; + }; + let pos = sources.iter().position(|s| s.url == last_url); + match pos { + Some(idx) => { + let next = (idx + 1) % sources.len(); + let mut rotated = sources[next..].to_vec(); + rotated.extend_from_slice(&sources[..next]); + rotated } + None => sources, } - - results } /// Scrape a single article URL, returning (body_text, page_title, final_url) or empty strings on failure. @@ -1626,118 +897,6 @@ async fn scrape_single_article( } } -/// Scrape an article URL using LLM for content extraction. -/// -/// Falls back to heuristic extraction if the LLM call fails. -async fn scrape_single_article_with_llm( - http_client: &reqwest::Client, - url: &str, - max_age_days: i64, - provider: std::sync::Arc, - model: String, -) -> (String, String, String) { - let content = match scraper::scrape_url(http_client, url).await { - Ok(c) => c, - Err(e) => { - tracing::warn!(url = url, error = %e, "Failed to fetch URL for LLM extraction"); - return (String::new(), String::new(), url.to_string()); - } - }; - - let final_url = content.url.clone(); - - if !content.ok || content.is_soft_404 { - return (String::new(), String::new(), final_url); - } - - // TODO(Task 5): LLM article extraction removed; use heuristic fallback only. - // The provider and model parameters are kept for future use. - let _ = (provider, model); - if scraper::is_article_too_old(content.published_date, max_age_days) { - return (String::new(), String::new(), final_url); - } - let title = content.title.unwrap_or_default(); - (content.body_text, title, final_url) -} - -/// Build the final sections array from the LLM's rewrite output. -/// -/// Maps `category_N` keys back to the user's category names. -fn build_final_sections( - raw: &serde_json::Value, - categories: &[String], -) -> Result, AppError> { - let mut sections = Vec::new(); - - // User categories - for (i, cat_name) in categories.iter().enumerate() { - let key = format!("category_{}", i); - let items_val = raw.get(&key).cloned().unwrap_or(serde_json::json!([])); - let items: Vec = serde_json::from_value(items_val).unwrap_or_default(); - if !items.is_empty() { - sections.push(NewsSection { - title: cat_name.clone(), - items, - }); - } - } - - // "Autre" category (if present in LLM output) - if let Some(autre_val) = raw.get("category_autre") { - let items: Vec = serde_json::from_value(autre_val.clone()).unwrap_or_default(); - if !items.is_empty() { - sections.push(NewsSection { - title: "Autre".to_string(), - items, - }); - } - } - - Ok(sections) -} - -/// Restore validated URLs from scraped data into the final sections. -/// -/// The LLM rewrite pass may hallucinate different URLs despite being -/// instructed to preserve them. This function replaces each article's URL -/// with the original scraped URL by matching on position (category index, -/// item index within category). -fn restore_scraped_urls( - sections: &mut [NewsSection], - scraped: &std::collections::HashMap>, - categories: &[String], -) { - for section in sections.iter_mut() { - // Determine the category key for this section - let key = if section.title == "Autre" { - "category_autre".to_string() - } else { - // Find the index of this category in the user categories list - categories - .iter() - .position(|c| c == §ion.title) - .map(|i| format!("category_{}", i)) - .unwrap_or_default() - }; - - if let Some(scraped_items) = scraped.get(&key) { - for (j, item) in section.items.iter_mut().enumerate() { - if let Some(scraped_item) = scraped_items.get(j) { - if item.url != scraped_item.url { - tracing::debug!( - category = %section.title, - original = %scraped_item.url, - hallucinated = %item.url, - "Restored hallucinated URL to scraped original" - ); - item.url = scraped_item.url.clone(); - } - } - } - } - } -} - /// Sanitize error messages to prevent leaking sensitive information. /// /// Removes potential API keys, internal paths, and other sensitive data. @@ -1769,110 +928,6 @@ fn sanitize_error_message(msg: &str) -> String { } } -/// Parse the LLM classification response and assign articles to categories. -/// -/// Returns a HashMap of category_key → Vec. -/// Invalid indices are ignored. Unknown categories default to "Autre". -/// Case-insensitive category matching. -fn parse_classification_response( - response: &serde_json::Value, - articles: &[ScrapedNewsItem], - categories: &[String], - max_per_category: i32, - filled_counts: &mut HashMap, -) -> (HashMap>, Vec) { - let max = max_per_category as usize; - let mut result: HashMap> = HashMap::new(); - let mut overflow: Vec = Vec::new(); - - // Build category name → key mapping (case-insensitive) - // "Autre" always maps to "category_autre" - // User categories map to "category_0", "category_1", etc. - // The index skips "Autre" — only user categories get numeric keys - let mut name_to_key: HashMap = HashMap::new(); - let mut user_cat_idx = 0; - for cat in categories { - let key = if cat == "Autre" { - "category_autre".to_string() - } else { - let key = format!("category_{}", user_cat_idx); - user_cat_idx += 1; - key - }; - name_to_key.insert(cat.to_lowercase(), key); - } - - let assignments = response - .get("assignments") - .and_then(|a| a.as_array()) - .cloned() - .unwrap_or_default(); - - let mut assigned_indices = std::collections::HashSet::new(); - - for assignment in &assignments { - let index = match assignment.get("index").and_then(|i| i.as_u64()) { - Some(i) => i as usize, - None => continue, - }; - if index >= articles.len() || assigned_indices.contains(&index) { - continue; - } - - let cat_name = assignment - .get("category") - .and_then(|c| c.as_str()) - .unwrap_or("Autre") - .to_string(); - - let cat_key = name_to_key - .get(&cat_name.to_lowercase()) - .cloned() - .unwrap_or_else(|| "category_autre".to_string()); - - // Resolve the display name for counting - let cat_display = categories - .iter() - .find(|c| c.to_lowercase() == cat_name.to_lowercase()) - .cloned() - .unwrap_or_else(|| "Autre".to_string()); - - let filled = filled_counts.get(&cat_display).copied().unwrap_or(0); - if filled >= max { - // Category full — assign to Autre if Autre has room - let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0); - if autre_filled < max { - result.entry("category_autre".to_string()).or_default().push(articles[index].clone()); - *filled_counts.entry("Autre".to_string()).or_insert(0) += 1; - assigned_indices.insert(index); - } else { - overflow.push(articles[index].clone()); - assigned_indices.insert(index); - } - continue; - } - - result.entry(cat_key).or_default().push(articles[index].clone()); - *filled_counts.entry(cat_display).or_insert(0) += 1; - assigned_indices.insert(index); - } - - // Unclassified articles → Autre - for (i, article) in articles.iter().enumerate() { - if !assigned_indices.contains(&i) { - let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0); - if autre_filled < max { - result.entry("category_autre".to_string()).or_default().push(article.clone()); - *filled_counts.entry("Autre".to_string()).or_insert(0) += 1; - } else { - overflow.push(article.clone()); - } - } - } - - (result, overflow) -} - #[cfg(test)] mod tests { use super::*; @@ -2064,45 +1119,6 @@ mod tests { assert_eq!(result[1].1.len(), 0); // Missing category → empty } - // ── build_final_sections tests ─────────────────────────────── - - #[test] - fn build_final_sections_maps_names() { - let raw = serde_json::json!({ - "category_0": [ - {"title": "Art", "url": "https://a.com", "summary": "Sum"} - ], - "category_1": [] - }); - - let categories = vec!["Annonces majeures".into(), "Recherche".into()]; - let sections = build_final_sections(&raw, &categories).unwrap(); - - // Only 1 section — empty categories are omitted - assert_eq!(sections.len(), 1); - assert_eq!(sections[0].title, "Annonces majeures"); - assert_eq!(sections[0].items.len(), 1); - } - - #[test] - fn build_final_sections_includes_autre() { - let raw = serde_json::json!({ - "category_0": [ - {"title": "A", "url": "https://a.com", "summary": "s"} - ], - "category_autre": [ - {"title": "B", "url": "https://b.com", "summary": "s"} - ] - }); - - let categories = vec!["AI News".into()]; - let sections = build_final_sections(&raw, &categories).unwrap(); - - assert_eq!(sections.len(), 2); - assert_eq!(sections[0].title, "AI News"); - assert_eq!(sections[1].title, "Autre"); - } - // ── sanitize_error_message tests ───────────────────────────── #[test] @@ -2144,72 +1160,7 @@ mod tests { assert_eq!(sanitized, msg); } - // ── filter_homepage_urls tests ────────────────────────────── - - #[test] - fn test_homepage_url_filtered() { - let parsed = vec![( - "category_0".into(), - vec![ - NewsItem { - title: "Homepage".into(), - url: "https://example.com/".into(), - summary: "Sum".into(), - }, - NewsItem { - title: "Homepage no slash".into(), - url: "https://example.com".into(), - summary: "Sum".into(), - }, - NewsItem { - title: "Real article".into(), - url: "https://example.com/article/123".into(), - summary: "Sum".into(), - }, - ], - )]; - - let result = filter_homepage_urls(parsed); - assert_eq!(result[0].1.len(), 1); - assert_eq!(result[0].1[0].title, "Real article"); - } - - #[test] - fn test_article_url_not_filtered() { - let parsed = vec![( - "category_0".into(), - vec![ - NewsItem { - title: "Article 1".into(), - url: "https://example.com/news/article-1".into(), - summary: "Sum 1".into(), - }, - NewsItem { - title: "Article 2".into(), - url: "https://blog.example.org/2026/03/post".into(), - summary: "Sum 2".into(), - }, - ], - )]; - - let result = filter_homepage_urls(parsed); - assert_eq!(result[0].1.len(), 2); - } - - #[test] - fn test_homepage_filter_keeps_unparseable_urls() { - let parsed = vec![( - "category_0".into(), - vec![NewsItem { - title: "Bad URL".into(), - url: "not-a-url".into(), - summary: "Sum".into(), - }], - )]; - - let result = filter_homepage_urls(parsed); - assert_eq!(result[0].1.len(), 1); - } + // ── sanitize_json_null_bytes tests ────────────────────────── #[test] fn sanitize_null_bytes_in_json_strings() { @@ -2234,369 +1185,6 @@ mod tests { assert_eq!(sanitized, json); } - // ── dedup_by_url tests ─────────────────────────────────────── - - #[test] - fn dedup_removes_same_url_across_categories() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A".into(), url: "https://example.com/article-1".into(), summary: "s".into() }, - NewsItem { title: "B".into(), url: "https://example.com/article-2".into(), summary: "s".into() }, - ]), - ("category_1".into(), vec![ - NewsItem { title: "C".into(), url: "https://example.com/article-1".into(), summary: "s".into() }, - NewsItem { title: "D".into(), url: "https://other.com/article-3".into(), summary: "s".into() }, - ]), - ]; - - let result = dedup_by_url(parsed); - assert_eq!(result[0].1.len(), 2, "Category 0 keeps both (first seen)"); - assert_eq!(result[1].1.len(), 1, "Category 1 loses the duplicate"); - assert_eq!(result[1].1[0].url, "https://other.com/article-3"); - } - - #[test] - fn dedup_removes_same_url_within_category() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A".into(), url: "https://example.com/same".into(), summary: "s".into() }, - NewsItem { title: "B".into(), url: "https://example.com/same".into(), summary: "s".into() }, - NewsItem { title: "C".into(), url: "https://example.com/different".into(), summary: "s".into() }, - ]), - ]; - - let result = dedup_by_url(parsed); - assert_eq!(result[0].1.len(), 2); - } - - #[test] - fn dedup_case_insensitive() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A".into(), url: "https://Example.COM/path".into(), summary: "s".into() }, - ]), - ("category_1".into(), vec![ - NewsItem { title: "B".into(), url: "https://example.com/path".into(), summary: "s".into() }, - ]), - ]; - - let result = dedup_by_url(parsed); - assert_eq!(result[0].1.len(), 1, "Keeps first"); - assert_eq!(result[1].1.len(), 0, "Drops case-insensitive duplicate"); - } - - #[test] - fn dedup_no_duplicates_unchanged() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into() }, - NewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into() }, - ]), - ]; - - let result = dedup_by_url(parsed); - assert_eq!(result[0].1.len(), 2); - } - - // ── filter_empty_scraped_articles tests ───────────────────────── - - #[test] - fn filter_empty_removes_articles_with_no_content() { - use crate::models::synthesis::ScrapedNewsItem; - let mut scraped = HashMap::new(); - scraped.insert("category_0".to_string(), vec![ - ScrapedNewsItem { - title: "Good".into(), url: "https://a.com/1".into(), - summary: "s".into(), original_title: "t".into(), - scraped_content: "Real content here".into(), source_url: None, - }, - ScrapedNewsItem { - title: "Empty".into(), url: "https://b.com/2".into(), - summary: "s".into(), original_title: "t".into(), - scraped_content: "".into(), source_url: None, - }, - ScrapedNewsItem { - title: "Whitespace".into(), url: "https://c.com/3".into(), - summary: "s".into(), original_title: "t".into(), - scraped_content: " ".into(), source_url: None, - }, - ]); - - let result = filter_empty_scraped_articles(scraped); - assert_eq!(result["category_0"].len(), 1); - assert_eq!(result["category_0"][0].title, "Good"); - } - - #[test] - fn filter_empty_keeps_all_when_all_have_content() { - use crate::models::synthesis::ScrapedNewsItem; - let mut scraped = HashMap::new(); - scraped.insert("category_0".to_string(), vec![ - ScrapedNewsItem { - title: "A".into(), url: "https://a.com/1".into(), - summary: "s".into(), original_title: "t".into(), - scraped_content: "Content".into(), source_url: None, - }, - ]); - - let result = filter_empty_scraped_articles(scraped); - assert_eq!(result["category_0"].len(), 1); - } - - // ── restore_scraped_urls tests ─────────────────────────────── - - #[test] - fn restore_urls_replaces_hallucinated_urls() { - use crate::models::synthesis::{ScrapedNewsItem, NewsSection}; - let categories = vec!["Cat A".to_string()]; - let mut scraped = HashMap::new(); - scraped.insert("category_0".to_string(), vec![ - ScrapedNewsItem { - title: "T".into(), url: "https://real-source.com/article".into(), - summary: "s".into(), original_title: "t".into(), - scraped_content: "c".into(), source_url: None, - }, - ]); - - let mut sections = vec![ - NewsSection { - title: "Cat A".into(), - items: vec![NewsItem { - title: "Rewritten title".into(), - url: "https://wikipedia.org/hallucinated".into(), - summary: "Rewritten summary".into(), - }], - }, - ]; - - restore_scraped_urls(&mut sections, &scraped, &categories); - assert_eq!(sections[0].items[0].url, "https://real-source.com/article"); - // Title and summary are preserved from LLM rewrite - assert_eq!(sections[0].items[0].title, "Rewritten title"); - } - - #[test] - fn restore_urls_no_change_when_urls_match() { - use crate::models::synthesis::{ScrapedNewsItem, NewsSection}; - let categories = vec!["Cat A".to_string()]; - let mut scraped = HashMap::new(); - scraped.insert("category_0".to_string(), vec![ - ScrapedNewsItem { - title: "T".into(), url: "https://correct.com/article".into(), - summary: "s".into(), original_title: "t".into(), - scraped_content: "c".into(), source_url: None, - }, - ]); - - let mut sections = vec![ - NewsSection { - title: "Cat A".into(), - items: vec![NewsItem { - title: "T".into(), - url: "https://correct.com/article".into(), - summary: "s".into(), - }], - }, - ]; - - restore_scraped_urls(&mut sections, &scraped, &categories); - assert_eq!(sections[0].items[0].url, "https://correct.com/article"); - } - - // ── limit_articles_per_source tests ──────────────────────────── - - #[test] - fn source_limit_spreads_across_categories() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A1".into(), url: "https://openai.com/blog/a".into(), summary: "s".into() }, - NewsItem { title: "A2".into(), url: "https://openai.com/blog/b".into(), summary: "s".into() }, - NewsItem { title: "A3".into(), url: "https://openai.com/blog/c".into(), summary: "s".into() }, - NewsItem { title: "A4".into(), url: "https://techcrunch.com/x".into(), summary: "s".into() }, - ]), - ("category_1".into(), vec![ - NewsItem { title: "B1".into(), url: "https://openai.com/research/d".into(), summary: "s".into() }, - NewsItem { title: "B2".into(), url: "https://openai.com/research/e".into(), summary: "s".into() }, - NewsItem { title: "B3".into(), url: "https://theverge.com/y".into(), summary: "s".into() }, - ]), - ]; - - let result = limit_articles_per_source(parsed, 3); - - // Count openai.com articles across all categories - let openai_count: usize = result.iter() - .flat_map(|(_, items)| items) - .filter(|i| i.url.contains("openai.com")) - .count(); - assert_eq!(openai_count, 3, "Should keep exactly 3 openai.com articles"); - - // Both categories should have at least 1 openai article (spread) - let cat0_openai = result[0].1.iter().filter(|i| i.url.contains("openai.com")).count(); - let cat1_openai = result[1].1.iter().filter(|i| i.url.contains("openai.com")).count(); - assert!(cat0_openai >= 1, "Category 0 should have at least 1 openai article"); - assert!(cat1_openai >= 1, "Category 1 should have at least 1 openai article"); - - // techcrunch and theverge should be untouched - let tc_count: usize = result.iter().flat_map(|(_, items)| items).filter(|i| i.url.contains("techcrunch")).count(); - assert_eq!(tc_count, 1); - } - - #[test] - fn source_limit_all_different_domains() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into() }, - NewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into() }, - ]), - ]; - - let result = limit_articles_per_source(parsed, 3); - assert_eq!(result[0].1.len(), 2, "Nothing dropped when all domains are unique"); - } - - #[test] - fn source_limit_max_one() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "A".into(), url: "https://openai.com/a".into(), summary: "s".into() }, - NewsItem { title: "B".into(), url: "https://openai.com/b".into(), summary: "s".into() }, - ]), - ("category_1".into(), vec![ - NewsItem { title: "C".into(), url: "https://openai.com/c".into(), summary: "s".into() }, - ]), - ]; - - let result = limit_articles_per_source(parsed, 1); - let total: usize = result.iter().flat_map(|(_, items)| items).filter(|i| i.url.contains("openai.com")).count(); - assert_eq!(total, 1, "max=1 should keep exactly 1 openai article"); - } - - #[test] - fn source_limit_more_categories_than_max() { - // 5 categories, each with 1 openai article, max=2 - let parsed: Vec<(String, Vec)> = (0..5) - .map(|i| ( - format!("category_{}", i), - vec![NewsItem { - title: format!("Art{}", i), - url: format!("https://openai.com/{}", i), - summary: "s".into(), - }], - )) - .collect(); - - let result = limit_articles_per_source(parsed, 2); - let total: usize = result.iter().flat_map(|(_, items)| items).count(); - assert_eq!(total, 2, "Should cap at max_per_source even with more categories"); - } - - #[test] - fn source_limit_empty_input() { - let result = limit_articles_per_source(vec![], 3); - assert!(result.is_empty()); - } - - #[test] - fn source_limit_unparseable_urls_kept() { - let parsed = vec![ - ("category_0".into(), vec![ - NewsItem { title: "Good".into(), url: "https://openai.com/a".into(), summary: "s".into() }, - NewsItem { title: "Bad".into(), url: "not-a-url".into(), summary: "s".into() }, - ]), - ]; - - let result = limit_articles_per_source(parsed, 3); - assert_eq!(result[0].1.len(), 2, "Unparseable URLs should be kept"); - } - - // ── parse_classification_response tests ───────────────────── - - #[test] - fn classification_assigns_articles_to_categories() { - use crate::models::synthesis::ScrapedNewsItem; - let articles = vec![ - ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None }, - ScrapedNewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None }, - ]; - let categories = vec!["AI News".to_string(), "Autre".to_string()]; - let response = serde_json::json!({ - "assignments": [ - {"index": 0, "category": "AI News"}, - {"index": 1, "category": "Autre"} - ] - }); - let mut filled = HashMap::new(); - let (result, _overflow) = parse_classification_response(&response, &articles, &categories, 4, &mut filled); - assert_eq!(result.get("category_0").map(|v| v.len()), Some(1)); - assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1)); - } - - #[test] - fn classification_unknown_category_goes_to_autre() { - use crate::models::synthesis::ScrapedNewsItem; - let articles = vec![ - ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None }, - ]; - let categories = vec!["AI News".to_string(), "Autre".to_string()]; - let response = serde_json::json!({ - "assignments": [{"index": 0, "category": "Unknown Category"}] - }); - let mut filled = HashMap::new(); - let (result, _overflow) = parse_classification_response(&response, &articles, &categories, 4, &mut filled); - assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1)); - } - - #[test] - fn classification_respects_max_per_category() { - use crate::models::synthesis::ScrapedNewsItem; - let articles: Vec = (0..5).map(|i| ScrapedNewsItem { - title: format!("Art{}", i), url: format!("https://a.com/{}", i), - summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None, - }).collect(); - let categories = vec!["AI News".to_string(), "Autre".to_string()]; - let response = serde_json::json!({ - "assignments": (0..5).map(|i| serde_json::json!({"index": i, "category": "AI News"})).collect::>() - }); - let mut filled = HashMap::new(); - let (result, overflow) = parse_classification_response(&response, &articles, &categories, 2, &mut filled); - assert_eq!(result.get("category_0").map(|v| v.len()), Some(2)); - assert_eq!(result.get("category_autre").map(|v| v.len()), Some(2)); - // Article at index 4 couldn't fit in AI News (capped at 2) or Autre (capped at 2) - assert_eq!(overflow.len(), 1); - assert_eq!(overflow[0].title, "Art4"); - } - - #[test] - fn classification_invalid_index_ignored() { - use crate::models::synthesis::ScrapedNewsItem; - let articles = vec![ - ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None }, - ]; - let categories = vec!["AI News".to_string(), "Autre".to_string()]; - let response = serde_json::json!({ - "assignments": [{"index": 99, "category": "AI News"}] - }); - let mut filled = HashMap::new(); - let (result, _overflow) = parse_classification_response(&response, &articles, &categories, 4, &mut filled); - // Index 99 is invalid → article 0 is unclassified → goes to Autre - assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1)); - } - - #[test] - fn classification_case_insensitive() { - use crate::models::synthesis::ScrapedNewsItem; - let articles = vec![ - ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None }, - ]; - let categories = vec!["AI News".to_string(), "Autre".to_string()]; - let response = serde_json::json!({ - "assignments": [{"index": 0, "category": "ai news"}] - }); - let mut filled = HashMap::new(); - let (result, _overflow) = parse_classification_response(&response, &articles, &categories, 4, &mut filled); - assert_eq!(result.get("category_0").map(|v| v.len()), Some(1)); - } - // ── normalize_article_url tests ───────────────────────────── #[test] @@ -2673,41 +1261,38 @@ mod tests { assert_ne!(h1, h2); } - // ── fill-up calculation tests ─────────────────────────────── + // ── rotate_sources tests ────────────────────────────────── #[test] - fn fillup_target_calculation() { - // 4 categories x 4 items = 16 max, 75% = 12 - let max = 4 * 4; - let target = (0.75_f64 * max as f64).ceil() as usize; - assert_eq!(target, 12); + fn rotate_sources_no_last_url() { + let sources = vec![ + crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), created_at: chrono::Utc::now() }, + crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "B".into(), url: "https://b.com".into(), created_at: chrono::Utc::now() }, + ]; + let result = rotate_sources(sources.clone(), None); + assert_eq!(result.len(), 2); + assert_eq!(result[0].url, "https://a.com"); } #[test] - fn fillup_shortfall_saturating() { - let target: usize = 12; - let total: usize = 15; - let shortfall = target.saturating_sub(total); - assert_eq!(shortfall, 0); + fn rotate_sources_with_last_url() { + let sources = vec![ + crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), created_at: chrono::Utc::now() }, + crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "B".into(), url: "https://b.com".into(), created_at: chrono::Utc::now() }, + crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "C".into(), url: "https://c.com".into(), created_at: chrono::Utc::now() }, + ]; + let result = rotate_sources(sources, Some("https://a.com")); + assert_eq!(result[0].url, "https://b.com"); + assert_eq!(result[1].url, "https://c.com"); + assert_eq!(result[2].url, "https://a.com"); } #[test] - fn classification_overflow_collected_when_all_full() { - use crate::models::synthesis::ScrapedNewsItem; - let articles: Vec = (0..6).map(|i| ScrapedNewsItem { - title: format!("Art{}", i), url: format!("https://a.com/{}", i), - summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None, - }).collect(); - let categories = vec!["AI News".to_string(), "Autre".to_string()]; - let response = serde_json::json!({ - "assignments": (0..6).map(|i| serde_json::json!({"index": i, "category": "AI News"})).collect::>() - }); - let mut filled = HashMap::new(); - let (result, overflow) = parse_classification_response(&response, &articles, &categories, 2, &mut filled); - - // AI News capped at 2, Autre gets 2, remaining 2 go to overflow - assert_eq!(result.get("category_0").map(|v| v.len()), Some(2)); - assert_eq!(result.get("category_autre").map(|v| v.len()), Some(2)); - assert_eq!(overflow.len(), 2, "2 articles should overflow when both categories are full"); + fn rotate_sources_last_url_not_found() { + let sources = vec![ + crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), created_at: chrono::Utc::now() }, + ]; + let result = rotate_sources(sources.clone(), Some("https://notfound.com")); + assert_eq!(result[0].url, "https://a.com"); } }