You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1337 lines
58 KiB
Rust
1337 lines
58 KiB
Rust
//! Synthesis generation pipeline.
|
|
//!
|
|
//! Orchestrates the two-phase pipeline:
|
|
//! 1. Personalized sources: scrape user sources, classify+summarize per article
|
|
//! 2. Web search fallback: LLM search for missing categories, scrape to validate
|
|
//!
|
|
//! Progress is reported via `tokio::sync::watch` channels per job,
|
|
//! consumed by SSE endpoints for real-time client updates.
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use tokio::sync::watch;
|
|
use uuid::Uuid;
|
|
|
|
use crate::app_state::AppState;
|
|
use crate::db;
|
|
use crate::errors::AppError;
|
|
use crate::models::settings::UserSettings;
|
|
use crate::models::synthesis::{
|
|
get_iso_week_string, NewsItem, NewsSection,
|
|
};
|
|
use crate::services::encryption;
|
|
use crate::services::llm::factory::create_provider;
|
|
use crate::services::scraper;
|
|
use crate::services::source_scraper;
|
|
use crate::services::feed_parser;
|
|
|
|
mod helpers;
|
|
pub(crate) use helpers::{
|
|
ArticleTrace, assign_category, build_trace_entry, extract_domain,
|
|
filter_phase2_url, hash_article_url,
|
|
};
|
|
|
|
// Re-export for downstream consumers that previously imported from synthesis.
|
|
pub use crate::services::job_store::{emit_progress, JobStore, ProgressEvent};
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Generation Pipeline
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
/// Run the full generation pipeline for a user.
|
|
///
|
|
/// This is the core orchestration function. It is spawned as a background
|
|
/// tokio task and communicates progress via the `watch` channel.
|
|
///
|
|
/// # Phases
|
|
/// 1. Personalized sources: extract links, scrape, classify+summarize per article
|
|
/// 2. Web search fallback: LLM search for under-filled categories, scrape to validate
|
|
/// 3. Save synthesis to DB
|
|
pub async fn run_generation(
|
|
job_id: Uuid,
|
|
state: AppState,
|
|
user_id: Uuid,
|
|
theme_id: Uuid,
|
|
tx: Arc<watch::Sender<ProgressEvent>>,
|
|
provider_override: Option<Arc<dyn crate::services::llm::LlmProvider>>,
|
|
cancelled: Arc<AtomicBool>,
|
|
) {
|
|
let result = run_generation_inner(job_id, &state, user_id, theme_id, &tx, provider_override, &cancelled).await;
|
|
|
|
match result {
|
|
Ok(synthesis_id) => {
|
|
tx.send(ProgressEvent::Complete { synthesis_id }).ok();
|
|
tracing::info!(job_id = %job_id, synthesis_id = %synthesis_id, "Generation completed");
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(job_id = %job_id, error = %e, "Generation failed");
|
|
// Sanitize error message — never expose API keys or internal details
|
|
let safe_message = sanitize_error_message(&e.to_string());
|
|
tx.send(ProgressEvent::Error {
|
|
message: safe_message,
|
|
})
|
|
.ok();
|
|
}
|
|
}
|
|
|
|
// Keep the job in the store for 5 minutes after completion
|
|
// to allow SSE reconnection
|
|
let store = state.job_store.clone();
|
|
let jid = job_id;
|
|
tokio::spawn(async move {
|
|
tokio::time::sleep(Duration::from_secs(300)).await;
|
|
store.remove(&jid);
|
|
});
|
|
}
|
|
|
|
/// Inner implementation of the generation pipeline, returning a Result.
|
|
pub async fn run_generation_inner(
|
|
job_id: Uuid,
|
|
state: &AppState,
|
|
user_id: Uuid,
|
|
theme_id: Uuid,
|
|
tx: &watch::Sender<ProgressEvent>,
|
|
provider_override: Option<Arc<dyn crate::services::llm::LlmProvider>>,
|
|
cancelled: &AtomicBool,
|
|
) -> Result<Uuid, AppError> {
|
|
// Batch buffer for article history traces (flushed at logical boundaries)
|
|
let mut pending_traces: Vec<db::article_history::ArticleHistoryEntry> = Vec::new();
|
|
|
|
// === INITIALIZATION ===
|
|
emit_progress(tx, "sources", "Chargement des parametres...", 5);
|
|
let settings = db::settings::get_or_create_default(&state.pool, user_id).await?;
|
|
|
|
// Load theme
|
|
let theme = db::themes::get_by_id(&state.pool, user_id, theme_id).await?
|
|
.ok_or_else(|| AppError::BadRequest("Theme introuvable.".into()))?;
|
|
let theme_categories: Vec<String> = serde_json::from_value(theme.categories).unwrap_or_default();
|
|
|
|
if settings.article_history_days > 0 {
|
|
db::article_history::cleanup_old(&state.pool, user_id, settings.article_history_days).await.unwrap_or(0);
|
|
db::llm_call_log::truncate_old(&state.pool, user_id, settings.article_history_days).await.ok();
|
|
}
|
|
|
|
let user_categories = if theme_categories.is_empty() {
|
|
Vec::new()
|
|
} else {
|
|
theme_categories.clone()
|
|
};
|
|
let mut classification_categories = user_categories.clone();
|
|
classification_categories.push("Divers".to_string());
|
|
|
|
emit_progress(tx, "sources", "Chargement des sources...", 10);
|
|
let sources = db::sources::list_for_user(&state.pool, user_id, Some(theme_id)).await?;
|
|
|
|
emit_progress(tx, "sources", "Configuration du fournisseur IA...", 12);
|
|
let (provider_name, provider) = if let Some(mock_provider) = provider_override {
|
|
("mock".to_string(), mock_provider)
|
|
} else {
|
|
let (pname, api_key) = resolve_provider_and_key(state, user_id, &settings).await?;
|
|
let p = create_provider(&pname, api_key)?;
|
|
(pname, p)
|
|
};
|
|
let (model_research, model_websearch) = if provider_name == "mock" {
|
|
let research = if settings.ai_model.is_empty() { "mock-model".to_string() } else { settings.ai_model.clone() };
|
|
let websearch = if settings.ai_model_websearch.is_empty() { "mock-model".to_string() } else { settings.ai_model_websearch.clone() };
|
|
(research, websearch)
|
|
} else {
|
|
let model_research = if !settings.ai_model.is_empty() { settings.ai_model.clone() } else { resolve_model(state, &provider_name).await? };
|
|
let model_websearch = if !settings.ai_model_websearch.is_empty() { settings.ai_model_websearch.clone() } else { model_research.clone() };
|
|
(model_research, model_websearch)
|
|
};
|
|
let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id);
|
|
|
|
// Tracking structures
|
|
let mut article_scraped: HashMap<String, Vec<NewsItem>> = HashMap::new();
|
|
let mut source_counts: HashMap<String, usize> = HashMap::new();
|
|
let mut url_source: HashMap<String, String> = HashMap::new();
|
|
let mut filled_counts: HashMap<String, usize> = HashMap::new();
|
|
let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
|
|
let max_total = (user_categories.len() + 1) * theme.max_items_per_category as usize;
|
|
let classify_schema = Arc::new(crate::services::llm::schema::build_article_classify_schema());
|
|
let model_research = Arc::new(model_research);
|
|
let classification_categories = Arc::new(classification_categories);
|
|
|
|
// === PHASE 1: Personalized Sources ===
|
|
if !sources.is_empty() {
|
|
emit_progress(tx, "sources", "Analyse des sources personnalisees...", 15);
|
|
|
|
let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
|
|
let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
|
|
// Preferred sources first, preserving rotation order within each group
|
|
let preferred: Vec<_> = rotated_sources.iter().filter(|s| s.is_preferred).cloned().collect();
|
|
let non_preferred: Vec<_> = rotated_sources.iter().filter(|s| !s.is_preferred).cloned().collect();
|
|
let ordered_sources = [preferred, non_preferred].concat();
|
|
let max_links = settings.max_links_per_source.max(1) as usize;
|
|
let window_size = settings.source_extraction_window.max(1) as usize;
|
|
|
|
// Process sources in waves of `window_size`
|
|
let source_chunks: Vec<Vec<&crate::models::source::Source>> = ordered_sources
|
|
.chunks(window_size)
|
|
.map(|chunk| chunk.iter().collect())
|
|
.collect();
|
|
let total_waves = source_chunks.len();
|
|
|
|
'wave_loop: for (wave_idx, wave_sources) in source_chunks.iter().enumerate() {
|
|
// Check cancellation before each wave
|
|
if cancelled.load(Ordering::Relaxed) {
|
|
tracing::info!(job_id = %job_id, "Generation cancelled by user (before wave)");
|
|
emit_progress(tx, "saving", "Generation arretee, sauvegarde...", 90);
|
|
break 'wave_loop;
|
|
}
|
|
|
|
let articles_so_far: usize = article_scraped.values().map(|v| v.len()).sum();
|
|
let pct = 5 + ((articles_so_far as u32 * 60) / max_total.max(1) as u32).min(60);
|
|
emit_progress(tx, "sources",
|
|
&format!("Vague {}/{} : extraction des sources...", wave_idx + 1, total_waves),
|
|
pct as u8);
|
|
|
|
// 1a. Extract links from this wave's sources (RSS-first with HTML fallback)
|
|
let mut wave_urls: Vec<(String, String)> = Vec::new();
|
|
let mut rss_updates: Vec<(Uuid, Option<String>, Option<DateTime<Utc>>)> = Vec::new();
|
|
{
|
|
let mut join_set = tokio::task::JoinSet::new();
|
|
for source in wave_sources {
|
|
let client = state.http_client.clone();
|
|
let source_id = source.id;
|
|
let source_url = source.url.clone();
|
|
let source_title = source.title.clone();
|
|
let rss_url = source.rss_url.clone();
|
|
let rss_discovered_at = source.rss_discovered_at;
|
|
let max_l = max_links;
|
|
join_set.spawn(async move {
|
|
// Try RSS feed first
|
|
let feed_result = feed_parser::detect_and_parse_feed(
|
|
&client,
|
|
&source_url,
|
|
rss_url.as_deref(),
|
|
rss_discovered_at,
|
|
max_l,
|
|
).await;
|
|
|
|
match feed_result {
|
|
feed_parser::FeedResult::Found { feed_url, entries }
|
|
if entries.len() >= feed_parser::MIN_FEED_ENTRIES =>
|
|
{
|
|
let links: Vec<String> = entries.into_iter().map(|e| e.url).collect();
|
|
tracing::info!(
|
|
source = %source_title,
|
|
feed = %feed_url,
|
|
links = links.len(),
|
|
"Extracted links from RSS feed"
|
|
);
|
|
// Signal RSS URL update if it changed
|
|
let rss_changed = rss_url.as_deref() != Some(&feed_url);
|
|
let rss_stale = rss_discovered_at
|
|
.map(|d| Utc::now().signed_duration_since(d).num_days() >= feed_parser::REDISCOVERY_DAYS)
|
|
.unwrap_or(true);
|
|
let update = if rss_changed || rss_stale {
|
|
Some((source_id, Some(feed_url), Some(Utc::now())))
|
|
} else {
|
|
None
|
|
};
|
|
(source_url, source_title, Ok(links), update)
|
|
}
|
|
feed_parser::FeedResult::Found { .. } => {
|
|
// Feed found but too few entries — keep the cache, fall back to HTML
|
|
let links = source_scraper::extract_article_links(&client, &source_url, max_l).await;
|
|
(source_url, source_title, links, None)
|
|
}
|
|
feed_parser::FeedResult::NotFound => {
|
|
// No feed discovered — fall back to HTML and clear any stale cache
|
|
let links = source_scraper::extract_article_links(&client, &source_url, max_l).await;
|
|
let update = if rss_url.is_some() {
|
|
Some((source_id, None, None))
|
|
} else {
|
|
None
|
|
};
|
|
(source_url, source_title, links, update)
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
while let Some(join_result) = join_set.join_next().await {
|
|
if let Ok((source_url, source_title, links_result, rss_update)) = join_result {
|
|
if let Some(update) = rss_update {
|
|
rss_updates.push(update);
|
|
}
|
|
match links_result {
|
|
Ok(links) => {
|
|
tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
|
|
for link in links {
|
|
if seen_urls.insert(link.to_lowercase()) {
|
|
wave_urls.push((link, source_url.clone()));
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(source = %source_title, error = %e, "Failed to extract links");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Persist RSS URL updates (fire-and-forget)
|
|
for (source_id, new_rss_url, new_discovered_at) in rss_updates {
|
|
db::sources::update_source_rss(
|
|
&state.pool,
|
|
source_id,
|
|
user_id,
|
|
new_rss_url.as_deref(),
|
|
new_discovered_at,
|
|
).await.ok();
|
|
}
|
|
|
|
// 1b. Filter against article history
|
|
if settings.article_history_days > 0 && !wave_urls.is_empty() {
|
|
let hashes: Vec<String> = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect();
|
|
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default();
|
|
if !existing.is_empty() {
|
|
for (url, source_url) in &wave_urls {
|
|
if existing.contains(&hash_article_url(url)) {
|
|
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
|
|
url, title: "", source_type: "personalized_source",
|
|
source_url: Some(source_url), category: None, synthesis_id: None,
|
|
status: "filtered_history", scraped_ok: false,
|
|
published_date: None,
|
|
}));
|
|
}
|
|
}
|
|
wave_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url)));
|
|
// Flush history dedup traces
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
// 1c. Preferred-first shuffle: preferred source URLs first, then others
|
|
use rand::seq::SliceRandom;
|
|
{
|
|
let preferred_source_urls: std::collections::HashSet<String> = wave_sources.iter()
|
|
.filter(|s| s.is_preferred)
|
|
.map(|s| s.url.clone())
|
|
.collect();
|
|
|
|
let mut preferred_urls: Vec<_> = wave_urls.iter()
|
|
.filter(|(_, source_url)| preferred_source_urls.contains(source_url))
|
|
.cloned()
|
|
.collect();
|
|
let mut other_urls: Vec<_> = wave_urls.iter()
|
|
.filter(|(_, source_url)| !preferred_source_urls.contains(source_url))
|
|
.cloned()
|
|
.collect();
|
|
|
|
preferred_urls.shuffle(&mut rand::thread_rng());
|
|
other_urls.shuffle(&mut rand::thread_rng());
|
|
|
|
wave_urls = [preferred_urls, other_urls].concat();
|
|
}
|
|
|
|
// Track url -> source
|
|
for (url, source_url) in &wave_urls {
|
|
url_source.insert(url.clone(), source_url.clone());
|
|
}
|
|
|
|
// 1d. Batch scrape+classify (operates on this wave's URLs)
|
|
if !wave_urls.is_empty() {
|
|
let total_candidates = wave_urls.len();
|
|
let batch_size = settings.batch_size.max(1) as usize;
|
|
let snippet_size = match theme.summary_length { 1 => 500, 2 => 2000, _ => 4000 };
|
|
let mut processed = 0usize;
|
|
let mut candidates_iter = wave_urls.into_iter();
|
|
let mut done = false;
|
|
|
|
let ctx = ScrapeClassifyCtx {
|
|
state, user_id, job_id, provider: &provider,
|
|
model_research: &model_research, classify_schema: &classify_schema,
|
|
classification_categories: &classification_categories,
|
|
user_categories: &user_categories, snippet_size, summary_length: theme.summary_length,
|
|
max_age_days: theme.max_age_days as i64,
|
|
max_items_per_category: theme.max_items_per_category as usize,
|
|
source_type: "personalized_source",
|
|
};
|
|
|
|
while !done {
|
|
// Take next batch of candidates, filtering source limits
|
|
let mut batch: Vec<(String, Option<String>)> = Vec::new();
|
|
while batch.len() < batch_size {
|
|
let Some((url, source_url)) = candidates_iter.next() else {
|
|
break;
|
|
};
|
|
let source_domain = extract_domain(&source_url).unwrap_or_default();
|
|
let source_count = source_counts.get(&source_domain).copied().unwrap_or(0);
|
|
if source_count >= settings.max_articles_per_source as usize {
|
|
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
|
|
url: &url, title: "", source_type: "personalized_source",
|
|
source_url: Some(&source_url), category: None, synthesis_id: None,
|
|
status: "filtered_diversity", scraped_ok: false,
|
|
published_date: None,
|
|
}));
|
|
continue;
|
|
}
|
|
batch.push((url, Some(source_url)));
|
|
}
|
|
|
|
if batch.is_empty() {
|
|
break;
|
|
}
|
|
|
|
let articles_so_far: usize = article_scraped.values().map(|v| v.len()).sum();
|
|
let pct = 5 + ((articles_so_far as u32 * 60) / max_total.max(1) as u32).min(60);
|
|
emit_progress(tx, "sources", &format!("Vague {}/{} : articles {}/{}...", wave_idx + 1, total_waves, processed + 1, total_candidates), pct as u8);
|
|
|
|
scrape_and_classify_batch(
|
|
&ctx, &batch, &mut article_scraped, &mut filled_counts,
|
|
&mut source_counts, &mut pending_traces, &user_rate_limiter, &provider_name,
|
|
).await?;
|
|
|
|
processed += batch.len();
|
|
|
|
// Check cancellation after each batch
|
|
if cancelled.load(Ordering::Relaxed) {
|
|
tracing::info!(job_id = %job_id, "Generation cancelled by user (after batch)");
|
|
emit_progress(tx, "saving", "Generation arretee, sauvegarde...", 90);
|
|
break;
|
|
}
|
|
|
|
// Check if we've reached the maximum after this batch
|
|
let total: usize = article_scraped.values().map(|v| v.len()).sum();
|
|
if total >= max_total {
|
|
done = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// 1e. Check if full after this wave
|
|
let total: usize = article_scraped.values().map(|v| v.len()).sum();
|
|
if total >= max_total {
|
|
tracing::info!(wave = wave_idx + 1, total_waves = total_waves, "Synthesis full after wave, skipping remaining sources");
|
|
break 'wave_loop;
|
|
}
|
|
|
|
// 1f. Flush traces between waves
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
// === PHASE 2: Web Search Fallback ===
|
|
// Skip Phase 2 if cancelled
|
|
let is_cancelled = cancelled.load(Ordering::Relaxed);
|
|
if is_cancelled {
|
|
tracing::info!(job_id = %job_id, "Skipping Phase 2 — generation cancelled by user");
|
|
}
|
|
|
|
let category_gaps: Vec<(String, i32)> = user_categories.iter().filter_map(|cat| {
|
|
let filled = filled_counts.get(cat).copied().unwrap_or(0);
|
|
let needed = (theme.max_items_per_category as usize).saturating_sub(filled);
|
|
if needed > 0 { Some((cat.clone(), needed as i32)) } else { None }
|
|
}).collect();
|
|
|
|
if !category_gaps.is_empty() && !is_cancelled {
|
|
if settings.use_brave_search {
|
|
// === BRAVE SEARCH PATH ===
|
|
emit_progress(tx, "websearch", "Recherche Brave Search...", 70);
|
|
|
|
let brave_key = resolve_brave_key(state, user_id).await?;
|
|
let query = format!("{} actualites", theme.theme);
|
|
let brave_results = crate::services::brave_search::search(
|
|
&state.http_client, &brave_key, &query, 20, theme.max_age_days,
|
|
).await?;
|
|
|
|
tracing::info!(results = brave_results.len(), "Brave Search returned results");
|
|
|
|
// Filter Brave results
|
|
let mut brave_urls: Vec<String> = Vec::new();
|
|
for result in &brave_results {
|
|
if let Some(reason) = filter_phase2_url(
|
|
&state.pool, user_id, &result.url, &seen_urls, &source_counts,
|
|
settings.article_history_days, settings.max_articles_per_source as usize,
|
|
).await {
|
|
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
|
|
url: &result.url, title: &result.title, source_type: "brave_search",
|
|
source_url: None, category: None, synthesis_id: None,
|
|
status: reason, scraped_ok: false,
|
|
published_date: None,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
seen_urls.insert(result.url.to_lowercase());
|
|
url_source.insert(result.url.clone(), "brave_search".to_string());
|
|
brave_urls.push(result.url.clone());
|
|
}
|
|
|
|
// Flush Brave filter traces
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
|
|
// Scrape + classify in batches (same as Phase 1)
|
|
if !brave_urls.is_empty() {
|
|
emit_progress(tx, "websearch", "Traitement des articles Brave...", 75);
|
|
let total_candidates = brave_urls.len();
|
|
let batch_size = settings.batch_size.max(1) as usize;
|
|
let snippet_size = match theme.summary_length { 1 => 500, 2 => 2000, _ => 4000 };
|
|
let mut processed = 0usize;
|
|
let mut candidates_iter = brave_urls.into_iter();
|
|
let mut done = false;
|
|
|
|
let ctx = ScrapeClassifyCtx {
|
|
state, user_id, job_id, provider: &provider,
|
|
model_research: &model_research, classify_schema: &classify_schema,
|
|
classification_categories: &classification_categories,
|
|
user_categories: &user_categories, snippet_size, summary_length: theme.summary_length,
|
|
max_age_days: theme.max_age_days as i64,
|
|
max_items_per_category: theme.max_items_per_category as usize,
|
|
source_type: "brave_search",
|
|
};
|
|
|
|
while !done {
|
|
let mut batch: Vec<(String, Option<String>)> = Vec::new();
|
|
while batch.len() < batch_size {
|
|
let Some(url) = candidates_iter.next() else { break };
|
|
batch.push((url, None));
|
|
}
|
|
|
|
if batch.is_empty() { break; }
|
|
|
|
let pct = 75 + ((processed as u32 * 10) / total_candidates.max(1) as u32).min(10);
|
|
emit_progress(tx, "websearch", &format!("Verification des sources {}/{}...", processed + 1, total_candidates), pct as u8);
|
|
|
|
scrape_and_classify_batch(
|
|
&ctx, &batch, &mut article_scraped, &mut filled_counts,
|
|
&mut source_counts, &mut pending_traces, &user_rate_limiter, &provider_name,
|
|
).await?;
|
|
|
|
processed += batch.len();
|
|
|
|
let total: usize = article_scraped.values().map(|v| v.len()).sum();
|
|
if total >= max_total {
|
|
done = true;
|
|
}
|
|
}
|
|
|
|
// Flush Brave scrape/classify traces
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
}
|
|
} else {
|
|
// === EXISTING LLM SEARCH PATH ===
|
|
emit_progress(tx, "websearch", "Recherche d'actualites...", 70);
|
|
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
|
|
|
|
let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, theme.max_items_per_category);
|
|
let current_date = Utc::now().format("%A %d %B %Y").to_string();
|
|
let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&theme.theme, &user_categories, theme.max_items_per_category, theme.max_age_days, &settings.search_agent_behavior, &[], ¤t_date, &[], Some(&category_gaps));
|
|
|
|
let llm_start = std::time::Instant::now();
|
|
let raw_results = provider.call_llm(&model_websearch, &sys_prompt, &usr_prompt, &search_schema).await?;
|
|
let llm_duration = llm_start.elapsed().as_millis() as u64;
|
|
log_llm_call(&state.pool, user_id, job_id, "search", &model_websearch, &sys_prompt, &usr_prompt, &raw_results, llm_duration, None).await;
|
|
|
|
emit_progress(tx, "websearch", "Analyse des resultats...", 75);
|
|
let parsed = parse_llm_output(&raw_results, &user_categories)?;
|
|
|
|
// Filter and validate Phase 2 articles
|
|
let mut phase2_items: Vec<(String, NewsItem)> = Vec::new();
|
|
|
|
for (cat_key, items) in parsed {
|
|
for item in items {
|
|
if let Some(reason) = filter_phase2_url(
|
|
&state.pool, user_id, &item.url, &seen_urls, &source_counts,
|
|
settings.article_history_days, settings.max_articles_per_source as usize,
|
|
).await {
|
|
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
|
|
url: &item.url, title: &item.title, source_type: "web_search",
|
|
source_url: None, category: None, synthesis_id: None,
|
|
status: reason, scraped_ok: false,
|
|
published_date: None,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
seen_urls.insert(item.url.to_lowercase());
|
|
phase2_items.push((cat_key.clone(), item));
|
|
}
|
|
}
|
|
|
|
// Flush Phase 2 filter traces
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
|
|
// Scrape Phase 2 for validation
|
|
emit_progress(tx, "websearch", "Verification des sources...", 80);
|
|
for (cat_key, item) in phase2_items {
|
|
let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, theme.max_age_days as i64).await;
|
|
|
|
if let Some(reason) = drop_reason {
|
|
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
|
|
url: &final_url, title: &item.title, source_type: "web_search",
|
|
source_url: None, category: None, synthesis_id: None,
|
|
status: reason, scraped_ok: false,
|
|
published_date: None,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
article_scraped.entry(cat_key).or_default().push(NewsItem {
|
|
title: item.title,
|
|
url: final_url,
|
|
summary: item.summary,
|
|
date: None,
|
|
});
|
|
|
|
if let Some(domain) = extract_domain(&item.url) {
|
|
*source_counts.entry(domain).or_insert(0) += 1;
|
|
}
|
|
}
|
|
|
|
// Flush Phase 2 scrape traces
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
// === SAVE ===
|
|
let is_cancelled = cancelled.load(Ordering::Relaxed);
|
|
let has_articles = article_scraped.values().any(|items| !items.is_empty());
|
|
|
|
if !has_articles {
|
|
if is_cancelled {
|
|
return Err(AppError::BadRequest("Generation arretee. Aucun article n'avait encore ete collecte.".into()));
|
|
}
|
|
return Err(AppError::BadRequest("Aucun article valide trouve. Verifiez vos sources et categories.".into()));
|
|
}
|
|
|
|
if is_cancelled {
|
|
emit_progress(tx, "saving", "Generation arretee, sauvegarde des articles collectes...", 90);
|
|
} else {
|
|
emit_progress(tx, "saving", "Sauvegarde de la synthese...", 90);
|
|
}
|
|
|
|
let mut final_sections: Vec<NewsSection> = Vec::new();
|
|
for (i, cat_name) in user_categories.iter().enumerate() {
|
|
let key = format!("category_{}", i);
|
|
if let Some(items) = article_scraped.get(&key) {
|
|
if !items.is_empty() {
|
|
final_sections.push(NewsSection { title: cat_name.clone(), items: items.clone() });
|
|
}
|
|
}
|
|
}
|
|
if let Some(autre_items) = article_scraped.get("category_autre") {
|
|
if !autre_items.is_empty() {
|
|
final_sections.push(NewsSection { title: "Divers".to_string(), items: autre_items.clone() });
|
|
}
|
|
}
|
|
|
|
let sections_json = serde_json::to_value(&final_sections).map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to serialize: {}", e)))?;
|
|
let sections_json = sanitize_json_null_bytes(sections_json);
|
|
|
|
let synthesis = db::syntheses::create(&state.pool, user_id, &get_iso_week_string(Utc::now().date_naive()), §ions_json, job_id, Some(theme_id)).await?;
|
|
|
|
if settings.article_history_days > 0 {
|
|
for section in &final_sections {
|
|
for item in §ion.items {
|
|
let source_type = match url_source.get(&item.url).map(|s| s.as_str()) {
|
|
Some("brave_search") => "brave_search",
|
|
Some(_) => "personalized_source",
|
|
None => "web_search",
|
|
};
|
|
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
|
|
url: &item.url, title: &item.title, source_type,
|
|
source_url: if source_type == "personalized_source" { url_source.get(&item.url).map(|s| s.as_str()) } else { None },
|
|
category: Some(§ion.title), synthesis_id: Some(synthesis.id),
|
|
status: "used", scraped_ok: true,
|
|
published_date: item.date.as_deref(),
|
|
}));
|
|
}
|
|
}
|
|
|
|
// Flush final "used" traces
|
|
if !pending_traces.is_empty() {
|
|
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
|
|
pending_traces.clear();
|
|
}
|
|
}
|
|
|
|
Ok(synthesis.id)
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Helper Functions
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
/// Recursively strip `\u0000` null bytes from JSON values.
|
|
///
|
|
/// PostgreSQL rejects null bytes in JSONB text. LLM output occasionally
|
|
/// contains them (e.g., `Meta AI a annonc\u0000...`).
|
|
fn sanitize_json_null_bytes(value: serde_json::Value) -> serde_json::Value {
|
|
match value {
|
|
serde_json::Value::String(s) => serde_json::Value::String(s.replace('\0', "")),
|
|
serde_json::Value::Array(arr) => {
|
|
serde_json::Value::Array(arr.into_iter().map(sanitize_json_null_bytes).collect())
|
|
}
|
|
serde_json::Value::Object(map) => serde_json::Value::Object(
|
|
map.into_iter()
|
|
.map(|(k, v)| (k, sanitize_json_null_bytes(v)))
|
|
.collect(),
|
|
),
|
|
other => other,
|
|
}
|
|
}
|
|
|
|
/// Context passed to [`scrape_and_classify_batch`] to avoid long argument lists.
|
|
struct ScrapeClassifyCtx<'a> {
|
|
state: &'a AppState,
|
|
user_id: Uuid,
|
|
job_id: Uuid,
|
|
provider: &'a Arc<dyn crate::services::llm::LlmProvider>,
|
|
model_research: &'a Arc<String>,
|
|
classify_schema: &'a Arc<serde_json::Value>,
|
|
classification_categories: &'a Arc<Vec<String>>,
|
|
user_categories: &'a [String],
|
|
snippet_size: usize,
|
|
summary_length: i32,
|
|
max_age_days: i64,
|
|
max_items_per_category: usize,
|
|
source_type: &'a str,
|
|
}
|
|
|
|
/// Scrape and classify a batch of articles, updating shared state.
|
|
///
|
|
/// Each article is represented as `(url, Option<source_url>)`.
|
|
/// - `source_type` is used for tracing (e.g. "personalized_source", "brave_search").
|
|
/// - When `source_url` is `Some`, it is recorded in traces; the domain for
|
|
/// source counting comes from the source URL.
|
|
/// - When `source_url` is `None`, traces have no source URL; the domain for
|
|
/// source counting comes from the article URL.
|
|
#[allow(clippy::too_many_arguments)]
|
|
async fn scrape_and_classify_batch(
|
|
ctx: &ScrapeClassifyCtx<'_>,
|
|
articles: &[(String, Option<String>)],
|
|
article_scraped: &mut HashMap<String, Vec<NewsItem>>,
|
|
filled_counts: &mut HashMap<String, usize>,
|
|
source_counts: &mut HashMap<String, usize>,
|
|
pending_traces: &mut Vec<db::article_history::ArticleHistoryEntry>,
|
|
user_rate_limiter: &Option<crate::services::rate_limiter::RateLimiter>,
|
|
provider_name: &str,
|
|
) -> Result<(), AppError> {
|
|
// Phase A: Scrape batch in parallel
|
|
let mut scrape_set = tokio::task::JoinSet::new();
|
|
for (url, source_url) in articles {
|
|
let client = ctx.state.http_client.clone();
|
|
let u = url.clone();
|
|
let su = source_url.clone();
|
|
let mad = ctx.max_age_days;
|
|
scrape_set.spawn(async move {
|
|
let result = scrape_single_article(&client, &u, mad).await;
|
|
(u, su, result)
|
|
});
|
|
}
|
|
|
|
let mut scraped_articles: Vec<(String, Option<String>, String, String)> = Vec::new();
|
|
while let Some(join_result) = scrape_set.join_next().await {
|
|
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
|
|
if let Some(reason) = drop_reason {
|
|
pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace {
|
|
url: &final_url, title: &page_title, source_type: ctx.source_type,
|
|
source_url: source_url.as_deref(), category: None, synthesis_id: None,
|
|
status: reason, scraped_ok: false,
|
|
published_date: None,
|
|
}));
|
|
} else {
|
|
scraped_articles.push((final_url, source_url, body_text, page_title));
|
|
}
|
|
}
|
|
}
|
|
|
|
if scraped_articles.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
// Phase B: Classify/summarize batch in parallel
|
|
check_rate_limit(ctx.state, user_rate_limiter, provider_name).await?;
|
|
|
|
let mut classify_set = tokio::task::JoinSet::new();
|
|
for (final_url, source_url, body_text, page_title) in &scraped_articles {
|
|
let provider_clone = Arc::clone(ctx.provider);
|
|
let model = Arc::clone(ctx.model_research);
|
|
let schema = Arc::clone(ctx.classify_schema);
|
|
let cats = Arc::clone(ctx.classification_categories);
|
|
let body_snippet: String = body_text.chars().take(ctx.snippet_size).collect();
|
|
let title = page_title.clone();
|
|
let url = final_url.clone();
|
|
let su = source_url.clone();
|
|
let pool = ctx.state.pool.clone();
|
|
let uid = ctx.user_id;
|
|
let jid = ctx.job_id;
|
|
let summary_length = ctx.summary_length;
|
|
|
|
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, summary_length);
|
|
|
|
classify_set.spawn(async move {
|
|
let llm_start = std::time::Instant::now();
|
|
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
|
|
let duration = llm_start.elapsed().as_millis() as u64;
|
|
|
|
if let Ok(ref resp) = result {
|
|
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
|
|
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
|
|
}
|
|
|
|
(url, su, title, result)
|
|
});
|
|
}
|
|
|
|
while let Some(join_result) = classify_set.join_next().await {
|
|
if let Ok((final_url, source_url, page_title, llm_result)) = join_result {
|
|
let class_response = match llm_result {
|
|
Ok(resp) => resp,
|
|
Err(e) => {
|
|
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
// Check if LLM considers this a real article
|
|
let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true);
|
|
if !is_article {
|
|
tracing::info!(url = %final_url, "Article filtered by LLM: not a real article");
|
|
pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace {
|
|
url: &final_url, title: &page_title, source_type: ctx.source_type,
|
|
source_url: source_url.as_deref(), category: None, synthesis_id: None,
|
|
status: "filtered_not_article", scraped_ok: true,
|
|
published_date: None,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
// Check LLM-extracted date as fallback
|
|
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
|
|
if !date_str.is_empty() {
|
|
if let Some(parsed) = scraper::parse_date_string(date_str) {
|
|
if scraper::is_article_too_old(Some(parsed), ctx.max_age_days) {
|
|
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
|
|
pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace {
|
|
url: &final_url, title: &page_title, source_type: ctx.source_type,
|
|
source_url: source_url.as_deref(), category: None, synthesis_id: None,
|
|
status: "filtered_too_old", scraped_ok: true,
|
|
published_date: Some(date_str),
|
|
}));
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string());
|
|
|
|
// Domain for source counting: prefer source_url if available, else article url
|
|
let count_domain = source_url.as_deref()
|
|
.and_then(extract_domain)
|
|
.or_else(|| extract_domain(&final_url));
|
|
|
|
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
|
|
&class_response, &page_title, ctx.user_categories, ctx.classification_categories,
|
|
filled_counts, ctx.max_items_per_category,
|
|
) else {
|
|
continue;
|
|
};
|
|
|
|
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
|
|
title: llm_title,
|
|
url: final_url.clone(),
|
|
summary: llm_summary,
|
|
date: llm_date,
|
|
});
|
|
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
|
|
|
|
if let Some(domain) = count_domain {
|
|
*source_counts.entry(domain).or_insert(0) += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Log an LLM call with full prompt, response, and timing.
|
|
#[allow(clippy::too_many_arguments)]
|
|
async fn log_llm_call(
|
|
pool: &sqlx::PgPool,
|
|
user_id: Uuid,
|
|
job_id: Uuid,
|
|
call_type: &str,
|
|
model: &str,
|
|
system_prompt: &str,
|
|
user_prompt: &str,
|
|
response: &serde_json::Value,
|
|
duration_ms: u64,
|
|
article_url: Option<&str>,
|
|
) {
|
|
let response_str = serde_json::to_string_pretty(response).unwrap_or_default();
|
|
db::llm_call_log::insert(
|
|
pool, user_id, job_id, call_type, model,
|
|
system_prompt, user_prompt, &response_str, duration_ms as i32,
|
|
article_url,
|
|
)
|
|
.await
|
|
.ok(); // Don't fail synthesis if logging fails
|
|
}
|
|
|
|
/// Look up or create a per-user rate limiter stored in AppState.
|
|
///
|
|
/// Returns `None` if the user has no rate limit overrides, in which case the
|
|
/// global provider rate limiter should be used instead.
|
|
///
|
|
/// Uses DashMap's entry API for atomic check-and-insert, preventing concurrent
|
|
/// generation jobs from creating independent limiters for the same user.
|
|
fn get_user_rate_limiter(
|
|
state: &AppState,
|
|
settings: &UserSettings,
|
|
user_id: Uuid,
|
|
) -> Option<crate::services::rate_limiter::RateLimiter> {
|
|
use crate::app_state::UserRateLimitEntry;
|
|
|
|
match (
|
|
settings.rate_limit_max_requests,
|
|
settings.rate_limit_time_window_seconds,
|
|
) {
|
|
(Some(max_req), Some(window_sec)) => {
|
|
let mut entry = state
|
|
.user_rate_limiters
|
|
.entry(user_id)
|
|
.or_insert_with(|| UserRateLimitEntry::new(max_req, window_sec));
|
|
// Replace if user's settings changed since the limiter was created
|
|
if entry.settings_changed(max_req, window_sec) {
|
|
*entry = UserRateLimitEntry::new(max_req, window_sec);
|
|
}
|
|
Some(entry.limiter.clone())
|
|
}
|
|
_ => {
|
|
state.user_rate_limiters.remove(&user_id);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Check rate limits using the user's limiter if provided, otherwise the global limiter.
|
|
/// Check rate limits, waiting if necessary (up to 60 seconds).
|
|
///
|
|
/// Instead of failing with an error, this function sleeps until the rate
|
|
/// limit window passes. If still rate limited after 60 seconds, returns an error.
|
|
async fn check_rate_limit(
|
|
state: &AppState,
|
|
user_limiter: &Option<crate::services::rate_limiter::RateLimiter>,
|
|
provider_name: &str,
|
|
) -> Result<(), AppError> {
|
|
let max_wait = std::time::Duration::from_secs(60);
|
|
let start = std::time::Instant::now();
|
|
|
|
loop {
|
|
let allowed = match user_limiter {
|
|
Some(limiter) => limiter.check(&format!("user_gen_{}", provider_name)),
|
|
None => state.provider_rate_limiter.check(provider_name),
|
|
};
|
|
|
|
if allowed {
|
|
return Ok(());
|
|
}
|
|
|
|
// Calculate how long to wait
|
|
let wait_time = match user_limiter {
|
|
Some(limiter) => limiter.time_until_available(&format!("user_gen_{}", provider_name)),
|
|
None => state.provider_rate_limiter.time_until_available(provider_name),
|
|
};
|
|
|
|
let wait = wait_time.unwrap_or(std::time::Duration::from_secs(1));
|
|
|
|
if start.elapsed() + wait > max_wait {
|
|
return Err(AppError::RateLimited(
|
|
"Limite de requetes atteinte. Veuillez reessayer dans quelques instants.".into(),
|
|
));
|
|
}
|
|
|
|
tracing::info!(wait_ms = wait.as_millis() as u64, "Rate limited, waiting...");
|
|
tokio::time::sleep(wait).await;
|
|
}
|
|
}
|
|
|
|
/// Decrypt the Brave Search API key for a user.
|
|
async fn resolve_brave_key(
|
|
state: &AppState,
|
|
user_id: Uuid,
|
|
) -> Result<String, AppError> {
|
|
let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?;
|
|
let key_record = db::api_keys::get_for_user_and_provider(
|
|
&state.pool, user_id, "brave_search",
|
|
).await?
|
|
.ok_or_else(|| AppError::BadRequest(
|
|
"Brave Search est active mais aucune cle API Brave n'est configuree. \
|
|
Veuillez ajouter une cle API Brave Search dans vos parametres.".into(),
|
|
))?;
|
|
|
|
encryption::decrypt(&master_key, &key_record.encrypted_key, &key_record.nonce)
|
|
}
|
|
|
|
/// Resolve the LLM provider and decrypt the user's API key.
|
|
///
|
|
/// If the user has a preferred provider in settings, looks for a key matching
|
|
/// that provider specifically. Otherwise falls back to the first available key.
|
|
async fn resolve_provider_and_key(
|
|
state: &AppState,
|
|
user_id: Uuid,
|
|
settings: &UserSettings,
|
|
) -> Result<(String, String), AppError> {
|
|
let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?;
|
|
|
|
// If the user has a preferred provider, look for that specific key
|
|
if !settings.ai_provider.is_empty() {
|
|
let key_record = db::api_keys::get_for_user_and_provider(
|
|
&state.pool,
|
|
user_id,
|
|
&settings.ai_provider,
|
|
)
|
|
.await?;
|
|
|
|
match key_record {
|
|
Some(record) => {
|
|
let api_key =
|
|
encryption::decrypt(&master_key, &record.encrypted_key, &record.nonce)?;
|
|
return Ok((record.provider_name.clone(), api_key));
|
|
}
|
|
None => {
|
|
return Err(AppError::BadRequest(format!(
|
|
"Aucune cle API configuree pour le fournisseur '{}'. \
|
|
Veuillez ajouter une cle API pour ce fournisseur dans vos parametres.",
|
|
settings.ai_provider
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fall back to first available key
|
|
let keys = db::api_keys::list_for_user(&state.pool, user_id).await?;
|
|
|
|
if keys.is_empty() {
|
|
return Err(AppError::BadRequest(
|
|
"Aucune cle API configuree. Veuillez ajouter une cle API dans vos parametres.".into(),
|
|
));
|
|
}
|
|
|
|
let key_record = &keys[0];
|
|
let api_key = encryption::decrypt(
|
|
&master_key,
|
|
&key_record.encrypted_key,
|
|
&key_record.nonce,
|
|
)?;
|
|
|
|
Ok((key_record.provider_name.clone(), api_key))
|
|
}
|
|
|
|
/// Resolve the model to use for a given provider.
|
|
///
|
|
/// Looks up the first enabled model for the provider from the admin config.
|
|
/// Falls back to sensible defaults if no admin-configured models exist.
|
|
async fn resolve_model(state: &AppState, provider_name: &str) -> Result<String, AppError> {
|
|
// Try to get the default model from the admin_providers JSONB models_scraping array
|
|
let model = sqlx::query_scalar::<_, String>(
|
|
r#"
|
|
SELECT m->>'model_id'
|
|
FROM admin_providers, jsonb_array_elements(models_scraping) AS m
|
|
WHERE provider_name = $1 AND is_enabled = true AND (m->>'is_default')::boolean = true
|
|
LIMIT 1
|
|
"#,
|
|
)
|
|
.bind(provider_name)
|
|
.fetch_optional(&state.pool)
|
|
.await?;
|
|
|
|
match model {
|
|
Some(m) => Ok(m),
|
|
None => {
|
|
// Fall back to sensible defaults
|
|
match provider_name {
|
|
"gemini" => Ok("gemini-2.5-pro".into()),
|
|
"openai" => Ok("gpt-4o".into()),
|
|
"anthropic" => Ok("claude-sonnet-4-20250514".into()),
|
|
_ => Err(AppError::BadRequest(format!(
|
|
"Aucun modele configure pour le fournisseur '{}'",
|
|
provider_name
|
|
))),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Parse the LLM's structured JSON output into category-keyed news items.
|
|
///
|
|
/// Expects the output to have keys like `category_0`, `category_1`, etc.
|
|
/// Each key maps to an array of `{title, url, summary}` objects.
|
|
fn parse_llm_output(
|
|
raw: &serde_json::Value,
|
|
categories: &[String],
|
|
) -> Result<Vec<(String, Vec<NewsItem>)>, AppError> {
|
|
let mut result = Vec::new();
|
|
|
|
for (i, _cat) in categories.iter().enumerate() {
|
|
let key = format!("category_{}", i);
|
|
let items_val = raw.get(&key).cloned().unwrap_or(serde_json::json!([]));
|
|
|
|
let items: Vec<NewsItem> = serde_json::from_value(items_val).unwrap_or_default();
|
|
result.push((key, items));
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
/// Rotate the sources list so that the source after the last-used source comes first.
|
|
fn rotate_sources(sources: Vec<crate::models::source::Source>, last_source_url: Option<&str>) -> Vec<crate::models::source::Source> {
|
|
let Some(last_url) = last_source_url else {
|
|
return sources;
|
|
};
|
|
let pos = sources.iter().position(|s| s.url == last_url);
|
|
match pos {
|
|
Some(idx) => {
|
|
let next = (idx + 1) % sources.len();
|
|
let mut rotated = sources[next..].to_vec();
|
|
rotated.extend_from_slice(&sources[..next]);
|
|
rotated
|
|
}
|
|
None => sources,
|
|
}
|
|
}
|
|
|
|
/// Scrape a single article. Returns (body_text, page_title, final_url, drop_reason).
|
|
/// `drop_reason` is `Some("filtered_empty")` or `Some("filtered_too_old")` if rejected, `None` if OK.
|
|
async fn scrape_single_article(
|
|
http_client: &reqwest::Client,
|
|
url: &str,
|
|
max_age_days: i64,
|
|
) -> (String, String, String, Option<&'static str>) {
|
|
match scraper::scrape_url(http_client, url).await {
|
|
Ok(content) => {
|
|
let final_url = content.url.clone();
|
|
if !content.ok || content.is_soft_404 {
|
|
tracing::warn!(url = url, "Soft 404 or error page detected, skipping content");
|
|
return (String::new(), String::new(), final_url, Some("filtered_empty"));
|
|
}
|
|
if scraper::is_article_too_old(content.published_date, max_age_days) {
|
|
tracing::warn!(url = url, "Article too old, skipping content");
|
|
return (String::new(), String::new(), final_url, Some("filtered_too_old"));
|
|
}
|
|
let title = content.title.unwrap_or_default();
|
|
(content.body_text, title, final_url, None)
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(url = url, error = %e, "Failed to scrape URL, keeping article with empty content");
|
|
(String::new(), String::new(), url.to_string(), Some("filtered_empty"))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Sanitize error messages to prevent leaking sensitive information.
|
|
///
|
|
/// Removes potential API keys, internal paths, and other sensitive data.
|
|
fn sanitize_error_message(msg: &str) -> String {
|
|
// If the message contains common API key patterns, replace with generic message
|
|
if msg.contains("API key")
|
|
|| msg.contains("api_key")
|
|
|| msg.contains("AIza")
|
|
|| msg.contains("sk-")
|
|
|| msg.contains("sk-ant-")
|
|
|| msg.contains("PERMISSION_DENIED")
|
|
{
|
|
return "Erreur d'authentification avec le fournisseur IA. Verifiez votre cle API.".into();
|
|
}
|
|
|
|
if msg.contains("rate limit") || msg.contains("quota") || msg.contains("429") {
|
|
return "Limite de requetes du fournisseur IA atteinte. Reessayez plus tard.".into();
|
|
}
|
|
|
|
if msg.contains("Database") || msg.contains("sqlx") || msg.contains("postgres") {
|
|
return "Erreur interne du serveur. Veuillez reessayer.".into();
|
|
}
|
|
|
|
// For other errors, truncate and sanitize
|
|
if msg.len() > 200 {
|
|
let truncated: String = msg.chars().take(200).collect();
|
|
format!("{}...", truncated)
|
|
} else {
|
|
msg.to_string()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
// ── parse_llm_output tests ───────────────────────────────────
|
|
|
|
#[test]
|
|
fn parse_llm_output_valid() {
|
|
let raw = serde_json::json!({
|
|
"category_0": [
|
|
{"title": "Art 1", "url": "https://a.com", "summary": "Sum 1"},
|
|
{"title": "Art 2", "url": "https://b.com", "summary": "Sum 2"}
|
|
],
|
|
"category_1": [
|
|
{"title": "Art 3", "url": "https://c.com", "summary": "Sum 3"}
|
|
]
|
|
});
|
|
|
|
let categories = vec!["AI News".into(), "Research".into()];
|
|
let result = parse_llm_output(&raw, &categories).unwrap();
|
|
|
|
assert_eq!(result.len(), 2);
|
|
assert_eq!(result[0].0, "category_0");
|
|
assert_eq!(result[0].1.len(), 2);
|
|
assert_eq!(result[1].0, "category_1");
|
|
assert_eq!(result[1].1.len(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn parse_llm_output_missing_category() {
|
|
let raw = serde_json::json!({
|
|
"category_0": [
|
|
{"title": "Art 1", "url": "https://a.com", "summary": "Sum 1"}
|
|
]
|
|
// category_1 is missing
|
|
});
|
|
|
|
let categories = vec!["AI News".into(), "Research".into()];
|
|
let result = parse_llm_output(&raw, &categories).unwrap();
|
|
|
|
assert_eq!(result.len(), 2);
|
|
assert_eq!(result[0].1.len(), 1);
|
|
assert_eq!(result[1].1.len(), 0); // Missing category → empty
|
|
}
|
|
|
|
// ── sanitize_error_message tests ─────────────────────────────
|
|
|
|
#[test]
|
|
fn sanitize_hides_api_key_references() {
|
|
let msg = "Invalid API key: AIzaSyB-test-key";
|
|
let sanitized = sanitize_error_message(msg);
|
|
assert!(sanitized.contains("cle API"));
|
|
assert!(!sanitized.contains("AIza"));
|
|
}
|
|
|
|
#[test]
|
|
fn sanitize_hides_rate_limit_details() {
|
|
let msg = "Resource exhausted: rate limit exceeded for project 12345";
|
|
let sanitized = sanitize_error_message(msg);
|
|
assert!(sanitized.contains("Limite"));
|
|
assert!(!sanitized.contains("12345"));
|
|
}
|
|
|
|
#[test]
|
|
fn sanitize_hides_database_details() {
|
|
let msg = "Database connection to postgres://user:pass@localhost failed";
|
|
let sanitized = sanitize_error_message(msg);
|
|
assert!(sanitized.contains("Erreur interne"));
|
|
assert!(!sanitized.contains("postgres"));
|
|
}
|
|
|
|
#[test]
|
|
fn sanitize_truncates_long_messages() {
|
|
let msg = "x".repeat(300);
|
|
let sanitized = sanitize_error_message(&msg);
|
|
assert!(sanitized.len() < 210);
|
|
assert!(sanitized.ends_with("..."));
|
|
}
|
|
|
|
#[test]
|
|
fn sanitize_passes_normal_messages() {
|
|
let msg = "Generation failed due to network timeout";
|
|
let sanitized = sanitize_error_message(msg);
|
|
assert_eq!(sanitized, msg);
|
|
}
|
|
|
|
// ── sanitize_json_null_bytes tests ──────────────────────────
|
|
|
|
#[test]
|
|
fn sanitize_null_bytes_in_json_strings() {
|
|
let json = serde_json::json!({
|
|
"title": "Hello\u{0000}World",
|
|
"items": [{"summary": "Text\u{0000}with\u{0000}nulls"}]
|
|
});
|
|
let sanitized = sanitize_json_null_bytes(json);
|
|
assert_eq!(sanitized["title"], "HelloWorld");
|
|
assert_eq!(sanitized["items"][0]["summary"], "Textwithnulls");
|
|
}
|
|
|
|
#[test]
|
|
fn sanitize_preserves_clean_json() {
|
|
let json = serde_json::json!({
|
|
"title": "Clean text",
|
|
"count": 42,
|
|
"active": true,
|
|
"items": [{"url": "https://example.com"}]
|
|
});
|
|
let sanitized = sanitize_json_null_bytes(json.clone());
|
|
assert_eq!(sanitized, json);
|
|
}
|
|
|
|
// ── rotate_sources tests ──────────────────────────────────
|
|
|
|
#[test]
|
|
fn rotate_sources_no_last_url() {
|
|
let sources = vec![
|
|
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), theme_id: None, is_preferred: false, rss_url: None, rss_discovered_at: None, created_at: chrono::Utc::now() },
|
|
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "B".into(), url: "https://b.com".into(), theme_id: None, is_preferred: false, rss_url: None, rss_discovered_at: None, created_at: chrono::Utc::now() },
|
|
];
|
|
let result = rotate_sources(sources.clone(), None);
|
|
assert_eq!(result.len(), 2);
|
|
assert_eq!(result[0].url, "https://a.com");
|
|
}
|
|
|
|
#[test]
|
|
fn rotate_sources_with_last_url() {
|
|
let sources = vec![
|
|
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), theme_id: None, is_preferred: false, rss_url: None, rss_discovered_at: None, created_at: chrono::Utc::now() },
|
|
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "B".into(), url: "https://b.com".into(), theme_id: None, is_preferred: false, rss_url: None, rss_discovered_at: None, created_at: chrono::Utc::now() },
|
|
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "C".into(), url: "https://c.com".into(), theme_id: None, is_preferred: false, rss_url: None, rss_discovered_at: None, created_at: chrono::Utc::now() },
|
|
];
|
|
let result = rotate_sources(sources, Some("https://a.com"));
|
|
assert_eq!(result[0].url, "https://b.com");
|
|
assert_eq!(result[1].url, "https://c.com");
|
|
assert_eq!(result[2].url, "https://a.com");
|
|
}
|
|
|
|
#[test]
|
|
fn rotate_sources_last_url_not_found() {
|
|
let sources = vec![
|
|
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), theme_id: None, is_preferred: false, rss_url: None, rss_discovered_at: None, created_at: chrono::Utc::now() },
|
|
];
|
|
let result = rotate_sources(sources.clone(), Some("https://notfound.com"));
|
|
assert_eq!(result[0].url, "https://a.com");
|
|
}
|
|
|
|
#[test]
|
|
fn sanitize_error_message_handles_multibyte_utf8() {
|
|
let msg = "é".repeat(150); // 300 bytes, 150 chars
|
|
let result = sanitize_error_message(&msg);
|
|
assert!(result.ends_with("..."));
|
|
}
|
|
|
|
}
|