feat: integrate feed_parser into Phase 1 pipeline with HTML fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 2 months ago
parent d4cbcf47ae
commit 027c576302

@ -265,8 +265,14 @@ mod tests {
use wiremock::{Mock, MockServer, ResponseTemplate};
use wiremock::matchers::method;
/// Set SKIP_SSRF_CHECK for tests using wiremock (localhost).
fn skip_ssrf_for_test() {
unsafe { std::env::set_var("SKIP_SSRF_CHECK", "1"); }
}
#[tokio::test]
async fn parse_feed_rss2() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let rss_body = r#"<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
@ -307,6 +313,7 @@ mod tests {
#[tokio::test]
async fn parse_feed_atom() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let atom_body = r#"<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
@ -334,6 +341,7 @@ mod tests {
#[tokio::test]
async fn parse_feed_respects_max_links() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let rss_body = r#"<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
@ -359,6 +367,7 @@ mod tests {
#[tokio::test]
async fn parse_feed_entries_without_dates_come_last() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let rss_body = r#"<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
@ -384,6 +393,7 @@ mod tests {
#[tokio::test]
async fn parse_feed_404_returns_empty() {
skip_ssrf_for_test();
let server = MockServer::start().await;
Mock::given(method("GET"))
@ -398,6 +408,7 @@ mod tests {
#[tokio::test]
async fn parse_feed_invalid_xml_returns_error() {
skip_ssrf_for_test();
let server = MockServer::start().await;
Mock::given(method("GET"))
@ -412,6 +423,7 @@ mod tests {
#[tokio::test]
async fn discover_feed_from_link_rss() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let html = format!(
r#"<html><head>
@ -434,6 +446,7 @@ mod tests {
#[tokio::test]
async fn discover_feed_from_link_atom() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let html = format!(
r#"<html><head>
@ -456,6 +469,7 @@ mod tests {
#[tokio::test]
async fn discover_feed_direct_rss_url() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let rss_body = r#"<?xml version="1.0"?><rss version="2.0"><channel><title>T</title></channel></rss>"#;
@ -476,6 +490,7 @@ mod tests {
#[tokio::test]
async fn discover_feed_no_feed_found() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let html = "<html><head><title>No feed</title></head><body></body></html>";
@ -492,6 +507,7 @@ mod tests {
#[tokio::test]
async fn discover_feed_resolves_relative_href() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let html = r#"<html><head>
<link rel="alternate" type="application/rss+xml" href="/feed.xml">
@ -513,6 +529,7 @@ mod tests {
#[tokio::test]
async fn detect_and_parse_cached_fresh_feed() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let rss_body = r#"<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"><channel><title>T</title>
@ -543,6 +560,7 @@ mod tests {
#[tokio::test]
async fn detect_and_parse_no_cache_discovers_feed() {
skip_ssrf_for_test();
let server = MockServer::start().await;
// First request: HTML page with feed link
@ -595,6 +613,7 @@ mod tests {
#[tokio::test]
async fn detect_and_parse_no_feed_returns_not_found() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let html = "<html><head><title>No feed</title></head><body></body></html>";
@ -617,6 +636,7 @@ mod tests {
#[tokio::test]
async fn detect_and_parse_stale_cache_rediscovers() {
skip_ssrf_for_test();
let server = MockServer::start().await;
let feed_path = format!("{}/feed.xml", server.uri());

@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use chrono::{DateTime, Utc};
use tokio::sync::watch;
use uuid::Uuid;
@ -27,6 +27,7 @@ use crate::services::encryption;
use crate::services::llm::factory::create_provider;
use crate::services::scraper;
use crate::services::source_scraper;
use crate::services::feed_parser;
mod helpers;
pub(crate) use helpers::{
@ -189,23 +190,76 @@ pub async fn run_generation_inner(
&format!("Vague {}/{} : extraction des sources...", wave_idx + 1, total_waves),
pct as u8);
// 1a. Extract links from this wave's sources (all in parallel)
// 1a. Extract links from this wave's sources (RSS-first with HTML fallback)
let mut wave_urls: Vec<(String, String)> = Vec::new();
let mut rss_updates: Vec<(Uuid, Option<String>, Option<DateTime<Utc>>)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
for source in wave_sources {
let client = state.http_client.clone();
let source_id = source.id;
let source_url = source.url.clone();
let source_title = source.title.clone();
let rss_url = source.rss_url.clone();
let rss_discovered_at = source.rss_discovered_at;
let max_l = max_links;
join_set.spawn(async move {
// Try RSS feed first
let feed_result = feed_parser::detect_and_parse_feed(
&client,
&source_url,
rss_url.as_deref(),
rss_discovered_at,
max_l,
).await;
match feed_result {
feed_parser::FeedResult::Found { feed_url, entries }
if entries.len() >= feed_parser::MIN_FEED_ENTRIES =>
{
let links: Vec<String> = entries.into_iter().map(|e| e.url).collect();
tracing::info!(
source = %source_title,
feed = %feed_url,
links = links.len(),
"Extracted links from RSS feed"
);
// Signal RSS URL update if it changed
let rss_changed = rss_url.as_deref() != Some(&feed_url);
let rss_stale = rss_discovered_at
.map(|d| Utc::now().signed_duration_since(d).num_days() >= feed_parser::REDISCOVERY_DAYS)
.unwrap_or(true);
let update = if rss_changed || rss_stale {
Some((source_id, Some(feed_url), Some(Utc::now())))
} else {
None
};
(source_url, source_title, Ok(links), update)
}
feed_parser::FeedResult::Found { .. } => {
// Feed found but too few entries — keep the cache, fall back to HTML
let links = source_scraper::extract_article_links(&client, &source_url, max_l).await;
(source_url, source_title, links, None)
}
feed_parser::FeedResult::NotFound => {
// No feed discovered — fall back to HTML and clear any stale cache
let links = source_scraper::extract_article_links(&client, &source_url, max_l).await;
(source_url, source_title, links)
let update = if rss_url.is_some() {
Some((source_id, None, None))
} else {
None
};
(source_url, source_title, links, update)
}
}
});
}
while let Some(join_result) = join_set.join_next().await {
if let Ok((source_url, source_title, links_result)) = join_result {
if let Ok((source_url, source_title, links_result, rss_update)) = join_result {
if let Some(update) = rss_update {
rss_updates.push(update);
}
match links_result {
Ok(links) => {
tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
@ -223,6 +277,16 @@ pub async fn run_generation_inner(
}
}
// Persist RSS URL updates (fire-and-forget)
for (source_id, new_rss_url, new_discovered_at) in rss_updates {
db::sources::update_source_rss(
&state.pool,
source_id,
new_rss_url.as_deref(),
new_discovered_at,
).await.ok();
}
// 1b. Filter against article history
if settings.article_history_days > 0 && !wave_urls.is_empty() {
let hashes: Vec<String> = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect();

Loading…
Cancel
Save