feat: instrument pipeline with article tracing at every filtering step

Add source_url field to ScrapedNewsItem and a trace_article helper that
inserts into article_history with full provenance metadata.  Instrument
Phase 1 (empty content, history dedup, source diversity) and Phase 2
(homepage filter, cross-phase dedup, history dedup, empty content) so
every dropped article is recorded with its filter reason.  Replace the
old insert_urls call with per-article trace_article calls for used
articles, preserving dedup semantics via url_hash.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 0e2c69edf7
commit b9003cde54

@ -153,6 +153,8 @@ pub struct ScrapedNewsItem {
pub original_title: String, pub original_title: String,
#[serde(rename = "scrapedContent")] #[serde(rename = "scrapedContent")]
pub scraped_content: String, pub scraped_content: String,
#[serde(default)]
pub source_url: Option<String>,
} }
#[cfg(test)] #[cfg(test)]

@ -402,6 +402,7 @@ mod tests {
summary: "A summary".into(), summary: "A summary".into(),
original_title: "Original Test Article".into(), original_title: "Original Test Article".into(),
scraped_content: "Full article text here...".into(), scraped_content: "Full article text here...".into(),
source_url: None,
}], }],
); );
@ -453,6 +454,7 @@ mod tests {
summary: "s".into(), summary: "s".into(),
original_title: "t".into(), original_title: "t".into(),
scraped_content: "OpenAI released GPT-5 today with major improvements".into(), scraped_content: "OpenAI released GPT-5 today with major improvements".into(),
source_url: None,
}, },
]; ];
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
@ -470,7 +472,7 @@ mod tests {
ScrapedNewsItem { ScrapedNewsItem {
title: "T".into(), url: "https://a.com/1".into(), title: "T".into(), url: "https://a.com/1".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: "Content".into(), scraped_content: "Content".into(), source_url: None,
}, },
]; ];
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];

