feat: restructure Phase 1 into windowed source extraction waves

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 0f1b0306e4
commit 37d17e577a

@ -292,17 +292,25 @@ pub async fn run_generation_inner(
let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
let max_links = 15usize;
// 1a. Extract article links from source pages (parallel, max 5 concurrent)
let mut candidate_urls: Vec<(String, String)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
let mut pending = rotated_sources.iter().peekable();
let max_concurrent = 5;
// Seed initial tasks
for _ in 0..max_concurrent {
if let Some(source) = pending.next() {
let window_size = settings.source_extraction_window.max(1) as usize;
// Process sources in waves of `window_size`
let source_chunks: Vec<Vec<&crate::models::source::Source>> = rotated_sources
.chunks(window_size)
.map(|chunk| chunk.iter().collect())
.collect();
let total_waves = source_chunks.len();
'wave_loop: for (wave_idx, wave_sources) in source_chunks.iter().enumerate() {
emit_progress(tx, "sources_scrape",
&format!("Extraction des sources (vague {}/{})", wave_idx + 1, total_waves),
15 + ((wave_idx as u32 * 10) / total_waves.max(1) as u32).min(10) as u8);
// 1a. Extract links from this wave's sources (all in parallel)
let mut wave_urls: Vec<(String, String)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
for source in wave_sources {
let client = state.http_client.clone();
let source_url = source.url.clone();
let source_title = source.title.clone();
@ -327,258 +335,235 @@ pub async fn run_generation_inner(
(source_url, source_title, links)
});
}
}
while let Some(join_result) = join_set.join_next().await {
if let Ok((source_url, source_title, links_result)) = join_result {
match links_result {
Ok(links) => {
tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
for link in links {
if seen_urls.insert(link.to_lowercase()) {
candidate_urls.push((link, source_url.clone()));
while let Some(join_result) = join_set.join_next().await {
if let Ok((source_url, source_title, links_result)) = join_result {
match links_result {
Ok(links) => {
tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
for link in links {
if seen_urls.insert(link.to_lowercase()) {
wave_urls.push((link, source_url.clone()));
}
}
}
}
Err(e) => {
tracing::warn!(source = %source_title, error = %e, "Failed to extract links");
Err(e) => {
tracing::warn!(source = %source_title, error = %e, "Failed to extract links");
}
}
}
}
// Spawn next task
if let Some(source) = pending.next() {
let client = state.http_client.clone();
let source_url = source.url.clone();
let source_title = source.title.clone();
let use_llm = settings.use_llm_for_source_links;
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let max_l = max_links;
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
join_set.spawn(async move {
let links = if use_llm {
source_scraper::extract_article_links_with_llm(
&client, &source_url, max_l, &provider_clone, &model,
Some(&pool), Some(uid), Some(jid),
).await
} else {
source_scraper::extract_article_links(
&client, &source_url, max_l,
).await
};
(source_url, source_title, links)
});
}
}
}
// Filter against article history
if settings.article_history_days > 0 && !candidate_urls.is_empty() {
let hashes: Vec<String> = candidate_urls.iter().map(|(url, _)| hash_article_url(url)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default();
if !existing.is_empty() {
for (url, source_url) in &candidate_urls {
if existing.contains(&hash_article_url(url)) {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url, title: "", source_type: "personalized_source",
source_url: Some(source_url), category: None, synthesis_id: None,
status: "filtered_history", scraped_ok: false,
published_date: None,
}));
// 1b. Filter against article history
if settings.article_history_days > 0 && !wave_urls.is_empty() {
let hashes: Vec<String> = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default();
if !existing.is_empty() {
for (url, source_url) in &wave_urls {
if existing.contains(&hash_article_url(url)) {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url, title: "", source_type: "personalized_source",
source_url: Some(source_url), category: None, synthesis_id: None,
status: "filtered_history", scraped_ok: false,
published_date: None,
}));
}
}
wave_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url)));
// Flush history dedup traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
candidate_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url)));
// Flush history dedup traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
// Shuffle candidates to interleave articles from different sources
use rand::seq::SliceRandom;
candidate_urls.shuffle(&mut rand::thread_rng());
// 1c. Shuffle this wave's candidates
use rand::seq::SliceRandom;
wave_urls.shuffle(&mut rand::thread_rng());
// Track url -> source
for (url, source_url) in &candidate_urls {
url_source.insert(url.clone(), source_url.clone());
}
// 1b. Scrape, classify, summarize in batches of 5
emit_progress(tx, "processing", "Traitement des articles...", 25);
let total_candidates = candidate_urls.len();
let batch_size = settings.batch_size.max(1) as usize;
let mut processed = 0usize;
let mut candidates_iter = candidate_urls.into_iter();
let mut done = false;
while !done {
// Take next batch of candidates (up to 5), filtering source limits
let mut batch: Vec<(String, String)> = Vec::new();
while batch.len() < batch_size {
let Some((url, source_url)) = candidates_iter.next() else {
break;
};
let source_domain = extract_domain(&source_url).unwrap_or_default();
let source_count = source_counts.get(&source_domain).copied().unwrap_or(0);
if source_count >= settings.max_articles_per_source as usize {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &url, title: "", source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_diversity", scraped_ok: false,
published_date: None,
}));
continue;
}
batch.push((url, source_url));
// Track url -> source
for (url, source_url) in &wave_urls {
url_source.insert(url.clone(), source_url.clone());
}
if batch.is_empty() {
break;
}
// 1d. Batch scrape+classify (operates on this wave's URLs)
if !wave_urls.is_empty() {
let total_candidates = wave_urls.len();
let batch_size = settings.batch_size.max(1) as usize;
let snippet_size = match settings.summary_length { 1 => 500, 2 => 2000, _ => 4000 };
let mut processed = 0usize;
let mut candidates_iter = wave_urls.into_iter();
let mut done = false;
let pct = 25 + ((processed as u32 * 40) / total_candidates.max(1) as u32).min(40);
emit_progress(tx, "processing", &format!("Articles {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8);
// Phase A: Scrape batch in parallel
let mut scrape_set = tokio::task::JoinSet::new();
for (url, source_url) in &batch {
let client = state.http_client.clone();
let u = url.clone();
let su = source_url.clone();
let mad = settings.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, su, result)
});
}
while !done {
// Take next batch of candidates, filtering source limits
let mut batch: Vec<(String, String)> = Vec::new();
while batch.len() < batch_size {
let Some((url, source_url)) = candidates_iter.next() else {
break;
};
let source_domain = extract_domain(&source_url).unwrap_or_default();
let source_count = source_counts.get(&source_domain).copied().unwrap_or(0);
if source_count >= settings.max_articles_per_source as usize {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &url, title: "", source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_diversity", scraped_ok: false,
published_date: None,
}));
continue;
}
batch.push((url, source_url));
}
let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: reason, scraped_ok: false,
published_date: None,
}));
} else {
scraped_articles.push((final_url, source_url, body_text, page_title));
if batch.is_empty() {
break;
}
}
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
let pct = 25 + ((processed as u32 * 40) / total_candidates.max(1) as u32).min(40);
emit_progress(tx, "processing", &format!("Articles {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8);
// Phase B: Classify/summarize batch in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
// Phase A: Scrape batch in parallel
let mut scrape_set = tokio::task::JoinSet::new();
for (url, source_url) in &batch {
let client = state.http_client.clone();
let u = url.clone();
let su = source_url.clone();
let mad = settings.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, su, result)
});
}
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, source_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let schema = Arc::clone(&classify_schema);
let cats = Arc::clone(&classification_categories);
let snippet_size = match settings.summary_length {
1 => 500,
2 => 2000,
_ => 4000,
};
let body_snippet: String = body_text.chars().take(snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let su = source_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, settings.summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
// Log the LLM call
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: reason, scraped_ok: false,
published_date: None,
}));
} else {
scraped_articles.push((final_url, source_url, body_text, page_title));
}
}
}
(url, su, title, result)
});
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, source_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check LLM-extracted date as fallback for articles without a scraper date
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), settings.max_age_days as i64) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
published_date: Some(date_str),
}));
// Phase B: Classify/summarize batch in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, source_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let schema = Arc::clone(&classify_schema);
let cats = Arc::clone(&classification_categories);
let body_snippet: String = body_text.chars().take(snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let su = source_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, settings.summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
// Log the LLM call
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, su, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, source_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check LLM-extracted date as fallback for articles without a scraper date
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), settings.max_age_days as i64) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
published_date: Some(date_str),
}));
continue;
}
}
}
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, &user_categories, &classification_categories,
&filled_counts, settings.max_items_per_category as usize,
) else {
continue;
};
let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string());
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: llm_date,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
let source_domain = extract_domain(&source_url).unwrap_or_default();
*source_counts.entry(source_domain).or_insert(0) += 1;
}
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, &user_categories, &classification_categories,
&filled_counts, settings.max_items_per_category as usize,
) else {
continue;
};
let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string());
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: llm_date,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
processed += batch.len();
let source_domain = extract_domain(&source_url).unwrap_or_default();
*source_counts.entry(source_domain).or_insert(0) += 1;
// Check if we've reached the maximum after this batch
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
done = true;
}
}
}
processed += batch.len();
// Check if we've reached the maximum after this batch
// 1e. Check if full after this wave
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
done = true;
tracing::info!(wave = wave_idx + 1, total_waves = total_waves, "Synthesis full after wave, skipping remaining sources");
break 'wave_loop;
}
}
// Flush Phase 1 traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
// 1f. Flush traces between waves
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}

Loading…
Cancel
Save