# Windowed Source Extraction — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Restructure Phase 1 to extract links in waves of `source_extraction_window` sources, stopping early when the synthesis is full. **Architecture:** New setting controls wave size. Phase 1's flat "extract all → process all" becomes a loop: "extract wave → filter → shuffle → batch process → check if full → next wave". All existing tracking (filled_counts, seen_urls, source_counts) carries across waves. **Tech Stack:** Rust (sqlx, tokio), SolidJS, PostgreSQL **Spec:** `docs/superpowers/specs/2026-03-26-windowed-source-extraction-design.md` --- ### Task 1: Add `source_extraction_window` setting **Files:** - Create: `backend/migrations/20260326000025_add_source_extraction_window.sql` - Modify: `backend/src/models/settings.rs` - Modify: `backend/src/db/settings.rs` - Modify: `backend/src/services/prompts.rs` (test fixture) - Modify: `backend/tests/api_syntheses_test.rs` (test fixture) - Modify: `backend/tests/pipeline_test.rs` (test fixture) - Modify: `frontend/src/types.ts` - Modify: `frontend/src/i18n/fr.ts` - Modify: `frontend/src/pages/Settings.tsx` - Modify: `e2e/tests/generation-live.spec.ts` - Modify: `CLAUDE.md` Same pattern as `batch_size`, `summary_length`, etc. - [ ] **Step 1: Create migration** ```sql ALTER TABLE settings ADD COLUMN source_extraction_window INTEGER NOT NULL DEFAULT 3; ``` - [ ] **Step 2: Add to all Rust settings structs** In `backend/src/models/settings.rs`: - Add `pub source_extraction_window: i32` to `UserSettings` and `UpdateSettingsRequest` - Add to `Default`: `source_extraction_window: 3,` - Add to `valid_request()`: `source_extraction_window: 3,` - Add validation: ```rust if !(1..=10).contains(&self.source_extraction_window) { return Err("source_extraction_window must be between 1 and 10".into()); } ``` - [ ] **Step 3: Update DB queries** Add `source_extraction_window` to `SettingsRow`, `TryFrom`, both SQL queries, `.bind()` calls. - [ ] **Step 4: Update all test fixtures** Add `source_extraction_window: 3,` (Rust) or `"source_extraction_window": 3` (JSON) to: - `prompts.rs` → `test_settings()` - `api_syntheses_test.rs` → settings JSON - `pipeline_test.rs` → settings JSON - `e2e/tests/generation-live.spec.ts` → settings payload - [ ] **Step 5: Frontend types + i18n + UI** In `frontend/src/types.ts`: add `source_extraction_window: number` and default `3`. In `frontend/src/i18n/fr.ts`: ```typescript 'settings.sourceExtractionWindow': 'Sources par vague d\'extraction', 'settings.sourceExtractionWindowHelp': 'Nombre de sources analysees en parallele a chaque vague. Reduit le nombre d\'appels IA quand peu de sources suffisent.', ``` In `frontend/src/pages/Settings.tsx`: add a number input (min=1, max=10) near `batch_size`. - [ ] **Step 6: Update CLAUDE.md** Change to `## Database (25 migrations)`. - [ ] **Step 7: Build and test** Run: `cd backend && cargo build && cargo test --lib` Run: `cd frontend && npx tsc --noEmit` - [ ] **Step 8: Commit** ```bash git add backend/migrations/20260326000025_add_source_extraction_window.sql \ backend/src/models/settings.rs backend/src/db/settings.rs \ backend/src/services/prompts.rs backend/tests/api_syntheses_test.rs \ backend/tests/pipeline_test.rs frontend/src/types.ts \ frontend/src/i18n/fr.ts frontend/src/pages/Settings.tsx \ e2e/tests/generation-live.spec.ts CLAUDE.md git commit -m "feat: add source_extraction_window setting (default 3)" ``` --- ### Task 2: Restructure Phase 1 into wave loop **Files:** - Modify: `backend/src/services/synthesis.rs` This is the core change. The current Phase 1 structure is: ``` 1a. Extract links from ALL sources (parallel JoinSet) 1b. Filter history 1c. Shuffle 1d. Batch scrape+classify loop ``` The new structure wraps this in a wave loop: ``` For each wave (chunk of source_extraction_window sources): 1a. Extract links from this wave's sources (parallel JoinSet) 1b. Filter history 1c. Shuffle 1d. Batch scrape+classify (existing loop) 1e. Check if max_total reached → break if full 1f. Flush traces ``` - [ ] **Step 1: Read the current Phase 1 code** Read `backend/src/services/synthesis.rs` from line ~288 ("PHASE 1: Personalized Sources") to line ~553 (end of Phase 1). Understand the structure: - Lines ~296-376: parallel link extraction JoinSet - Lines ~378-400: history filter - Lines ~402-409: shuffle + url_source tracking - Lines ~411-553: batch scrape+classify loop - [ ] **Step 2: Wrap in a wave loop** Replace the Phase 1 block. The new structure: ```rust if !sources.is_empty() { emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15); let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None); let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref()); let max_links = 15usize; let window_size = settings.source_extraction_window.max(1) as usize; let source_waves: Vec<&[Source]> = rotated_sources.chunks(window_size).collect(); let total_waves = source_waves.len(); 'wave_loop: for (wave_idx, wave_sources) in source_waves.iter().enumerate() { emit_progress(tx, "sources_scrape", &format!("Extraction des sources (vague {}/{})", wave_idx + 1, total_waves), 15 + ((wave_idx as u32 * 10) / total_waves.max(1) as u32).min(10) as u8); // 1a. Extract links from this wave's sources (parallel) let mut wave_urls: Vec<(String, String)> = Vec::new(); { let mut join_set = tokio::task::JoinSet::new(); for source in *wave_sources { // ... existing spawn logic (same as current code) } while let Some(join_result) = join_set.join_next().await { // ... existing result handling (same as current code) // Push to wave_urls instead of candidate_urls } } // 1b. Filter against article history if settings.article_history_days > 0 && !wave_urls.is_empty() { // ... existing history filter logic (same as current code) // Uses seen_urls which carries across waves } // 1c. Shuffle this wave's candidates use rand::seq::SliceRandom; wave_urls.shuffle(&mut rand::thread_rng()); // Track url -> source for (url, source_url) in &wave_urls { url_source.insert(url.clone(), source_url.clone()); } // 1d. Batch scrape+classify (existing loop, operates on wave_urls) if !wave_urls.is_empty() { // ... existing batch loop (same as current code) // Uses wave_urls instead of candidate_urls // filled_counts, source_counts, article_scraped carry across waves } // 1e. Check if full after this wave let total: usize = article_scraped.values().map(|v| v.len()).sum(); if total >= max_total { tracing::info!(wave = wave_idx + 1, total_waves, "Synthesis full after wave, skipping remaining sources"); break 'wave_loop; } // 1f. Flush traces if !pending_traces.is_empty() { db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok(); pending_traces.clear(); } } } ``` The key changes: - `candidate_urls` becomes `wave_urls` (scoped to each wave) - The parallel JoinSet no longer needs `max_concurrent=5` throttling — it processes at most `window_size` sources - `seen_urls`, `filled_counts`, `source_counts`, `article_scraped` are NOT reset between waves - The batch scrape+classify loop is unchanged — it just operates on `wave_urls` instead of `candidate_urls` - Progress percentages adjusted per wave **IMPORTANT:** Read the actual file carefully. The spawn logic inside the JoinSet is duplicated (seed tasks + continuation on join). With the windowed approach, the JoinSet only has `window_size` sources, so the `max_concurrent` throttle is no longer needed — just spawn all sources in the wave at once. - [ ] **Step 3: Build and test** Run: `cd backend && cargo build && cargo test --lib` Expected: All pass - [ ] **Step 4: Commit** ```bash git add backend/src/services/synthesis.rs git commit -m "feat: restructure Phase 1 into windowed source extraction waves" ```