@ -386,6 +386,16 @@ async fn run_generation_inner(
.await; .await;
// 1c. Filter empty content // 1c. Filter empty content
// Trace articles with empty content
if settings.article_history_days > 0 {
for article in &scraped_articles {
if article.scraped_content.trim().is_empty() {
trace_article(&state.pool, user_id, job_id, &article.url, &article.title,
"personalized_source", article.source_url.as_deref(), None, None,
"filtered_empty", false).await;
}
}
}
let valid_articles: Vec<ScrapedNewsItem> = scraped_articles let valid_articles: Vec<ScrapedNewsItem> = scraped_articles
.into_iter() .into_iter()
.filter(|a| !a.scraped_content.trim().is_empty()) .filter(|a| !a.scraped_content.trim().is_empty())
@ -393,6 +403,7 @@ async fn run_generation_inner(
// 1d. Filter against article history (cross-synthesis dedup) // 1d. Filter against article history (cross-synthesis dedup)
let mut valid_articles = if settings.article_history_days > 0 { let mut valid_articles = if settings.article_history_days > 0 {
let pre_history_articles = valid_articles.clone();
let hashes: Vec<String> = valid_articles.iter().map(|a| hash_article_url(&a.url)).collect(); let hashes: Vec<String> = valid_articles.iter().map(|a| hash_article_url(&a.url)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes) let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes)
.await .await
@ -400,6 +411,16 @@ async fn run_generation_inner(
if !existing.is_empty() { if !existing.is_empty() {
tracing::info!(filtered = existing.len(), "Phase 1: filtered articles already in history"); tracing::info!(filtered = existing.len(), "Phase 1: filtered articles already in history");
} }
// Trace history-filtered articles
if !existing.is_empty() {
for article in &pre_history_articles {
if existing.contains(&hash_article_url(&article.url)) {
trace_article(&state.pool, user_id, job_id, &article.url, &article.title,
"personalized_source", article.source_url.as_deref(), None, None,
"filtered_history", true).await;
}
}
}
valid_articles valid_articles
.into_iter() .into_iter()
.filter(|a| !existing.contains(&hash_article_url(&a.url))) .filter(|a| !existing.contains(&hash_article_url(&a.url)))
@ -515,6 +536,12 @@ async fn run_generation_inner(
// (reuse domain counting logic) // (reuse domain counting logic)
let max_per_source = settings.max_articles_per_source as usize; let max_per_source = settings.max_articles_per_source as usize;
let mut domain_counts: HashMap<String, usize> = HashMap::new(); let mut domain_counts: HashMap<String, usize> = HashMap::new();
// Collect items before diversity pruning for tracing
let pre_diversity_snapshot: Vec<ScrapedNewsItem> = if settings.article_history_days > 0 {
all_scraped.values().flat_map(|items| items.iter().cloned()).collect()
} else {
Vec::new()
};
for (_, items) in &mut all_scraped { for (_, items) in &mut all_scraped {
items.retain(|item| { items.retain(|item| {
if let Some(domain) = extract_domain(&item.url) { if let Some(domain) = extract_domain(&item.url) {
@ -530,6 +557,19 @@ async fn run_generation_inner(
} }
}); });
} }
// Trace diversity-filtered articles
if settings.article_history_days > 0 {
let post_urls: std::collections::HashSet<String> = all_scraped.values()
.flat_map(|items| items.iter().map(|i| i.url.clone()))
.collect();
for article in &pre_diversity_snapshot {
if !post_urls.contains(&article.url) {
trace_article(&state.pool, user_id, job_id, &article.url, &article.title,
"personalized_source", article.source_url.as_deref(), None, None,
"filtered_diversity", true).await;
}
}
}
// Recount filled_counts after trimming // Recount filled_counts after trimming
filled_counts.clear(); filled_counts.clear();
@ -626,7 +666,28 @@ async fn run_generation_inner(
// Parse + filter // Parse + filter
emit_progress(tx, "parsing", "Analyse des resultats...", 55); emit_progress(tx, "parsing", "Analyse des resultats...", 55);
let parsed = parse_llm_output(&raw_results, &settings.categories)?; let parsed = parse_llm_output(&raw_results, &settings.categories)?;
// Trace homepage-filtered articles
let pre_homepage_parsed = if settings.article_history_days > 0 {
Some(parsed.clone())
} else {
None
};
let parsed = filter_homepage_urls(parsed); let parsed = filter_homepage_urls(parsed);
if let Some(ref pre) = pre_homepage_parsed {
let post_urls: std::collections::HashSet<String> = parsed.iter()
.flat_map(|(_, items)| items.iter().map(|i| i.url.clone()))
.collect();
for (_, items) in pre {
for item in items {
if !post_urls.contains(&item.url) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title,
"web_search", None, None, None,
"filtered_homepage", false).await;
}
}
}
}
// Cross-phase dedup: remove URLs already found in Phase 1 // Cross-phase dedup: remove URLs already found in Phase 1
let parsed: Vec<(String, Vec<NewsItem>)> = parsed let parsed: Vec<(String, Vec<NewsItem>)> = parsed
@ -634,11 +695,36 @@ async fn run_generation_inner(
.map(|(cat_key, items)| { .map(|(cat_key, items)| {
let deduped: Vec<NewsItem> = items let deduped: Vec<NewsItem> = items
.into_iter() .into_iter()
.filter(|item| !seen_urls.contains(&item.url.to_lowercase())) .filter(|item| {
if seen_urls.contains(&item.url.to_lowercase()) {
// Tracing is handled below for async context
false
} else {
true
}
})
.collect(); .collect();
(cat_key, deduped) (cat_key, deduped)
}) })
.collect(); .collect();
// Trace cross-phase dedup drops (we need to check pre vs post since filter is sync)
if settings.article_history_days > 0 {
if let Some(ref pre) = pre_homepage_parsed {
let post_urls: std::collections::HashSet<String> = parsed.iter()
.flat_map(|(_, items)| items.iter().map(|i| i.url.to_lowercase()))
.collect();
for (_, items) in pre {
for item in items {
let lower = item.url.to_lowercase();
if seen_urls.contains(&lower) && !post_urls.contains(&lower) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title,
"web_search", None, None, None,
"filtered_cross_phase_dedup", false).await;
}
}
}
}
}
let parsed = dedup_by_url(parsed); let parsed = dedup_by_url(parsed);
let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source); let parsed = limit_articles_per_source(parsed, settings.max_articles_per_source);
@ -654,6 +740,16 @@ async fn run_generation_inner(
.unwrap_or_default(); .unwrap_or_default();
if !existing.is_empty() { if !existing.is_empty() {
tracing::info!(filtered = existing.len(), "Phase 2: filtered articles already in history"); tracing::info!(filtered = existing.len(), "Phase 2: filtered articles already in history");
// Trace history-filtered articles
for (_, items) in &parsed {
for item in items {
if existing.contains(&hash_article_url(&item.url)) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title,
"web_search", None, None, None,
"filtered_history", false).await;
}
}
}
} }
parsed parsed
.into_iter() .into_iter()
@ -672,6 +768,18 @@ async fn run_generation_inner(
// Scrape web search results // Scrape web search results
emit_progress(tx, "scraping", "Verification des sources web...", 60); emit_progress(tx, "scraping", "Verification des sources web...", 60);
let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx, llm_for_scraping.clone()).await; let scraped = scrape_articles(state, &parsed, settings.max_age_days as i64, tx, llm_for_scraping.clone()).await;
// Trace empty content drops from Phase 2 scraping
if settings.article_history_days > 0 {
for (_, items) in &scraped {
for item in items {
if item.scraped_content.trim().is_empty() {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title,
"web_search", None, None, None,
"filtered_empty", false).await;
}
}
}
}
let scraped = filter_empty_scraped_articles(scraped); let scraped = filter_empty_scraped_articles(scraped);
// Flatten scraped articles for classification // Flatten scraped articles for classification
@ -810,16 +918,15 @@ async fn run_generation_inner(
let synthesis = let synthesis =
db::syntheses::create(&state.pool, user_id, &week, &sections_json, job_id).await?; db::syntheses::create(&state.pool, user_id, &week, &sections_json, job_id).await?;
// Record article URLs in history for cross-synthesis dedup // Record used articles in history with full tracing metadata
if settings.article_history_days > 0 { if settings.article_history_days > 0 {
let article_urls: Vec<(String, String)> = final_sections for section in &final_sections {
.iter() for item in &section.items {
.flat_map(|section| section.items.iter()) trace_article(&state.pool, user_id, job_id, &item.url, &item.title,
.map(|item| (item.url.clone(), hash_article_url(&item.url))) "used", None, Some(&section.title), Some(synthesis.id),
.collect(); "used", true).await;
db::article_history::insert_urls(&state.pool, user_id, &article_urls) }
.await }
.ok(); // Don't fail synthesis if history insert fails
} }
Ok(synthesis.id) Ok(synthesis.id)
@ -862,6 +969,36 @@ fn emit_progress(tx: &watch::Sender<ProgressEvent>, step: &str, message: &str, p
.ok(); .ok();
} }
/// Insert a trace entry into article_history for debugging pipeline behavior.
async fn trace_article(
pool: &sqlx::PgPool,
user_id: Uuid,
job_id: Uuid,
url: &str,
title: &str,
source_type: &str,
source_url: Option<&str>,
category: Option<&str>,
synthesis_id: Option<Uuid>,
status: &str,
scraped_ok: bool,
) {
let entry = db::article_history::ArticleHistoryEntry {
user_id,
url: url.to_string(),
url_hash: hash_article_url(url),
title: title.to_string(),
source_type: source_type.to_string(),
source_url: source_url.map(|s| s.to_string()),
category: category.map(|s| s.to_string()),
synthesis_id,
status: status.to_string(),
scraped_ok,
job_id,
};
db::article_history::insert_entry(pool, &entry).await.ok();
}
/// Look up or create a per-user rate limiter stored in AppState. /// Look up or create a per-user rate limiter stored in AppState.
/// ///
/// Returns `None` if the user has no rate limit overrides, in which case the /// Returns `None` if the user has no rate limit overrides, in which case the
@ -1393,6 +1530,7 @@ async fn scrape_articles(
summary: item.summary, summary: item.summary,
original_title: page_title, original_title: page_title,
scraped_content, scraped_content,
source_url: None,
}; };
result result
@ -1494,6 +1632,7 @@ async fn scrape_flat_urls(
summary: String::new(), // No LLM summary yet summary: String::new(), // No LLM summary yet
original_title: page_title, original_title: page_title,
scraped_content, scraped_content,
source_url: None,
}); });
} }
@ -2266,17 +2405,17 @@ mod tests {
ScrapedNewsItem { ScrapedNewsItem {
title: "Good".into(), url: "https://a.com/1".into(), title: "Good".into(), url: "https://a.com/1".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: "Real content here".into(), scraped_content: "Real content here".into(), source_url: None,
}, },
ScrapedNewsItem { ScrapedNewsItem {
title: "Empty".into(), url: "https://b.com/2".into(), title: "Empty".into(), url: "https://b.com/2".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: "".into(), scraped_content: "".into(), source_url: None,
}, },
ScrapedNewsItem { ScrapedNewsItem {
title: "Whitespace".into(), url: "https://c.com/3".into(), title: "Whitespace".into(), url: "https://c.com/3".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: " ".into(), scraped_content: " ".into(), source_url: None,
}, },
]); ]);
@ -2293,7 +2432,7 @@ mod tests {
ScrapedNewsItem { ScrapedNewsItem {
title: "A".into(), url: "https://a.com/1".into(), title: "A".into(), url: "https://a.com/1".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: "Content".into(), scraped_content: "Content".into(), source_url: None,
}, },
]); ]);
@ -2312,7 +2451,7 @@ mod tests {
ScrapedNewsItem { ScrapedNewsItem {
title: "T".into(), url: "https://real-source.com/article".into(), title: "T".into(), url: "https://real-source.com/article".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: "c".into(), scraped_content: "c".into(), source_url: None,
}, },
]); ]);
@ -2342,7 +2481,7 @@ mod tests {
ScrapedNewsItem { ScrapedNewsItem {
title: "T".into(), url: "https://correct.com/article".into(), title: "T".into(), url: "https://correct.com/article".into(),
summary: "s".into(), original_title: "t".into(), summary: "s".into(), original_title: "t".into(),
scraped_content: "c".into(), scraped_content: "c".into(), source_url: None,
}, },
]); ]);
@ -2473,8 +2612,8 @@ mod tests {
fn classification_assigns_articles_to_categories() { fn classification_assigns_articles_to_categories() {
use crate::models::synthesis::ScrapedNewsItem; use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![ let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None },
ScrapedNewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ScrapedNewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None },
]; ];
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({ let response = serde_json::json!({
@ -2493,7 +2632,7 @@ mod tests {
fn classification_unknown_category_goes_to_autre() { fn classification_unknown_category_goes_to_autre() {
use crate::models::synthesis::ScrapedNewsItem; use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![ let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None },
]; ];
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({ let response = serde_json::json!({
@ -2509,7 +2648,7 @@ mod tests {
use crate::models::synthesis::ScrapedNewsItem; use crate::models::synthesis::ScrapedNewsItem;
let articles: Vec<ScrapedNewsItem> = (0..5).map(|i| ScrapedNewsItem { let articles: Vec<ScrapedNewsItem> = (0..5).map(|i| ScrapedNewsItem {
title: format!("Art{}", i), url: format!("https://a.com/{}", i), title: format!("Art{}", i), url: format!("https://a.com/{}", i),
summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None,
}).collect(); }).collect();
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({ let response = serde_json::json!({
@ -2528,7 +2667,7 @@ mod tests {
fn classification_invalid_index_ignored() { fn classification_invalid_index_ignored() {
use crate::models::synthesis::ScrapedNewsItem; use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![ let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None },
]; ];
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({ let response = serde_json::json!({
@ -2544,7 +2683,7 @@ mod tests {
fn classification_case_insensitive() { fn classification_case_insensitive() {
use crate::models::synthesis::ScrapedNewsItem; use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![ let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None },
]; ];
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({ let response = serde_json::json!({
@ -2654,7 +2793,7 @@ mod tests {
use crate::models::synthesis::ScrapedNewsItem; use crate::models::synthesis::ScrapedNewsItem;
let articles: Vec<ScrapedNewsItem> = (0..6).map(|i| ScrapedNewsItem { let articles: Vec<ScrapedNewsItem> = (0..6).map(|i| ScrapedNewsItem {
title: format!("Art{}", i), url: format!("https://a.com/{}", i), title: format!("Art{}", i), url: format!("https://a.com/{}", i),
summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), source_url: None,
}).collect(); }).collect();
let categories = vec!["AI News".to_string(), "Autre".to_string()]; let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({ let response = serde_json::json!({

Loading…
Cancel
Save