diff --git a/backend/src/services/synthesis.rs b/backend/src/services/synthesis.rs index 99a35a5..7c87e00 100644 --- a/backend/src/services/synthesis.rs +++ b/backend/src/services/synthesis.rs @@ -559,85 +559,286 @@ async fn run_generation_inner( }).collect(); if !category_gaps.is_empty() { - emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70); - check_rate_limit(state, &user_rate_limiter, &provider_name).await?; + if settings.use_brave_search { + // === BRAVE SEARCH PATH === + emit_progress(tx, "search", "Recherche Brave Search...", 70); - let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category); - let current_date = Utc::now().format("%A %d %B %Y").to_string(); - let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&settings, &[], ¤t_date, &[], Some(&category_gaps)); + let brave_key = resolve_brave_key(state, user_id).await?; + let query = format!("{} actualites", settings.theme); + let brave_results = crate::services::brave_search::search( + &state.http_client, &brave_key, &query, 20, settings.max_age_days, + ).await?; - let llm_start = std::time::Instant::now(); - let raw_results = provider.call_llm(&model_websearch, &sys_prompt, &usr_prompt, &search_schema).await?; - let llm_duration = llm_start.elapsed().as_millis() as u64; - log_llm_call(&state.pool, user_id, job_id, "search", &model_websearch, &sys_prompt, &usr_prompt, &raw_results, llm_duration, None).await; + tracing::info!(results = brave_results.len(), "Brave Search returned results"); - emit_progress(tx, "parsing", "Analyse des resultats...", 75); - let parsed = parse_llm_output(&raw_results, &user_categories)?; - - // Filter and validate Phase 2 articles - let mut phase2_items: Vec<(String, NewsItem)> = Vec::new(); - - for (cat_key, items) in parsed { - for item in items { - let url_lower = item.url.to_lowercase(); + // Filter Brave results + let mut brave_urls: Vec = Vec::new(); + for result in &brave_results { + let url_lower = result.url.to_lowercase(); // Homepage filter - if let Ok(parsed_url) = url::Url::parse(&item.url) { + if let Ok(parsed_url) = url::Url::parse(&result.url) { let path = parsed_url.path(); if path.is_empty() || path == "/" { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_homepage", false).await; + trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_homepage", false).await; continue; } } // Cross-phase dedup if seen_urls.contains(&url_lower) { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_cross_phase_dedup", false).await; + trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_cross_phase_dedup", false).await; continue; } // History dedup if settings.article_history_days > 0 { - let hash = hash_article_url(&item.url); + let hash = hash_article_url(&result.url); let exists = db::article_history::check_urls_exist(&state.pool, user_id, std::slice::from_ref(&hash)).await.unwrap_or_default(); if exists.contains(&hash) { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_history", false).await; + trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_history", false).await; continue; } } - // Source limit - if let Some(domain) = extract_domain(&item.url) { + // Source diversity + if let Some(domain) = extract_domain(&result.url) { let count = source_counts.get(&domain).copied().unwrap_or(0); if count >= settings.max_articles_per_source as usize { - trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_diversity", false).await; + trace_article(&state.pool, user_id, job_id, &result.url, &result.title, "brave_search", None, None, None, "filtered_diversity", false).await; continue; } } seen_urls.insert(url_lower); - phase2_items.push((cat_key.clone(), item)); + url_source.insert(result.url.clone(), "brave_search".to_string()); + brave_urls.push(result.url.clone()); } - } - // Scrape Phase 2 for validation - emit_progress(tx, "scraping", "Verification des sources web...", 80); - for (cat_key, item) in phase2_items { - let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await; + // Scrape + classify in batches (same as Phase 1) + if !brave_urls.is_empty() { + emit_progress(tx, "processing", "Traitement des articles Brave...", 75); + let total_candidates = brave_urls.len(); + let batch_size = settings.batch_size.max(1) as usize; + let mut processed = 0usize; + let mut candidates_iter = brave_urls.into_iter(); + let mut done = false; + + while !done { + let mut batch: Vec = Vec::new(); + while batch.len() < batch_size { + let Some(url) = candidates_iter.next() else { break }; + batch.push(url); + } - if let Some(reason) = drop_reason { - trace_article(&state.pool, user_id, job_id, &final_url, &item.title, "web_search", None, None, None, reason, false).await; - continue; + if batch.is_empty() { break; } + + let pct = 75 + ((processed as u32 * 15) / total_candidates.max(1) as u32).min(15); + emit_progress(tx, "processing", &format!("Articles Brave {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8); + + // Scrape batch in parallel + let mut scrape_set = tokio::task::JoinSet::new(); + for url in &batch { + let client = state.http_client.clone(); + let u = url.clone(); + let mad = settings.max_age_days as i64; + scrape_set.spawn(async move { + let result = scrape_single_article(&client, &u, mad).await; + (u, result) + }); + } + + let mut scraped_articles: Vec<(String, String, String)> = Vec::new(); // (url, body_text, page_title) + while let Some(join_result) = scrape_set.join_next().await { + if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result { + if let Some(reason) = drop_reason { + trace_article(&state.pool, user_id, job_id, &final_url, &page_title, "brave_search", None, None, None, reason, false).await; + } else { + scraped_articles.push((final_url, body_text, page_title)); + } + } + } + + if scraped_articles.is_empty() { + processed += batch.len(); + continue; + } + + // Classify/summarize in parallel + check_rate_limit(state, &user_rate_limiter, &provider_name).await?; + + let mut classify_set = tokio::task::JoinSet::new(); + for (final_url, body_text, page_title) in &scraped_articles { + let provider_clone = std::sync::Arc::clone(&provider); + let model = model_research.clone(); + let schema = classify_schema.clone(); + let cats = classification_categories.clone(); + let body_snippet: String = body_text.chars().take(500).collect(); + let title = page_title.clone(); + let url = final_url.clone(); + let pool = state.pool.clone(); + let uid = user_id; + let jid = job_id; + + let (class_sys, class_user) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats); + let sys = class_sys.clone(); + let usr = class_user.clone(); + let mdl = model.clone(); + + classify_set.spawn(async move { + let llm_start = std::time::Instant::now(); + let result = provider_clone.call_llm(&mdl, &sys, &usr, &schema).await; + let duration = llm_start.elapsed().as_millis() as u64; + + if let Ok(ref resp) = result { + let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default(); + crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &mdl, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok(); + } + + (url, title, result) + }); + } + + while let Some(join_result) = classify_set.join_next().await { + if let Ok((final_url, page_title, llm_result)) = join_result { + let class_response = match llm_result { + Ok(resp) => resp, + Err(e) => { + tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article"); + continue; + } + }; + + let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string(); + let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string(); + let mut llm_category = class_response.get("category").and_then(|c| c.as_str()).unwrap_or("Autre").to_string(); + + if !classification_categories.iter().any(|c| c.to_lowercase() == llm_category.to_lowercase()) { + llm_category = "Autre".to_string(); + } + + let cat_key = if llm_category.to_lowercase() == "autre" { + "category_autre".to_string() + } else { + user_categories.iter().position(|c| c.to_lowercase() == llm_category.to_lowercase()) + .map(|i| format!("category_{}", i)) + .unwrap_or_else(|| "category_autre".to_string()) + }; + + let cat_filled = filled_counts.get(&llm_category).copied().unwrap_or(0); + let (final_cat_key, final_cat_name) = if cat_filled >= settings.max_items_per_category as usize && llm_category.to_lowercase() != "autre" { + let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0); + if autre_filled >= settings.max_items_per_category as usize { + continue; + } + ("category_autre".to_string(), "Autre".to_string()) + } else { + (cat_key, llm_category) + }; + + article_scraped.entry(final_cat_key).or_default().push(NewsItem { + title: llm_title, + url: final_url.clone(), + summary: llm_summary, + }); + *filled_counts.entry(final_cat_name).or_insert(0) += 1; + + if let Some(domain) = extract_domain(&final_url) { + *source_counts.entry(domain).or_insert(0) += 1; + } + } + } + + processed += batch.len(); + + let total: usize = article_scraped.values().map(|v| v.len()).sum(); + if total >= max_total { + done = true; + } + } } + } else { + // === EXISTING LLM SEARCH PATH === + emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70); + check_rate_limit(state, &user_rate_limiter, &provider_name).await?; + + let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category); + let current_date = Utc::now().format("%A %d %B %Y").to_string(); + let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&settings, &[], ¤t_date, &[], Some(&category_gaps)); + + let llm_start = std::time::Instant::now(); + let raw_results = provider.call_llm(&model_websearch, &sys_prompt, &usr_prompt, &search_schema).await?; + let llm_duration = llm_start.elapsed().as_millis() as u64; + log_llm_call(&state.pool, user_id, job_id, "search", &model_websearch, &sys_prompt, &usr_prompt, &raw_results, llm_duration, None).await; - article_scraped.entry(cat_key).or_default().push(NewsItem { - title: item.title, - url: final_url, - summary: item.summary, - }); + emit_progress(tx, "parsing", "Analyse des resultats...", 75); + let parsed = parse_llm_output(&raw_results, &user_categories)?; - if let Some(domain) = extract_domain(&item.url) { - *source_counts.entry(domain).or_insert(0) += 1; + // Filter and validate Phase 2 articles + let mut phase2_items: Vec<(String, NewsItem)> = Vec::new(); + + for (cat_key, items) in parsed { + for item in items { + let url_lower = item.url.to_lowercase(); + + // Homepage filter + if let Ok(parsed_url) = url::Url::parse(&item.url) { + let path = parsed_url.path(); + if path.is_empty() || path == "/" { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_homepage", false).await; + continue; + } + } + + // Cross-phase dedup + if seen_urls.contains(&url_lower) { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_cross_phase_dedup", false).await; + continue; + } + + // History dedup + if settings.article_history_days > 0 { + let hash = hash_article_url(&item.url); + let exists = db::article_history::check_urls_exist(&state.pool, user_id, std::slice::from_ref(&hash)).await.unwrap_or_default(); + if exists.contains(&hash) { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_history", false).await; + continue; + } + } + + // Source limit + if let Some(domain) = extract_domain(&item.url) { + let count = source_counts.get(&domain).copied().unwrap_or(0); + if count >= settings.max_articles_per_source as usize { + trace_article(&state.pool, user_id, job_id, &item.url, &item.title, "web_search", None, None, None, "filtered_diversity", false).await; + continue; + } + } + + seen_urls.insert(url_lower); + phase2_items.push((cat_key.clone(), item)); + } + } + + // Scrape Phase 2 for validation + emit_progress(tx, "scraping", "Verification des sources web...", 80); + for (cat_key, item) in phase2_items { + let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await; + + if let Some(reason) = drop_reason { + trace_article(&state.pool, user_id, job_id, &final_url, &item.title, "web_search", None, None, None, reason, false).await; + continue; + } + + article_scraped.entry(cat_key).or_default().push(NewsItem { + title: item.title, + url: final_url, + summary: item.summary, + }); + + if let Some(domain) = extract_domain(&item.url) { + *source_counts.entry(domain).or_insert(0) += 1; + } } } } @@ -672,10 +873,15 @@ async fn run_generation_inner( if settings.article_history_days > 0 { for section in &final_sections { for item in §ion.items { - let source_url = url_source.get(&item.url).map(|s| s.as_str()); + let source_type = match url_source.get(&item.url).map(|s| s.as_str()) { + Some("brave_search") => "brave_search", + Some(_) => "personalized_source", + None => "web_search", + }; trace_article(&state.pool, user_id, job_id, &item.url, &item.title, - if source_url.is_some() { "personalized_source" } else { "web_search" }, - source_url, Some(§ion.title), Some(synthesis.id), "used", true).await; + source_type, + if source_type == "personalized_source" { url_source.get(&item.url).map(|s| s.as_str()) } else { None }, + Some(§ion.title), Some(synthesis.id), "used", true).await; } } } @@ -906,6 +1112,23 @@ fn hash_article_url(url: &str) -> String { crate::util::token::hash_token(&normalized) } +/// Decrypt the Brave Search API key for a user. +async fn resolve_brave_key( + state: &AppState, + user_id: Uuid, +) -> Result { + let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?; + let key_record = db::api_keys::get_for_user_and_provider( + &state.pool, user_id, "brave_search", + ).await? + .ok_or_else(|| AppError::BadRequest( + "Brave Search est active mais aucune cle API Brave n'est configuree. \ + Veuillez ajouter une cle API Brave Search dans vos parametres.".into(), + ))?; + + encryption::decrypt(&master_key, &key_record.encrypted_key, &key_record.nonce) +} + /// Resolve the LLM provider and decrypt the user's API key. /// /// If the user has a preferred provider in settings, looks for a key matching