diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 934e3ba..7fbcb92 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -292,17 +292,25 @@ pub async fn run_generation_inner( let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None); let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref()); let max_links = 15usize; - - // 1a. Extract article links from source pages (parallel, max 5 concurrent) - let mut candidate_urls: Vec<(String, String)> = Vec::new(); - { - let mut join_set = tokio::task::JoinSet::new(); - let mut pending = rotated_sources.iter().peekable(); - let max_concurrent = 5; - - // Seed initial tasks - for _ in 0..max_concurrent { - if let Some(source) = pending.next() { + let window_size = settings.source_extraction_window.max(1) as usize; + + // Process sources in waves of `window_size` + let source_chunks: Vec> = rotated_sources + .chunks(window_size) + .map(|chunk| chunk.iter().collect()) + .collect(); + let total_waves = source_chunks.len(); + + 'wave_loop: for (wave_idx, wave_sources) in source_chunks.iter().enumerate() { + emit_progress(tx, "sources_scrape", + &format!("Extraction des sources (vague {}/{})", wave_idx + 1, total_waves), + 15 + ((wave_idx as u32 * 10) / total_waves.max(1) as u32).min(10) as u8); + + // 1a. Extract links from this wave's sources (all in parallel) + let mut wave_urls: Vec<(String, String)> = Vec::new(); + { + let mut join_set = tokio::task::JoinSet::new(); + for source in wave_sources { let client = state.http_client.clone(); let source_url = source.url.clone(); let source_title = source.title.clone(); @@ -327,258 +335,235 @@ pub async fn run_generation_inner( (source_url, source_title, links) }); } - } - while let Some(join_result) = join_set.join_next().await { - if let Ok((source_url, source_title, links_result)) = join_result { - match links_result { - Ok(links) => { - tracing::info!(source = %source_title, links = links.len(), "Extracted links from source"); - for link in links { - if seen_urls.insert(link.to_lowercase()) { - candidate_urls.push((link, source_url.clone())); + while let Some(join_result) = join_set.join_next().await { + if let Ok((source_url, source_title, links_result)) = join_result { + match links_result { + Ok(links) => { + tracing::info!(source = %source_title, links = links.len(), "Extracted links from source"); + for link in links { + if seen_urls.insert(link.to_lowercase()) { + wave_urls.push((link, source_url.clone())); + } } } - } - Err(e) => { - tracing::warn!(source = %source_title, error = %e, "Failed to extract links"); + Err(e) => { + tracing::warn!(source = %source_title, error = %e, "Failed to extract links"); + } } } } - - // Spawn next task - if let Some(source) = pending.next() { - let client = state.http_client.clone(); - let source_url = source.url.clone(); - let source_title = source.title.clone(); - let use_llm = settings.use_llm_for_source_links; - let provider_clone = std::sync::Arc::clone(&provider); - let model = Arc::clone(&model_research); - let max_l = max_links; - let pool = state.pool.clone(); - let uid = user_id; - let jid = job_id; - join_set.spawn(async move { - let links = if use_llm { - source_scraper::extract_article_links_with_llm( - &client, &source_url, max_l, &provider_clone, &model, - Some(&pool), Some(uid), Some(jid), - ).await - } else { - source_scraper::extract_article_links( - &client, &source_url, max_l, - ).await - }; - (source_url, source_title, links) - }); - } } - } - // Filter against article history - if settings.article_history_days > 0 && !candidate_urls.is_empty() { - let hashes: Vec = candidate_urls.iter().map(|(url, _)| hash_article_url(url)).collect(); - let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default(); - if !existing.is_empty() { - for (url, source_url) in &candidate_urls { - if existing.contains(&hash_article_url(url)) { - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url, title: "", source_type: "personalized_source", - source_url: Some(source_url), category: None, synthesis_id: None, - status: "filtered_history", scraped_ok: false, - published_date: None, - })); + // 1b. Filter against article history + if settings.article_history_days > 0 && !wave_urls.is_empty() { + let hashes: Vec = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect(); + let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default(); + if !existing.is_empty() { + for (url, source_url) in &wave_urls { + if existing.contains(&hash_article_url(url)) { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { + url, title: "", source_type: "personalized_source", + source_url: Some(source_url), category: None, synthesis_id: None, + status: "filtered_history", scraped_ok: false, + published_date: None, + })); + } + } + wave_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url))); + // Flush history dedup traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); } - } - candidate_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url))); - // Flush history dedup traces - if !pending_traces.is_empty() { - db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); - pending_traces.clear(); } } - } - // Shuffle candidates to interleave articles from different sources - use rand::seq::SliceRandom; - candidate_urls.shuffle(&mut rand::thread_rng()); + // 1c. Shuffle this wave's candidates + use rand::seq::SliceRandom; + wave_urls.shuffle(&mut rand::thread_rng()); - // Track url -> source - for (url, source_url) in &candidate_urls { - url_source.insert(url.clone(), source_url.clone()); - } - - // 1b. Scrape, classify, summarize in batches of 5 - emit_progress(tx, "processing", "Traitement des articles...", 25); - let total_candidates = candidate_urls.len(); - let batch_size = settings.batch_size.max(1) as usize; - let mut processed = 0usize; - let mut candidates_iter = candidate_urls.into_iter(); - let mut done = false; - - while !done { - // Take next batch of candidates (up to 5), filtering source limits - let mut batch: Vec<(String, String)> = Vec::new(); - while batch.len() < batch_size { - let Some((url, source_url)) = candidates_iter.next() else { - break; - }; - let source_domain = extract_domain(&source_url).unwrap_or_default(); - let source_count = source_counts.get(&source_domain).copied().unwrap_or(0); - if source_count >= settings.max_articles_per_source as usize { - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &url, title: "", source_type: "personalized_source", - source_url: Some(&source_url), category: None, synthesis_id: None, - status: "filtered_diversity", scraped_ok: false, - published_date: None, - })); - continue; - } - batch.push((url, source_url)); + // Track url -> source + for (url, source_url) in &wave_urls { + url_source.insert(url.clone(), source_url.clone()); } - if batch.is_empty() { - break; - } + // 1d. Batch scrape+classify (operates on this wave's URLs) + if !wave_urls.is_empty() { + let total_candidates = wave_urls.len(); + let batch_size = settings.batch_size.max(1) as usize; + let snippet_size = match settings.summary_length { 1 => 500, 2 => 2000, _ => 4000 }; + let mut processed = 0usize; + let mut candidates_iter = wave_urls.into_iter(); + let mut done = false; - let pct = 25 + ((processed as u32 * 40) / total_candidates.max(1) as u32).min(40); - emit_progress(tx, "processing", &format!("Articles {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8); - - // Phase A: Scrape batch in parallel - let mut scrape_set = tokio::task::JoinSet::new(); - for (url, source_url) in &batch { - let client = state.http_client.clone(); - let u = url.clone(); - let su = source_url.clone(); - let mad = settings.max_age_days as i64; - scrape_set.spawn(async move { - let result = scrape_single_article(&client, &u, mad).await; - (u, su, result) - }); - } + while !done { + // Take next batch of candidates, filtering source limits + let mut batch: Vec<(String, String)> = Vec::new(); + while batch.len() < batch_size { + let Some((url, source_url)) = candidates_iter.next() else { + break; + }; + let source_domain = extract_domain(&source_url).unwrap_or_default(); + let source_count = source_counts.get(&source_domain).copied().unwrap_or(0); + if source_count >= settings.max_articles_per_source as usize { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { + url: &url, title: "", source_type: "personalized_source", + source_url: Some(&source_url), category: None, synthesis_id: None, + status: "filtered_diversity", scraped_ok: false, + published_date: None, + })); + continue; + } + batch.push((url, source_url)); + } - let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title) - while let Some(join_result) = scrape_set.join_next().await { - if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result { - if let Some(reason) = drop_reason { - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "personalized_source", - source_url: Some(&source_url), category: None, synthesis_id: None, - status: reason, scraped_ok: false, - published_date: None, - })); - } else { - scraped_articles.push((final_url, source_url, body_text, page_title)); + if batch.is_empty() { + break; } - } - } - if scraped_articles.is_empty() { - processed += batch.len(); - continue; - } + let pct = 25 + ((processed as u32 * 40) / total_candidates.max(1) as u32).min(40); + emit_progress(tx, "processing", &format!("Articles {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8); - // Phase B: Classify/summarize batch in parallel - check_rate_limit(state, &user_rate_limiter, &provider_name).await?; + // Phase A: Scrape batch in parallel + let mut scrape_set = tokio::task::JoinSet::new(); + for (url, source_url) in &batch { + let client = state.http_client.clone(); + let u = url.clone(); + let su = source_url.clone(); + let mad = settings.max_age_days as i64; + scrape_set.spawn(async move { + let result = scrape_single_article(&client, &u, mad).await; + (u, su, result) + }); + } - let mut classify_set = tokio::task::JoinSet::new(); - for (final_url, source_url, body_text, page_title) in &scraped_articles { - let provider_clone = std::sync::Arc::clone(&provider); - let model = Arc::clone(&model_research); - let schema = Arc::clone(&classify_schema); - let cats = Arc::clone(&classification_categories); - let snippet_size = match settings.summary_length { - 1 => 500, - 2 => 2000, - _ => 4000, - }; - let body_snippet: String = body_text.chars().take(snippet_size).collect(); - let title = page_title.clone(); - let url = final_url.clone(); - let su = source_url.clone(); - let pool = state.pool.clone(); - let uid = user_id; - let jid = job_id; - - let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, settings.summary_length); - - classify_set.spawn(async move { - let llm_start = std::time::Instant::now(); - let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await; - let duration = llm_start.elapsed().as_millis() as u64; - - // Log the LLM call - if let Ok(ref resp) = result { - let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default(); - crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok(); + let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title) + while let Some(join_result) = scrape_set.join_next().await { + if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result { + if let Some(reason) = drop_reason { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { + url: &final_url, title: &page_title, source_type: "personalized_source", + source_url: Some(&source_url), category: None, synthesis_id: None, + status: reason, scraped_ok: false, + published_date: None, + })); + } else { + scraped_articles.push((final_url, source_url, body_text, page_title)); + } + } } - (url, su, title, result) - }); - } + if scraped_articles.is_empty() { + processed += batch.len(); + continue; + } - while let Some(join_result) = classify_set.join_next().await { - if let Ok((final_url, source_url, page_title, llm_result)) = join_result { - let class_response = match llm_result { - Ok(resp) => resp, - Err(e) => { - tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article"); - continue; - } - }; - - // Check LLM-extracted date as fallback for articles without a scraper date - if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) { - if !date_str.is_empty() { - if let Some(parsed) = scraper::parse_date_string(date_str) { - if scraper::is_article_too_old(Some(parsed), settings.max_age_days as i64) { - tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)"); - pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { - url: &final_url, title: &page_title, source_type: "personalized_source", - source_url: Some(&source_url), category: None, synthesis_id: None, - status: "filtered_too_old", scraped_ok: true, - published_date: Some(date_str), - })); + // Phase B: Classify/summarize batch in parallel + check_rate_limit(state, &user_rate_limiter, &provider_name).await?; + + let mut classify_set = tokio::task::JoinSet::new(); + for (final_url, source_url, body_text, page_title) in &scraped_articles { + let provider_clone = std::sync::Arc::clone(&provider); + let model = Arc::clone(&model_research); + let schema = Arc::clone(&classify_schema); + let cats = Arc::clone(&classification_categories); + let body_snippet: String = body_text.chars().take(snippet_size).collect(); + let title = page_title.clone(); + let url = final_url.clone(); + let su = source_url.clone(); + let pool = state.pool.clone(); + let uid = user_id; + let jid = job_id; + + let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, settings.summary_length); + + classify_set.spawn(async move { + let llm_start = std::time::Instant::now(); + let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await; + let duration = llm_start.elapsed().as_millis() as u64; + + // Log the LLM call + if let Ok(ref resp) = result { + let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default(); + crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok(); + } + + (url, su, title, result) + }); + } + + while let Some(join_result) = classify_set.join_next().await { + if let Ok((final_url, source_url, page_title, llm_result)) = join_result { + let class_response = match llm_result { + Ok(resp) => resp, + Err(e) => { + tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article"); continue; } + }; + + // Check LLM-extracted date as fallback for articles without a scraper date + if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) { + if !date_str.is_empty() { + if let Some(parsed) = scraper::parse_date_string(date_str) { + if scraper::is_article_too_old(Some(parsed), settings.max_age_days as i64) { + tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)"); + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { + url: &final_url, title: &page_title, source_type: "personalized_source", + source_url: Some(&source_url), category: None, synthesis_id: None, + status: "filtered_too_old", scraped_ok: true, + published_date: Some(date_str), + })); + continue; + } + } + } } + + let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category( + &class_response, &page_title, &user_categories, &classification_categories, + &filled_counts, settings.max_items_per_category as usize, + ) else { + continue; + }; + + let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string()); + article_scraped.entry(final_cat_key).or_default().push(NewsItem { + title: llm_title, + url: final_url.clone(), + summary: llm_summary, + date: llm_date, + }); + *filled_counts.entry(final_cat_name).or_insert(0) += 1; + + let source_domain = extract_domain(&source_url).unwrap_or_default(); + *source_counts.entry(source_domain).or_insert(0) += 1; } } - let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category( - &class_response, &page_title, &user_categories, &classification_categories, - &filled_counts, settings.max_items_per_category as usize, - ) else { - continue; - }; - - let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string()); - article_scraped.entry(final_cat_key).or_default().push(NewsItem { - title: llm_title, - url: final_url.clone(), - summary: llm_summary, - date: llm_date, - }); - *filled_counts.entry(final_cat_name).or_insert(0) += 1; + processed += batch.len(); - let source_domain = extract_domain(&source_url).unwrap_or_default(); - *source_counts.entry(source_domain).or_insert(0) += 1; + // Check if we've reached the maximum after this batch + let total: usize = article_scraped.values().map(|v| v.len()).sum(); + if total >= max_total { + done = true; + } } } - processed += batch.len(); - - // Check if we've reached the maximum after this batch + // 1e. Check if full after this wave let total: usize = article_scraped.values().map(|v| v.len()).sum(); if total >= max_total { - done = true; + tracing::info!(wave = wave_idx + 1, total_waves = total_waves, "Synthesis full after wave, skipping remaining sources"); + break 'wave_loop; } - } - // Flush Phase 1 traces - if !pending_traces.is_empty() { - db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); - pending_traces.clear(); + // 1f. Flush traces between waves + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } } }