feat: two-phase generation pipeline — personalized sources first, web search fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 51ea032838
commit 53ecce84b0

@ -29,9 +29,10 @@ use crate::models::synthesis::{
};
use crate::services::encryption;
use crate::services::llm::factory::create_provider;
use crate::services::llm::schema::build_category_schema;
use crate::services::prompts;
use crate::services::llm::schema::{build_category_schema, build_classification_schema};
use crate::services::prompts::{self, build_classification_prompt};
use crate::services::scraper;
use crate::services::source_scraper;
// ───────────────────────────────────────────────────────────────────
// Progress Events
@ -269,15 +270,11 @@ async fn run_generation_inner(
let sources = db::sources::list_for_user(&state.pool, user_id).await?;
// Step 3: Resolve provider + decrypt API key
emit_progress(tx, "provider", "Configuration du fournisseur IA...", 15);
emit_progress(tx, "provider", "Configuration du fournisseur IA...", 12);
let (provider_name, api_key) = resolve_provider_and_key(state, user_id, &settings).await?;
let provider = create_provider(&provider_name, api_key)?;
// Step 4: Build schema from categories
let schema = build_category_schema(&settings.categories, settings.max_items_per_category);
// Step 4b: Resolve models — user overrides take priority over admin config
// Step 4: Resolve models
let model_research = if !settings.ai_model.is_empty() {
settings.ai_model.clone()
} else {
@ -289,93 +286,318 @@ async fn run_generation_inner(
model_research.clone()
};
// Look up or create per-user rate limiter from AppState so limits persist across jobs.
let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id);
// Step 5: Rate limit check (pass 1)
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
// Build categories list with "Autre" appended for classification
let mut classification_categories = settings.categories.clone();
classification_categories.push("Autre".to_string());
// Track how many articles fill each category across both phases
let mut filled_counts: HashMap<String, usize> = HashMap::new();
// Combined scraped articles keyed by category
let mut all_scraped: HashMap<String, Vec<ScrapedNewsItem>> = HashMap::new();
// Track all URLs seen (for cross-phase dedup)
let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
// ═══════════════════════════════════════════════════════════════
// PHASE 1: Personalized Sources (scrape-based, no LLM for discovery)
// ═══════════════════════════════════════════════════════════════
if !sources.is_empty() {
emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15);
let max_sources = sources.len().min(10); // Cap at 10 sources
let max_links_per_source = (2 * settings.max_articles_per_source) as usize;
// 1a. Extract article links from each source page
let mut candidate_urls: Vec<String> = Vec::new();
for source in sources.iter().take(max_sources) {
match source_scraper::extract_article_links(
&state.http_client,
&source.url,
max_links_per_source,
)
.await
{
Ok(links) => {
tracing::info!(
source = %source.title,
url = %source.url,
links_found = links.len(),
"Extracted article links from source"
);
candidate_urls.extend(links);
}
Err(e) => {
tracing::warn!(
source = %source.title,
url = %source.url,
error = %e,
"Failed to extract links from source, skipping"
);
}
}
}
// Step 6: LLM search pass
emit_progress(tx, "search", "Recherche d'actualites en cours...", 30);
let current_date = Utc::now()
.format("%A %d %B %Y")
.to_string();
// Step 5b: Load recently-used domains for source diversity
let recent_domains = if settings.source_diversity_window > 0 {
let recent = db::syntheses::list_for_user(
&state.pool,
user_id,
settings.source_diversity_window as i64,
0,
)
.await
.unwrap_or_default();
// Deduplicate candidate URLs
let mut seen = std::collections::HashSet::new();
candidate_urls.retain(|url| seen.insert(url.to_lowercase()));
if !candidate_urls.is_empty() {
// 1b. Scrape candidate articles
let scraped_articles = scrape_flat_urls(
state,
&candidate_urls,
settings.max_age_days as i64,
tx,
)
.await;
// 1c. Filter empty content
let valid_articles: Vec<ScrapedNewsItem> = scraped_articles
.into_iter()
.filter(|a| !a.scraped_content.trim().is_empty())
.collect();
let mut domains: Vec<String> = recent
.iter()
.filter_map(|s| {
serde_json::from_value::<Vec<crate::models::synthesis::NewsSection>>(
s.sections.clone(),
)
.ok()
})
.flat_map(|sections| {
sections
.into_iter()
.flat_map(|sec| sec.items.into_iter())
.filter_map(|item| extract_domain(&item.url))
})
.collect();
tracing::info!(
valid_count = valid_articles.len(),
"Phase 1: valid articles from personalized sources"
);
domains.sort();
domains.dedup();
domains
} else {
Vec::new()
};
if !valid_articles.is_empty() {
// 1d. LLM classification call
emit_progress(tx, "classifying", "Classification des articles...", 35);
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
let (class_system, class_user) = build_classification_prompt(
&valid_articles,
&classification_categories,
settings.max_items_per_category,
&filled_counts,
);
let class_schema = build_classification_schema();
let class_response = provider
.generate_rewrite_pass(
&model_research,
&class_system,
&class_user,
&class_schema,
)
.await?;
// 1e. Parse classification and fill categories
let phase1_classified = parse_classification_response(
&class_response,
&valid_articles,
&classification_categories,
settings.max_items_per_category,
&mut filled_counts,
);
// Merge into all_scraped and track URLs
for (cat_key, items) in phase1_classified {
for item in &items {
seen_urls.insert(item.url.to_lowercase());
}
all_scraped.entry(cat_key).or_default().extend(items);
}
let (system_prompt, user_prompt) =
prompts::build_search_prompt(&settings, &sources, &current_date, &recent_domains, None);
// 1f. Enforce max_articles_per_source across all categories
// (reuse domain counting logic)
let max_per_source = settings.max_articles_per_source as usize;
let mut domain_counts: HashMap<String, usize> = HashMap::new();
for (_, items) in &mut all_scraped {
items.retain(|item| {
if let Some(domain) = extract_domain(&item.url) {
let count = domain_counts.entry(domain).or_insert(0);
if *count >= max_per_source {
false
} else {
*count += 1;
true
}
} else {
true
}
});
}
let raw_results = provider
.generate_search_pass(&model_research, &system_prompt, &user_prompt, &schema)
.await?;
// Recount filled_counts after trimming
filled_counts.clear();
for (cat_key, items) in &all_scraped {
let cat_name = if cat_key == "category_autre" {
"Autre".to_string()
} else {
let idx: usize = cat_key
.strip_prefix("category_")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
settings.categories.get(idx).cloned().unwrap_or_default()
};
*filled_counts.entry(cat_name).or_insert(0) += items.len();
}
}
}
}
// Step 7: Parse structured output into (category_key, Vec<NewsItem>)
emit_progress(tx, "parsing", "Analyse des resultats...", 40);
let parsed = parse_llm_output(&raw_results, &settings.categories)?;
// ═══════════════════════════════════════════════════════════════
// PHASE 2: Web Search Fallback (LLM-based)
// Only runs if any user-defined category is under-filled
// ═══════════════════════════════════════════════════════════════
let category_gaps: Vec<(String, i32)> = settings
.categories
.iter()
.filter_map(|cat| {
let filled = filled_counts.get(cat).copied().unwrap_or(0);
let needed = settings.max_items_per_category as usize - filled.min(settings.max_items_per_category as usize);
if needed > 0 {
Some((cat.clone(), needed as i32))
} else {
None
}
})
.collect();
// Step 7b: Filter out homepage URLs (path == "/" or empty)
let parsed = filter_homepage_urls(parsed);
if !category_gaps.is_empty() {
emit_progress(tx, "search", "Recherche d'actualites complementaires...", 45);
// Step 7c: Deduplicate articles with the same URL across categories
let parsed = dedup_by_url(parsed);
// Rate limit check before search pass
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
// Step 7d: Limit articles per source for diversity
let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source);
// Load recently-used domains for diversity (Phase 2 only)
let recent_domains = if settings.source_diversity_window > 0 {
let recent = db::syntheses::list_for_user(
&state.pool,
user_id,
settings.source_diversity_window as i64,
0,
)
.await
.unwrap_or_default();
// Step 8: Scrape + rewrite pass
//
// Always run the full pipeline: the search pass URLs can be hallucinated
// by the LLM (Wikipedia, corporate sites instead of actual articles).
// The scrape pass fetches each URL and validates the content exists,
// then the rewrite pass produces summaries based on actual article content.
emit_progress(tx, "scraping", "Verification des sources...", 45);
let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await;
let mut domains: Vec<String> = recent
.iter()
.filter_map(|s| {
serde_json::from_value::<Vec<crate::models::synthesis::NewsSection>>(
s.sections.clone(),
)
.ok()
})
.flat_map(|sections| {
sections
.into_iter()
.flat_map(|sec| sec.items.into_iter())
.filter_map(|item| extract_domain(&item.url))
})
.collect();
// Remove articles with empty scraped content (too old, soft 404, scrape failure).
// These would produce empty/low-quality output in the rewrite pass.
let scraped = filter_empty_scraped_articles(scraped);
domains.sort();
domains.dedup();
domains
} else {
Vec::new()
};
// Rate limit check (pass 2)
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
// Build search schema for gap categories
let search_schema = build_category_schema(&settings.categories, settings.max_items_per_category);
let current_date = Utc::now().format("%A %d %B %Y").to_string();
let (system_prompt, user_prompt) = prompts::build_search_prompt(
&settings,
&sources,
&current_date,
&recent_domains,
Some(&category_gaps),
);
let raw_results = provider
.generate_search_pass(&model_research, &system_prompt, &user_prompt, &search_schema)
.await?;
// Parse + filter
emit_progress(tx, "parsing", "Analyse des resultats...", 55);
let parsed = parse_llm_output(&raw_results, &settings.categories)?;
let parsed = filter_homepage_urls(parsed);
// Cross-phase dedup: remove URLs already found in Phase 1
let parsed: Vec<(String, Vec<NewsItem>)> = parsed
.into_iter()
.map(|(cat_key, items)| {
let deduped: Vec<NewsItem> = items
.into_iter()
.filter(|item| !seen_urls.contains(&item.url.to_lowercase()))
.collect();
(cat_key, deduped)
})
.collect();
let parsed = dedup_by_url(parsed);
let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source);
// Scrape web search results
emit_progress(tx, "scraping", "Verification des sources web...", 60);
let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx).await;
let scraped = filter_empty_scraped_articles(scraped);
// Flatten scraped articles for classification
let phase2_articles: Vec<ScrapedNewsItem> = scraped
.into_values()
.flat_map(|items| items.into_iter())
.collect();
if !phase2_articles.is_empty() {
// LLM classification for Phase 2 articles
emit_progress(tx, "classifying", "Classification des resultats web...", 70);
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
let (class_system, class_user) = build_classification_prompt(
&phase2_articles,
&classification_categories,
settings.max_items_per_category,
&filled_counts,
);
let class_schema = build_classification_schema();
let class_response = provider
.generate_rewrite_pass(
&model_research,
&class_system,
&class_user,
&class_schema,
)
.await?;
let phase2_classified = parse_classification_response(
&class_response,
&phase2_articles,
&classification_categories,
settings.max_items_per_category,
&mut filled_counts,
);
// Merge Phase 2 into all_scraped
for (cat_key, items) in phase2_classified {
for item in &items {
seen_urls.insert(item.url.to_lowercase());
}
all_scraped.entry(cat_key).or_default().extend(items);
}
}
}
// ═══════════════════════════════════════════════════════════════
// COMBINED REWRITE PASS
// ═══════════════════════════════════════════════════════════════
if all_scraped.values().all(|items| items.is_empty()) {
return Err(AppError::BadRequest(
"Aucun article valide trouve. Verifiez vos sources et categories.".into(),
));
}
// LLM rewrite pass — use a schema that matches the actual scraped item counts
// (which may be less than max_items_per_category after filtering empty content)
emit_progress(tx, "rewrite", "Redaction des resumes...", 80);
let (rewrite_system, rewrite_user) = prompts::build_rewrite_prompt(&scraped);
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
let rewrite_schema = build_rewrite_schema(&scraped, &settings.categories);
let (rewrite_system, rewrite_user) = prompts::build_rewrite_prompt(&all_scraped);
let rewrite_schema = build_rewrite_schema(&all_scraped, &settings.categories);
let final_results = provider
.generate_rewrite_pass(&model_writing, &rewrite_system, &rewrite_user, &rewrite_schema)
@ -384,19 +606,14 @@ async fn run_generation_inner(
emit_progress(tx, "finalizing", "Finalisation...", 90);
let mut final_sections = build_final_sections(&final_results, &settings.categories)?;
// Restore validated URLs from scraped data — the LLM rewrite pass may
// hallucinate different URLs despite being told to preserve them.
restore_scraped_urls(&mut final_sections, &scraped, &settings.categories);
restore_scraped_urls(&mut final_sections, &all_scraped, &settings.categories);
// Step 12: Save synthesis to DB
// Save synthesis to DB
emit_progress(tx, "saving", "Sauvegarde de la synthese...", 95);
let week = get_iso_week_string(Utc::now().date_naive());
let sections_json = serde_json::to_value(&final_sections).map_err(|e| {
AppError::Internal(anyhow::anyhow!("Failed to serialize sections: {}", e))
})?;
// Strip \u0000 null bytes — LLM output occasionally contains them and
// PostgreSQL rejects them in JSONB columns.
let sections_json = sanitize_json_null_bytes(sections_json);
let synthesis =

Loading…
Cancel
Save