From 2dee7628a715c2446b3e68d261c5becdd5271828 Mon Sep 17 00:00:00 2001 From: oabrivard Date: Fri, 3 Apr 2026 14:11:16 +0200 Subject: [PATCH] feat: integrate feed_parser into Phase 1 pipeline with HTML fallback Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/services/synthesis/mod.rs | 70 +++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 5 deletions(-) diff --git a/backend/src/services/synthesis/mod.rs b/backend/src/services/synthesis/mod.rs index 7a8245b..eee9545 100644 --- a/backend/src/services/synthesis/mod.rs +++ b/backend/src/services/synthesis/mod.rs @@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use chrono::Utc; +use chrono::{DateTime, Utc}; use tokio::sync::watch; use uuid::Uuid; @@ -27,6 +27,7 @@ use crate::services::encryption; use crate::services::llm::factory::create_provider; use crate::services::scraper; use crate::services::source_scraper; +use crate::services::feed_parser; mod helpers; pub(crate) use helpers::{ @@ -189,23 +190,72 @@ pub async fn run_generation_inner( &format!("Vague {}/{} : extraction des sources...", wave_idx + 1, total_waves), pct as u8); - // 1a. Extract links from this wave's sources (all in parallel) + // 1a. Extract links from this wave's sources (RSS-first with HTML fallback) let mut wave_urls: Vec<(String, String)> = Vec::new(); + let mut rss_updates: Vec<(Uuid, Option, Option>)> = Vec::new(); { let mut join_set = tokio::task::JoinSet::new(); for source in wave_sources { let client = state.http_client.clone(); + let source_id = source.id; let source_url = source.url.clone(); let source_title = source.title.clone(); + let rss_url = source.rss_url.clone(); + let rss_discovered_at = source.rss_discovered_at; let max_l = max_links; join_set.spawn(async move { - let links = source_scraper::extract_article_links(&client, &source_url, max_l).await; - (source_url, source_title, links) + // Try RSS feed first + let feed_result = feed_parser::detect_and_parse_feed( + &client, + &source_url, + rss_url.as_deref(), + rss_discovered_at, + max_l, + ).await; + + match feed_result { + feed_parser::FeedResult::Found { feed_url, entries } + if entries.len() >= feed_parser::MIN_FEED_ENTRIES => + { + let links: Vec = entries.into_iter().map(|e| e.url).collect(); + tracing::info!( + source = %source_title, + feed = %feed_url, + links = links.len(), + "Extracted links from RSS feed" + ); + // Signal RSS URL update if it changed + let rss_changed = rss_url.as_deref() != Some(&feed_url); + let rss_stale = rss_discovered_at + .map(|d| Utc::now().signed_duration_since(d).num_days() >= feed_parser::REDISCOVERY_DAYS) + .unwrap_or(true); + let update = if rss_changed || rss_stale { + Some((source_id, Some(feed_url), Some(Utc::now()))) + } else { + None + }; + (source_url, source_title, Ok(links), update) + } + _ => { + // Fallback to HTML extraction + let links = source_scraper::extract_article_links(&client, &source_url, max_l).await; + // If we had a cached RSS URL but feed failed, clear it + let update = if rss_url.is_some() { + Some((source_id, None, None)) + } else { + None + }; + (source_url, source_title, links, update) + } + } }); } while let Some(join_result) = join_set.join_next().await { - if let Ok((source_url, source_title, links_result)) = join_result { + if let Ok((source_url, source_title, links_result, rss_update)) = join_result { + if let Some(update) = rss_update { + rss_updates.push(update); + } match links_result { Ok(links) => { tracing::info!(source = %source_title, links = links.len(), "Extracted links from source"); @@ -223,6 +273,16 @@ pub async fn run_generation_inner( } } + // Persist RSS URL updates (fire-and-forget) + for (source_id, new_rss_url, new_discovered_at) in rss_updates { + db::sources::update_source_rss( + &state.pool, + source_id, + new_rss_url.as_deref(), + new_discovered_at, + ).await.ok(); + } + // 1b. Filter against article history if settings.article_history_days > 0 && !wave_urls.is_empty() { let hashes: Vec = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect();