# Source Priority Pipeline — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Redesign the synthesis generation pipeline to scrape personalized sources first, classify articles with the LLM, then fall back to web search for unfilled categories. **Architecture:** Two-phase pipeline. Phase 1: scrape user source pages → extract article links → scrape articles → LLM classification → fill categories. Phase 2: LLM web search for gaps → scrape → classify → fill remaining. Combined rewrite pass at the end. **Tech Stack:** Rust (reqwest, scraper crate, serde_json), existing LLM providers **Spec:** `docs/superpowers/specs/2026-03-24-source-priority-pipeline-design.md` --- ### Task 1: Source page scraper module **Files:** - Create: `backend/src/services/source_scraper.rs` - Modify: `backend/src/services/mod.rs` Create a new module that fetches a source page and extracts article links. - [ ] **Step 1: Create `backend/src/services/source_scraper.rs`** The module should contain: ```rust //! Source page scraper: fetches a source URL and extracts article links. //! //! Used in Phase 1 of the generation pipeline to discover articles //! from user-configured sources before falling back to LLM web search. use crate::errors::AppError; use scraper::{Html, Selector}; use url::Url; /// Patterns in URL paths that indicate non-article pages. const EXCLUDED_PATH_PATTERNS: &[&str] = &[ "/tag/", "/category/", "/author/", "/page/", "/login", "/signup", "/privacy", "/terms", "/search", "/contact", "/about", ]; /// File extensions that indicate static assets, not articles. const EXCLUDED_EXTENSIONS: &[&str] = &[ ".css", ".js", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".pdf", ".zip", ".xml", ".json", ".ico", ".woff", ".woff2", ]; /// Extract article links from a source page. /// /// Fetches the HTML at `source_url`, extracts all `` links, /// filters to same-domain article-like URLs, deduplicates, and returns /// up to `max_links` candidate URLs. pub async fn extract_article_links( http_client: &reqwest::Client, source_url: &str, max_links: usize, ) -> Result, AppError> { // Parse source URL to get base domain let base_url = Url::parse(source_url) .map_err(|e| AppError::BadRequest(format!("Invalid source URL: {}", e)))?; let base_domain = base_url.host_str().unwrap_or("").to_lowercase(); // Fetch the page let response = http_client .get(source_url) .send() .await .map_err(|e| { tracing::warn!(url = source_url, error = %e, "Failed to fetch source page"); AppError::Internal(anyhow::anyhow!("Failed to fetch source page")) })?; if !response.status().is_success() { tracing::warn!(url = source_url, status = %response.status(), "Source page returned non-200"); return Ok(Vec::new()); } let html_text = response.text().await.map_err(|e| { AppError::Internal(anyhow::anyhow!("Failed to read source page body: {}", e)) })?; let links = extract_links_from_html(&html_text, &base_url, &base_domain); Ok(links.into_iter().take(max_links).collect()) } /// Extract and filter article links from HTML content. /// /// This is a pure function (no I/O) for easy testing. pub fn extract_links_from_html( html: &str, base_url: &Url, base_domain: &str, ) -> Vec { let document = Html::parse_document(html); let selector = Selector::parse("a[href]").unwrap(); let mut seen = std::collections::HashSet::new(); let mut links = Vec::new(); for element in document.select(&selector) { if let Some(href) = element.value().attr("href") { // Resolve relative URLs against the base let resolved = match base_url.join(href) { Ok(u) => u, Err(_) => continue, }; // Must be http(s) if resolved.scheme() != "http" && resolved.scheme() != "https" { continue; } // Must be same domain let link_domain = resolved.host_str().unwrap_or("").to_lowercase(); if link_domain != base_domain { continue; } // Must have a non-empty path (not just "/") let path = resolved.path(); if path.is_empty() || path == "/" { continue; } // Exclude non-article patterns let path_lower = path.to_lowercase(); if EXCLUDED_PATH_PATTERNS.iter().any(|p| path_lower.contains(p)) { continue; } // Exclude static assets if EXCLUDED_EXTENSIONS.iter().any(|ext| path_lower.ends_with(ext)) { continue; } // Normalize: strip fragment, keep path+query let mut normalized = resolved.clone(); normalized.set_fragment(None); let url_str = normalized.to_string(); // Deduplicate if seen.insert(url_str.clone()) { links.push(url_str); } } } links } ``` - [ ] **Step 2: Register module in `backend/src/services/mod.rs`** Add `pub mod source_scraper;` after `pub mod scraper;`. - [ ] **Step 3: Add unit tests** Add to the bottom of `source_scraper.rs`: ```rust #[cfg(test)] mod tests { use super::*; fn base_url(s: &str) -> Url { Url::parse(s).unwrap() } #[test] fn extracts_article_links_from_html() { let html = r#" Article 1 Article 2 Home "#; let base = base_url("https://example.com/blog"); let links = extract_links_from_html(html, &base, "example.com"); assert_eq!(links.len(), 2); assert!(links[0].contains("/blog/article-1")); assert!(links[1].contains("/blog/article-2")); } #[test] fn filters_external_links() { let html = r#"External"#; let base = base_url("https://example.com"); let links = extract_links_from_html(html, &base, "example.com"); assert!(links.is_empty()); } #[test] fn filters_non_article_patterns() { let html = r#" Tag Category Author Login "#; let base = base_url("https://example.com"); let links = extract_links_from_html(html, &base, "example.com"); assert!(links.is_empty()); } #[test] fn filters_static_assets() { let html = r#" CSS JS Image "#; let base = base_url("https://example.com"); let links = extract_links_from_html(html, &base, "example.com"); assert!(links.is_empty()); } #[test] fn deduplicates_links() { let html = r#" Link 1 Link 2 Link 3 "#; let base = base_url("https://example.com"); let links = extract_links_from_html(html, &base, "example.com"); assert_eq!(links.len(), 1); } #[test] fn resolves_relative_urls() { let html = r#"Relative"#; let base = base_url("https://example.com/blog/"); let links = extract_links_from_html(html, &base, "example.com"); assert_eq!(links.len(), 1); assert!(links[0].contains("/blog/my-post")); } #[test] fn allows_single_segment_paths() { let html = r#"Article"#; let base = base_url("https://example.com"); let links = extract_links_from_html(html, &base, "example.com"); assert_eq!(links.len(), 1); } #[test] fn empty_html_returns_empty() { let links = extract_links_from_html("", &base_url("https://example.com"), "example.com"); assert!(links.is_empty()); } } ``` - [ ] **Step 4: Run tests** Run: `cd backend && cargo test --lib source_scraper` Expected: 7 tests pass - [ ] **Step 5: Commit** ```bash git add backend/src/services/source_scraper.rs backend/src/services/mod.rs git commit -m "feat: add source_scraper module for extracting article links from source pages" ``` --- ### Task 2: Classification prompt and schema **Files:** - Modify: `backend/src/services/prompts.rs` - Modify: `backend/src/services/llm/schema.rs` - [ ] **Step 1: Add `build_classification_schema` to `schema.rs`** ```rust /// Build a JSON Schema for the article classification response. /// /// The LLM returns an array of assignments mapping article indices to category names. pub fn build_classification_schema() -> Value { serde_json::json!({ "type": "object", "properties": { "assignments": { "type": "array", "items": { "type": "object", "properties": { "index": { "type": "integer", "description": "Article index from the input list" }, "category": { "type": "string", "description": "Category name to assign this article to" } }, "required": ["index", "category"], "additionalProperties": false } } }, "required": ["assignments"], "additionalProperties": false }) } ``` - [ ] **Step 2: Add `build_classification_prompt` to `prompts.rs`** ```rust use crate::models::synthesis::ScrapedNewsItem; /// Build a prompt for classifying scraped articles into categories. /// /// # Arguments /// * `articles` — scraped articles to classify (title + body snippet used) /// * `categories` — user categories + "Autre" /// * `max_per_category` — max items allowed per category /// * `filled_counts` — how many items already fill each category (for Phase 2) pub fn build_classification_prompt( articles: &[ScrapedNewsItem], categories: &[String], max_per_category: i32, filled_counts: &std::collections::HashMap, ) -> (String, String) { let system_prompt = "Tu es un assistant qui classe des articles dans des categories. \ Reponds uniquement au format JSON demande." .to_string(); let articles_json: Vec = articles .iter() .enumerate() .map(|(i, a)| { let snippet: String = a.scraped_content.chars().take(500).collect(); serde_json::json!({ "index": i, "title": a.title, "url": a.url, "snippet": snippet }) }) .collect(); let categories_info: Vec = categories .iter() .map(|cat| { let filled = filled_counts.get(cat).copied().unwrap_or(0); let remaining = (max_per_category as usize).saturating_sub(filled); format!("- \"{}\" (encore {} places)", cat, remaining) }) .collect(); let user_prompt = format!( "Voici une liste d'articles :\n{articles}\n\n\ Categories disponibles :\n{categories}\n\n\ Classe chaque article dans la categorie la plus appropriee. \ Si un article ne correspond a aucune categorie, classe-le dans \"Autre\".\n\ Respecte le nombre de places restantes par categorie.", articles = serde_json::to_string_pretty(&articles_json).unwrap_or_default(), categories = categories_info.join("\n"), ); (system_prompt, user_prompt) } ``` - [ ] **Step 3: Update `test_settings()` in prompts.rs if needed** If `ScrapedNewsItem` is not already imported, add the import. The function uses types from `models::synthesis`. - [ ] **Step 4: Add tests** ```rust #[test] fn classification_prompt_includes_categories_and_articles() { let articles = vec![ ScrapedNewsItem { title: "GPT-5 Released".into(), url: "https://openai.com/blog/gpt5".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "OpenAI released GPT-5 today with major improvements".into(), }, ]; let categories = vec!["AI News".to_string(), "Autre".to_string()]; let filled = std::collections::HashMap::new(); let (_, user_prompt) = build_classification_prompt(&articles, &categories, 4, &filled); assert!(user_prompt.contains("GPT-5 Released")); assert!(user_prompt.contains("AI News")); assert!(user_prompt.contains("Autre")); assert!(user_prompt.contains("encore 4 places")); } #[test] fn classification_prompt_shows_reduced_capacity() { let articles = vec![ ScrapedNewsItem { title: "T".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "Content".into(), }, ]; let categories = vec!["AI News".to_string(), "Autre".to_string()]; let mut filled = std::collections::HashMap::new(); filled.insert("AI News".to_string(), 3); let (_, user_prompt) = build_classification_prompt(&articles, &categories, 4, &filled); assert!(user_prompt.contains("encore 1 places")); } ``` - [ ] **Step 5: Add a test for `build_classification_schema` in schema.rs** ```rust #[test] fn classification_schema_has_assignments_array() { let schema = build_classification_schema(); assert_eq!(schema["type"], "object"); let assignments = &schema["properties"]["assignments"]; assert_eq!(assignments["type"], "array"); let item_props = &assignments["items"]["properties"]; assert!(item_props.get("index").is_some()); assert!(item_props.get("category").is_some()); assert_eq!(assignments["items"]["additionalProperties"], false); } ``` - [ ] **Step 6: Run tests** Run: `cd backend && cargo test --lib` Expected: all tests pass - [ ] **Step 7: Commit** ```bash git add backend/src/services/prompts.rs backend/src/services/llm/schema.rs git commit -m "feat: add classification prompt and schema for article categorization" ``` --- ### Task 3: Classification response parsing + category filling **Files:** - Modify: `backend/src/services/synthesis.rs` Add helper functions for parsing the LLM classification response and filling categories. - [ ] **Step 1: Add `parse_classification_response` function** ```rust /// Parse the LLM classification response and assign articles to categories. /// /// Returns a HashMap of category_key → Vec. /// Invalid indices are ignored. Unknown categories default to "Autre". /// Case-insensitive category matching. fn parse_classification_response( response: &serde_json::Value, articles: &[ScrapedNewsItem], categories: &[String], max_per_category: i32, filled_counts: &mut HashMap, ) -> HashMap> { let max = max_per_category as usize; let mut result: HashMap> = HashMap::new(); // Build category name → key mapping (case-insensitive) let mut name_to_key: HashMap = HashMap::new(); for (i, cat) in categories.iter().enumerate() { let key = if cat == "Autre" { "category_autre".to_string() } else { format!("category_{}", i) }; name_to_key.insert(cat.to_lowercase(), key); } let assignments = response .get("assignments") .and_then(|a| a.as_array()) .cloned() .unwrap_or_default(); let mut assigned_indices = std::collections::HashSet::new(); for assignment in &assignments { let index = match assignment.get("index").and_then(|i| i.as_u64()) { Some(i) => i as usize, None => continue, }; if index >= articles.len() || assigned_indices.contains(&index) { continue; } let cat_name = assignment .get("category") .and_then(|c| c.as_str()) .unwrap_or("Autre") .to_string(); let cat_key = name_to_key .get(&cat_name.to_lowercase()) .cloned() .unwrap_or_else(|| "category_autre".to_string()); // Resolve the category name for counting let cat_display = categories .iter() .find(|c| c.to_lowercase() == cat_name.to_lowercase()) .cloned() .unwrap_or_else(|| "Autre".to_string()); let filled = filled_counts.get(&cat_display).copied().unwrap_or(0); if filled >= max { // Category full — assign to Autre if Autre has room let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0); if autre_filled < max { result.entry("category_autre".to_string()).or_default().push(articles[index].clone()); *filled_counts.entry("Autre".to_string()).or_insert(0) += 1; assigned_indices.insert(index); } continue; } result.entry(cat_key).or_default().push(articles[index].clone()); *filled_counts.entry(cat_display).or_insert(0) += 1; assigned_indices.insert(index); } // Unclassified articles → Autre for (i, article) in articles.iter().enumerate() { if !assigned_indices.contains(&i) { let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0); if autre_filled < max { result.entry("category_autre".to_string()).or_default().push(article.clone()); *filled_counts.entry("Autre".to_string()).or_insert(0) += 1; } } } result } ``` - [ ] **Step 2: Add unit tests** ```rust // ── parse_classification_response tests ───────────────────── #[test] fn classification_assigns_articles_to_categories() { use crate::models::synthesis::ScrapedNewsItem; let articles = vec![ ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ScrapedNewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ]; let categories = vec!["AI News".to_string(), "Autre".to_string()]; let response = serde_json::json!({ "assignments": [ {"index": 0, "category": "AI News"}, {"index": 1, "category": "Autre"} ] }); let mut filled = HashMap::new(); let result = parse_classification_response(&response, &articles, &categories, 4, &mut filled); assert_eq!(result.get("category_0").map(|v| v.len()), Some(1)); assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1)); } #[test] fn classification_unknown_category_goes_to_autre() { use crate::models::synthesis::ScrapedNewsItem; let articles = vec![ ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ]; let categories = vec!["AI News".to_string(), "Autre".to_string()]; let response = serde_json::json!({ "assignments": [{"index": 0, "category": "Unknown Category"}] }); let mut filled = HashMap::new(); let result = parse_classification_response(&response, &articles, &categories, 4, &mut filled); assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1)); } #[test] fn classification_respects_max_per_category() { use crate::models::synthesis::ScrapedNewsItem; let articles: Vec = (0..5).map(|i| ScrapedNewsItem { title: format!("Art{}", i), url: format!("https://a.com/{}", i), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(), }).collect(); let categories = vec!["AI News".to_string(), "Autre".to_string()]; let response = serde_json::json!({ "assignments": (0..5).map(|i| serde_json::json!({"index": i, "category": "AI News"})).collect::>() }); let mut filled = HashMap::new(); let result = parse_classification_response(&response, &articles, &categories, 2, &mut filled); // AI News capped at 2, overflow goes to Autre assert_eq!(result.get("category_0").map(|v| v.len()), Some(2)); assert!(result.get("category_autre").map(|v| v.len()).unwrap_or(0) > 0); } #[test] fn classification_invalid_index_ignored() { use crate::models::synthesis::ScrapedNewsItem; let articles = vec![ ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() }, ]; let categories = vec!["AI News".to_string(), "Autre".to_string()]; let response = serde_json::json!({ "assignments": [{"index": 99, "category": "AI News"}] }); let mut filled = HashMap::new(); let result = parse_classification_response(&response, &articles, &categories, 4, &mut filled); // Index 99 is invalid → article 0 is unclassified → goes to Autre assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1)); } ``` - [ ] **Step 3: Run tests** Run: `cd backend && cargo test --lib` Expected: all tests pass - [ ] **Step 4: Commit** ```bash git add backend/src/services/synthesis.rs git commit -m "feat: add classification response parsing with category filling and Autre fallback" ``` --- ### Task 4: Rewrite `run_generation_inner` with two-phase pipeline **Files:** - Modify: `backend/src/services/synthesis.rs` - Modify: `backend/src/services/prompts.rs` This is the core task — replace the existing single-pass pipeline with the two-phase approach. - [ ] **Step 1: Modify `build_search_prompt` to accept category gaps** In `prompts.rs`, add an optional parameter `category_gaps: Option<&[(String, i32)]>` to `build_search_prompt`. When provided, append to the prompt: "Find specifically: N articles for category X, M articles for category Y." If `None`, use the existing behavior (all categories, max_items_per_category each). Update the call signature and all existing callers (add `, None` to existing calls). - [ ] **Step 2: Rewrite `run_generation_inner`** Replace the body of `run_generation_inner` with the two-phase pipeline. The new flow: ``` Steps 1-4b: Same as today (load settings, sources, resolve provider, build schema, resolve models) Step 5: Build rate limiter (same) PHASE 1 (if sources not empty): 5a. Scrape source pages → extract article links (source_scraper::extract_article_links) 5b. Scrape candidate articles (existing scrape_articles, but from flat URL list) 5c. Filter empty content (filter_empty_scraped_articles) 5d. LLM classification call (build_classification_prompt + generate_rewrite_pass + parse_classification_response) 5e. Enforce max_articles_per_source on classified results PHASE 2 (if any user-defined category is under-filled): 6a. Compute category gaps 6b. Load recent domains for diversity (same as today) 6c. LLM search pass with gap-aware prompt 6d. Parse + filter_homepage_urls + dedup_by_url (cross-phase) + limit_articles_per_source (cross-phase) 6e. Scrape + filter empty 6f. LLM classification call (same function, with filled_counts from Phase 1) 6g. Fill remaining slots COMBINED: 7. Merge Phase 1 + Phase 2 results into single HashMap 8. Build rewrite schema (actual counts, omit empty categories, include "Autre" if non-empty) 9. Build rewrite prompt + LLM rewrite pass 10. Build final sections (include "Autre" if non-empty) 11. Restore scraped URLs 12. Sanitize + save to DB ``` Key changes to existing functions: - `build_rewrite_schema`: skip categories with 0 articles (remove `.max(1)`), add "Autre" support - `build_final_sections`: add "Autre" section if `category_autre` has articles - `scrape_articles`: needs a variant that takes a flat `Vec` of URLs (for Phase 1 source articles) instead of `Vec<(String, Vec)>` - [ ] **Step 3: Remove dead code** Delete `url_quality_sufficient`, `URL_QUALITY_THRESHOLD`, and their tests. - [ ] **Step 4: Run tests** Run: `cd backend && cargo test --lib` Expected: all tests pass (some existing synthesis tests may need updates for the new pipeline shape) - [ ] **Step 5: Commit** ```bash git add backend/src/services/synthesis.rs backend/src/services/prompts.rs git commit -m "feat: two-phase generation pipeline — personalized sources first, web search fallback" ``` --- ### Task 5: Update integration tests **Files:** - Modify: `backend/tests/api_syntheses_test.rs` - [ ] **Step 1: Update `generate_pipeline_resolves_model_from_admin_config`** The test triggers generation with a fake API key. With the new pipeline, Phase 1 will scrape the configured source, fail (no real content), fall through to Phase 2 (LLM search, which will also fail with fake key). The test should still verify that the pipeline doesn't crash with a database error — it should fail at the LLM call level, not at model resolution. Update the test's source URL to something that will return a valid HTML page but no article links (e.g., keep `https://example.com` which returns a simple page). The pipeline should gracefully skip Phase 1 and attempt Phase 2. - [ ] **Step 2: Run integration tests** Run: `cd backend && cargo test --no-run` (compile check, integration tests need Postgres) Expected: compiles without errors - [ ] **Step 3: Commit** ```bash git add backend/tests/api_syntheses_test.rs git commit -m "test: update integration tests for two-phase generation pipeline" ``` --- ### Task 6: Update E2E test **Files:** - Modify: `e2e/tests/generation-live.spec.ts` - [ ] **Step 1: Update settings payload** Ensure `max_articles_per_source` and `source_diversity_window` are included (already done from previous work, but verify). - [ ] **Step 2: Add personalized source assertion** After generation completes, verify that at least one article URL matches the configured source domain (e.g., if source is `https://openai.com/blog`, check that at least one article URL contains `openai.com`). - [ ] **Step 3: Verify "Autre" behavior** Add an assertion that if "Autre" appears as a section title, it has at most `max_items_per_category` articles and each article has valid content. - [ ] **Step 4: Run E2E test** Start Docker stack, seed, run: ```bash cd e2e && docker compose -f docker-compose.test.yml up --build -d sleep 25 && npx tsx seed.ts npx playwright test generation-live --reporter=list ``` Expected: test passes - [ ] **Step 5: Commit** ```bash git add e2e/tests/generation-live.spec.ts git commit -m "test: update E2E test for two-phase pipeline with source priority" ```