From 8a061c98db985a431668581bdc028e5ec5b74fe5 Mon Sep 17 00:00:00 2001 From: oabrivard Date: Tue, 24 Mar 2026 10:49:10 +0100 Subject: [PATCH] feat: LLM-assisted article extraction with Arc provider, concurrency control, and progress Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/services/synthesis.rs | 235 +++++++++++++++++++++++------- 1 file changed, 185 insertions(+), 50 deletions(-) diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index d1154ca..a0c6992 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -288,6 +288,12 @@ async fn run_generation_inner( let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id); + let llm_for_scraping: Option<(std::sync::Arc, String)> = if settings.use_llm_for_article_extraction { + Some((std::sync::Arc::clone(&provider), model_research.clone())) + } else { + None + }; + // Build categories list with "Autre" appended for classification let mut classification_categories = settings.categories.clone(); classification_categories.push("Autre".to_string()); @@ -310,13 +316,24 @@ async fn run_generation_inner( // 1a. Extract article links from each source page let mut candidate_urls: Vec = Vec::new(); for source in sources.iter().take(max_sources) { - match source_scraper::extract_article_links( - &state.http_client, - &source.url, - max_links_per_source, - ) - .await - { + let links = if settings.use_llm_for_source_links { + source_scraper::extract_article_links_with_llm( + &state.http_client, + &source.url, + max_links_per_source, + &provider, + &model_research, + ) + .await + } else { + source_scraper::extract_article_links( + &state.http_client, + &source.url, + max_links_per_source, + ) + .await + }; + match links { Ok(links) => { tracing::info!( source = %source.title, @@ -348,6 +365,7 @@ async fn run_generation_inner( &candidate_urls, settings.max_age_days as i64, tx, + llm_for_scraping.clone(), ) .await; @@ -535,7 +553,7 @@ async fn run_generation_inner( // Scrape web search results emit_progress(tx, "scraping", "Verification des sources web...", 60); - let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await; + let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx, llm_for_scraping.clone()).await; let scraped = filter_empty_scraped_articles(scraped); // Flatten scraped articles for classification @@ -1061,11 +1079,13 @@ fn parse_llm_output( /// For each category, scrapes all article URLs. Failed scrapes are /// handled gracefully — the article is kept with empty scraped content /// rather than being discarded. +/// When `llm` is `Some`, uses LLM-assisted extraction with reduced concurrency. async fn scrape_articles( state: &AppState, parsed: &[(String, Vec)], max_age_days: i64, tx: &watch::Sender, + llm: Option<(std::sync::Arc, String)>, ) -> HashMap> { let mut result: HashMap> = HashMap::new(); @@ -1082,27 +1102,35 @@ async fn scrape_articles( return result; } - // Use JoinSet for bounded concurrency (max 10 concurrent scrapes) + // Use JoinSet for bounded concurrency let mut join_set = tokio::task::JoinSet::new(); let mut pending = tasks.into_iter().peekable(); let mut completed = 0usize; - let spawn_task = - |join_set: &mut tokio::task::JoinSet<_>, cat_key: String, item: NewsItem| { - let client = state.http_client.clone(); - let url = item.url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let scraped = scrape_single_article(&client, &url, mad).await; - (cat_key, item, scraped) - }); - }; + let max_concurrent = if llm.is_some() { 5 } else { 10 }; - // Seed the JoinSet with up to 10 initial tasks - let max_concurrent = 10; + // Seed initial tasks for _ in 0..max_concurrent { if let Some((cat_key, item)) = pending.next() { - spawn_task(&mut join_set, cat_key, item); + if let Some((ref provider, ref model)) = llm { + let provider = std::sync::Arc::clone(provider); + let model = model.clone(); + let client = state.http_client.clone(); + let url = item.url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; + (cat_key, item, (scraped_content, page_title, final_url)) + }); + } else { + let client = state.http_client.clone(); + let url = item.url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let scraped = scrape_single_article(&client, &url, mad).await; + (cat_key, item, scraped) + }); + } } } @@ -1112,12 +1140,12 @@ async fn scrape_articles( // Update progress (45% to 75% range for scraping) let pct = 45 + ((completed as u32 * 30) / total as u32).min(30); - emit_progress( - tx, - "scraping", - &format!("Verification des sources ({}/{})...", completed, total), - pct as u8, - ); + let progress_label = if llm.is_some() { + format!("Extraction IA des articles ({}/{})...", completed, total) + } else { + format!("Verification des sources ({}/{})...", completed, total) + }; + emit_progress(tx, "scraping", &progress_label, pct as u8); if let Ok((cat_key, item, (scraped_content, page_title, final_url))) = join_result { let scraped_item = ScrapedNewsItem { @@ -1136,7 +1164,25 @@ async fn scrape_articles( // Spawn next task if available if let Some((cat_key, item)) = pending.next() { - spawn_task(&mut join_set, cat_key, item); + if let Some((ref provider, ref model)) = llm { + let provider = std::sync::Arc::clone(provider); + let model = model.clone(); + let client = state.http_client.clone(); + let url = item.url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; + (cat_key, item, (scraped_content, page_title, final_url)) + }); + } else { + let client = state.http_client.clone(); + let url = item.url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let scraped = scrape_single_article(&client, &url, mad).await; + (cat_key, item, scraped) + }); + } } } @@ -1147,11 +1193,13 @@ async fn scrape_articles( /// /// Used in Phase 1 where articles haven't been classified yet. /// Reuses the same scraper infrastructure as `scrape_articles`. +/// When `llm` is `Some`, uses LLM-assisted extraction with reduced concurrency. async fn scrape_flat_urls( state: &AppState, urls: &[String], max_age_days: i64, tx: &watch::Sender, + llm: Option<(std::sync::Arc, String)>, ) -> Vec { let total = urls.len(); if total == 0 { @@ -1163,30 +1211,42 @@ async fn scrape_flat_urls( let mut completed = 0usize; let mut results = Vec::new(); - let max_concurrent = 10; + let max_concurrent = if llm.is_some() { 5 } else { 10 }; // Seed initial tasks for _ in 0..max_concurrent { if let Some((_, url)) = pending.next() { - let client = state.http_client.clone(); - let url = url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article(&client, &url, mad).await; - (url, scraped_content, page_title, final_url) - }); + if let Some((ref provider, ref model)) = llm { + let provider = std::sync::Arc::clone(provider); + let model = model.clone(); + let client = state.http_client.clone(); + let url = url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; + (url, scraped_content, page_title, final_url) + }); + } else { + let client = state.http_client.clone(); + let url = url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let (scraped_content, page_title, final_url) = scrape_single_article(&client, &url, mad).await; + (url, scraped_content, page_title, final_url) + }); + } } } while let Some(join_result) = join_set.join_next().await { completed += 1; let pct = 15 + ((completed as u32 * 15) / total as u32).min(15); - emit_progress( - tx, - "scraping_sources", - &format!("Analyse des sources ({}/{})...", completed, total), - pct as u8, - ); + let progress_label = if llm.is_some() { + format!("Extraction IA des articles ({}/{})...", completed, total) + } else { + format!("Analyse des sources ({}/{})...", completed, total) + }; + emit_progress(tx, "scraping_sources", &progress_label, pct as u8); if let Ok((_original_url, scraped_content, page_title, final_url)) = join_result { results.push(ScrapedNewsItem { @@ -1199,13 +1259,25 @@ async fn scrape_flat_urls( } if let Some((_, url)) = pending.next() { - let client = state.http_client.clone(); - let url = url.clone(); - let mad = max_age_days; - join_set.spawn(async move { - let (scraped_content, page_title, final_url) = scrape_single_article(&client, &url, mad).await; - (url, scraped_content, page_title, final_url) - }); + if let Some((ref provider, ref model)) = llm { + let provider = std::sync::Arc::clone(provider); + let model = model.clone(); + let client = state.http_client.clone(); + let url = url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await; + (url, scraped_content, page_title, final_url) + }); + } else { + let client = state.http_client.clone(); + let url = url.clone(); + let mad = max_age_days; + join_set.spawn(async move { + let (scraped_content, page_title, final_url) = scrape_single_article(&client, &url, mad).await; + (url, scraped_content, page_title, final_url) + }); + } } } @@ -1244,6 +1316,69 @@ async fn scrape_single_article( } } +/// Scrape an article URL using LLM for content extraction. +/// +/// Falls back to heuristic extraction if the LLM call fails. +async fn scrape_single_article_with_llm( + http_client: &reqwest::Client, + url: &str, + max_age_days: i64, + provider: std::sync::Arc, + model: String, +) -> (String, String, String) { + let content = match scraper::scrape_url(http_client, url).await { + Ok(c) => c, + Err(e) => { + tracing::warn!(url = url, error = %e, "Failed to fetch URL for LLM extraction"); + return (String::new(), String::new(), url.to_string()); + } + }; + + let final_url = content.url.clone(); + + if !content.ok || content.is_soft_404 { + return (String::new(), String::new(), final_url); + } + + let (system, user) = crate::services::prompts::build_article_extraction_prompt( + &content.head_html, + &content.body_text, + ); + let schema = crate::services::llm::schema::build_article_extraction_schema(); + + match provider.generate_rewrite_pass(&model, &system, &user, &schema).await { + Ok(response) => { + let title = response.get("title").and_then(|t| t.as_str()).unwrap_or("").to_string(); + let body = response.get("body_text").and_then(|b| b.as_str()).unwrap_or("").to_string(); + let is_error = response.get("is_error_page").and_then(|e| e.as_bool()).unwrap_or(false); + let date_str = response.get("published_date").and_then(|d| d.as_str()).unwrap_or(""); + + if is_error || body.trim().is_empty() { + return (String::new(), String::new(), final_url); + } + + if !date_str.is_empty() { + if let Ok(date) = chrono::DateTime::parse_from_rfc3339(date_str) { + if scraper::is_article_too_old(Some(date.with_timezone(&chrono::Utc)), max_age_days) { + tracing::warn!(url = url, "LLM-extracted article too old"); + return (String::new(), String::new(), final_url); + } + } + } + + (body, title, final_url) + } + Err(e) => { + tracing::warn!(url = url, error = %e, "LLM extraction failed, using heuristic fallback"); + if scraper::is_article_too_old(content.published_date, max_age_days) { + return (String::new(), String::new(), final_url); + } + let title = content.title.unwrap_or_default(); + (content.body_text, title, final_url) + } + } +} + /// Build the final sections array from the LLM's rewrite output. /// /// Maps `category_N` keys back to the user's category names.