8.4 KiB
Windowed Source Extraction — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Restructure Phase 1 to extract links in waves of source_extraction_window sources, stopping early when the synthesis is full.
Architecture: New setting controls wave size. Phase 1's flat "extract all → process all" becomes a loop: "extract wave → filter → shuffle → batch process → check if full → next wave". All existing tracking (filled_counts, seen_urls, source_counts) carries across waves.
Tech Stack: Rust (sqlx, tokio), SolidJS, PostgreSQL
Spec: docs/superpowers/specs/2026-03-26-windowed-source-extraction-design.md
Task 1: Add source_extraction_window setting
Files:
- Create:
backend/migrations/20260326000025_add_source_extraction_window.sql - Modify:
backend/src/models/settings.rs - Modify:
backend/src/db/settings.rs - Modify:
backend/src/services/prompts.rs(test fixture) - Modify:
backend/tests/api_syntheses_test.rs(test fixture) - Modify:
backend/tests/pipeline_test.rs(test fixture) - Modify:
frontend/src/types.ts - Modify:
frontend/src/i18n/fr.ts - Modify:
frontend/src/pages/Settings.tsx - Modify:
e2e/tests/generation-live.spec.ts - Modify:
CLAUDE.md
Same pattern as batch_size, summary_length, etc.
- Step 1: Create migration
ALTER TABLE settings ADD COLUMN source_extraction_window INTEGER NOT NULL DEFAULT 3;
- Step 2: Add to all Rust settings structs
In backend/src/models/settings.rs:
- Add
pub source_extraction_window: i32toUserSettingsandUpdateSettingsRequest - Add to
Default:source_extraction_window: 3, - Add to
valid_request():source_extraction_window: 3, - Add validation:
if !(1..=10).contains(&self.source_extraction_window) {
return Err("source_extraction_window must be between 1 and 10".into());
}
- Step 3: Update DB queries
Add source_extraction_window to SettingsRow, TryFrom, both SQL queries, .bind() calls.
- Step 4: Update all test fixtures
Add source_extraction_window: 3, (Rust) or "source_extraction_window": 3 (JSON) to:
-
prompts.rs→test_settings() -
api_syntheses_test.rs→ settings JSON -
pipeline_test.rs→ settings JSON -
e2e/tests/generation-live.spec.ts→ settings payload -
Step 5: Frontend types + i18n + UI
In frontend/src/types.ts: add source_extraction_window: number and default 3.
In frontend/src/i18n/fr.ts:
'settings.sourceExtractionWindow': 'Sources par vague d\'extraction',
'settings.sourceExtractionWindowHelp': 'Nombre de sources analysees en parallele a chaque vague. Reduit le nombre d\'appels IA quand peu de sources suffisent.',
In frontend/src/pages/Settings.tsx: add a number input (min=1, max=10) near batch_size.
- Step 6: Update CLAUDE.md
Change to ## Database (25 migrations).
- Step 7: Build and test
Run: cd backend && cargo build && cargo test --lib
Run: cd frontend && npx tsc --noEmit
- Step 8: Commit
git add backend/migrations/20260326000025_add_source_extraction_window.sql \
backend/src/models/settings.rs backend/src/db/settings.rs \
backend/src/services/prompts.rs backend/tests/api_syntheses_test.rs \
backend/tests/pipeline_test.rs frontend/src/types.ts \
frontend/src/i18n/fr.ts frontend/src/pages/Settings.tsx \
e2e/tests/generation-live.spec.ts CLAUDE.md
git commit -m "feat: add source_extraction_window setting (default 3)"
Task 2: Restructure Phase 1 into wave loop
Files:
- Modify:
backend/src/services/synthesis.rs
This is the core change. The current Phase 1 structure is:
1a. Extract links from ALL sources (parallel JoinSet)
1b. Filter history
1c. Shuffle
1d. Batch scrape+classify loop
The new structure wraps this in a wave loop:
For each wave (chunk of source_extraction_window sources):
1a. Extract links from this wave's sources (parallel JoinSet)
1b. Filter history
1c. Shuffle
1d. Batch scrape+classify (existing loop)
1e. Check if max_total reached → break if full
1f. Flush traces
- Step 1: Read the current Phase 1 code
Read backend/src/services/synthesis.rs from line ~288 ("PHASE 1: Personalized Sources") to line ~553 (end of Phase 1). Understand the structure:
-
Lines ~296-376: parallel link extraction JoinSet
-
Lines ~378-400: history filter
-
Lines ~402-409: shuffle + url_source tracking
-
Lines ~411-553: batch scrape+classify loop
-
Step 2: Wrap in a wave loop
Replace the Phase 1 block. The new structure:
if !sources.is_empty() {
emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15);
let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
let max_links = 15usize;
let window_size = settings.source_extraction_window.max(1) as usize;
let source_waves: Vec<&[Source]> = rotated_sources.chunks(window_size).collect();
let total_waves = source_waves.len();
'wave_loop: for (wave_idx, wave_sources) in source_waves.iter().enumerate() {
emit_progress(tx, "sources_scrape",
&format!("Extraction des sources (vague {}/{})", wave_idx + 1, total_waves),
15 + ((wave_idx as u32 * 10) / total_waves.max(1) as u32).min(10) as u8);
// 1a. Extract links from this wave's sources (parallel)
let mut wave_urls: Vec<(String, String)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
for source in *wave_sources {
// ... existing spawn logic (same as current code)
}
while let Some(join_result) = join_set.join_next().await {
// ... existing result handling (same as current code)
// Push to wave_urls instead of candidate_urls
}
}
// 1b. Filter against article history
if settings.article_history_days > 0 && !wave_urls.is_empty() {
// ... existing history filter logic (same as current code)
// Uses seen_urls which carries across waves
}
// 1c. Shuffle this wave's candidates
use rand::seq::SliceRandom;
wave_urls.shuffle(&mut rand::thread_rng());
// Track url -> source
for (url, source_url) in &wave_urls {
url_source.insert(url.clone(), source_url.clone());
}
// 1d. Batch scrape+classify (existing loop, operates on wave_urls)
if !wave_urls.is_empty() {
// ... existing batch loop (same as current code)
// Uses wave_urls instead of candidate_urls
// filled_counts, source_counts, article_scraped carry across waves
}
// 1e. Check if full after this wave
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
tracing::info!(wave = wave_idx + 1, total_waves, "Synthesis full after wave, skipping remaining sources");
break 'wave_loop;
}
// 1f. Flush traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
The key changes:
candidate_urlsbecomeswave_urls(scoped to each wave)- The parallel JoinSet no longer needs
max_concurrent=5throttling — it processes at mostwindow_sizesources seen_urls,filled_counts,source_counts,article_scrapedare NOT reset between waves- The batch scrape+classify loop is unchanged — it just operates on
wave_urlsinstead ofcandidate_urls - Progress percentages adjusted per wave
IMPORTANT: Read the actual file carefully. The spawn logic inside the JoinSet is duplicated (seed tasks + continuation on join). With the windowed approach, the JoinSet only has window_size sources, so the max_concurrent throttle is no longer needed — just spawn all sources in the wave at once.
- Step 3: Build and test
Run: cd backend && cargo build && cargo test --lib
Expected: All pass
- Step 4: Commit
git add backend/src/services/synthesis.rs
git commit -m "feat: restructure Phase 1 into windowed source extraction waves"