You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1789 lines
72 KiB
Rust

//! Synthesis generation pipeline and job management.
//!
//! Orchestrates the two-phase pipeline:
//! 1. Personalized sources: scrape user sources, classify+summarize per article
//! 2. Web search fallback: LLM search for missing categories, scrape to validate
//!
//! Progress is reported via `tokio::sync::watch` channels per job,
//! consumed by SSE endpoints for real-time client updates.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::Utc;
use dashmap::DashMap;
use dashmap::DashSet;
use serde::Serialize;
use tokio::sync::watch;
use uuid::Uuid;
use crate::app_state::AppState;
use crate::db;
use crate::errors::AppError;
use crate::models::settings::UserSettings;
use crate::models::synthesis::{
get_iso_week_string, NewsItem, NewsSection,
};
use crate::services::encryption;
use crate::services::llm::factory::create_provider;
use crate::services::scraper;
use crate::services::source_scraper;
// ───────────────────────────────────────────────────────────────────
// Progress Events
// ───────────────────────────────────────────────────────────────────
/// Progress event sent to SSE clients during generation.
///
/// The `watch` channel always holds the latest event, and new subscribers
/// immediately receive the current state.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum ProgressEvent {
/// Generation is in progress.
#[serde(rename = "progress")]
Progress {
step: String,
message: String,
percent: u8,
},
/// Generation completed successfully.
#[serde(rename = "complete")]
Complete { synthesis_id: Uuid },
/// Generation failed with an error.
#[serde(rename = "error")]
Error { message: String },
}
// ───────────────────────────────────────────────────────────────────
// Job Store
// ───────────────────────────────────────────────────────────────────
/// Entry in the job store, holding the progress channel and metadata.
struct JobEntry {
/// Sender side of the watch channel for progress updates.
/// Wrapped in Arc so it can be shared with the background task
/// without cloning the Sender itself.
tx: Arc<watch::Sender<ProgressEvent>>,
/// A receiver kept alive to prevent the channel from closing.
/// Without at least one receiver, `Sender::send()` returns an error
/// and does NOT update the stored value.
_rx: watch::Receiver<ProgressEvent>,
/// User who owns this job.
user_id: Uuid,
/// When the job was created (for TTL cleanup).
created_at: Instant,
}
/// In-memory store for active generation jobs.
///
/// Uses `DashMap` for lock-free concurrent access. Jobs are keyed by
/// a random UUID and automatically cleaned up after a TTL.
#[derive(Clone)]
pub struct JobStore {
inner: Arc<DashMap<Uuid, JobEntry>>,
generating_users: Arc<DashSet<Uuid>>,
}
/// Jobs expire after 1 hour (allows SSE reconnection).
const JOB_TTL: Duration = Duration::from_secs(3600);
impl Default for JobStore {
fn default() -> Self {
Self::new()
}
}
impl JobStore {
/// Create a new empty job store.
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
generating_users: Arc::new(DashSet::new()),
}
}
/// Create a new job for a user, returning the job ID and the watch Sender.
///
/// Returns `None` if the user already has an active job.
/// Uses an atomic DashSet insert to prevent race conditions on double-click.
pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc<watch::Sender<ProgressEvent>>)> {
if !self.generating_users.insert(user_id) {
return None;
}
let job_id = Uuid::new_v4();
let (tx, rx) = watch::channel(ProgressEvent::Progress {
step: "init".into(),
message: "Initialisation...".into(),
percent: 0,
});
let tx = Arc::new(tx);
self.inner.insert(job_id, JobEntry {
tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(),
});
Some((job_id, tx))
}
/// Get a watch receiver for a job, if it exists and belongs to the given user.
pub fn subscribe(&self, job_id: Uuid, user_id: Uuid) -> Option<watch::Receiver<ProgressEvent>> {
self.inner.get(&job_id).and_then(|entry| {
if entry.value().user_id == user_id {
Some(entry.value().tx.subscribe())
} else {
None
}
})
}
/// Check if a user has an active (in-progress) job.
pub fn has_active_job(&self, user_id: Uuid) -> Option<Uuid> {
if !self.generating_users.contains(&user_id) { return None; }
for entry in self.inner.iter() {
if entry.value().user_id == user_id { return Some(*entry.key()); }
}
None
}
/// Release the generating lock for a user (called when job completes, errors, or times out).
pub fn release_user(&self, user_id: Uuid) {
self.generating_users.remove(&user_id);
}
/// Remove expired jobs (older than TTL).
pub fn cleanup_expired(&self) {
let now = Instant::now();
self.inner.retain(|_, entry| {
let keep = now.duration_since(entry.created_at) < JOB_TTL;
if !keep { self.generating_users.remove(&entry.user_id); }
keep
});
}
/// Remove a specific job.
pub fn remove(&self, job_id: &Uuid) {
self.inner.remove(job_id);
}
/// Get the number of active jobs (for testing/monitoring).
pub fn len(&self) -> usize {
self.inner.len()
}
/// Check if the store is empty (for testing).
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
// ───────────────────────────────────────────────────────────────────
// Generation Pipeline
// ───────────────────────────────────────────────────────────────────
/// Run the full generation pipeline for a user.
///
/// This is the core orchestration function. It is spawned as a background
/// tokio task and communicates progress via the `watch` channel.
///
/// # Phases
/// 1. Personalized sources: extract links, scrape, classify+summarize per article
/// 2. Web search fallback: LLM search for under-filled categories, scrape to validate
/// 3. Save synthesis to DB
pub async fn run_generation(
job_id: Uuid,
state: AppState,
user_id: Uuid,
tx: Arc<watch::Sender<ProgressEvent>>,
provider_override: Option<Arc<dyn crate::services::llm::LlmProvider>>,
) {
let result = run_generation_inner(job_id, &state, user_id, &tx, provider_override).await;
match result {
Ok(synthesis_id) => {
tx.send(ProgressEvent::Complete { synthesis_id }).ok();
tracing::info!(job_id = %job_id, synthesis_id = %synthesis_id, "Generation completed");
}
Err(e) => {
tracing::error!(job_id = %job_id, error = %e, "Generation failed");
// Sanitize error message — never expose API keys or internal details
let safe_message = sanitize_error_message(&e.to_string());
tx.send(ProgressEvent::Error {
message: safe_message,
})
.ok();
}
}
// Keep the job in the store for 5 minutes after completion
// to allow SSE reconnection
let store = state.job_store.clone();
let jid = job_id;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(300)).await;
store.remove(&jid);
});
}
/// Inner implementation of the generation pipeline, returning a Result.
pub async fn run_generation_inner(
job_id: Uuid,
state: &AppState,
user_id: Uuid,
tx: &watch::Sender<ProgressEvent>,
provider_override: Option<Arc<dyn crate::services::llm::LlmProvider>>,
) -> Result<Uuid, AppError> {
// Batch buffer for article history traces (flushed at logical boundaries)
let mut pending_traces: Vec<db::article_history::ArticleHistoryEntry> = Vec::new();
// === INITIALIZATION ===
emit_progress(tx, "settings", "Chargement des parametres...", 5);
let settings = db::settings::get_or_create_default(&state.pool, user_id).await?;
if settings.article_history_days > 0 {
db::article_history::cleanup_old(&state.pool, user_id, settings.article_history_days).await.unwrap_or(0);
db::llm_call_log::truncate_old(&state.pool, user_id, settings.article_history_days).await.ok();
}
let user_categories = if settings.categories.is_empty() {
Vec::new()
} else {
settings.categories.clone()
};
let mut classification_categories = user_categories.clone();
classification_categories.push("Autre".to_string());
emit_progress(tx, "sources", "Chargement des sources...", 10);
let sources = db::sources::list_for_user(&state.pool, user_id).await?;
emit_progress(tx, "provider", "Configuration du fournisseur IA...", 12);
let (provider_name, provider) = if let Some(mock_provider) = provider_override {
("mock".to_string(), mock_provider)
} else {
let (pname, api_key) = resolve_provider_and_key(state, user_id, &settings).await?;
let p = create_provider(&pname, api_key)?;
(pname, p)
};
let (model_research, model_websearch) = if provider_name == "mock" {
let research = if settings.ai_model.is_empty() { "mock-model".to_string() } else { settings.ai_model.clone() };
let websearch = if settings.ai_model_websearch.is_empty() { "mock-model".to_string() } else { settings.ai_model_websearch.clone() };
(research, websearch)
} else {
let model_research = if !settings.ai_model.is_empty() { settings.ai_model.clone() } else { resolve_model(state, &provider_name).await? };
let model_websearch = if !settings.ai_model_websearch.is_empty() { settings.ai_model_websearch.clone() } else { model_research.clone() };
(model_research, model_websearch)
};
let user_rate_limiter = get_user_rate_limiter(state, &settings, user_id);
// Tracking structures
let mut article_scraped: HashMap<String, Vec<NewsItem>> = HashMap::new();
let mut source_counts: HashMap<String, usize> = HashMap::new();
let mut url_source: HashMap<String, String> = HashMap::new();
let mut filled_counts: HashMap<String, usize> = HashMap::new();
let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
let max_total = (user_categories.len() + 1) * settings.max_items_per_category as usize;
let classify_schema = Arc::new(crate::services::llm::schema::build_article_classify_schema());
let model_research = Arc::new(model_research);
let classification_categories = Arc::new(classification_categories);
// === PHASE 1: Personalized Sources ===
if !sources.is_empty() {
emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15);
let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
let max_links = 15usize;
// 1a. Extract article links from source pages (parallel, max 5 concurrent)
let mut candidate_urls: Vec<(String, String)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
let mut pending = rotated_sources.iter().peekable();
let max_concurrent = 5;
// Seed initial tasks
for _ in 0..max_concurrent {
if let Some(source) = pending.next() {
let client = state.http_client.clone();
let source_url = source.url.clone();
let source_title = source.title.clone();
let use_llm = settings.use_llm_for_source_links;
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let max_l = max_links;
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
join_set.spawn(async move {
let links = if use_llm {
source_scraper::extract_article_links_with_llm(
&client, &source_url, max_l, &provider_clone, &model,
Some(&pool), Some(uid), Some(jid),
).await
} else {
source_scraper::extract_article_links(
&client, &source_url, max_l,
).await
};
(source_url, source_title, links)
});
}
}
while let Some(join_result) = join_set.join_next().await {
if let Ok((source_url, source_title, links_result)) = join_result {
match links_result {
Ok(links) => {
tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
for link in links {
if seen_urls.insert(link.to_lowercase()) {
candidate_urls.push((link, source_url.clone()));
}
}
}
Err(e) => {
tracing::warn!(source = %source_title, error = %e, "Failed to extract links");
}
}
}
// Spawn next task
if let Some(source) = pending.next() {
let client = state.http_client.clone();
let source_url = source.url.clone();
let source_title = source.title.clone();
let use_llm = settings.use_llm_for_source_links;
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let max_l = max_links;
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
join_set.spawn(async move {
let links = if use_llm {
source_scraper::extract_article_links_with_llm(
&client, &source_url, max_l, &provider_clone, &model,
Some(&pool), Some(uid), Some(jid),
).await
} else {
source_scraper::extract_article_links(
&client, &source_url, max_l,
).await
};
(source_url, source_title, links)
});
}
}
}
// Filter against article history
if settings.article_history_days > 0 && !candidate_urls.is_empty() {
let hashes: Vec<String> = candidate_urls.iter().map(|(url, _)| hash_article_url(url)).collect();
let existing = db::article_history::check_urls_exist(&state.pool, user_id, &hashes).await.unwrap_or_default();
if !existing.is_empty() {
for (url, source_url) in &candidate_urls {
if existing.contains(&hash_article_url(url)) {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url, title: "", source_type: "personalized_source",
source_url: Some(source_url), category: None, synthesis_id: None,
status: "filtered_history", scraped_ok: false,
}));
}
}
candidate_urls.retain(|(url, _)| !existing.contains(&hash_article_url(url)));
// Flush history dedup traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
// Shuffle candidates to interleave articles from different sources
use rand::seq::SliceRandom;
candidate_urls.shuffle(&mut rand::thread_rng());
// Track url -> source
for (url, source_url) in &candidate_urls {
url_source.insert(url.clone(), source_url.clone());
}
// 1b. Scrape, classify, summarize in batches of 5
emit_progress(tx, "processing", "Traitement des articles...", 25);
let total_candidates = candidate_urls.len();
let batch_size = settings.batch_size.max(1) as usize;
let mut processed = 0usize;
let mut candidates_iter = candidate_urls.into_iter();
let mut done = false;
while !done {
// Take next batch of candidates (up to 5), filtering source limits
let mut batch: Vec<(String, String)> = Vec::new();
while batch.len() < batch_size {
let Some((url, source_url)) = candidates_iter.next() else {
break;
};
let source_domain = extract_domain(&source_url).unwrap_or_default();
let source_count = source_counts.get(&source_domain).copied().unwrap_or(0);
if source_count >= settings.max_articles_per_source as usize {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &url, title: "", source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_diversity", scraped_ok: false,
}));
continue;
}
batch.push((url, source_url));
}
if batch.is_empty() {
break;
}
let pct = 25 + ((processed as u32 * 40) / total_candidates.max(1) as u32).min(40);
emit_progress(tx, "processing", &format!("Articles {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8);
// Phase A: Scrape batch in parallel
let mut scrape_set = tokio::task::JoinSet::new();
for (url, source_url) in &batch {
let client = state.http_client.clone();
let u = url.clone();
let su = source_url.clone();
let mad = settings.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, su, result)
});
}
let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}));
} else {
scraped_articles.push((final_url, source_url, body_text, page_title));
}
}
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
// Phase B: Classify/summarize batch in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, source_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let schema = Arc::clone(&classify_schema);
let cats = Arc::clone(&classification_categories);
let snippet_size = match settings.summary_length {
1 => 500,
2 => 2000,
_ => 4000,
};
let body_snippet: String = body_text.chars().take(snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let su = source_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, settings.summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
// Log the LLM call
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, su, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, source_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check LLM-extracted date as fallback for articles without a scraper date
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), settings.max_age_days as i64) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
}));
continue;
}
}
}
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, &user_categories, &classification_categories,
&filled_counts, settings.max_items_per_category as usize,
) else {
continue;
};
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
let source_domain = extract_domain(&source_url).unwrap_or_default();
*source_counts.entry(source_domain).or_insert(0) += 1;
}
}
processed += batch.len();
// Check if we've reached the maximum after this batch
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
done = true;
}
}
// Flush Phase 1 traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
// === PHASE 2: Web Search Fallback ===
let category_gaps: Vec<(String, i32)> = user_categories.iter().filter_map(|cat| {
let filled = filled_counts.get(cat).copied().unwrap_or(0);
let needed = (settings.max_items_per_category as usize).saturating_sub(filled);
if needed > 0 { Some((cat.clone(), needed as i32)) } else { None }
}).collect();
if !category_gaps.is_empty() {
if settings.use_brave_search {
// === BRAVE SEARCH PATH ===
emit_progress(tx, "search", "Recherche Brave Search...", 70);
let brave_key = resolve_brave_key(state, user_id).await?;
let query = format!("{} actualites", settings.theme);
let brave_results = crate::services::brave_search::search(
&state.http_client, &brave_key, &query, 20, settings.max_age_days,
).await?;
tracing::info!(results = brave_results.len(), "Brave Search returned results");
// Filter Brave results
let mut brave_urls: Vec<String> = Vec::new();
for result in &brave_results {
if let Some(reason) = filter_phase2_url(
&state.pool, user_id, &result.url, &seen_urls, &source_counts,
settings.article_history_days, settings.max_articles_per_source as usize,
).await {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &result.url, title: &result.title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}));
continue;
}
seen_urls.insert(result.url.to_lowercase());
url_source.insert(result.url.clone(), "brave_search".to_string());
brave_urls.push(result.url.clone());
}
// Flush Brave filter traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
// Scrape + classify in batches (same as Phase 1)
if !brave_urls.is_empty() {
emit_progress(tx, "processing", "Traitement des articles Brave...", 75);
let total_candidates = brave_urls.len();
let batch_size = settings.batch_size.max(1) as usize;
let mut processed = 0usize;
let mut candidates_iter = brave_urls.into_iter();
let mut done = false;
while !done {
let mut batch: Vec<String> = Vec::new();
while batch.len() < batch_size {
let Some(url) = candidates_iter.next() else { break };
batch.push(url);
}
if batch.is_empty() { break; }
let pct = 75 + ((processed as u32 * 15) / total_candidates.max(1) as u32).min(15);
emit_progress(tx, "processing", &format!("Articles Brave {}-{}/{}...", processed + 1, processed + batch.len(), total_candidates), pct as u8);
// Scrape batch in parallel
let mut scrape_set = tokio::task::JoinSet::new();
for url in &batch {
let client = state.http_client.clone();
let u = url.clone();
let mad = settings.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, result)
});
}
let mut scraped_articles: Vec<(String, String, String)> = Vec::new(); // (url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}));
} else {
scraped_articles.push((final_url, body_text, page_title));
}
}
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
// Classify/summarize in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let schema = Arc::clone(&classify_schema);
let cats = Arc::clone(&classification_categories);
let snippet_size = match settings.summary_length {
1 => 500,
2 => 2000,
_ => 4000,
};
let body_snippet: String = body_text.chars().take(snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, settings.summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check LLM-extracted date as fallback
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), settings.max_age_days as i64) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
}));
continue;
}
}
}
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, &user_categories, &classification_categories,
&filled_counts, settings.max_items_per_category as usize,
) else {
continue;
};
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
if let Some(domain) = extract_domain(&final_url) {
*source_counts.entry(domain).or_insert(0) += 1;
}
}
}
processed += batch.len();
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
done = true;
}
}
// Flush Brave scrape/classify traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
} else {
// === EXISTING LLM SEARCH PATH ===
emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70);
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category);
let current_date = Utc::now().format("%A %d %B %Y").to_string();
let (sys_prompt, usr_prompt) = crate::services::prompts::build_search_prompt(&settings, &[], &current_date, &[], Some(&category_gaps));
let llm_start = std::time::Instant::now();
let raw_results = provider.call_llm(&model_websearch, &sys_prompt, &usr_prompt, &search_schema).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
log_llm_call(&state.pool, user_id, job_id, "search", &model_websearch, &sys_prompt, &usr_prompt, &raw_results, llm_duration, None).await;
emit_progress(tx, "parsing", "Analyse des resultats...", 75);
let parsed = parse_llm_output(&raw_results, &user_categories)?;
// Filter and validate Phase 2 articles
let mut phase2_items: Vec<(String, NewsItem)> = Vec::new();
for (cat_key, items) in parsed {
for item in items {
if let Some(reason) = filter_phase2_url(
&state.pool, user_id, &item.url, &seen_urls, &source_counts,
settings.article_history_days, settings.max_articles_per_source as usize,
).await {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &item.url, title: &item.title, source_type: "web_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}));
continue;
}
seen_urls.insert(item.url.to_lowercase());
phase2_items.push((cat_key.clone(), item));
}
}
// Flush Phase 2 filter traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
// Scrape Phase 2 for validation
emit_progress(tx, "scraping", "Verification des sources web...", 80);
for (cat_key, item) in phase2_items {
let (_body_text, _, final_url, drop_reason) = scrape_single_article(&state.http_client, &item.url, settings.max_age_days as i64).await;
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &item.title, source_type: "web_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
}));
continue;
}
article_scraped.entry(cat_key).or_default().push(NewsItem {
title: item.title,
url: final_url,
summary: item.summary,
});
if let Some(domain) = extract_domain(&item.url) {
*source_counts.entry(domain).or_insert(0) += 1;
}
}
// Flush Phase 2 scrape traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
// === SAVE ===
if article_scraped.values().all(|items| items.is_empty()) {
return Err(AppError::BadRequest("Aucun article valide trouve. Verifiez vos sources et categories.".into()));
}
emit_progress(tx, "saving", "Sauvegarde de la synthese...", 90);
let mut final_sections: Vec<NewsSection> = Vec::new();
for (i, cat_name) in user_categories.iter().enumerate() {
let key = format!("category_{}", i);
if let Some(items) = article_scraped.get(&key) {
if !items.is_empty() {
final_sections.push(NewsSection { title: cat_name.clone(), items: items.clone() });
}
}
}
if let Some(autre_items) = article_scraped.get("category_autre") {
if !autre_items.is_empty() {
final_sections.push(NewsSection { title: "Autre".to_string(), items: autre_items.clone() });
}
}
let sections_json = serde_json::to_value(&final_sections).map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to serialize: {}", e)))?;
let sections_json = sanitize_json_null_bytes(sections_json);
let synthesis = db::syntheses::create(&state.pool, user_id, &get_iso_week_string(Utc::now().date_naive()), &sections_json, job_id).await?;
if settings.article_history_days > 0 {
for section in &final_sections {
for item in &section.items {
let source_type = match url_source.get(&item.url).map(|s| s.as_str()) {
Some("brave_search") => "brave_search",
Some(_) => "personalized_source",
None => "web_search",
};
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &item.url, title: &item.title, source_type,
source_url: if source_type == "personalized_source" { url_source.get(&item.url).map(|s| s.as_str()) } else { None },
category: Some(&section.title), synthesis_id: Some(synthesis.id),
status: "used", scraped_ok: true,
}));
}
}
// Flush final "used" traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
Ok(synthesis.id)
}
// ───────────────────────────────────────────────────────────────────
// Helper Functions
// ───────────────────────────────────────────────────────────────────
/// Recursively strip `\u0000` null bytes from JSON values.
///
/// PostgreSQL rejects null bytes in JSONB text. LLM output occasionally
/// contains them (e.g., `Meta AI a annonc\u0000...`).
fn sanitize_json_null_bytes(value: serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::String(s) => serde_json::Value::String(s.replace('\0', "")),
serde_json::Value::Array(arr) => {
serde_json::Value::Array(arr.into_iter().map(sanitize_json_null_bytes).collect())
}
serde_json::Value::Object(map) => serde_json::Value::Object(
map.into_iter()
.map(|(k, v)| (k, sanitize_json_null_bytes(v)))
.collect(),
),
other => other,
}
}
/// Emit a progress event via the watch channel.
fn emit_progress(tx: &watch::Sender<ProgressEvent>, step: &str, message: &str, percent: u8) {
tx.send(ProgressEvent::Progress {
step: step.into(),
message: message.into(),
percent,
})
.ok();
}
/// Structured parameters for article history tracing.
struct ArticleTrace<'a> {
url: &'a str,
title: &'a str,
source_type: &'a str,
source_url: Option<&'a str>,
category: Option<&'a str>,
synthesis_id: Option<Uuid>,
status: &'a str,
scraped_ok: bool,
}
/// Build an article history entry from trace parameters (no DB call).
fn build_trace_entry(
user_id: Uuid,
job_id: Uuid,
trace: &ArticleTrace<'_>,
) -> db::article_history::ArticleHistoryEntry {
db::article_history::ArticleHistoryEntry {
user_id,
url: trace.url.to_string(),
url_hash: hash_article_url(trace.url),
title: trace.title.to_string(),
source_type: trace.source_type.to_string(),
source_url: trace.source_url.map(|s| s.to_string()),
category: trace.category.map(|s| s.to_string()),
synthesis_id: trace.synthesis_id,
status: trace.status.to_string(),
scraped_ok: trace.scraped_ok,
job_id,
}
}
/// Log an LLM call with full prompt, response, and timing.
#[allow(clippy::too_many_arguments)]
async fn log_llm_call(
pool: &sqlx::PgPool,
user_id: Uuid,
job_id: Uuid,
call_type: &str,
model: &str,
system_prompt: &str,
user_prompt: &str,
response: &serde_json::Value,
duration_ms: u64,
article_url: Option<&str>,
) {
let response_str = serde_json::to_string_pretty(response).unwrap_or_default();
db::llm_call_log::insert(
pool, user_id, job_id, call_type, model,
system_prompt, user_prompt, &response_str, duration_ms as i32,
article_url,
)
.await
.ok(); // Don't fail synthesis if logging fails
}
/// Look up or create a per-user rate limiter stored in AppState.
///
/// Returns `None` if the user has no rate limit overrides, in which case the
/// global provider rate limiter should be used instead.
///
/// Uses DashMap's entry API for atomic check-and-insert, preventing concurrent
/// generation jobs from creating independent limiters for the same user.
fn get_user_rate_limiter(
state: &AppState,
settings: &UserSettings,
user_id: Uuid,
) -> Option<crate::services::rate_limiter::RateLimiter> {
use crate::app_state::UserRateLimitEntry;
match (
settings.rate_limit_max_requests,
settings.rate_limit_time_window_seconds,
) {
(Some(max_req), Some(window_sec)) => {
let mut entry = state
.user_rate_limiters
.entry(user_id)
.or_insert_with(|| UserRateLimitEntry::new(max_req, window_sec));
// Replace if user's settings changed since the limiter was created
if entry.settings_changed(max_req, window_sec) {
*entry = UserRateLimitEntry::new(max_req, window_sec);
}
Some(entry.limiter.clone())
}
_ => {
state.user_rate_limiters.remove(&user_id);
None
}
}
}
/// Check rate limits using the user's limiter if provided, otherwise the global limiter.
/// Check rate limits, waiting if necessary (up to 60 seconds).
///
/// Instead of failing with an error, this function sleeps until the rate
/// limit window passes. If still rate limited after 60 seconds, returns an error.
async fn check_rate_limit(
state: &AppState,
user_limiter: &Option<crate::services::rate_limiter::RateLimiter>,
provider_name: &str,
) -> Result<(), AppError> {
let max_wait = std::time::Duration::from_secs(60);
let start = std::time::Instant::now();
loop {
let allowed = match user_limiter {
Some(limiter) => limiter.check(&format!("user_gen_{}", provider_name)),
None => state.provider_rate_limiter.check(provider_name),
};
if allowed {
return Ok(());
}
// Calculate how long to wait
let wait_time = match user_limiter {
Some(limiter) => limiter.time_until_available(&format!("user_gen_{}", provider_name)),
None => state.provider_rate_limiter.time_until_available(provider_name),
};
let wait = wait_time.unwrap_or(std::time::Duration::from_secs(1));
if start.elapsed() + wait > max_wait {
return Err(AppError::RateLimited(
"Limite de requetes atteinte. Veuillez reessayer dans quelques instants.".into(),
));
}
tracing::info!(wait_ms = wait.as_millis() as u64, "Rate limited, waiting...");
tokio::time::sleep(wait).await;
}
}
/// Extract the domain (host) from a URL, or None if unparseable.
fn extract_domain(url: &str) -> Option<String> {
url::Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(|h| h.to_lowercase()))
}
/// Assign an article to a category based on LLM classification response.
/// Returns `Some((cat_key, cat_name, title, summary))` or `None` if all categories full.
fn assign_category(
llm_response: &serde_json::Value,
page_title: &str,
user_categories: &[String],
classification_categories: &[String],
filled_counts: &HashMap<String, usize>,
max_items_per_category: usize,
) -> Option<(String, String, String, String)> {
let llm_title = llm_response.get("title").and_then(|t| t.as_str()).unwrap_or(page_title).to_string();
let llm_summary = llm_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string();
let mut llm_category = llm_response.get("category").and_then(|c| c.as_str()).unwrap_or("Autre").to_string();
if !classification_categories.iter().any(|c| c.to_lowercase() == llm_category.to_lowercase()) {
llm_category = "Autre".to_string();
}
let cat_key = if llm_category.to_lowercase() == "autre" {
"category_autre".to_string()
} else {
user_categories.iter().position(|c| c.to_lowercase() == llm_category.to_lowercase())
.map(|i| format!("category_{}", i))
.unwrap_or_else(|| "category_autre".to_string())
};
let cat_filled = filled_counts.get(&llm_category).copied().unwrap_or(0);
if cat_filled >= max_items_per_category && llm_category.to_lowercase() != "autre" {
let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0);
if autre_filled >= max_items_per_category {
return None;
}
Some(("category_autre".to_string(), "Autre".to_string(), llm_title, llm_summary))
} else {
Some((cat_key, llm_category, llm_title, llm_summary))
}
}
/// Check if a Phase 2 URL passes all filters.
/// Returns the filter reason if rejected, None if accepted.
async fn filter_phase2_url(
pool: &sqlx::PgPool,
user_id: Uuid,
url: &str,
seen_urls: &std::collections::HashSet<String>,
source_counts: &HashMap<String, usize>,
article_history_days: i32,
max_articles_per_source: usize,
) -> Option<&'static str> {
if let Ok(parsed_url) = url::Url::parse(url) {
let path = parsed_url.path();
if path.is_empty() || path == "/" {
return Some("filtered_homepage");
}
}
if seen_urls.contains(&url.to_lowercase()) {
return Some("filtered_cross_phase_dedup");
}
if article_history_days > 0 {
let hash = hash_article_url(url);
let exists = db::article_history::check_urls_exist(pool, user_id, std::slice::from_ref(&hash)).await.unwrap_or_default();
if exists.contains(&hash) {
return Some("filtered_history");
}
}
if let Some(domain) = extract_domain(url) {
let count = source_counts.get(&domain).copied().unwrap_or(0);
if count >= max_articles_per_source {
return Some("filtered_diversity");
}
}
None
}
/// Normalize an article URL for consistent history hashing.
///
/// Strips fragments, trailing slashes, and known tracking query parameters
/// so that the same article with different UTM tags is recognized as a duplicate.
fn normalize_article_url(url_str: &str) -> String {
let Ok(mut parsed) = url::Url::parse(url_str) else {
return url_str.to_lowercase();
};
// Strip fragment
parsed.set_fragment(None);
// Strip known tracking query parameters
let tracking_params: &[&str] = &[
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"ref", "source", "fbclid", "gclid",
];
let filtered_pairs: Vec<(String, String)> = parsed
.query_pairs()
.filter(|(key, _)| !tracking_params.contains(&key.as_ref()))
.map(|(k, v)| (k.into_owned(), v.into_owned()))
.collect();
if filtered_pairs.is_empty() {
parsed.set_query(None);
} else {
let query_string = filtered_pairs
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join("&");
parsed.set_query(Some(&query_string));
}
// Strip trailing slash (unless path is just "/")
let path = parsed.path().to_string();
if path.len() > 1 && path.ends_with('/') {
parsed.set_path(&path[..path.len() - 1]);
}
parsed.to_string().to_lowercase()
}
/// Compute the hash of a normalized article URL for history lookup.
fn hash_article_url(url: &str) -> String {
let normalized = normalize_article_url(url);
crate::util::token::hash_token(&normalized)
}
/// Decrypt the Brave Search API key for a user.
async fn resolve_brave_key(
state: &AppState,
user_id: Uuid,
) -> Result<String, AppError> {
let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?;
let key_record = db::api_keys::get_for_user_and_provider(
&state.pool, user_id, "brave_search",
).await?
.ok_or_else(|| AppError::BadRequest(
"Brave Search est active mais aucune cle API Brave n'est configuree. \
Veuillez ajouter une cle API Brave Search dans vos parametres.".into(),
))?;
encryption::decrypt(&master_key, &key_record.encrypted_key, &key_record.nonce)
}
/// Resolve the LLM provider and decrypt the user's API key.
///
/// If the user has a preferred provider in settings, looks for a key matching
/// that provider specifically. Otherwise falls back to the first available key.
async fn resolve_provider_and_key(
state: &AppState,
user_id: Uuid,
settings: &UserSettings,
) -> Result<(String, String), AppError> {
let master_key = encryption::MasterKey::from_hex(&state.config.master_encryption_key)?;
// If the user has a preferred provider, look for that specific key
if !settings.ai_provider.is_empty() {
let key_record = db::api_keys::get_for_user_and_provider(
&state.pool,
user_id,
&settings.ai_provider,
)
.await?;
match key_record {
Some(record) => {
let api_key =
encryption::decrypt(&master_key, &record.encrypted_key, &record.nonce)?;
return Ok((record.provider_name.clone(), api_key));
}
None => {
return Err(AppError::BadRequest(format!(
"Aucune cle API configuree pour le fournisseur '{}'. \
Veuillez ajouter une cle API pour ce fournisseur dans vos parametres.",
settings.ai_provider
)));
}
}
}
// Fall back to first available key
let keys = db::api_keys::list_for_user(&state.pool, user_id).await?;
if keys.is_empty() {
return Err(AppError::BadRequest(
"Aucune cle API configuree. Veuillez ajouter une cle API dans vos parametres.".into(),
));
}
let key_record = &keys[0];
let api_key = encryption::decrypt(
&master_key,
&key_record.encrypted_key,
&key_record.nonce,
)?;
Ok((key_record.provider_name.clone(), api_key))
}
/// Resolve the model to use for a given provider.
///
/// Looks up the first enabled model for the provider from the admin config.
/// Falls back to sensible defaults if no admin-configured models exist.
async fn resolve_model(state: &AppState, provider_name: &str) -> Result<String, AppError> {
// Try to get the default model from the admin_providers JSONB models_scraping array
let model = sqlx::query_scalar::<_, String>(
r#"
SELECT m->>'model_id'
FROM admin_providers, jsonb_array_elements(models_scraping) AS m
WHERE provider_name = $1 AND is_enabled = true AND (m->>'is_default')::boolean = true
LIMIT 1
"#,
)
.bind(provider_name)
.fetch_optional(&state.pool)
.await?;
match model {
Some(m) => Ok(m),
None => {
// Fall back to sensible defaults
match provider_name {
"gemini" => Ok("gemini-2.5-pro".into()),
"openai" => Ok("gpt-4o".into()),
"anthropic" => Ok("claude-sonnet-4-20250514".into()),
_ => Err(AppError::BadRequest(format!(
"Aucun modele configure pour le fournisseur '{}'",
provider_name
))),
}
}
}
}
/// Parse the LLM's structured JSON output into category-keyed news items.
///
/// Expects the output to have keys like `category_0`, `category_1`, etc.
/// Each key maps to an array of `{title, url, summary}` objects.
fn parse_llm_output(
raw: &serde_json::Value,
categories: &[String],
) -> Result<Vec<(String, Vec<NewsItem>)>, AppError> {
let mut result = Vec::new();
for (i, _cat) in categories.iter().enumerate() {
let key = format!("category_{}", i);
let items_val = raw.get(&key).cloned().unwrap_or(serde_json::json!([]));
let items: Vec<NewsItem> = serde_json::from_value(items_val).unwrap_or_default();
result.push((key, items));
}
Ok(result)
}
/// Rotate the sources list so that the source after the last-used source comes first.
fn rotate_sources(sources: Vec<crate::models::source::Source>, last_source_url: Option<&str>) -> Vec<crate::models::source::Source> {
let Some(last_url) = last_source_url else {
return sources;
};
let pos = sources.iter().position(|s| s.url == last_url);
match pos {
Some(idx) => {
let next = (idx + 1) % sources.len();
let mut rotated = sources[next..].to_vec();
rotated.extend_from_slice(&sources[..next]);
rotated
}
None => sources,
}
}
/// Scrape a single article. Returns (body_text, page_title, final_url, drop_reason).
/// `drop_reason` is `Some("filtered_empty")` or `Some("filtered_too_old")` if rejected, `None` if OK.
async fn scrape_single_article(
http_client: &reqwest::Client,
url: &str,
max_age_days: i64,
) -> (String, String, String, Option<&'static str>) {
match scraper::scrape_url(http_client, url).await {
Ok(content) => {
let final_url = content.url.clone();
if !content.ok || content.is_soft_404 {
tracing::warn!(url = url, "Soft 404 or error page detected, skipping content");
return (String::new(), String::new(), final_url, Some("filtered_empty"));
}
if scraper::is_article_too_old(content.published_date, max_age_days) {
tracing::warn!(url = url, "Article too old, skipping content");
return (String::new(), String::new(), final_url, Some("filtered_too_old"));
}
let title = content.title.unwrap_or_default();
(content.body_text, title, final_url, None)
}
Err(e) => {
tracing::warn!(url = url, error = %e, "Failed to scrape URL, keeping article with empty content");
(String::new(), String::new(), url.to_string(), Some("filtered_empty"))
}
}
}
/// Sanitize error messages to prevent leaking sensitive information.
///
/// Removes potential API keys, internal paths, and other sensitive data.
fn sanitize_error_message(msg: &str) -> String {
// If the message contains common API key patterns, replace with generic message
if msg.contains("API key")
|| msg.contains("api_key")
|| msg.contains("AIza")
|| msg.contains("sk-")
|| msg.contains("sk-ant-")
|| msg.contains("PERMISSION_DENIED")
{
return "Erreur d'authentification avec le fournisseur IA. Verifiez votre cle API.".into();
}
if msg.contains("rate limit") || msg.contains("quota") || msg.contains("429") {
return "Limite de requetes du fournisseur IA atteinte. Reessayez plus tard.".into();
}
if msg.contains("Database") || msg.contains("sqlx") || msg.contains("postgres") {
return "Erreur interne du serveur. Veuillez reessayer.".into();
}
// For other errors, truncate and sanitize
if msg.len() > 200 {
let truncated: String = msg.chars().take(200).collect();
format!("{}...", truncated)
} else {
msg.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
// ── JobStore tests ───────────────────────────────────────────
#[test]
fn job_store_create_and_subscribe() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (job_id, tx) = store.create_job(user_id).unwrap();
assert_eq!(store.len(), 1);
// Subscribe
let rx = store.subscribe(job_id, user_id);
assert!(rx.is_some());
// Wrong user cannot subscribe
let other_user = Uuid::new_v4();
assert!(store.subscribe(job_id, other_user).is_none());
// Check active job
assert_eq!(store.has_active_job(user_id), Some(job_id));
assert_eq!(store.has_active_job(other_user), None);
drop(tx);
}
#[test]
fn job_store_prevents_duplicate_active_jobs() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let result1 = store.create_job(user_id);
assert!(result1.is_some());
// Second job for same user should fail
let result2 = store.create_job(user_id);
assert!(result2.is_none());
// Different user should succeed
let other_user = Uuid::new_v4();
let result3 = store.create_job(other_user);
assert!(result3.is_some());
}
#[test]
fn job_store_allows_new_job_after_completion() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (_job_id, tx) = store.create_job(user_id).unwrap();
// Complete the job and release the user lock (as the pipeline does)
tx.send(ProgressEvent::Complete {
synthesis_id: Uuid::new_v4(),
})
.ok();
store.release_user(user_id);
// Should now allow a new job
let result2 = store.create_job(user_id);
assert!(result2.is_some());
}
#[test]
fn job_store_allows_new_job_after_error() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (_job_id, tx) = store.create_job(user_id).unwrap();
// Fail the job and release the user lock (as the pipeline does)
tx.send(ProgressEvent::Error {
message: "test error".into(),
})
.ok();
store.release_user(user_id);
// Should now allow a new job
let result2 = store.create_job(user_id);
assert!(result2.is_some());
}
#[test]
fn job_store_cleanup_expired() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
// Create a job and manually set its created_at to the past
let (_job_id, _tx) = store.create_job(user_id).unwrap();
assert_eq!(store.len(), 1);
// Cleanup should not remove recent jobs
store.cleanup_expired();
assert_eq!(store.len(), 1);
}
#[test]
fn job_store_remove() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (job_id, _tx) = store.create_job(user_id).unwrap();
assert_eq!(store.len(), 1);
store.remove(&job_id);
assert!(store.is_empty());
}
// ── ProgressEvent serialization tests ────────────────────────
#[test]
fn progress_event_serialization_progress() {
let event = ProgressEvent::Progress {
step: "search".into(),
message: "Searching...".into(),
percent: 30,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "progress");
assert_eq!(json["step"], "search");
assert_eq!(json["message"], "Searching...");
assert_eq!(json["percent"], 30);
}
#[test]
fn progress_event_serialization_complete() {
let synthesis_id = Uuid::nil();
let event = ProgressEvent::Complete { synthesis_id };
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "complete");
assert_eq!(
json["synthesis_id"],
"00000000-0000-0000-0000-000000000000"
);
}
#[test]
fn progress_event_serialization_error() {
let event = ProgressEvent::Error {
message: "Something went wrong".into(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "error");
assert_eq!(json["message"], "Something went wrong");
}
// ── parse_llm_output tests ───────────────────────────────────
#[test]
fn parse_llm_output_valid() {
let raw = serde_json::json!({
"category_0": [
{"title": "Art 1", "url": "https://a.com", "summary": "Sum 1"},
{"title": "Art 2", "url": "https://b.com", "summary": "Sum 2"}
],
"category_1": [
{"title": "Art 3", "url": "https://c.com", "summary": "Sum 3"}
]
});
let categories = vec!["AI News".into(), "Research".into()];
let result = parse_llm_output(&raw, &categories).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "category_0");
assert_eq!(result[0].1.len(), 2);
assert_eq!(result[1].0, "category_1");
assert_eq!(result[1].1.len(), 1);
}
#[test]
fn parse_llm_output_missing_category() {
let raw = serde_json::json!({
"category_0": [
{"title": "Art 1", "url": "https://a.com", "summary": "Sum 1"}
]
// category_1 is missing
});
let categories = vec!["AI News".into(), "Research".into()];
let result = parse_llm_output(&raw, &categories).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].1.len(), 1);
assert_eq!(result[1].1.len(), 0); // Missing category → empty
}
// ── sanitize_error_message tests ─────────────────────────────
#[test]
fn sanitize_hides_api_key_references() {
let msg = "Invalid API key: AIzaSyB-test-key";
let sanitized = sanitize_error_message(msg);
assert!(sanitized.contains("cle API"));
assert!(!sanitized.contains("AIza"));
}
#[test]
fn sanitize_hides_rate_limit_details() {
let msg = "Resource exhausted: rate limit exceeded for project 12345";
let sanitized = sanitize_error_message(msg);
assert!(sanitized.contains("Limite"));
assert!(!sanitized.contains("12345"));
}
#[test]
fn sanitize_hides_database_details() {
let msg = "Database connection to postgres://user:pass@localhost failed";
let sanitized = sanitize_error_message(msg);
assert!(sanitized.contains("Erreur interne"));
assert!(!sanitized.contains("postgres"));
}
#[test]
fn sanitize_truncates_long_messages() {
let msg = "x".repeat(300);
let sanitized = sanitize_error_message(&msg);
assert!(sanitized.len() < 210);
assert!(sanitized.ends_with("..."));
}
#[test]
fn sanitize_passes_normal_messages() {
let msg = "Generation failed due to network timeout";
let sanitized = sanitize_error_message(msg);
assert_eq!(sanitized, msg);
}
// ── sanitize_json_null_bytes tests ──────────────────────────
#[test]
fn sanitize_null_bytes_in_json_strings() {
let json = serde_json::json!({
"title": "Hello\u{0000}World",
"items": [{"summary": "Text\u{0000}with\u{0000}nulls"}]
});
let sanitized = sanitize_json_null_bytes(json);
assert_eq!(sanitized["title"], "HelloWorld");
assert_eq!(sanitized["items"][0]["summary"], "Textwithnulls");
}
#[test]
fn sanitize_preserves_clean_json() {
let json = serde_json::json!({
"title": "Clean text",
"count": 42,
"active": true,
"items": [{"url": "https://example.com"}]
});
let sanitized = sanitize_json_null_bytes(json.clone());
assert_eq!(sanitized, json);
}
// ── normalize_article_url tests ─────────────────────────────
#[test]
fn normalize_strips_fragment() {
assert_eq!(
normalize_article_url("https://example.com/article#section"),
"https://example.com/article"
);
}
#[test]
fn normalize_strips_utm_params() {
assert_eq!(
normalize_article_url("https://example.com/article?utm_source=twitter&utm_medium=social"),
"https://example.com/article"
);
}
#[test]
fn normalize_keeps_non_tracking_params() {
let result = normalize_article_url("https://example.com/search?q=test&utm_source=twitter");
assert!(result.contains("q=test"));
assert!(!result.contains("utm_source"));
}
#[test]
fn normalize_strips_trailing_slash() {
assert_eq!(
normalize_article_url("https://example.com/article/"),
"https://example.com/article"
);
}
#[test]
fn normalize_keeps_root_slash() {
assert_eq!(
normalize_article_url("https://example.com/"),
"https://example.com/"
);
}
#[test]
fn normalize_lowercases() {
assert_eq!(
normalize_article_url("https://Example.COM/Article"),
"https://example.com/article"
);
}
#[test]
fn normalize_strips_fbclid() {
let result = normalize_article_url("https://example.com/post?fbclid=abc123");
assert!(!result.contains("fbclid"));
assert!(!result.contains("?"));
}
#[test]
fn normalize_handles_invalid_url() {
let result = normalize_article_url("not a url at all");
assert_eq!(result, "not a url at all");
}
#[test]
fn hash_article_url_deterministic() {
let h1 = hash_article_url("https://example.com/article?utm_source=twitter");
let h2 = hash_article_url("https://example.com/article?utm_source=newsletter");
assert_eq!(h1, h2, "Same article with different UTM params should hash the same");
}
#[test]
fn hash_article_url_different_articles() {
let h1 = hash_article_url("https://example.com/article-1");
let h2 = hash_article_url("https://example.com/article-2");
assert_ne!(h1, h2);
}
// ── rotate_sources tests ──────────────────────────────────
#[test]
fn rotate_sources_no_last_url() {
let sources = vec![
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), created_at: chrono::Utc::now() },
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "B".into(), url: "https://b.com".into(), created_at: chrono::Utc::now() },
];
let result = rotate_sources(sources.clone(), None);
assert_eq!(result.len(), 2);
assert_eq!(result[0].url, "https://a.com");
}
#[test]
fn rotate_sources_with_last_url() {
let sources = vec![
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), created_at: chrono::Utc::now() },
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "B".into(), url: "https://b.com".into(), created_at: chrono::Utc::now() },
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "C".into(), url: "https://c.com".into(), created_at: chrono::Utc::now() },
];
let result = rotate_sources(sources, Some("https://a.com"));
assert_eq!(result[0].url, "https://b.com");
assert_eq!(result[1].url, "https://c.com");
assert_eq!(result[2].url, "https://a.com");
}
#[test]
fn rotate_sources_last_url_not_found() {
let sources = vec![
crate::models::source::Source { id: Uuid::new_v4(), user_id: Uuid::new_v4(), title: "A".into(), url: "https://a.com".into(), created_at: chrono::Utc::now() },
];
let result = rotate_sources(sources.clone(), Some("https://notfound.com"));
assert_eq!(result[0].url, "https://a.com");
}
#[test]
fn sanitize_error_message_handles_multibyte_utf8() {
let msg = "é".repeat(150); // 300 bytes, 150 chars
let result = sanitize_error_message(&msg);
assert!(result.ends_with("..."));
}
}