perf: batch article history INSERTs to reduce DB round-trips

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent f37e0b42a0
commit 4bbdd5c4d1

@ -89,6 +89,47 @@ pub async fn insert_urls(
Ok(())
}
/// Insert multiple article history entries in a single query.
pub async fn batch_insert_entries(pool: &PgPool, entries: &[ArticleHistoryEntry]) -> Result<(), AppError> {
if entries.is_empty() {
return Ok(());
}
let user_ids: Vec<Uuid> = entries.iter().map(|e| e.user_id).collect();
let urls: Vec<&str> = entries.iter().map(|e| e.url.as_str()).collect();
let url_hashes: Vec<&str> = entries.iter().map(|e| e.url_hash.as_str()).collect();
let titles: Vec<&str> = entries.iter().map(|e| e.title.as_str()).collect();
let source_types: Vec<&str> = entries.iter().map(|e| e.source_type.as_str()).collect();
let source_urls: Vec<Option<&str>> = entries.iter().map(|e| e.source_url.as_deref()).collect();
let categories: Vec<Option<&str>> = entries.iter().map(|e| e.category.as_deref()).collect();
let synthesis_ids: Vec<Option<Uuid>> = entries.iter().map(|e| e.synthesis_id).collect();
let statuses: Vec<&str> = entries.iter().map(|e| e.status.as_str()).collect();
let scraped_oks: Vec<bool> = entries.iter().map(|e| e.scraped_ok).collect();
let job_ids: Vec<Uuid> = entries.iter().map(|e| e.job_id).collect();
sqlx::query(
r#"
INSERT INTO article_history (user_id, url, url_hash, title, source_type, source_url, category, synthesis_id, status, scraped_ok, job_id)
SELECT * FROM unnest($1::uuid[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::uuid[], $9::text[], $10::bool[], $11::uuid[])
"#,
)
.bind(&user_ids)
.bind(&urls)
.bind(&url_hashes)
.bind(&titles)
.bind(&source_types)
.bind(&source_urls)
.bind(&categories)
.bind(&synthesis_ids)
.bind(&statuses)
.bind(&scraped_oks)
.bind(&job_ids)
.execute(pool)
.await?;
Ok(())
}
/// Insert a single article history entry with full tracing metadata.
pub async fn insert_entry(pool: &PgPool, entry: &ArticleHistoryEntry) -> Result<(), AppError> {
sqlx::query(

@ -230,6 +230,9 @@ async fn run_generation_inner(
user_id: Uuid,
tx: &watch::Sender<ProgressEvent>,
) -> Result<Uuid, AppError> {
// Batch buffer for article history traces (flushed at logical boundaries)
let mut pending_traces: Vec<db::article_history::ArticleHistoryEntry> = Vec::new();
// === INITIALIZATION ===
emit_progress(tx, "settings", "Chargement des parametres...", 5);
let settings = db::settings::get_or_create_default(&state.pool, user_id).await?;
@ -365,14 +368,19 @@ async fn run_generation_inner(
if !existing.is_empty() {
for (url, source_url) in &candidate_urls {
if existing.contains(&hash_article_url(url)) {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url, title: "", source_type: "personalized_source",
source_url: Some(source_url), category: None, synthesis_id: None,
status: "filtered_history", scraped_ok: false,
}).await;
}));
}
}
candidate_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url)));
// Flush history dedup traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
@ -403,11 +411,11 @@ async fn run_generation_inner(
let source_domain = extract_domain(&source_url).unwrap_or_default();
let source_count = source_counts.get(&source_domain).copied().unwrap_or(0);
if source_count >= settings.max_articles_per_source as usize {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &url, title: "", source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_diversity", scraped_ok: false,
}).await;
}));
continue;
}
batch.push((url, source_url));
@ -437,11 +445,11 @@ async fn run_generation_inner(
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}).await;
}));
} else {
scraped_articles.push((final_url, source_url, body_text, page_title));
}
@ -524,6 +532,12 @@ async fn run_generation_inner(
done = true;
}
}
// Flush Phase 1 traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
// === PHASE 2: Web Search Fallback ===
@ -553,11 +567,11 @@ async fn run_generation_inner(
&state.pool, user_id, &result.url, &seen_urls, &source_counts,
settings.article_history_days, settings.max_articles_per_source as usize,
).await {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &result.url, title: &result.title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}).await;
}));
continue;
}
@ -566,6 +580,12 @@ async fn run_generation_inner(
brave_urls.push(result.url.clone());
}
// Flush Brave filter traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
// Scrape + classify in batches (same as Phase 1)
if !brave_urls.is_empty() {
emit_progress(tx, "processing", "Traitement des articles Brave...", 75);
@ -603,11 +623,11 @@ async fn run_generation_inner(
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}).await;
}));
} else {
scraped_articles.push((final_url, body_text, page_title));
}
@ -688,6 +708,12 @@ async fn run_generation_inner(
done = true;
}
}
// Flush Brave scrape/classify traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
} else {
// === EXISTING LLM SEARCH PATH ===
@ -715,11 +741,11 @@ async fn run_generation_inner(
&state.pool, user_id, &item.url, &seen_urls, &source_counts,
settings.article_history_days, settings.max_articles_per_source as usize,
).await {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &item.url, title: &item.title, source_type: "web_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}).await;
}));
continue;
}
@ -728,17 +754,23 @@ async fn run_generation_inner(
}
}
// Flush Phase 2 filter traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
// Scrape Phase 2 for validation
emit_progress(tx, "scraping", "Verification des sources web...", 80);
for (cat_key, item) in phase2_items {
let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await;
if let Some(reason) = drop_reason {
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &item.title, source_type: "web_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}).await;
}));
continue;
}
@ -752,6 +784,12 @@ async fn run_generation_inner(
*source_counts.entry(domain).or_insert(0) += 1;
}
}
// Flush Phase 2 scrape traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
@ -790,14 +828,20 @@ async fn run_generation_inner(
Some(_) => "personalized_source",
None => "web_search",
};
trace_article(&state.pool, user_id, job_id, &ArticleTrace {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &item.url, title: &item.title, source_type,
source_url: if source_type == "personalized_source" { url_source.get(&item.url).map(|s| s.as_str()) } else { None },
category: Some(&section.title), synthesis_id: Some(synthesis.id),
status: "used", scraped_ok: true,
}).await;
}));
}
}
// Flush final "used" traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
Ok(synthesis.id)
@ -848,14 +892,13 @@ struct ArticleTrace<'a> {
scraped_ok: bool,
}
/// Insert a trace entry into article_history for debugging pipeline behavior.
async fn trace_article(
pool: &sqlx::PgPool,
/// Build an article history entry from trace parameters (no DB call).
fn build_trace_entry(
user_id: Uuid,
job_id: Uuid,
trace: &ArticleTrace<'_>,
) {
let entry = db::article_history::ArticleHistoryEntry {
) -> db::article_history::ArticleHistoryEntry {
db::article_history::ArticleHistoryEntry {
user_id,
url: trace.url.to_string(),
url_hash: hash_article_url(trace.url),
@ -867,8 +910,7 @@ async fn trace_article(
status: trace.status.to_string(),
scraped_ok: trace.scraped_ok,
job_id,
};
db::article_history::insert_entry(pool, &entry).await.ok();
}
}
/// Log an LLM call with full prompt, response, and timing.

Loading…
Cancel
Save