feat: article history filtering in pipeline — cleanup, Phase 1/2 filter, retry, insert

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 0a87b7ed8f
commit 65eb6004d2

@ -259,6 +259,20 @@ async fn run_generation_inner(
emit_progress(tx, "settings", "Chargement des parametres...", 5);
let settings = db::settings::get_or_create_default(&state.pool, user_id).await?;
// Cleanup old article history entries
if settings.article_history_days > 0 {
let deleted = db::article_history::cleanup_old(
&state.pool,
user_id,
settings.article_history_days,
)
.await
.unwrap_or(0);
if deleted > 0 {
tracing::info!(deleted = deleted, "Cleaned up old article history entries");
}
}
if settings.categories.is_empty() {
return Err(AppError::BadRequest(
"Aucune categorie configuree. Veuillez configurer vos parametres.".into(),
@ -375,13 +389,87 @@ async fn run_generation_inner(
.filter(|a| !a.scraped_content.trim().is_empty())
.collect();
// 1d. Filter against article history (cross-synthesis dedup)
let mut valid_articles = if settings.article_history_days > 0 {
let hashes: Vec<String> = valid_articles.iter().map(|a| hash_article_url(&a.url)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes)
.await
.unwrap_or_default();
if !existing.is_empty() {
tracing::info!(filtered = existing.len(), "Phase 1: filtered articles already in history");
}
valid_articles
.into_iter()
.filter(|a| !existing.contains(&hash_article_url(&a.url)))
.collect::<Vec<_>>()
} else {
valid_articles
};
// 1e. Retry if under-filled after history filtering (1 attempt)
let target = settings.categories.len() * settings.max_items_per_category as usize;
if valid_articles.len() < target && settings.article_history_days > 0 {
tracing::info!(
have = valid_articles.len(),
need = target,
"Phase 1 under-filled after history filter, retrying"
);
let already_fetched: std::collections::HashSet<String> = candidate_urls
.iter()
.map(|u| u.to_lowercase())
.collect();
let mut retry_urls: Vec<String> = Vec::new();
for source in sources.iter().take(max_sources) {
let links = if settings.use_llm_for_source_links {
source_scraper::extract_article_links_with_llm(
&state.http_client, &source.url, max_links_per_source,
&provider, &model_research,
).await
} else {
source_scraper::extract_article_links(
&state.http_client, &source.url, max_links_per_source,
).await
};
if let Ok(links) = links {
for link in links {
if !already_fetched.contains(&link.to_lowercase()) {
retry_urls.push(link);
}
}
}
}
if !retry_urls.is_empty() {
let retry_scraped = scrape_flat_urls(
state, &retry_urls, settings.max_age_days as i64, tx,
llm_for_scraping.clone(),
).await;
let mut retry_valid: Vec<ScrapedNewsItem> = retry_scraped
.into_iter()
.filter(|a| !a.scraped_content.trim().is_empty())
.collect();
if !retry_valid.is_empty() && settings.article_history_days > 0 {
let hashes: Vec<String> = retry_valid.iter().map(|a| hash_article_url(&a.url)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes)
.await.unwrap_or_default();
retry_valid.retain(|a| !existing.contains(&hash_article_url(&a.url)));
}
valid_articles.extend(retry_valid);
tracing::info!(total = valid_articles.len(), "Phase 1 after retry");
}
}
tracing::info!(
valid_count = valid_articles.len(),
"Phase 1: valid articles from personalized sources"
);
if !valid_articles.is_empty() {
// 1d. LLM classification call
// 1f. LLM classification call
emit_progress(tx, "classifying", "Classification des articles...", 35);
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
@ -551,6 +639,32 @@ async fn run_generation_inner(
let parsed = dedup_by_url(parsed);
let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source);
// Filter against article history BEFORE scraping (saves HTTP requests)
let parsed = if settings.article_history_days > 0 {
let all_urls: Vec<String> = parsed.iter()
.flat_map(|(_, items)| items.iter().map(|i| i.url.clone()))
.collect();
let hashes: Vec<String> = all_urls.iter().map(|u| hash_article_url(u)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes)
.await
.unwrap_or_default();
if !existing.is_empty() {
tracing::info!(filtered = existing.len(), "Phase 2: filtered articles already in history");
}
parsed
.into_iter()
.map(|(cat_key, items)| {
let filtered = items
.into_iter()
.filter(|item| !existing.contains(&hash_article_url(&item.url)))
.collect();
(cat_key, filtered)
})
.collect()
} else {
parsed
};
// Scrape web search results
emit_progress(tx, "scraping", "Verification des sources web...", 60);
let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx, llm_for_scraping.clone()).await;
@ -637,6 +751,18 @@ async fn run_generation_inner(
let synthesis =
db::syntheses::create(&state.pool, user_id, &week, &sections_json).await?;
// Record article URLs in history for cross-synthesis dedup
if settings.article_history_days > 0 {
let article_urls: Vec<(String, String)> = final_sections
.iter()
.flat_map(|section| section.items.iter())
.map(|item| (item.url.clone(), hash_article_url(&item.url)))
.collect();
db::article_history::insert_urls(&state.pool, user_id, &article_urls)
.await
.ok(); // Don't fail synthesis if history insert fails
}
Ok(synthesis.id)
}

Loading…
Cancel
Save