refactor: extract scrape_and_classify_batch from synthesis pipeline

Replaces ~200 duplicated lines in Phase 1 (personalized sources) and
Phase 2 (Brave Search) with a shared scrape_and_classify_batch function.
Uses ScrapeClassifyCtx to bundle shared parameters. Also prepares
synthesis.rs for JobStore extraction by re-exporting from job_store.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 381e11a5e4
commit 1ab9c817e4

@ -1,4 +1,4 @@
//! Synthesis generation pipeline and job management. //! Synthesis generation pipeline.
//! //!
//! Orchestrates the two-phase pipeline: //! Orchestrates the two-phase pipeline:
//! 1. Personalized sources: scrape user sources, classify+summarize per article //! 1. Personalized sources: scrape user sources, classify+summarize per article
@ -10,12 +10,9 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use chrono::Utc; use chrono::Utc;
use dashmap::DashMap;
use dashmap::DashSet;
use serde::Serialize;
use tokio::sync::watch; use tokio::sync::watch;
use uuid::Uuid; use uuid::Uuid;
@ -31,166 +28,8 @@ use crate::services::llm::factory::create_provider;
use crate::services::scraper; use crate::services::scraper;
use crate::services::source_scraper; use crate::services::source_scraper;
// ─────────────────────────────────────────────────────────────────── // Re-export for downstream consumers that previously imported from synthesis.
// Progress Events pub use crate::services::job_store::{emit_progress, JobStore, ProgressEvent};
// ───────────────────────────────────────────────────────────────────
/// Progress event sent to SSE clients during generation.
///
/// The `watch` channel always holds the latest event, and new subscribers
/// immediately receive the current state.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum ProgressEvent {
/// Generation is in progress.
#[serde(rename = "progress")]
Progress {
step: String,
message: String,
percent: u8,
},
/// Generation completed successfully.
#[serde(rename = "complete")]
Complete { synthesis_id: Uuid },
/// Generation failed with an error.
#[serde(rename = "error")]
Error { message: String },
}
// ───────────────────────────────────────────────────────────────────
// Job Store
// ───────────────────────────────────────────────────────────────────
/// Entry in the job store, holding the progress channel and metadata.
struct JobEntry {
/// Sender side of the watch channel for progress updates.
/// Wrapped in Arc so it can be shared with the background task
/// without cloning the Sender itself.
tx: Arc<watch::Sender<ProgressEvent>>,
/// A receiver kept alive to prevent the channel from closing.
/// Without at least one receiver, `Sender::send()` returns an error
/// and does NOT update the stored value.
_rx: watch::Receiver<ProgressEvent>,
/// User who owns this job.
user_id: Uuid,
/// When the job was created (for TTL cleanup).
created_at: Instant,
/// Flag set to true when the user requests cancellation.
cancelled: Arc<AtomicBool>,
}
/// In-memory store for active generation jobs.
///
/// Uses `DashMap` for lock-free concurrent access. Jobs are keyed by
/// a random UUID and automatically cleaned up after a TTL.
#[derive(Clone)]
pub struct JobStore {
inner: Arc<DashMap<Uuid, JobEntry>>,
generating_users: Arc<DashSet<Uuid>>,
}
/// Jobs expire after 1 hour (allows SSE reconnection).
const JOB_TTL: Duration = Duration::from_secs(3600);
impl Default for JobStore {
fn default() -> Self {
Self::new()
}
}
impl JobStore {
/// Create a new empty job store.
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
generating_users: Arc::new(DashSet::new()),
}
}
/// Create a new job for a user, returning the job ID, the watch Sender, and a cancellation flag.
///
/// Returns `None` if the user already has an active job.
/// Uses an atomic DashSet insert to prevent race conditions on double-click.
pub fn create_job(&self, user_id: Uuid) -> Option<(Uuid, Arc<watch::Sender<ProgressEvent>>, Arc<AtomicBool>)> {
if !self.generating_users.insert(user_id) {
return None;
}
let job_id = Uuid::new_v4();
let (tx, rx) = watch::channel(ProgressEvent::Progress {
step: "init".into(),
message: "Initialisation...".into(),
percent: 0,
});
let tx = Arc::new(tx);
let cancelled = Arc::new(AtomicBool::new(false));
self.inner.insert(job_id, JobEntry {
tx: Arc::clone(&tx), _rx: rx, user_id, created_at: Instant::now(),
cancelled: Arc::clone(&cancelled),
});
Some((job_id, tx, cancelled))
}
/// Get a watch receiver for a job, if it exists and belongs to the given user.
pub fn subscribe(&self, job_id: Uuid, user_id: Uuid) -> Option<watch::Receiver<ProgressEvent>> {
self.inner.get(&job_id).and_then(|entry| {
if entry.value().user_id == user_id {
Some(entry.value().tx.subscribe())
} else {
None
}
})
}
/// Check if a user has an active (in-progress) job.
pub fn has_active_job(&self, user_id: Uuid) -> Option<Uuid> {
if !self.generating_users.contains(&user_id) { return None; }
for entry in self.inner.iter() {
if entry.value().user_id == user_id { return Some(*entry.key()); }
}
None
}
/// Signal a job to stop. Returns true if the job was found and belongs to the user.
pub fn cancel_job(&self, job_id: Uuid, user_id: Uuid) -> bool {
if let Some(entry) = self.inner.get(&job_id) {
if entry.value().user_id == user_id {
entry.value().cancelled.store(true, Ordering::Relaxed);
return true;
}
}
false
}
/// Release the generating lock for a user (called when job completes, errors, or times out).
pub fn release_user(&self, user_id: Uuid) {
self.generating_users.remove(&user_id);
}
/// Remove expired jobs (older than TTL).
pub fn cleanup_expired(&self) {
let now = Instant::now();
self.inner.retain(|_, entry| {
let keep = now.duration_since(entry.created_at) < JOB_TTL;
if !keep { self.generating_users.remove(&entry.user_id); }
keep
});
}
/// Remove a specific job.
pub fn remove(&self, job_id: &Uuid) {
self.inner.remove(job_id);
}
/// Get the number of active jobs (for testing/monitoring).
pub fn len(&self) -> usize {
self.inner.len()
}
/// Check if the store is empty (for testing).
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
// ─────────────────────────────────────────────────────────────────── // ───────────────────────────────────────────────────────────────────
// Generation Pipeline // Generation Pipeline
@ -439,9 +278,19 @@ pub async fn run_generation_inner(
let mut candidates_iter = wave_urls.into_iter(); let mut candidates_iter = wave_urls.into_iter();
let mut done = false; let mut done = false;
let ctx = ScrapeClassifyCtx {
state, user_id, job_id, provider: &provider,
model_research: &model_research, classify_schema: &classify_schema,
classification_categories: &classification_categories,
user_categories: &user_categories, snippet_size, summary_length: theme.summary_length,
max_age_days: theme.max_age_days as i64,
max_items_per_category: theme.max_items_per_category as usize,
source_type: "personalized_source",
};
while !done { while !done {
// Take next batch of candidates, filtering source limits // Take next batch of candidates, filtering source limits
let mut batch: Vec<(String, String)> = Vec::new(); let mut batch: Vec<(String, Option<String>)> = Vec::new();
while batch.len() < batch_size { while batch.len() < batch_size {
let Some((url, source_url)) = candidates_iter.next() else { let Some((url, source_url)) = candidates_iter.next() else {
break; break;
@ -457,7 +306,7 @@ pub async fn run_generation_inner(
})); }));
continue; continue;
} }
batch.push((url, source_url)); batch.push((url, Some(source_url)));
} }
if batch.is_empty() { if batch.is_empty() {
@ -468,152 +317,10 @@ pub async fn run_generation_inner(
let pct = 5 + ((articles_so_far as u32 * 60) / max_total.max(1) as u32).min(60); let pct = 5 + ((articles_so_far as u32 * 60) / max_total.max(1) as u32).min(60);
emit_progress(tx, "sources", &format!("Vague {}/{} : articles {}/{}...", wave_idx + 1, total_waves, processed + 1, total_candidates), pct as u8); emit_progress(tx, "sources", &format!("Vague {}/{} : articles {}/{}...", wave_idx + 1, total_waves, processed + 1, total_candidates), pct as u8);
// Phase A: Scrape batch in parallel scrape_and_classify_batch(
let mut scrape_set = tokio::task::JoinSet::new(); &ctx, &batch, &mut article_scraped, &mut filled_counts,
for (url, source_url) in &batch { &mut source_counts, &mut pending_traces, &user_rate_limiter, &provider_name,
let client = state.http_client.clone(); ).await?;
let u = url.clone();
let su = source_url.clone();
let mad = theme.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, su, result)
});
}
let mut scraped_articles: Vec<(String, String, String, String)> = Vec::new(); // (url, source_url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: reason, scraped_ok: false,
published_date: None,
}));
} else {
scraped_articles.push((final_url, source_url, body_text, page_title));
}
}
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
// Phase B: Classify/summarize batch in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, source_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let schema = Arc::clone(&classify_schema);
let cats = Arc::clone(&classification_categories);
let body_snippet: String = body_text.chars().take(snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let su = source_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, theme.summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
// Log the LLM call
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, su, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, source_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check if LLM considers this a real article
let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true);
if !is_article {
tracing::info!(url = %final_url, "Article filtered by LLM: not a real article");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_not_article", scraped_ok: true,
published_date: None,
}));
continue;
}
// Check LLM-extracted date as fallback for articles without a scraper date
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), theme.max_age_days as i64) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "personalized_source",
source_url: Some(&source_url), category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
published_date: Some(date_str),
}));
continue;
}
}
}
}
let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string());
// Articles without any date go to "Articles sans date" category
if llm_date.is_none() {
let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string();
let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string();
article_scraped.entry("category_no_date".to_string()).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: None,
});
let source_domain = extract_domain(&source_url).unwrap_or_default();
*source_counts.entry(source_domain).or_insert(0) += 1;
continue;
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, &user_categories, &classification_categories,
&filled_counts, theme.max_items_per_category as usize,
) else {
continue;
};
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: llm_date,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
let source_domain = extract_domain(&source_url).unwrap_or_default();
*source_counts.entry(source_domain).or_insert(0) += 1;
}
}
processed += batch.len(); processed += batch.len();
@ -705,15 +412,26 @@ pub async fn run_generation_inner(
emit_progress(tx, "websearch", "Traitement des articles Brave...", 75); emit_progress(tx, "websearch", "Traitement des articles Brave...", 75);
let total_candidates = brave_urls.len(); let total_candidates = brave_urls.len();
let batch_size = settings.batch_size.max(1) as usize; let batch_size = settings.batch_size.max(1) as usize;
let snippet_size = match theme.summary_length { 1 => 500, 2 => 2000, _ => 4000 };
let mut processed = 0usize; let mut processed = 0usize;
let mut candidates_iter = brave_urls.into_iter(); let mut candidates_iter = brave_urls.into_iter();
let mut done = false; let mut done = false;
let ctx = ScrapeClassifyCtx {
state, user_id, job_id, provider: &provider,
model_research: &model_research, classify_schema: &classify_schema,
classification_categories: &classification_categories,
user_categories: &user_categories, snippet_size, summary_length: theme.summary_length,
max_age_days: theme.max_age_days as i64,
max_items_per_category: theme.max_items_per_category as usize,
source_type: "brave_search",
};
while !done { while !done {
let mut batch: Vec<String> = Vec::new(); let mut batch: Vec<(String, Option<String>)> = Vec::new();
while batch.len() < batch_size { while batch.len() < batch_size {
let Some(url) = candidates_iter.next() else { break }; let Some(url) = candidates_iter.next() else { break };
batch.push(url); batch.push((url, None));
} }
if batch.is_empty() { break; } if batch.is_empty() { break; }
@ -721,156 +439,10 @@ pub async fn run_generation_inner(
let pct = 75 + ((processed as u32 * 10) / total_candidates.max(1) as u32).min(10); let pct = 75 + ((processed as u32 * 10) / total_candidates.max(1) as u32).min(10);
emit_progress(tx, "websearch", &format!("Verification des sources {}/{}...", processed + 1, total_candidates), pct as u8); emit_progress(tx, "websearch", &format!("Verification des sources {}/{}...", processed + 1, total_candidates), pct as u8);
// Scrape batch in parallel scrape_and_classify_batch(
let mut scrape_set = tokio::task::JoinSet::new(); &ctx, &batch, &mut article_scraped, &mut filled_counts,
for url in &batch { &mut source_counts, &mut pending_traces, &user_rate_limiter, &provider_name,
let client = state.http_client.clone(); ).await?;
let u = url.clone();
let mad = theme.max_age_days as i64;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, result)
});
}
let mut scraped_articles: Vec<(String, String, String)> = Vec::new(); // (url, body_text, page_title)
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: reason, scraped_ok: false,
published_date: None,
}));
} else {
scraped_articles.push((final_url, body_text, page_title));
}
}
}
if scraped_articles.is_empty() {
processed += batch.len();
continue;
}
// Classify/summarize in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, body_text, page_title) in &scraped_articles {
let provider_clone = std::sync::Arc::clone(&provider);
let model = Arc::clone(&model_research);
let schema = Arc::clone(&classify_schema);
let cats = Arc::clone(&classification_categories);
let snippet_size = match theme.summary_length {
1 => 500,
2 => 2000,
_ => 4000,
};
let body_snippet: String = body_text.chars().take(snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let pool = state.pool.clone();
let uid = user_id;
let jid = job_id;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, theme.summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check if LLM considers this a real article
let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true);
if !is_article {
tracing::info!(url = %final_url, "Article filtered by LLM: not a real article");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: "filtered_not_article", scraped_ok: true,
published_date: None,
}));
continue;
}
// Check LLM-extracted date as fallback
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), theme.max_age_days as i64) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(user_id, job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: "brave_search",
source_url: None, category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
published_date: Some(date_str),
}));
continue;
}
}
}
}
let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string());
// Articles without any date go to "Articles sans date" category
if llm_date.is_none() {
let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string();
let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string();
article_scraped.entry("category_no_date".to_string()).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: None,
});
if let Some(domain) = extract_domain(&final_url) {
*source_counts.entry(domain).or_insert(0) += 1;
}
continue;
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, &user_categories, &classification_categories,
&filled_counts, theme.max_items_per_category as usize,
) else {
continue;
};
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: llm_date,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
if let Some(domain) = extract_domain(&final_url) {
*source_counts.entry(domain).or_insert(0) += 1;
}
}
}
processed += batch.len(); processed += batch.len();
@ -1060,14 +632,196 @@ fn sanitize_json_null_bytes(value: serde_json::Value) -> serde_json::Value {
} }
} }
/// Emit a progress event via the watch channel. /// Context passed to [`scrape_and_classify_batch`] to avoid long argument lists.
fn emit_progress(tx: &watch::Sender<ProgressEvent>, step: &str, message: &str, percent: u8) { struct ScrapeClassifyCtx<'a> {
tx.send(ProgressEvent::Progress { state: &'a AppState,
step: step.into(), user_id: Uuid,
message: message.into(), job_id: Uuid,
percent, provider: &'a Arc<dyn crate::services::llm::LlmProvider>,
}) model_research: &'a Arc<String>,
.ok(); classify_schema: &'a Arc<serde_json::Value>,
classification_categories: &'a Arc<Vec<String>>,
user_categories: &'a [String],
snippet_size: usize,
summary_length: i32,
max_age_days: i64,
max_items_per_category: usize,
source_type: &'a str,
}
/// Scrape and classify a batch of articles, updating shared state.
///
/// Each article is represented as `(url, Option<source_url>)`.
/// - `source_type` is used for tracing (e.g. "personalized_source", "brave_search").
/// - When `source_url` is `Some`, it is recorded in traces; the domain for
/// source counting comes from the source URL.
/// - When `source_url` is `None`, traces have no source URL; the domain for
/// source counting comes from the article URL.
#[allow(clippy::too_many_arguments)]
async fn scrape_and_classify_batch(
ctx: &ScrapeClassifyCtx<'_>,
articles: &[(String, Option<String>)],
article_scraped: &mut HashMap<String, Vec<NewsItem>>,
filled_counts: &mut HashMap<String, usize>,
source_counts: &mut HashMap<String, usize>,
pending_traces: &mut Vec<db::article_history::ArticleHistoryEntry>,
user_rate_limiter: &Option<crate::services::rate_limiter::RateLimiter>,
provider_name: &str,
) -> Result<(), AppError> {
// Phase A: Scrape batch in parallel
let mut scrape_set = tokio::task::JoinSet::new();
for (url, source_url) in articles {
let client = ctx.state.http_client.clone();
let u = url.clone();
let su = source_url.clone();
let mad = ctx.max_age_days;
scrape_set.spawn(async move {
let result = scrape_single_article(&client, &u, mad).await;
(u, su, result)
});
}
let mut scraped_articles: Vec<(String, Option<String>, String, String)> = Vec::new();
while let Some(join_result) = scrape_set.join_next().await {
if let Ok((_url, source_url, (body_text, page_title, final_url, drop_reason))) = join_result {
if let Some(reason) = drop_reason {
pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: ctx.source_type,
source_url: source_url.as_deref(), category: None, synthesis_id: None,
status: reason, scraped_ok: false,
published_date: None,
}));
} else {
scraped_articles.push((final_url, source_url, body_text, page_title));
}
}
}
if scraped_articles.is_empty() {
return Ok(());
}
// Phase B: Classify/summarize batch in parallel
check_rate_limit(ctx.state, user_rate_limiter, provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, source_url, body_text, page_title) in &scraped_articles {
let provider_clone = Arc::clone(ctx.provider);
let model = Arc::clone(ctx.model_research);
let schema = Arc::clone(ctx.classify_schema);
let cats = Arc::clone(ctx.classification_categories);
let body_snippet: String = body_text.chars().take(ctx.snippet_size).collect();
let title = page_title.clone();
let url = final_url.clone();
let su = source_url.clone();
let pool = ctx.state.pool.clone();
let uid = ctx.user_id;
let jid = ctx.job_id;
let summary_length = ctx.summary_length;
let (sys, usr) = crate::services::prompts::build_article_classify_prompt(&title, &body_snippet, &cats, summary_length);
classify_set.spawn(async move {
let llm_start = std::time::Instant::now();
let result = provider_clone.call_llm(&model, &sys, &usr, &schema).await;
let duration = llm_start.elapsed().as_millis() as u64;
if let Ok(ref resp) = result {
let resp_str = serde_json::to_string_pretty(resp).unwrap_or_default();
crate::db::llm_call_log::insert(&pool, uid, jid, "classify_summarize", &model, &sys, &usr, &resp_str, duration as i32, Some(&url)).await.ok();
}
(url, su, title, result)
});
}
while let Some(join_result) = classify_set.join_next().await {
if let Ok((final_url, source_url, page_title, llm_result)) = join_result {
let class_response = match llm_result {
Ok(resp) => resp,
Err(e) => {
tracing::warn!(url = %final_url, error = %e, "LLM classify failed, skipping article");
continue;
}
};
// Check if LLM considers this a real article
let is_article = class_response.get("is_article").and_then(|v| v.as_bool()).unwrap_or(true);
if !is_article {
tracing::info!(url = %final_url, "Article filtered by LLM: not a real article");
pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: ctx.source_type,
source_url: source_url.as_deref(), category: None, synthesis_id: None,
status: "filtered_not_article", scraped_ok: true,
published_date: None,
}));
continue;
}
// Check LLM-extracted date as fallback
if let Some(date_str) = class_response.get("date").and_then(|d| d.as_str()) {
if !date_str.is_empty() {
if let Some(parsed) = scraper::parse_date_string(date_str) {
if scraper::is_article_too_old(Some(parsed), ctx.max_age_days) {
tracing::info!(url = %final_url, date = date_str, "Article filtered by LLM-extracted date (too old)");
pending_traces.push(build_trace_entry(ctx.user_id, ctx.job_id, &ArticleTrace {
url: &final_url, title: &page_title, source_type: ctx.source_type,
source_url: source_url.as_deref(), category: None, synthesis_id: None,
status: "filtered_too_old", scraped_ok: true,
published_date: Some(date_str),
}));
continue;
}
}
}
}
let llm_date = class_response.get("date").and_then(|d| d.as_str()).filter(|s| !s.is_empty()).map(|s| s.to_string());
// Domain for source counting: prefer source_url if available, else article url
let count_domain = source_url.as_deref()
.and_then(extract_domain)
.or_else(|| extract_domain(&final_url));
// Articles without any date go to "Articles sans date" category
if llm_date.is_none() {
let llm_title = class_response.get("title").and_then(|t| t.as_str()).unwrap_or(&page_title).to_string();
let llm_summary = class_response.get("summary").and_then(|s| s.as_str()).unwrap_or("").to_string();
article_scraped.entry("category_no_date".to_string()).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: None,
});
if let Some(domain) = count_domain {
*source_counts.entry(domain).or_insert(0) += 1;
}
continue;
}
let Some((final_cat_key, final_cat_name, llm_title, llm_summary)) = assign_category(
&class_response, &page_title, ctx.user_categories, ctx.classification_categories,
filled_counts, ctx.max_items_per_category,
) else {
continue;
};
article_scraped.entry(final_cat_key).or_default().push(NewsItem {
title: llm_title,
url: final_url.clone(),
summary: llm_summary,
date: llm_date,
});
*filled_counts.entry(final_cat_name).or_insert(0) += 1;
if let Some(domain) = count_domain {
*source_counts.entry(domain).or_insert(0) += 1;
}
}
}
Ok(())
} }
/// Structured parameters for article history tracing. /// Structured parameters for article history tracing.
@ -1547,154 +1301,6 @@ fn sanitize_error_message(msg: &str) -> String {
mod tests { mod tests {
use super::*; use super::*;
// ── JobStore tests ───────────────────────────────────────────
#[test]
fn job_store_create_and_subscribe() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (job_id, tx, _cancelled) = store.create_job(user_id).unwrap();
assert_eq!(store.len(), 1);
// Subscribe
let rx = store.subscribe(job_id, user_id);
assert!(rx.is_some());
// Wrong user cannot subscribe
let other_user = Uuid::new_v4();
assert!(store.subscribe(job_id, other_user).is_none());
// Check active job
assert_eq!(store.has_active_job(user_id), Some(job_id));
assert_eq!(store.has_active_job(other_user), None);
drop(tx);
}
#[test]
fn job_store_prevents_duplicate_active_jobs() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let _result1 = store.create_job(user_id);
assert!(_result1.is_some());
// Second job for same user should fail
let result2 = store.create_job(user_id);
assert!(result2.is_none());
// Different user should succeed
let other_user = Uuid::new_v4();
let result3 = store.create_job(other_user);
assert!(result3.is_some());
}
#[test]
fn job_store_allows_new_job_after_completion() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (_job_id, tx, _cancelled) = store.create_job(user_id).unwrap();
// Complete the job and release the user lock (as the pipeline does)
tx.send(ProgressEvent::Complete {
synthesis_id: Uuid::new_v4(),
})
.ok();
store.release_user(user_id);
// Should now allow a new job
let result2 = store.create_job(user_id);
assert!(result2.is_some());
}
#[test]
fn job_store_allows_new_job_after_error() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (_job_id, tx, _cancelled) = store.create_job(user_id).unwrap();
// Fail the job and release the user lock (as the pipeline does)
tx.send(ProgressEvent::Error {
message: "test error".into(),
})
.ok();
store.release_user(user_id);
// Should now allow a new job
let result2 = store.create_job(user_id);
assert!(result2.is_some());
}
#[test]
fn job_store_cleanup_expired() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
// Create a job and manually set its created_at to the past
let (_job_id, _tx, _cancelled) = store.create_job(user_id).unwrap();
assert_eq!(store.len(), 1);
// Cleanup should not remove recent jobs
store.cleanup_expired();
assert_eq!(store.len(), 1);
}
#[test]
fn job_store_remove() {
let store = JobStore::new();
let user_id = Uuid::new_v4();
let (job_id, _tx, _cancelled) = store.create_job(user_id).unwrap();
assert_eq!(store.len(), 1);
store.remove(&job_id);
assert!(store.is_empty());
}
// ── ProgressEvent serialization tests ────────────────────────
#[test]
fn progress_event_serialization_progress() {
let event = ProgressEvent::Progress {
step: "search".into(),
message: "Searching...".into(),
percent: 30,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "progress");
assert_eq!(json["step"], "search");
assert_eq!(json["message"], "Searching...");
assert_eq!(json["percent"], 30);
}
#[test]
fn progress_event_serialization_complete() {
let synthesis_id = Uuid::nil();
let event = ProgressEvent::Complete { synthesis_id };
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "complete");
assert_eq!(
json["synthesis_id"],
"00000000-0000-0000-0000-000000000000"
);
}
#[test]
fn progress_event_serialization_error() {
let event = ProgressEvent::Error {
message: "Something went wrong".into(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "error");
assert_eq!(json["message"], "Something went wrong");
}
// ── parse_llm_output tests ─────────────────────────────────── // ── parse_llm_output tests ───────────────────────────────────
#[test] #[test]

Loading…
Cancel
Save