feat: add Brave Search Phase 2 pipeline path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
master
oabrivard 3 months ago
parent e05c2ae75a
commit f124b056fe

@ -559,85 +559,286 @@ async fn run_generation_inner(
}).collect(); }).collect();
if !category_gaps.is_empty() { if !category_gaps.is_empty() {
emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70); if settings.use_brave_search {
check_rate_limit(state, &user_rate_limiter, &provider_name).await?; // === BRAVE SEARCH PATH ===
emit_progress(tx, "search", "Recherche Brave Search...", 70);
let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category); let brave_key = resolve_brave_key(state, user_id).await?;
let current_date = Utc::now().format("%A %d %B %Y").to_string(); let query = format!("{} actualites", settings.theme);
let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&settings, &[], &current_date, &[], Some(&category_gaps)); let brave_results = crate::services::brave_search::search(
&state.http_client, &brave_key, &query, 20, settings.max_age_days,
).await?;
let llm_start = std::time::Instant::now(); tracing::info!(results = brave_results.len(), "Brave Search returned results");
let raw_results = provider.call_llm(&model_websearch, &sys_prompt, &usr_prompt, &search_schema).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
log_llm_call(&state.pool, user_id, job_id, "search", &model_websearch, &sys_prompt, &usr_prompt, &raw_results, llm_duration, None).await;
emit_progress(tx, "parsing", "Analyse des resultats...", 75); // Filter Brave results
let parsed = parse_llm_output(&raw_results, &user_categories)?; let mut brave_urls: Vec<String> = Vec::new();
for result in &brave_results {
// Filter and validate Phase 2 articles let url_lower = result.url.to_lowercase();
let mut phase2_items: Vec<(String, NewsItem)> = Vec::new();
for (cat_key, items) in parsed {
for item in items {
let url_lower = item.url.to_lowercase();
// Homepage filter // Homepage filter
if let Ok(parsed_url) = url::Url::parse(&item.url) { if let Ok(parsed_url) = url::Url::parse(&result.url) {
let path = parsed_url.path(); let path = parsed_url.path();
if path.is_empty() || path == "/" { if path.is_empty() || path == "/" {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_homepage", false).await; trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_homepage", false).await;
continue; continue;
} }
} }
// Cross-phase dedup // Cross-phase dedup
if seen_urls.contains(&url_lower) { if seen_urls.contains(&url_lower) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_cross_phase_dedup", false).await; trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_cross_phase_dedup", false).await;
continue; continue;
} }
// History dedup // History dedup
if settings.article_history_days > 0 { if settings.article_history_days > 0 {
let hash = hash_article_url(&item.url); let hash = hash_article_url(&result.url);
let exists = db::article_history::check_urls_exist(&state.pool, user_id, std::slice::from_ref(&hash)).await.unwrap_or_default(); let exists = db::article_history::check_urls_exist(&state.pool, user_id, std::slice::from_ref(&hash)).await.unwrap_or_default();
if exists.contains(&hash) { if exists.contains(&hash) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_history", false).await; trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_history", false).await;
continue; continue;
} }
} }
// Source limit // Source diversity
if let Some(domain) = extract_domain(&item.url) { if let Some(domain) = extract_domain(&result.url) {
let count = source_counts.get(&domain).copied().unwrap_or(0); let count = source_counts.get(&domain).copied().unwrap_or(0);
if count >= settings.max_articles_per_source as usize { if count >= settings.max_articles_per_source as usize {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_diversity", false).await; trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_diversity", false).await;
continue; continue;
} }
} }
seen_urls.insert(url_lower); seen_urls.insert(url_lower);
phase2_items.push((cat_key.clone(), item)); url_source.insert(result.url.clone(), "brave_search".to_string());
brave_urls.push(result.url.clone());
} }
}
// Scrape Phase 2 for validation // Scrape + classify in batches (same as Phase 1)
emit_progress(tx, "scraping", "Verification des sources web...", 80); if !brave_urls.is_empty() {
for (cat_key, item) in phase2_items { emit_progress(tx, "processing", "Traitement des articles Brave...", 75);
let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await; let total_candidates = brave_urls.len();
let batch_size = settings.batch_size.max(1) as usize;
let mut processed = 0usize;
let mut candidates_iter = brave_urls.into_iter();
let mut done = false;
while !done {
let mut batch: Vec<String> = Vec::new();
while batch.len() < batch_size {
let Some(url) = candidates_iter.next() else { break };
batch.push(url);
}
if let Some(reason) = drop_reason { if batch.is_empty() { break; }
trace_article(&state.pool, user_id, job_id, &final_url, &item.title, "web_search", None, None, None, reason, false).await;
continue; let pct = 75 + ((processed as u32 * 15) / total_candidates.max(1) as u32).min(15);
emit_progress(tx, "processing", &format!("Articles Brave {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8);
// Scrape batch in parallel
let mut scrape_set = tokio::task::JoinSet::new();
for url in &batch {
let client = state.http_client.clone();
let u = url.clone();
let mad = settings.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, result)
});
}
let mut scraped_articles: Vec<(String, String, String)> = Vec::new(); // (url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
trace_article(&state.pool, user_id, job_id, &final_url, &page_title, "brave_search", None, None, None, reason, false).await;
} else {
scraped_articles.push((final_url, body_text, page_title));
}
}
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
// Classify/summarize in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = model_research.clone();
let schema = classify_schema.clone();
let cats = classification_categories.clone();
let body_snippet: String = body_text.chars().take(500).collect();
let title = page_title.clone();
let url = final_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (class_sys, class_user) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats);
let sys = class_sys.clone();
let usr = class_user.clone();
let mdl = model.clone();
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&mdl, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &mdl, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string();
let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string();
let mut llm_category = class_response.get("category").and_then(|c| c.as_str()).unwrap_or("Autre").to_string();
if !classification_categories.iter().any(|c| c.to_lowercase() == llm_category.to_lowercase()) {
llm_category = "Autre".to_string();
}
let cat_key = if llm_category.to_lowercase() == "autre" {
"category_autre".to_string()
} else {
user_categories.iter().position(|c| c.to_lowercase() == llm_category.to_lowercase())
.map(|i| format!("category_{}", i))
.unwrap_or_else(|| "category_autre".to_string())
};
let cat_filled = filled_counts.get(&llm_category).copied().unwrap_or(0);
let (final_cat_key, final_cat_name) = if cat_filled >= settings.max_items_per_category as usize && llm_category.to_lowercase() != "autre" {
let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0);
if autre_filled >= settings.max_items_per_category as usize {
continue;
}
("category_autre".to_string(), "Autre".to_string())
} else {
(cat_key, llm_category)
};
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
if let Some(domain) = extract_domain(&final_url) {
*source_counts.entry(domain).or_insert(0) += 1;
}
}
}
processed += batch.len();
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
done = true;
}
}
} }
} else {
// === EXISTING LLM SEARCH PATH ===
emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70);
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category);
let current_date = Utc::now().format("%A %d %B %Y").to_string();
let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&settings, &[], &current_date, &[], Some(&category_gaps));
let llm_start = std::time::Instant::now();
let raw_results = provider.call_llm(&model_websearch, &sys_prompt, &usr_prompt, &search_schema).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
log_llm_call(&state.pool, user_id, job_id, "search", &model_websearch, &sys_prompt, &usr_prompt, &raw_results, llm_duration, None).await;
article_scraped.entry(cat_key).or_default().push(NewsItem { emit_progress(tx, "parsing", "Analyse des resultats...", 75);
title: item.title, let parsed = parse_llm_output(&raw_results, &user_categories)?;
url: final_url,
summary: item.summary,
});
if let Some(domain) = extract_domain(&item.url) { // Filter and validate Phase 2 articles
*source_counts.entry(domain).or_insert(0) += 1; let mut phase2_items: Vec<(String, NewsItem)> = Vec::new();
for (cat_key, items) in parsed {
for item in items {
let url_lower = item.url.to_lowercase();
// Homepage filter
if let Ok(parsed_url) = url::Url::parse(&item.url) {
let path = parsed_url.path();
if path.is_empty() || path == "/" {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_homepage", false).await;
continue;
}
}
// Cross-phase dedup
if seen_urls.contains(&url_lower) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_cross_phase_dedup", false).await;
continue;
}
// History dedup
if settings.article_history_days > 0 {
let hash = hash_article_url(&item.url);
let exists = db::article_history::check_urls_exist(&state.pool, user_id, std::slice::from_ref(&hash)).await.unwrap_or_default();
if exists.contains(&hash) {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_history", false).await;
continue;
}
}
// Source limit
if let Some(domain) = extract_domain(&item.url) {
let count = source_counts.get(&domain).copied().unwrap_or(0);
if count >= settings.max_articles_per_source as usize {
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_diversity", false).await;
continue;
}
}
seen_urls.insert(url_lower);
phase2_items.push((cat_key.clone(), item));
}
}
// Scrape Phase 2 for validation
emit_progress(tx, "scraping", "Verification des sources web...", 80);
for (cat_key, item) in phase2_items {
let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await;
if let Some(reason) = drop_reason {
trace_article(&state.pool, user_id, job_id, &final_url, &item.title, "web_search", None, None, None, reason, false).await;
continue;
}
article_scraped.entry(cat_key).or_default().push(NewsItem {
title: item.title,
url: final_url,
summary: item.summary,
});
if let Some(domain) = extract_domain(&item.url) {
*source_counts.entry(domain).or_insert(0) += 1;
}
} }
} }
} }
@ -672,10 +873,15 @@ async fn run_generation_inner(
if settings.article_history_days > 0 { if settings.article_history_days > 0 {
for section in &final_sections { for section in &final_sections {
for item in &section.items { for item in &section.items {
let source_url = url_source.get(&item.url).map(|s| s.as_str()); let source_type = match url_source.get(&item.url).map(|s| s.as_str()) {
Some("brave_search") => "brave_search",
Some(_) => "personalized_source",
None => "web_search",
};
trace_article(&state.pool, user_id, job_id, &item.url, &item.title, trace_article(&state.pool, user_id, job_id, &item.url, &item.title,
if source_url.is_some() { "personalized_source" } else { "web_search" }, source_type,
source_url, Some(&section.title), Some(synthesis.id), "used", true).await; if source_type == "personalized_source" { url_source.get(&item.url).map(|s| s.as_str()) } else { None },
Some(&section.title), Some(synthesis.id), "used", true).await;
} }
} }
} }
@ -906,6 +1112,23 @@ fn hash_article_url(url: &str) -> String {
crate::util::token::hash_token(&normalized) crate::util::token::hash_token(&normalized)
} }
/// Decrypt the Brave Search API key for a user.
async fn resolve_brave_key(
state: &AppState,
user_id: Uuid,
) -> Result<String, AppError> {
let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?;
let key_record = db::api_keys::get_for_user_and_provider(
&state.pool, user_id, "brave_search",
).await?
.ok_or_else(|| AppError::BadRequest(
"Brave Search est active mais aucune cle API Brave n'est configuree. \
Veuillez ajouter une cle API Brave Search dans vos parametres.".into(),
))?;
encryption::decrypt(&master_key, &key_record.encrypted_key, &key_record.nonce)
}
/// Resolve the LLM provider and decrypt the user's API key. /// Resolve the LLM provider and decrypt the user's API key.
/// ///
/// If the user has a preferred provider in settings, looks for a key matching /// If the user has a preferred provider in settings, looks for a key matching

Loading…
Cancel
Save