27 KiB
Source Priority Pipeline — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Redesign the synthesis generation pipeline to scrape personalized sources first, classify articles with the LLM, then fall back to web search for unfilled categories.
Architecture: Two-phase pipeline. Phase 1: scrape user source pages → extract article links → scrape articles → LLM classification → fill categories. Phase 2: LLM web search for gaps → scrape → classify → fill remaining. Combined rewrite pass at the end.
Tech Stack: Rust (reqwest, scraper crate, serde_json), existing LLM providers
Spec: docs/superpowers/specs/2026-03-24-source-priority-pipeline-design.md
Task 1: Source page scraper module
Files:
- Create:
backend/src/services/source_scraper.rs - Modify:
backend/src/services/mod.rs
Create a new module that fetches a source page and extracts article links.
- Step 1: Create
backend/src/services/source_scraper.rs
The module should contain:
//! Source page scraper: fetches a source URL and extracts article links.
//!
//! Used in Phase 1 of the generation pipeline to discover articles
//! from user-configured sources before falling back to LLM web search.
use crate::errors::AppError;
use scraper::{Html, Selector};
use url::Url;
/// Patterns in URL paths that indicate non-article pages.
const EXCLUDED_PATH_PATTERNS: &[&str] = &[
"/tag/", "/category/", "/author/", "/page/", "/login", "/signup",
"/privacy", "/terms", "/search", "/contact", "/about",
];
/// File extensions that indicate static assets, not articles.
const EXCLUDED_EXTENSIONS: &[&str] = &[
".css", ".js", ".png", ".jpg", ".jpeg", ".gif", ".svg",
".pdf", ".zip", ".xml", ".json", ".ico", ".woff", ".woff2",
];
/// Extract article links from a source page.
///
/// Fetches the HTML at `source_url`, extracts all `<a href>` links,
/// filters to same-domain article-like URLs, deduplicates, and returns
/// up to `max_links` candidate URLs.
pub async fn extract_article_links(
http_client: &reqwest::Client,
source_url: &str,
max_links: usize,
) -> Result<Vec<String>, AppError> {
// Parse source URL to get base domain
let base_url = Url::parse(source_url)
.map_err(|e| AppError::BadRequest(format!("Invalid source URL: {}", e)))?;
let base_domain = base_url.host_str().unwrap_or("").to_lowercase();
// Fetch the page
let response = http_client
.get(source_url)
.send()
.await
.map_err(|e| {
tracing::warn!(url = source_url, error = %e, "Failed to fetch source page");
AppError::Internal(anyhow::anyhow!("Failed to fetch source page"))
})?;
if !response.status().is_success() {
tracing::warn!(url = source_url, status = %response.status(), "Source page returned non-200");
return Ok(Vec::new());
}
let html_text = response.text().await.map_err(|e| {
AppError::Internal(anyhow::anyhow!("Failed to read source page body: {}", e))
})?;
let links = extract_links_from_html(&html_text, &base_url, &base_domain);
Ok(links.into_iter().take(max_links).collect())
}
/// Extract and filter article links from HTML content.
///
/// This is a pure function (no I/O) for easy testing.
pub fn extract_links_from_html(
html: &str,
base_url: &Url,
base_domain: &str,
) -> Vec<String> {
let document = Html::parse_document(html);
let selector = Selector::parse("a[href]").unwrap();
let mut seen = std::collections::HashSet::new();
let mut links = Vec::new();
for element in document.select(&selector) {
if let Some(href) = element.value().attr("href") {
// Resolve relative URLs against the base
let resolved = match base_url.join(href) {
Ok(u) => u,
Err(_) => continue,
};
// Must be http(s)
if resolved.scheme() != "http" && resolved.scheme() != "https" {
continue;
}
// Must be same domain
let link_domain = resolved.host_str().unwrap_or("").to_lowercase();
if link_domain != base_domain {
continue;
}
// Must have a non-empty path (not just "/")
let path = resolved.path();
if path.is_empty() || path == "/" {
continue;
}
// Exclude non-article patterns
let path_lower = path.to_lowercase();
if EXCLUDED_PATH_PATTERNS.iter().any(|p| path_lower.contains(p)) {
continue;
}
// Exclude static assets
if EXCLUDED_EXTENSIONS.iter().any(|ext| path_lower.ends_with(ext)) {
continue;
}
// Normalize: strip fragment, keep path+query
let mut normalized = resolved.clone();
normalized.set_fragment(None);
let url_str = normalized.to_string();
// Deduplicate
if seen.insert(url_str.clone()) {
links.push(url_str);
}
}
}
links
}
- Step 2: Register module in
backend/src/services/mod.rs
Add pub mod source_scraper; after pub mod scraper;.
- Step 3: Add unit tests
Add to the bottom of source_scraper.rs:
#[cfg(test)]
mod tests {
use super::*;
fn base_url(s: &str) -> Url {
Url::parse(s).unwrap()
}
#[test]
fn extracts_article_links_from_html() {
let html = r#"
<html><body>
<a href="/blog/article-1">Article 1</a>
<a href="/blog/article-2">Article 2</a>
<a href="/">Home</a>
</body></html>"#;
let base = base_url("https://example.com/blog");
let links = extract_links_from_html(html, &base, "example.com");
assert_eq!(links.len(), 2);
assert!(links[0].contains("/blog/article-1"));
assert!(links[1].contains("/blog/article-2"));
}
#[test]
fn filters_external_links() {
let html = r#"<a href="https://other.com/article">External</a>"#;
let base = base_url("https://example.com");
let links = extract_links_from_html(html, &base, "example.com");
assert!(links.is_empty());
}
#[test]
fn filters_non_article_patterns() {
let html = r#"
<a href="/tag/ai">Tag</a>
<a href="/category/tech">Category</a>
<a href="/author/john">Author</a>
<a href="/login">Login</a>
"#;
let base = base_url("https://example.com");
let links = extract_links_from_html(html, &base, "example.com");
assert!(links.is_empty());
}
#[test]
fn filters_static_assets() {
let html = r#"
<a href="/style.css">CSS</a>
<a href="/script.js">JS</a>
<a href="/logo.png">Image</a>
"#;
let base = base_url("https://example.com");
let links = extract_links_from_html(html, &base, "example.com");
assert!(links.is_empty());
}
#[test]
fn deduplicates_links() {
let html = r#"
<a href="/article">Link 1</a>
<a href="/article">Link 2</a>
<a href="/article#section">Link 3</a>
"#;
let base = base_url("https://example.com");
let links = extract_links_from_html(html, &base, "example.com");
assert_eq!(links.len(), 1);
}
#[test]
fn resolves_relative_urls() {
let html = r#"<a href="my-post">Relative</a>"#;
let base = base_url("https://example.com/blog/");
let links = extract_links_from_html(html, &base, "example.com");
assert_eq!(links.len(), 1);
assert!(links[0].contains("/blog/my-post"));
}
#[test]
fn allows_single_segment_paths() {
let html = r#"<a href="/my-great-article">Article</a>"#;
let base = base_url("https://example.com");
let links = extract_links_from_html(html, &base, "example.com");
assert_eq!(links.len(), 1);
}
#[test]
fn empty_html_returns_empty() {
let links = extract_links_from_html("", &base_url("https://example.com"), "example.com");
assert!(links.is_empty());
}
}
- Step 4: Run tests
Run: cd backend && cargo test --lib source_scraper
Expected: 7 tests pass
- Step 5: Commit
git add backend/src/services/source_scraper.rs backend/src/services/mod.rs
git commit -m "feat: add source_scraper module for extracting article links from source pages"
Task 2: Classification prompt and schema
Files:
-
Modify:
backend/src/services/prompts.rs -
Modify:
backend/src/services/llm/schema.rs -
Step 1: Add
build_classification_schematoschema.rs
/// Build a JSON Schema for the article classification response.
///
/// The LLM returns an array of assignments mapping article indices to category names.
pub fn build_classification_schema() -> Value {
serde_json::json!({
"type": "object",
"properties": {
"assignments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"index": { "type": "integer", "description": "Article index from the input list" },
"category": { "type": "string", "description": "Category name to assign this article to" }
},
"required": ["index", "category"],
"additionalProperties": false
}
}
},
"required": ["assignments"],
"additionalProperties": false
})
}
- Step 2: Add
build_classification_prompttoprompts.rs
use crate::models::synthesis::ScrapedNewsItem;
/// Build a prompt for classifying scraped articles into categories.
///
/// # Arguments
/// * `articles` — scraped articles to classify (title + body snippet used)
/// * `categories` — user categories + "Autre"
/// * `max_per_category` — max items allowed per category
/// * `filled_counts` — how many items already fill each category (for Phase 2)
pub fn build_classification_prompt(
articles: &[ScrapedNewsItem],
categories: &[String],
max_per_category: i32,
filled_counts: &std::collections::HashMap<String, usize>,
) -> (String, String) {
let system_prompt =
"Tu es un assistant qui classe des articles dans des categories. \
Reponds uniquement au format JSON demande."
.to_string();
let articles_json: Vec<serde_json::Value> = articles
.iter()
.enumerate()
.map(|(i, a)| {
let snippet: String = a.scraped_content.chars().take(500).collect();
serde_json::json!({
"index": i,
"title": a.title,
"url": a.url,
"snippet": snippet
})
})
.collect();
let categories_info: Vec<String> = categories
.iter()
.map(|cat| {
let filled = filled_counts.get(cat).copied().unwrap_or(0);
let remaining = (max_per_category as usize).saturating_sub(filled);
format!("- \"{}\" (encore {} places)", cat, remaining)
})
.collect();
let user_prompt = format!(
"Voici une liste d'articles :\n{articles}\n\n\
Categories disponibles :\n{categories}\n\n\
Classe chaque article dans la categorie la plus appropriee. \
Si un article ne correspond a aucune categorie, classe-le dans \"Autre\".\n\
Respecte le nombre de places restantes par categorie.",
articles = serde_json::to_string_pretty(&articles_json).unwrap_or_default(),
categories = categories_info.join("\n"),
);
(system_prompt, user_prompt)
}
- Step 3: Update
test_settings()in prompts.rs if needed
If ScrapedNewsItem is not already imported, add the import. The function uses types from models::synthesis.
- Step 4: Add tests
#[test]
fn classification_prompt_includes_categories_and_articles() {
let articles = vec![
ScrapedNewsItem {
title: "GPT-5 Released".into(),
url: "https://openai.com/blog/gpt5".into(),
summary: "s".into(),
original_title: "t".into(),
scraped_content: "OpenAI released GPT-5 today with major improvements".into(),
},
];
let categories = vec!["AI News".to_string(), "Autre".to_string()];
let filled = std::collections::HashMap::new();
let (_, user_prompt) = build_classification_prompt(&articles, &categories, 4, &filled);
assert!(user_prompt.contains("GPT-5 Released"));
assert!(user_prompt.contains("AI News"));
assert!(user_prompt.contains("Autre"));
assert!(user_prompt.contains("encore 4 places"));
}
#[test]
fn classification_prompt_shows_reduced_capacity() {
let articles = vec![
ScrapedNewsItem {
title: "T".into(), url: "https://a.com/1".into(),
summary: "s".into(), original_title: "t".into(),
scraped_content: "Content".into(),
},
];
let categories = vec!["AI News".to_string(), "Autre".to_string()];
let mut filled = std::collections::HashMap::new();
filled.insert("AI News".to_string(), 3);
let (_, user_prompt) = build_classification_prompt(&articles, &categories, 4, &filled);
assert!(user_prompt.contains("encore 1 places"));
}
- Step 5: Add a test for
build_classification_schemain schema.rs
#[test]
fn classification_schema_has_assignments_array() {
let schema = build_classification_schema();
assert_eq!(schema["type"], "object");
let assignments = &schema["properties"]["assignments"];
assert_eq!(assignments["type"], "array");
let item_props = &assignments["items"]["properties"];
assert!(item_props.get("index").is_some());
assert!(item_props.get("category").is_some());
assert_eq!(assignments["items"]["additionalProperties"], false);
}
- Step 6: Run tests
Run: cd backend && cargo test --lib
Expected: all tests pass
- Step 7: Commit
git add backend/src/services/prompts.rs backend/src/services/llm/schema.rs
git commit -m "feat: add classification prompt and schema for article categorization"
Task 3: Classification response parsing + category filling
Files:
- Modify:
backend/src/services/synthesis.rs
Add helper functions for parsing the LLM classification response and filling categories.
- Step 1: Add
parse_classification_responsefunction
/// Parse the LLM classification response and assign articles to categories.
///
/// Returns a HashMap of category_key → Vec<ScrapedNewsItem>.
/// Invalid indices are ignored. Unknown categories default to "Autre".
/// Case-insensitive category matching.
fn parse_classification_response(
response: &serde_json::Value,
articles: &[ScrapedNewsItem],
categories: &[String],
max_per_category: i32,
filled_counts: &mut HashMap<String, usize>,
) -> HashMap<String, Vec<ScrapedNewsItem>> {
let max = max_per_category as usize;
let mut result: HashMap<String, Vec<ScrapedNewsItem>> = HashMap::new();
// Build category name → key mapping (case-insensitive)
let mut name_to_key: HashMap<String, String> = HashMap::new();
for (i, cat) in categories.iter().enumerate() {
let key = if cat == "Autre" {
"category_autre".to_string()
} else {
format!("category_{}", i)
};
name_to_key.insert(cat.to_lowercase(), key);
}
let assignments = response
.get("assignments")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let mut assigned_indices = std::collections::HashSet::new();
for assignment in &assignments {
let index = match assignment.get("index").and_then(|i| i.as_u64()) {
Some(i) => i as usize,
None => continue,
};
if index >= articles.len() || assigned_indices.contains(&index) {
continue;
}
let cat_name = assignment
.get("category")
.and_then(|c| c.as_str())
.unwrap_or("Autre")
.to_string();
let cat_key = name_to_key
.get(&cat_name.to_lowercase())
.cloned()
.unwrap_or_else(|| "category_autre".to_string());
// Resolve the category name for counting
let cat_display = categories
.iter()
.find(|c| c.to_lowercase() == cat_name.to_lowercase())
.cloned()
.unwrap_or_else(|| "Autre".to_string());
let filled = filled_counts.get(&cat_display).copied().unwrap_or(0);
if filled >= max {
// Category full — assign to Autre if Autre has room
let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0);
if autre_filled < max {
result.entry("category_autre".to_string()).or_default().push(articles[index].clone());
*filled_counts.entry("Autre".to_string()).or_insert(0) += 1;
assigned_indices.insert(index);
}
continue;
}
result.entry(cat_key).or_default().push(articles[index].clone());
*filled_counts.entry(cat_display).or_insert(0) += 1;
assigned_indices.insert(index);
}
// Unclassified articles → Autre
for (i, article) in articles.iter().enumerate() {
if !assigned_indices.contains(&i) {
let autre_filled = filled_counts.get("Autre").copied().unwrap_or(0);
if autre_filled < max {
result.entry("category_autre".to_string()).or_default().push(article.clone());
*filled_counts.entry("Autre".to_string()).or_insert(0) += 1;
}
}
}
result
}
- Step 2: Add unit tests
// ── parse_classification_response tests ─────────────────────
#[test]
fn classification_assigns_articles_to_categories() {
use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() },
ScrapedNewsItem { title: "B".into(), url: "https://b.com/2".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() },
];
let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({
"assignments": [
{"index": 0, "category": "AI News"},
{"index": 1, "category": "Autre"}
]
});
let mut filled = HashMap::new();
let result = parse_classification_response(&response, &articles, &categories, 4, &mut filled);
assert_eq!(result.get("category_0").map(|v| v.len()), Some(1));
assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1));
}
#[test]
fn classification_unknown_category_goes_to_autre() {
use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() },
];
let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({
"assignments": [{"index": 0, "category": "Unknown Category"}]
});
let mut filled = HashMap::new();
let result = parse_classification_response(&response, &articles, &categories, 4, &mut filled);
assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1));
}
#[test]
fn classification_respects_max_per_category() {
use crate::models::synthesis::ScrapedNewsItem;
let articles: Vec<ScrapedNewsItem> = (0..5).map(|i| ScrapedNewsItem {
title: format!("Art{}", i), url: format!("https://a.com/{}", i),
summary: "s".into(), original_title: "t".into(), scraped_content: "c".into(),
}).collect();
let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({
"assignments": (0..5).map(|i| serde_json::json!({"index": i, "category": "AI News"})).collect::<Vec<_>>()
});
let mut filled = HashMap::new();
let result = parse_classification_response(&response, &articles, &categories, 2, &mut filled);
// AI News capped at 2, overflow goes to Autre
assert_eq!(result.get("category_0").map(|v| v.len()), Some(2));
assert!(result.get("category_autre").map(|v| v.len()).unwrap_or(0) > 0);
}
#[test]
fn classification_invalid_index_ignored() {
use crate::models::synthesis::ScrapedNewsItem;
let articles = vec![
ScrapedNewsItem { title: "A".into(), url: "https://a.com/1".into(), summary: "s".into(), original_title: "t".into(), scraped_content: "c".into() },
];
let categories = vec!["AI News".to_string(), "Autre".to_string()];
let response = serde_json::json!({
"assignments": [{"index": 99, "category": "AI News"}]
});
let mut filled = HashMap::new();
let result = parse_classification_response(&response, &articles, &categories, 4, &mut filled);
// Index 99 is invalid → article 0 is unclassified → goes to Autre
assert_eq!(result.get("category_autre").map(|v| v.len()), Some(1));
}
- Step 3: Run tests
Run: cd backend && cargo test --lib
Expected: all tests pass
- Step 4: Commit
git add backend/src/services/synthesis.rs
git commit -m "feat: add classification response parsing with category filling and Autre fallback"
Task 4: Rewrite run_generation_inner with two-phase pipeline
Files:
- Modify:
backend/src/services/synthesis.rs - Modify:
backend/src/services/prompts.rs
This is the core task — replace the existing single-pass pipeline with the two-phase approach.
- Step 1: Modify
build_search_promptto accept category gaps
In prompts.rs, add an optional parameter category_gaps: Option<&[(String, i32)]> to build_search_prompt. When provided, append to the prompt: "Find specifically: N articles for category X, M articles for category Y."
If None, use the existing behavior (all categories, max_items_per_category each).
Update the call signature and all existing callers (add , None to existing calls).
- Step 2: Rewrite
run_generation_inner
Replace the body of run_generation_inner with the two-phase pipeline. The new flow:
Steps 1-4b: Same as today (load settings, sources, resolve provider, build schema, resolve models)
Step 5: Build rate limiter (same)
PHASE 1 (if sources not empty):
5a. Scrape source pages → extract article links (source_scraper::extract_article_links)
5b. Scrape candidate articles (existing scrape_articles, but from flat URL list)
5c. Filter empty content (filter_empty_scraped_articles)
5d. LLM classification call (build_classification_prompt + generate_rewrite_pass + parse_classification_response)
5e. Enforce max_articles_per_source on classified results
PHASE 2 (if any user-defined category is under-filled):
6a. Compute category gaps
6b. Load recent domains for diversity (same as today)
6c. LLM search pass with gap-aware prompt
6d. Parse + filter_homepage_urls + dedup_by_url (cross-phase) + limit_articles_per_source (cross-phase)
6e. Scrape + filter empty
6f. LLM classification call (same function, with filled_counts from Phase 1)
6g. Fill remaining slots
COMBINED:
7. Merge Phase 1 + Phase 2 results into single HashMap
8. Build rewrite schema (actual counts, omit empty categories, include "Autre" if non-empty)
9. Build rewrite prompt + LLM rewrite pass
10. Build final sections (include "Autre" if non-empty)
11. Restore scraped URLs
12. Sanitize + save to DB
Key changes to existing functions:
-
build_rewrite_schema: skip categories with 0 articles (remove.max(1)), add "Autre" support -
build_final_sections: add "Autre" section ifcategory_autrehas articles -
scrape_articles: needs a variant that takes a flatVec<String>of URLs (for Phase 1 source articles) instead ofVec<(String, Vec<NewsItem>)> -
Step 3: Remove dead code
Delete url_quality_sufficient, URL_QUALITY_THRESHOLD, and their tests.
- Step 4: Run tests
Run: cd backend && cargo test --lib
Expected: all tests pass (some existing synthesis tests may need updates for the new pipeline shape)
- Step 5: Commit
git add backend/src/services/synthesis.rs backend/src/services/prompts.rs
git commit -m "feat: two-phase generation pipeline — personalized sources first, web search fallback"
Task 5: Update integration tests
Files:
-
Modify:
backend/tests/api_syntheses_test.rs -
Step 1: Update
generate_pipeline_resolves_model_from_admin_config
The test triggers generation with a fake API key. With the new pipeline, Phase 1 will scrape the configured source, fail (no real content), fall through to Phase 2 (LLM search, which will also fail with fake key). The test should still verify that the pipeline doesn't crash with a database error — it should fail at the LLM call level, not at model resolution.
Update the test's source URL to something that will return a valid HTML page but no article links (e.g., keep https://example.com which returns a simple page). The pipeline should gracefully skip Phase 1 and attempt Phase 2.
- Step 2: Run integration tests
Run: cd backend && cargo test --no-run (compile check, integration tests need Postgres)
Expected: compiles without errors
- Step 3: Commit
git add backend/tests/api_syntheses_test.rs
git commit -m "test: update integration tests for two-phase generation pipeline"
Task 6: Update E2E test
Files:
-
Modify:
e2e/tests/generation-live.spec.ts -
Step 1: Update settings payload
Ensure max_articles_per_source and source_diversity_window are included (already done from previous work, but verify).
- Step 2: Add personalized source assertion
After generation completes, verify that at least one article URL matches the configured source domain (e.g., if source is https://openai.com/blog, check that at least one article URL contains openai.com).
- Step 3: Verify "Autre" behavior
Add an assertion that if "Autre" appears as a section title, it has at most max_items_per_category articles and each article has valid content.
- Step 4: Run E2E test
Start Docker stack, seed, run:
cd e2e && docker compose -f docker-compose.test.yml up --build -d
sleep 25 && npx tsx seed.ts
npx playwright test generation-live --reporter=list
Expected: test passes
- Step 5: Commit
git add e2e/tests/generation-live.spec.ts
git commit -m "test: update E2E test for two-phase pipeline with source priority"