From 027c5763029dbb6e143e7bcfb4b11b825a52a370 Mon Sep 17 00:00:00 2001 From: oabrivard Date: Fri, 3 Apr 2026 14:11:16 +0200 Subject: [PATCH] feat: integrate feed_parser into Phase 1 pipeline with HTML fallback Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/services/feed_parser.rs | 20 ++++++++ backend/src/services/synthesis/mod.rs | 74 +++++++++++++++++++++++++-- 2 files changed, 89 insertions(+), 5 deletions(-) diff --git a/backend/src/services/feed_parser.rs b/backend/src/services/feed_parser.rs index 06d09bf..e6d9943 100644 --- a/backend/src/services/feed_parser.rs +++ b/backend/src/services/feed_parser.rs @@ -265,8 +265,14 @@ mod tests { use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::method; + /// Set SKIP_SSRF_CHECK for tests using wiremock (localhost). + fn skip_ssrf_for_test() { + unsafe { std::env::set_var("SKIP_SSRF_CHECK", "1"); } + } + #[tokio::test] async fn parse_feed_rss2() { + skip_ssrf_for_test(); let server = MockServer::start().await; let rss_body = r#" @@ -307,6 +313,7 @@ mod tests { #[tokio::test] async fn parse_feed_atom() { + skip_ssrf_for_test(); let server = MockServer::start().await; let atom_body = r#" @@ -334,6 +341,7 @@ mod tests { #[tokio::test] async fn parse_feed_respects_max_links() { + skip_ssrf_for_test(); let server = MockServer::start().await; let rss_body = r#" @@ -359,6 +367,7 @@ mod tests { #[tokio::test] async fn parse_feed_entries_without_dates_come_last() { + skip_ssrf_for_test(); let server = MockServer::start().await; let rss_body = r#" @@ -384,6 +393,7 @@ mod tests { #[tokio::test] async fn parse_feed_404_returns_empty() { + skip_ssrf_for_test(); let server = MockServer::start().await; Mock::given(method("GET")) @@ -398,6 +408,7 @@ mod tests { #[tokio::test] async fn parse_feed_invalid_xml_returns_error() { + skip_ssrf_for_test(); let server = MockServer::start().await; Mock::given(method("GET")) @@ -412,6 +423,7 @@ mod tests { #[tokio::test] async fn discover_feed_from_link_rss() { + skip_ssrf_for_test(); let server = MockServer::start().await; let html = format!( r#" @@ -434,6 +446,7 @@ mod tests { #[tokio::test] async fn discover_feed_from_link_atom() { + skip_ssrf_for_test(); let server = MockServer::start().await; let html = format!( r#" @@ -456,6 +469,7 @@ mod tests { #[tokio::test] async fn discover_feed_direct_rss_url() { + skip_ssrf_for_test(); let server = MockServer::start().await; let rss_body = r#"T"#; @@ -476,6 +490,7 @@ mod tests { #[tokio::test] async fn discover_feed_no_feed_found() { + skip_ssrf_for_test(); let server = MockServer::start().await; let html = "No feed"; @@ -492,6 +507,7 @@ mod tests { #[tokio::test] async fn discover_feed_resolves_relative_href() { + skip_ssrf_for_test(); let server = MockServer::start().await; let html = r#" @@ -513,6 +529,7 @@ mod tests { #[tokio::test] async fn detect_and_parse_cached_fresh_feed() { + skip_ssrf_for_test(); let server = MockServer::start().await; let rss_body = r#" T @@ -543,6 +560,7 @@ mod tests { #[tokio::test] async fn detect_and_parse_no_cache_discovers_feed() { + skip_ssrf_for_test(); let server = MockServer::start().await; // First request: HTML page with feed link @@ -595,6 +613,7 @@ mod tests { #[tokio::test] async fn detect_and_parse_no_feed_returns_not_found() { + skip_ssrf_for_test(); let server = MockServer::start().await; let html = "No feed"; @@ -617,6 +636,7 @@ mod tests { #[tokio::test] async fn detect_and_parse_stale_cache_rediscovers() { + skip_ssrf_for_test(); let server = MockServer::start().await; let feed_path = format!("{}/feed.xml", server.uri()); diff --git a/backend/src/services/synthesis/mod.rs b/backend/src/services/synthesis/mod.rs index 7a8245b..f4ac128 100644 --- a/backend/src/services/synthesis/mod.rs +++ b/backend/src/services/synthesis/mod.rs @@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use chrono::Utc; +use chrono::{DateTime, Utc}; use tokio::sync::watch; use uuid::Uuid; @@ -27,6 +27,7 @@ use crate::services::encryption; use crate::services::llm::factory::create_provider; use crate::services::scraper; use crate::services::source_scraper; +use crate::services::feed_parser; mod helpers; pub(crate) use helpers::{ @@ -189,23 +190,76 @@ pub async fn run_generation_inner( &format!("Vague {}/{} : extraction des sources...", wave_idx + 1, total_waves), pct as u8); - // 1a. Extract links from this wave's sources (all in parallel) + // 1a. Extract links from this wave's sources (RSS-first with HTML fallback) let mut wave_urls: Vec<(String, String)> = Vec::new(); + let mut rss_updates: Vec<(Uuid, Option, Option>)> = Vec::new(); { let mut join_set = tokio::task::JoinSet::new(); for source in wave_sources { let client = state.http_client.clone(); + let source_id = source.id; let source_url = source.url.clone(); let source_title = source.title.clone(); + let rss_url = source.rss_url.clone(); + let rss_discovered_at = source.rss_discovered_at; let max_l = max_links; join_set.spawn(async move { - let links = source_scraper::extract_article_links(&client, &source_url, max_l).await; - (source_url, source_title, links) + // Try RSS feed first + let feed_result = feed_parser::detect_and_parse_feed( + &client, + &source_url, + rss_url.as_deref(), + rss_discovered_at, + max_l, + ).await; + + match feed_result { + feed_parser::FeedResult::Found { feed_url, entries } + if entries.len() >= feed_parser::MIN_FEED_ENTRIES => + { + let links: Vec = entries.into_iter().map(|e| e.url).collect(); + tracing::info!( + source = %source_title, + feed = %feed_url, + links = links.len(), + "Extracted links from RSS feed" + ); + // Signal RSS URL update if it changed + let rss_changed = rss_url.as_deref() != Some(&feed_url); + let rss_stale = rss_discovered_at + .map(|d| Utc::now().signed_duration_since(d).num_days() >= feed_parser::REDISCOVERY_DAYS) + .unwrap_or(true); + let update = if rss_changed || rss_stale { + Some((source_id, Some(feed_url), Some(Utc::now()))) + } else { + None + }; + (source_url, source_title, Ok(links), update) + } + feed_parser::FeedResult::Found { .. } => { + // Feed found but too few entries — keep the cache, fall back to HTML + let links = source_scraper::extract_article_links(&client, &source_url, max_l).await; + (source_url, source_title, links, None) + } + feed_parser::FeedResult::NotFound => { + // No feed discovered — fall back to HTML and clear any stale cache + let links = source_scraper::extract_article_links(&client, &source_url, max_l).await; + let update = if rss_url.is_some() { + Some((source_id, None, None)) + } else { + None + }; + (source_url, source_title, links, update) + } + } }); } while let Some(join_result) = join_set.join_next().await { - if let Ok((source_url, source_title, links_result)) = join_result { + if let Ok((source_url, source_title, links_result, rss_update)) = join_result { + if let Some(update) = rss_update { + rss_updates.push(update); + } match links_result { Ok(links) => { tracing::info!(source = %source_title, links = links.len(), "Extracted links from source"); @@ -223,6 +277,16 @@ pub async fn run_generation_inner( } } + // Persist RSS URL updates (fire-and-forget) + for (source_id, new_rss_url, new_discovered_at) in rss_updates { + db::sources::update_source_rss( + &state.pool, + source_id, + new_rss_url.as_deref(), + new_discovered_at, + ).await.ok(); + } + // 1b. Filter against article history if settings.article_history_days > 0 && !wave_urls.is_empty() { let hashes: Vec = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect();