feat: integrate feed_parser into Phase 1 pipeline with HTML fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
oabrivard 2 months ago
parent d4cbcf47ae
commit 2dee7628a7

@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use chrono::{DateTime, Utc};
use tokio::sync::watch;
use uuid::Uuid;
@ -27,6 +27,7 @@ use crate::services::encryption;
use crate::services::llm::factory::create_provider;
use crate::services::scraper;
use crate::services::source_scraper;
use crate::services::feed_parser;
mod helpers;
pub(crate) use helpers::{
@ -189,23 +190,72 @@ pub async fn run_generation_inner(
&format!("Vague {}/{} : extraction des sources...", wave_idx + 1, total_waves),
pct as u8);
// 1a. Extract links from this wave's sources (all in parallel)
// 1a. Extract links from this wave's sources (RSS-first with HTML fallback)
let mut wave_urls: Vec<(String, String)> = Vec::new();
let mut rss_updates: Vec<(Uuid, Option<String>, Option<DateTime<Utc>>)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
for source in wave_sources {
let client = state.http_client.clone();
let source_id = source.id;
let source_url = source.url.clone();
let source_title = source.title.clone();
let rss_url = source.rss_url.clone();
let rss_discovered_at = source.rss_discovered_at;
let max_l = max_links;
join_set.spawn(async move {
let links = source_scraper::extract_article_links(&client, &source_url, max_l).await;
(source_url, source_title, links)
// Try RSS feed first
let feed_result = feed_parser::detect_and_parse_feed(
&client,
&source_url,
rss_url.as_deref(),
rss_discovered_at,
max_l,
).await;
match feed_result {
feed_parser::FeedResult::Found { feed_url, entries }
if entries.len() >= feed_parser::MIN_FEED_ENTRIES =>
{
let links: Vec<String> = entries.into_iter().map(|e| e.url).collect();
tracing::info!(
source = %source_title,
feed = %feed_url,
links = links.len(),
"Extracted links from RSS feed"
);
// Signal RSS URL update if it changed
let rss_changed = rss_url.as_deref() != Some(&feed_url);
let rss_stale = rss_discovered_at
.map(|d| Utc::now().signed_duration_since(d).num_days() >= feed_parser::REDISCOVERY_DAYS)
.unwrap_or(true);
let update = if rss_changed || rss_stale {
Some((source_id, Some(feed_url), Some(Utc::now())))
} else {
None
};
(source_url, source_title, Ok(links), update)
}
_ => {
// Fallback to HTML extraction
let links = source_scraper::extract_article_links(&client, &source_url, max_l).await;
// If we had a cached RSS URL but feed failed, clear it
let update = if rss_url.is_some() {
Some((source_id, None, None))
} else {
None
};
(source_url, source_title, links, update)
}
}
});
}
while let Some(join_result) = join_set.join_next().await {
if let Ok((source_url, source_title, links_result)) = join_result {
if let Ok((source_url, source_title, links_result, rss_update)) = join_result {
if let Some(update) = rss_update {
rss_updates.push(update);
}
match links_result {
Ok(links) => {
tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
@ -223,6 +273,16 @@ pub async fn run_generation_inner(
}
}
// Persist RSS URL updates (fire-and-forget)
for (source_id, new_rss_url, new_discovered_at) in rss_updates {
db::sources::update_source_rss(
&state.pool,
source_id,
new_rss_url.as_deref(),
new_discovered_at,
).await.ok();
}
// 1b. Filter against article history
if settings.article_history_days > 0 && !wave_urls.is_empty() {
let hashes: Vec<String> = wave_urls.iter().map(|(url, _)| hash_article_url(url)).collect();

Loading…
Cancel
Save