From 53ecce84b0cc05743caf21b7d7c537c0401fe5ee Mon Sep 17 00:00:00 2001 From: oabrivard Date: Tue, 24 Mar 2026 01:55:58 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20two-phase=20generation=20pipeline=20?= =?UTF-8?q?=E2=80=94=20personalized=20sources=20first,=20web=20search=20fa?= =?UTF-8?q?llback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/services/synthesis.rs | 387 +++++++++++++++++++++++------- 1 file changed, 302 insertions(+), 85 deletions(-) diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 21dcdba..98ed162 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -29,9 +29,10 @@ use crate::models::synthesis::{ }; use crate::services::encryption; use crate::services::llm::factory::create_provider; -use crate::services::llm::schema::build_category_schema; -use crate::services::prompts; +use crate::services::llm::schema::{build_category_schema, build_classification_schema}; +use crate::services::prompts::{self, build_classification_prompt}; use crate::services::scraper; +use crate::services::source_scraper; // ─────────────────────────────────────────────────────────────────── // Progress Events @@ -269,15 +270,11 @@ async fn run_generation_inner( let sources = db::sources::list_for_user(&state.pool, user_id).await?; // Step 3: Resolve provider + decrypt API key - emit_progress(tx, "provider", "Configuration du fournisseur IA...", 15); + emit_progress(tx, "provider", "Configuration du fournisseur IA...", 12); let (provider_name, api_key) = resolve_provider_and_key(state, user_id, &settings).await?; - let provider = create_provider(&provider_name, api_key)?; - // Step 4: Build schema from categories - let schema = build_category_schema(&settings.categories, settings.max_items_per_category); - - // Step 4b: Resolve models — user overrides take priority over admin config + // Step 4: Resolve models let model_research = if !settings.ai_model.is_empty() { settings.ai_model.clone() } else { @@ -289,93 +286,318 @@ async fn run_generation_inner( model_research.clone() }; - // Look up or create per-user rate limiter from AppState so limits persist across jobs. let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id); - // Step 5: Rate limit check (pass 1) - check_rate_limit(state, &user_rate_limiter, &provider_name)?; + // Build categories list with "Autre" appended for classification + let mut classification_categories = settings.categories.clone(); + classification_categories.push("Autre".to_string()); + + // Track how many articles fill each category across both phases + let mut filled_counts: HashMap = HashMap::new(); + // Combined scraped articles keyed by category + let mut all_scraped: HashMap> = HashMap::new(); + // Track all URLs seen (for cross-phase dedup) + let mut seen_urls: std::collections::HashSet = std::collections::HashSet::new(); + + // ═══════════════════════════════════════════════════════════════ + // PHASE 1: Personalized Sources (scrape-based, no LLM for discovery) + // ═══════════════════════════════════════════════════════════════ + if !sources.is_empty() { + emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15); + let max_sources = sources.len().min(10); // Cap at 10 sources + let max_links_per_source = (2 * settings.max_articles_per_source) as usize; + + // 1a. Extract article links from each source page + let mut candidate_urls: Vec = Vec::new(); + for source in sources.iter().take(max_sources) { + match source_scraper::extract_article_links( + &state.http_client, + &source.url, + max_links_per_source, + ) + .await + { + Ok(links) => { + tracing::info!( + source = %source.title, + url = %source.url, + links_found = links.len(), + "Extracted article links from source" + ); + candidate_urls.extend(links); + } + Err(e) => { + tracing::warn!( + source = %source.title, + url = %source.url, + error = %e, + "Failed to extract links from source, skipping" + ); + } + } + } - // Step 6: LLM search pass - emit_progress(tx, "search", "Recherche d'actualites en cours...", 30); - let current_date = Utc::now() - .format("%A %d %B %Y") - .to_string(); - // Step 5b: Load recently-used domains for source diversity - let recent_domains = if settings.source_diversity_window > 0 { - let recent = db::syntheses::list_for_user( - &state.pool, - user_id, - settings.source_diversity_window as i64, - 0, - ) - .await - .unwrap_or_default(); + // Deduplicate candidate URLs + let mut seen = std::collections::HashSet::new(); + candidate_urls.retain(|url| seen.insert(url.to_lowercase())); + + if !candidate_urls.is_empty() { + // 1b. Scrape candidate articles + let scraped_articles = scrape_flat_urls( + state, + &candidate_urls, + settings.max_age_days as i64, + tx, + ) + .await; + + // 1c. Filter empty content + let valid_articles: Vec = scraped_articles + .into_iter() + .filter(|a| !a.scraped_content.trim().is_empty()) + .collect(); - let mut domains: Vec = recent - .iter() - .filter_map(|s| { - serde_json::from_value::>( - s.sections.clone(), - ) - .ok() - }) - .flat_map(|sections| { - sections - .into_iter() - .flat_map(|sec| sec.items.into_iter()) - .filter_map(|item| extract_domain(&item.url)) - }) - .collect(); + tracing::info!( + valid_count = valid_articles.len(), + "Phase 1: valid articles from personalized sources" + ); - domains.sort(); - domains.dedup(); - domains - } else { - Vec::new() - }; + if !valid_articles.is_empty() { + // 1d. LLM classification call + emit_progress(tx, "classifying", "Classification des articles...", 35); + check_rate_limit(state, &user_rate_limiter, &provider_name)?; + + let (class_system, class_user) = build_classification_prompt( + &valid_articles, + &classification_categories, + settings.max_items_per_category, + &filled_counts, + ); + let class_schema = build_classification_schema(); + + let class_response = provider + .generate_rewrite_pass( + &model_research, + &class_system, + &class_user, + &class_schema, + ) + .await?; + + // 1e. Parse classification and fill categories + let phase1_classified = parse_classification_response( + &class_response, + &valid_articles, + &classification_categories, + settings.max_items_per_category, + &mut filled_counts, + ); + + // Merge into all_scraped and track URLs + for (cat_key, items) in phase1_classified { + for item in &items { + seen_urls.insert(item.url.to_lowercase()); + } + all_scraped.entry(cat_key).or_default().extend(items); + } - let (system_prompt, user_prompt) = - prompts::build_search_prompt(&settings, &sources, ¤t_date, &recent_domains, None); + // 1f. Enforce max_articles_per_source across all categories + // (reuse domain counting logic) + let max_per_source = settings.max_articles_per_source as usize; + let mut domain_counts: HashMap = HashMap::new(); + for (_, items) in &mut all_scraped { + items.retain(|item| { + if let Some(domain) = extract_domain(&item.url) { + let count = domain_counts.entry(domain).or_insert(0); + if *count >= max_per_source { + false + } else { + *count += 1; + true + } + } else { + true + } + }); + } - let raw_results = provider - .generate_search_pass(&model_research, &system_prompt, &user_prompt, &schema) - .await?; + // Recount filled_counts after trimming + filled_counts.clear(); + for (cat_key, items) in &all_scraped { + let cat_name = if cat_key == "category_autre" { + "Autre".to_string() + } else { + let idx: usize = cat_key + .strip_prefix("category_") + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + settings.categories.get(idx).cloned().unwrap_or_default() + }; + *filled_counts.entry(cat_name).or_insert(0) += items.len(); + } + } + } + } - // Step 7: Parse structured output into (category_key, Vec) - emit_progress(tx, "parsing", "Analyse des resultats...", 40); - let parsed = parse_llm_output(&raw_results, &settings.categories)?; + // ═══════════════════════════════════════════════════════════════ + // PHASE 2: Web Search Fallback (LLM-based) + // Only runs if any user-defined category is under-filled + // ═══════════════════════════════════════════════════════════════ + let category_gaps: Vec<(String, i32)> = settings + .categories + .iter() + .filter_map(|cat| { + let filled = filled_counts.get(cat).copied().unwrap_or(0); + let needed = settings.max_items_per_category as usize - filled.min(settings.max_items_per_category as usize); + if needed > 0 { + Some((cat.clone(), needed as i32)) + } else { + None + } + }) + .collect(); - // Step 7b: Filter out homepage URLs (path == "/" or empty) - let parsed = filter_homepage_urls(parsed); + if !category_gaps.is_empty() { + emit_progress(tx, "search", "Recherche d'actualites complementaires...", 45); - // Step 7c: Deduplicate articles with the same URL across categories - let parsed = dedup_by_url(parsed); + // Rate limit check before search pass + check_rate_limit(state, &user_rate_limiter, &provider_name)?; - // Step 7d: Limit articles per source for diversity - let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source); + // Load recently-used domains for diversity (Phase 2 only) + let recent_domains = if settings.source_diversity_window > 0 { + let recent = db::syntheses::list_for_user( + &state.pool, + user_id, + settings.source_diversity_window as i64, + 0, + ) + .await + .unwrap_or_default(); - // Step 8: Scrape + rewrite pass - // - // Always run the full pipeline: the search pass URLs can be hallucinated - // by the LLM (Wikipedia, corporate sites instead of actual articles). - // The scrape pass fetches each URL and validates the content exists, - // then the rewrite pass produces summaries based on actual article content. - emit_progress(tx, "scraping", "Verification des sources...", 45); - let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await; + let mut domains: Vec = recent + .iter() + .filter_map(|s| { + serde_json::from_value::>( + s.sections.clone(), + ) + .ok() + }) + .flat_map(|sections| { + sections + .into_iter() + .flat_map(|sec| sec.items.into_iter()) + .filter_map(|item| extract_domain(&item.url)) + }) + .collect(); - // Remove articles with empty scraped content (too old, soft 404, scrape failure). - // These would produce empty/low-quality output in the rewrite pass. - let scraped = filter_empty_scraped_articles(scraped); + domains.sort(); + domains.dedup(); + domains + } else { + Vec::new() + }; - // Rate limit check (pass 2) - check_rate_limit(state, &user_rate_limiter, &provider_name)?; + // Build search schema for gap categories + let search_schema = build_category_schema(&settings.categories, settings.max_items_per_category); + + let current_date = Utc::now().format("%A %d %B %Y").to_string(); + let (system_prompt, user_prompt) = prompts::build_search_prompt( + &settings, + &sources, + ¤t_date, + &recent_domains, + Some(&category_gaps), + ); + + let raw_results = provider + .generate_search_pass(&model_research, &system_prompt, &user_prompt, &search_schema) + .await?; + + // Parse + filter + emit_progress(tx, "parsing", "Analyse des resultats...", 55); + let parsed = parse_llm_output(&raw_results, &settings.categories)?; + let parsed = filter_homepage_urls(parsed); + + // Cross-phase dedup: remove URLs already found in Phase 1 + let parsed: Vec<(String, Vec)> = parsed + .into_iter() + .map(|(cat_key, items)| { + let deduped: Vec = items + .into_iter() + .filter(|item| !seen_urls.contains(&item.url.to_lowercase())) + .collect(); + (cat_key, deduped) + }) + .collect(); + + let parsed = dedup_by_url(parsed); + let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source); + + // Scrape web search results + emit_progress(tx, "scraping", "Verification des sources web...", 60); + let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await; + let scraped = filter_empty_scraped_articles(scraped); + + // Flatten scraped articles for classification + let phase2_articles: Vec = scraped + .into_values() + .flat_map(|items| items.into_iter()) + .collect(); + + if !phase2_articles.is_empty() { + // LLM classification for Phase 2 articles + emit_progress(tx, "classifying", "Classification des resultats web...", 70); + check_rate_limit(state, &user_rate_limiter, &provider_name)?; + + let (class_system, class_user) = build_classification_prompt( + &phase2_articles, + &classification_categories, + settings.max_items_per_category, + &filled_counts, + ); + let class_schema = build_classification_schema(); + + let class_response = provider + .generate_rewrite_pass( + &model_research, + &class_system, + &class_user, + &class_schema, + ) + .await?; + + let phase2_classified = parse_classification_response( + &class_response, + &phase2_articles, + &classification_categories, + settings.max_items_per_category, + &mut filled_counts, + ); + + // Merge Phase 2 into all_scraped + for (cat_key, items) in phase2_classified { + for item in &items { + seen_urls.insert(item.url.to_lowercase()); + } + all_scraped.entry(cat_key).or_default().extend(items); + } + } + } + + // ═══════════════════════════════════════════════════════════════ + // COMBINED REWRITE PASS + // ═══════════════════════════════════════════════════════════════ + if all_scraped.values().all(|items| items.is_empty()) { + return Err(AppError::BadRequest( + "Aucun article valide trouve. Verifiez vos sources et categories.".into(), + )); + } - // LLM rewrite pass — use a schema that matches the actual scraped item counts - // (which may be less than max_items_per_category after filtering empty content) emit_progress(tx, "rewrite", "Redaction des resumes...", 80); - let (rewrite_system, rewrite_user) = prompts::build_rewrite_prompt(&scraped); + check_rate_limit(state, &user_rate_limiter, &provider_name)?; - let rewrite_schema = build_rewrite_schema(&scraped, &settings.categories); + let (rewrite_system, rewrite_user) = prompts::build_rewrite_prompt(&all_scraped); + let rewrite_schema = build_rewrite_schema(&all_scraped, &settings.categories); let final_results = provider .generate_rewrite_pass(&model_writing, &rewrite_system, &rewrite_user, &rewrite_schema) @@ -384,19 +606,14 @@ async fn run_generation_inner( emit_progress(tx, "finalizing", "Finalisation...", 90); let mut final_sections = build_final_sections(&final_results, &settings.categories)?; - // Restore validated URLs from scraped data — the LLM rewrite pass may - // hallucinate different URLs despite being told to preserve them. - restore_scraped_urls(&mut final_sections, &scraped, &settings.categories); + restore_scraped_urls(&mut final_sections, &all_scraped, &settings.categories); - // Step 12: Save synthesis to DB + // Save synthesis to DB emit_progress(tx, "saving", "Sauvegarde de la synthese...", 95); let week = get_iso_week_string(Utc::now().date_naive()); let sections_json = serde_json::to_value(&final_sections).map_err(|e| { AppError::Internal(anyhow::anyhow!("Failed to serialize sections: {}", e)) })?; - - // Strip \u0000 null bytes — LLM output occasionally contains them and - // PostgreSQL rejects them in JSONB columns. let sections_json = sanitize_json_null_bytes(sections_json); let synthesis =