feat: LLM-assisted article extraction with Arc provider, concurrency control, and progress

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 357f06e405
commit 8a061c98db

@ -288,6 +288,12 @@ async fn run_generation_inner(
let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id); let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id);
let llm_for_scraping: Option<(std::sync::Arc<dyn crate::services::llm::LlmProvider>, String)> = if settings.use_llm_for_article_extraction {
Some((std::sync::Arc::clone(&provider), model_research.clone()))
} else {
None
};
// Build categories list with "Autre" appended for classification // Build categories list with "Autre" appended for classification
let mut classification_categories = settings.categories.clone(); let mut classification_categories = settings.categories.clone();
classification_categories.push("Autre".to_string()); classification_categories.push("Autre".to_string());
@ -310,13 +316,24 @@ async fn run_generation_inner(
// 1a. Extract article links from each source page // 1a. Extract article links from each source page
let mut candidate_urls: Vec<String> = Vec::new(); let mut candidate_urls: Vec<String> = Vec::new();
for source in sources.iter().take(max_sources) { for source in sources.iter().take(max_sources) {
match source_scraper::extract_article_links( let links = if settings.use_llm_for_source_links {
source_scraper::extract_article_links_with_llm(
&state.http_client, &state.http_client,
&source.url, &source.url,
max_links_per_source, max_links_per_source,
&provider,
&model_research,
) )
.await .await
{ } else {
source_scraper::extract_article_links(
&state.http_client,
&source.url,
max_links_per_source,
)
.await
};
match links {
Ok(links) => { Ok(links) => {
tracing::info!( tracing::info!(
source = %source.title, source = %source.title,
@ -348,6 +365,7 @@ async fn run_generation_inner(
&candidate_urls, &candidate_urls,
settings.max_age_days as i64, settings.max_age_days as i64,
tx, tx,
llm_for_scraping.clone(),
) )
.await; .await;
@ -535,7 +553,7 @@ async fn run_generation_inner(
// Scrape web search results // Scrape web search results
emit_progress(tx, "scraping", "Verification des sources web...", 60); emit_progress(tx, "scraping", "Verification des sources web...", 60);
let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await; let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx, llm_for_scraping.clone()).await;
let scraped = filter_empty_scraped_articles(scraped); let scraped = filter_empty_scraped_articles(scraped);
// Flatten scraped articles for classification // Flatten scraped articles for classification
@ -1061,11 +1079,13 @@ fn parse_llm_output(
/// For each category, scrapes all article URLs. Failed scrapes are /// For each category, scrapes all article URLs. Failed scrapes are
/// handled gracefully — the article is kept with empty scraped content /// handled gracefully — the article is kept with empty scraped content
/// rather than being discarded. /// rather than being discarded.
/// When `llm` is `Some`, uses LLM-assisted extraction with reduced concurrency.
async fn scrape_articles( async fn scrape_articles(
state: &AppState, state: &AppState,
parsed: &[(String, Vec<NewsItem>)], parsed: &[(String, Vec<NewsItem>)],
max_age_days: i64, max_age_days: i64,
tx: &watch::Sender<ProgressEvent>, tx: &watch::Sender<ProgressEvent>,
llm: Option<(std::sync::Arc<dyn crate::services::llm::LlmProvider>, String)>,
) -> HashMap<String, Vec<ScrapedNewsItem>> { ) -> HashMap<String, Vec<ScrapedNewsItem>> {
let mut result: HashMap<String, Vec<ScrapedNewsItem>> = HashMap::new(); let mut result: HashMap<String, Vec<ScrapedNewsItem>> = HashMap::new();
@ -1082,13 +1102,27 @@ async fn scrape_articles(
return result; return result;
} }
// Use JoinSet for bounded concurrency (max 10 concurrent scrapes) // Use JoinSet for bounded concurrency
let mut join_set = tokio::task::JoinSet::new(); let mut join_set = tokio::task::JoinSet::new();
let mut pending = tasks.into_iter().peekable(); let mut pending = tasks.into_iter().peekable();
let mut completed = 0usize; let mut completed = 0usize;
let spawn_task = let max_concurrent = if llm.is_some() { 5 } else { 10 };
|join_set: &mut tokio::task::JoinSet<_>, cat_key: String, item: NewsItem| {
// Seed initial tasks
for _ in 0..max_concurrent {
if let Some((cat_key, item)) = pending.next() {
if let Some((ref provider, ref model)) = llm {
let provider = std::sync::Arc::clone(provider);
let model = model.clone();
let client = state.http_client.clone();
let url = item.url.clone();
let mad = max_age_days;
join_set.spawn(async move {
let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await;
(cat_key, item, (scraped_content, page_title, final_url))
});
} else {
let client = state.http_client.clone(); let client = state.http_client.clone();
let url = item.url.clone(); let url = item.url.clone();
let mad = max_age_days; let mad = max_age_days;
@ -1096,13 +1130,7 @@ async fn scrape_articles(
let scraped = scrape_single_article(&client, &url, mad).await; let scraped = scrape_single_article(&client, &url, mad).await;
(cat_key, item, scraped) (cat_key, item, scraped)
}); });
}; }
// Seed the JoinSet with up to 10 initial tasks
let max_concurrent = 10;
for _ in 0..max_concurrent {
if let Some((cat_key, item)) = pending.next() {
spawn_task(&mut join_set, cat_key, item);
} }
} }
@ -1112,12 +1140,12 @@ async fn scrape_articles(
// Update progress (45% to 75% range for scraping) // Update progress (45% to 75% range for scraping)
let pct = 45 + ((completed as u32 * 30) / total as u32).min(30); let pct = 45 + ((completed as u32 * 30) / total as u32).min(30);
emit_progress( let progress_label = if llm.is_some() {
tx, format!("Extraction IA des articles ({}/{})...", completed, total)
"scraping", } else {
&format!("Verification des sources ({}/{})...", completed, total), format!("Verification des sources ({}/{})...", completed, total)
pct as u8, };
); emit_progress(tx, "scraping", &progress_label, pct as u8);
if let Ok((cat_key, item, (scraped_content, page_title, final_url))) = join_result { if let Ok((cat_key, item, (scraped_content, page_title, final_url))) = join_result {
let scraped_item = ScrapedNewsItem { let scraped_item = ScrapedNewsItem {
@ -1136,7 +1164,25 @@ async fn scrape_articles(
// Spawn next task if available // Spawn next task if available
if let Some((cat_key, item)) = pending.next() { if let Some((cat_key, item)) = pending.next() {
spawn_task(&mut join_set, cat_key, item); if let Some((ref provider, ref model)) = llm {
let provider = std::sync::Arc::clone(provider);
let model = model.clone();
let client = state.http_client.clone();
let url = item.url.clone();
let mad = max_age_days;
join_set.spawn(async move {
let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await;
(cat_key, item, (scraped_content, page_title, final_url))
});
} else {
let client = state.http_client.clone();
let url = item.url.clone();
let mad = max_age_days;
join_set.spawn(async move {
let scraped = scrape_single_article(&client, &url, mad).await;
(cat_key, item, scraped)
});
}
} }
} }
@ -1147,11 +1193,13 @@ async fn scrape_articles(
/// ///
/// Used in Phase 1 where articles haven't been classified yet. /// Used in Phase 1 where articles haven't been classified yet.
/// Reuses the same scraper infrastructure as `scrape_articles`. /// Reuses the same scraper infrastructure as `scrape_articles`.
/// When `llm` is `Some`, uses LLM-assisted extraction with reduced concurrency.
async fn scrape_flat_urls( async fn scrape_flat_urls(
state: &AppState, state: &AppState,
urls: &[String], urls: &[String],
max_age_days: i64, max_age_days: i64,
tx: &watch::Sender<ProgressEvent>, tx: &watch::Sender<ProgressEvent>,
llm: Option<(std::sync::Arc<dyn crate::services::llm::LlmProvider>, String)>,
) -> Vec<ScrapedNewsItem> { ) -> Vec<ScrapedNewsItem> {
let total = urls.len(); let total = urls.len();
if total == 0 { if total == 0 {
@ -1163,11 +1211,22 @@ async fn scrape_flat_urls(
let mut completed = 0usize; let mut completed = 0usize;
let mut results = Vec::new(); let mut results = Vec::new();
let max_concurrent = 10; let max_concurrent = if llm.is_some() { 5 } else { 10 };
// Seed initial tasks // Seed initial tasks
for _ in 0..max_concurrent { for _ in 0..max_concurrent {
if let Some((_, url)) = pending.next() { if let Some((_, url)) = pending.next() {
if let Some((ref provider, ref model)) = llm {
let provider = std::sync::Arc::clone(provider);
let model = model.clone();
let client = state.http_client.clone();
let url = url.clone();
let mad = max_age_days;
join_set.spawn(async move {
let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await;
(url, scraped_content, page_title, final_url)
});
} else {
let client = state.http_client.clone(); let client = state.http_client.clone();
let url = url.clone(); let url = url.clone();
let mad = max_age_days; let mad = max_age_days;
@ -1177,16 +1236,17 @@ async fn scrape_flat_urls(
}); });
} }
} }
}
while let Some(join_result) = join_set.join_next().await { while let Some(join_result) = join_set.join_next().await {
completed += 1; completed += 1;
let pct = 15 + ((completed as u32 * 15) / total as u32).min(15); let pct = 15 + ((completed as u32 * 15) / total as u32).min(15);
emit_progress( let progress_label = if llm.is_some() {
tx, format!("Extraction IA des articles ({}/{})...", completed, total)
"scraping_sources", } else {
&format!("Analyse des sources ({}/{})...", completed, total), format!("Analyse des sources ({}/{})...", completed, total)
pct as u8, };
); emit_progress(tx, "scraping_sources", &progress_label, pct as u8);
if let Ok((_original_url, scraped_content, page_title, final_url)) = join_result { if let Ok((_original_url, scraped_content, page_title, final_url)) = join_result {
results.push(ScrapedNewsItem { results.push(ScrapedNewsItem {
@ -1199,6 +1259,17 @@ async fn scrape_flat_urls(
} }
if let Some((_, url)) = pending.next() { if let Some((_, url)) = pending.next() {
if let Some((ref provider, ref model)) = llm {
let provider = std::sync::Arc::clone(provider);
let model = model.clone();
let client = state.http_client.clone();
let url = url.clone();
let mad = max_age_days;
join_set.spawn(async move {
let (scraped_content, page_title, final_url) = scrape_single_article_with_llm(&client, &url, mad, provider, model).await;
(url, scraped_content, page_title, final_url)
});
} else {
let client = state.http_client.clone(); let client = state.http_client.clone();
let url = url.clone(); let url = url.clone();
let mad = max_age_days; let mad = max_age_days;
@ -1208,6 +1279,7 @@ async fn scrape_flat_urls(
}); });
} }
} }
}
results results
} }
@ -1244,6 +1316,69 @@ async fn scrape_single_article(
} }
} }
/// Scrape an article URL using LLM for content extraction.
///
/// Falls back to heuristic extraction if the LLM call fails.
async fn scrape_single_article_with_llm(
http_client: &reqwest::Client,
url: &str,
max_age_days: i64,
provider: std::sync::Arc<dyn crate::services::llm::LlmProvider>,
model: String,
) -> (String, String, String) {
let content = match scraper::scrape_url(http_client, url).await {
Ok(c) => c,
Err(e) => {
tracing::warn!(url = url, error = %e, "Failed to fetch URL for LLM extraction");
return (String::new(), String::new(), url.to_string());
}
};
let final_url = content.url.clone();
if !content.ok || content.is_soft_404 {
return (String::new(), String::new(), final_url);
}
let (system, user) = crate::services::prompts::build_article_extraction_prompt(
&content.head_html,
&content.body_text,
);
let schema = crate::services::llm::schema::build_article_extraction_schema();
match provider.generate_rewrite_pass(&model, &system, &user, &schema).await {
Ok(response) => {
let title = response.get("title").and_then(|t| t.as_str()).unwrap_or("").to_string();
let body = response.get("body_text").and_then(|b| b.as_str()).unwrap_or("").to_string();
let is_error = response.get("is_error_page").and_then(|e| e.as_bool()).unwrap_or(false);
let date_str = response.get("published_date").and_then(|d| d.as_str()).unwrap_or("");
if is_error || body.trim().is_empty() {
return (String::new(), String::new(), final_url);
}
if !date_str.is_empty() {
if let Ok(date) = chrono::DateTime::parse_from_rfc3339(date_str) {
if scraper::is_article_too_old(Some(date.with_timezone(&chrono::Utc)), max_age_days) {
tracing::warn!(url = url, "LLM-extracted article too old");
return (String::new(), String::new(), final_url);
}
}
}
(body, title, final_url)
}
Err(e) => {
tracing::warn!(url = url, error = %e, "LLM extraction failed, using heuristic fallback");
if scraper::is_article_too_old(content.published_date, max_age_days) {
return (String::new(), String::new(), final_url);
}
let title = content.title.unwrap_or_default();
(content.body_text, title, final_url)
}
}
}
/// Build the final sections array from the LLM's rewrite output. /// Build the final sections array from the LLM's rewrite output.
/// ///
/// Maps `category_N` keys back to the user's category names. /// Maps `category_N` keys back to the user's category names.

Loading…
Cancel
Save