diff --git a/backend/src/db/article_history.rs b/backend/src/db/article_history.rs index ba8ff55..305a339 100644 --- a/backend/src/db/article_history.rs +++ b/backend/src/db/article_history.rs @@ -89,6 +89,47 @@ pub async fn insert_urls( Ok(()) } +/// Insert multiple article history entries in a single query. +pub async fn batch_insert_entries(pool: &PgPool, entries: &[ArticleHistoryEntry]) -> Result<(), AppError> { + if entries.is_empty() { + return Ok(()); + } + + let user_ids: Vec = entries.iter().map(|e| e.user_id).collect(); + let urls: Vec<&str> = entries.iter().map(|e| e.url.as_str()).collect(); + let url_hashes: Vec<&str> = entries.iter().map(|e| e.url_hash.as_str()).collect(); + let titles: Vec<&str> = entries.iter().map(|e| e.title.as_str()).collect(); + let source_types: Vec<&str> = entries.iter().map(|e| e.source_type.as_str()).collect(); + let source_urls: Vec> = entries.iter().map(|e| e.source_url.as_deref()).collect(); + let categories: Vec> = entries.iter().map(|e| e.category.as_deref()).collect(); + let synthesis_ids: Vec> = entries.iter().map(|e| e.synthesis_id).collect(); + let statuses: Vec<&str> = entries.iter().map(|e| e.status.as_str()).collect(); + let scraped_oks: Vec = entries.iter().map(|e| e.scraped_ok).collect(); + let job_ids: Vec = entries.iter().map(|e| e.job_id).collect(); + + sqlx::query( + r#" + INSERT INTO article_history (user_id, url, url_hash, title, source_type, source_url, category, synthesis_id, status, scraped_ok, job_id) + SELECT * FROM unnest($1::uuid[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::uuid[], $9::text[], $10::bool[], $11::uuid[]) + "#, + ) + .bind(&user_ids) + .bind(&urls) + .bind(&url_hashes) + .bind(&titles) + .bind(&source_types) + .bind(&source_urls) + .bind(&categories) + .bind(&synthesis_ids) + .bind(&statuses) + .bind(&scraped_oks) + .bind(&job_ids) + .execute(pool) + .await?; + + Ok(()) +} + /// Insert a single article history entry with full tracing metadata. pub async fn insert_entry(pool: &PgPool, entry: &ArticleHistoryEntry) -> Result<(), AppError> { sqlx::query( diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 8e708c7..03ef22d 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -230,6 +230,9 @@ async fn run_generation_inner( user_id: Uuid, tx: &watch::Sender, ) -> Result { + // Batch buffer for article history traces (flushed at logical boundaries) + let mut pending_traces: Vec = Vec::new(); + // === INITIALIZATION === emit_progress(tx, "settings", "Chargement des parametres...", 5); let settings = db::settings::get_or_create_default(&state.pool, user_id).await?; @@ -365,14 +368,19 @@ async fn run_generation_inner( if !existing.is_empty() { for (url, source_url) in &candidate_urls { if existing.contains(&hash_article_url(url)) { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url, title: "", source_type: "personalized_source", source_url: Some(source_url), category: None, synthesis_id: None, status: "filtered_history", scraped_ok: false, - }).await; + })); } } candidate_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url))); + // Flush history dedup traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } } } @@ -403,11 +411,11 @@ async fn run_generation_inner( let source_domain = extract_domain(&source_url).unwrap_or_default(); let source_count = source_counts.get(&source_domain).copied().unwrap_or(0); if source_count >= settings.max_articles_per_source as usize { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &url, title: "", source_type: "personalized_source", source_url: Some(&source_url), category: None, synthesis_id: None, status: "filtered_diversity", scraped_ok: false, - }).await; + })); continue; } batch.push((url, source_url)); @@ -437,11 +445,11 @@ async fn run_generation_inner( while let Some(join_result) = scrape_set.join_next().await { if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result { if let Some(reason) = drop_reason { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &final_url, title: &page_title, source_type: "personalized_source", source_url: Some(&source_url), category: None, synthesis_id: None, status: reason, scraped_ok: false, - }).await; + })); } else { scraped_articles.push((final_url, source_url, body_text, page_title)); } @@ -524,6 +532,12 @@ async fn run_generation_inner( done = true; } } + + // Flush Phase 1 traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } } // === PHASE 2: Web Search Fallback === @@ -553,11 +567,11 @@ async fn run_generation_inner( &state.pool, user_id, &result.url, &seen_urls, &source_counts, settings.article_history_days, settings.max_articles_per_source as usize, ).await { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &result.url, title: &result.title, source_type: "brave_search", source_url: None, category: None, synthesis_id: None, status: reason, scraped_ok: false, - }).await; + })); continue; } @@ -566,6 +580,12 @@ async fn run_generation_inner( brave_urls.push(result.url.clone()); } + // Flush Brave filter traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } + // Scrape + classify in batches (same as Phase 1) if !brave_urls.is_empty() { emit_progress(tx, "processing", "Traitement des articles Brave...", 75); @@ -603,11 +623,11 @@ async fn run_generation_inner( while let Some(join_result) = scrape_set.join_next().await { if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result { if let Some(reason) = drop_reason { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &final_url, title: &page_title, source_type: "brave_search", source_url: None, category: None, synthesis_id: None, status: reason, scraped_ok: false, - }).await; + })); } else { scraped_articles.push((final_url, body_text, page_title)); } @@ -688,6 +708,12 @@ async fn run_generation_inner( done = true; } } + + // Flush Brave scrape/classify traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } } } else { // === EXISTING LLM SEARCH PATH === @@ -715,11 +741,11 @@ async fn run_generation_inner( &state.pool, user_id, &item.url, &seen_urls, &source_counts, settings.article_history_days, settings.max_articles_per_source as usize, ).await { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &item.url, title: &item.title, source_type: "web_search", source_url: None, category: None, synthesis_id: None, status: reason, scraped_ok: false, - }).await; + })); continue; } @@ -728,17 +754,23 @@ async fn run_generation_inner( } } + // Flush Phase 2 filter traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } + // Scrape Phase 2 for validation emit_progress(tx, "scraping", "Verification des sources web...", 80); for (cat_key, item) in phase2_items { let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await; if let Some(reason) = drop_reason { - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &final_url, title: &item.title, source_type: "web_search", source_url: None, category: None, synthesis_id: None, status: reason, scraped_ok: false, - }).await; + })); continue; } @@ -752,6 +784,12 @@ async fn run_generation_inner( *source_counts.entry(domain).or_insert(0) += 1; } } + + // Flush Phase 2 scrape traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } } } @@ -790,14 +828,20 @@ async fn run_generation_inner( Some(_) => "personalized_source", None => "web_search", }; - trace_article(&state.pool, user_id, job_id, &ArticleTrace { + pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace { url: &item.url, title: &item.title, source_type, source_url: if source_type == "personalized_source" { url_source.get(&item.url).map(|s| s.as_str()) } else { None }, category: Some(§ion.title), synthesis_id: Some(synthesis.id), status: "used", scraped_ok: true, - }).await; + })); } } + + // Flush final "used" traces + if !pending_traces.is_empty() { + db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); + pending_traces.clear(); + } } Ok(synthesis.id) @@ -848,14 +892,13 @@ struct ArticleTrace<'a> { scraped_ok: bool, } -/// Insert a trace entry into article_history for debugging pipeline behavior. -async fn trace_article( - pool: &sqlx::PgPool, +/// Build an article history entry from trace parameters (no DB call). +fn build_trace_entry( user_id: Uuid, job_id: Uuid, trace: &ArticleTrace<'_>, -) { - let entry = db::article_history::ArticleHistoryEntry { +) -> db::article_history::ArticleHistoryEntry { + db::article_history::ArticleHistoryEntry { user_id, url: trace.url.to_string(), url_hash: hash_article_url(trace.url), @@ -867,8 +910,7 @@ async fn trace_article( status: trace.status.to_string(), scraped_ok: trace.scraped_ok, job_id, - }; - db::article_history::insert_entry(pool, &entry).await.ok(); + } } /// Log an LLM call with full prompt, response, and timing.