docs: add implementation plan for windowed source extraction

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 5426b342ef
commit 94a200a2ab

@ -0,0 +1,222 @@
# Windowed Source Extraction — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Restructure Phase 1 to extract links in waves of `source_extraction_window` sources, stopping early when the synthesis is full.
**Architecture:** New setting controls wave size. Phase 1's flat "extract all → process all" becomes a loop: "extract wave → filter → shuffle → batch process → check if full → next wave". All existing tracking (filled_counts, seen_urls, source_counts) carries across waves.
**Tech Stack:** Rust (sqlx, tokio), SolidJS, PostgreSQL
**Spec:** `docs/superpowers/specs/2026-03-26-windowed-source-extraction-design.md`
---
### Task 1: Add `source_extraction_window` setting
**Files:**
- Create: `backend/migrations/20260326000025_add_source_extraction_window.sql`
- Modify: `backend/src/models/settings.rs`
- Modify: `backend/src/db/settings.rs`
- Modify: `backend/src/services/prompts.rs` (test fixture)
- Modify: `backend/tests/api_syntheses_test.rs` (test fixture)
- Modify: `backend/tests/pipeline_test.rs` (test fixture)
- Modify: `frontend/src/types.ts`
- Modify: `frontend/src/i18n/fr.ts`
- Modify: `frontend/src/pages/Settings.tsx`
- Modify: `e2e/tests/generation-live.spec.ts`
- Modify: `CLAUDE.md`
Same pattern as `batch_size`, `summary_length`, etc.
- [ ] **Step 1: Create migration**
```sql
ALTER TABLE settings ADD COLUMN source_extraction_window INTEGER NOT NULL DEFAULT 3;
```
- [ ] **Step 2: Add to all Rust settings structs**
In `backend/src/models/settings.rs`:
- Add `pub source_extraction_window: i32` to `UserSettings` and `UpdateSettingsRequest`
- Add to `Default`: `source_extraction_window: 3,`
- Add to `valid_request()`: `source_extraction_window: 3,`
- Add validation:
```rust
if !(1..=10).contains(&self.source_extraction_window) {
return Err("source_extraction_window must be between 1 and 10".into());
}
```
- [ ] **Step 3: Update DB queries**
Add `source_extraction_window` to `SettingsRow`, `TryFrom`, both SQL queries, `.bind()` calls.
- [ ] **Step 4: Update all test fixtures**
Add `source_extraction_window: 3,` (Rust) or `"source_extraction_window": 3` (JSON) to:
- `prompts.rs``test_settings()`
- `api_syntheses_test.rs` → settings JSON
- `pipeline_test.rs` → settings JSON
- `e2e/tests/generation-live.spec.ts` → settings payload
- [ ] **Step 5: Frontend types + i18n + UI**
In `frontend/src/types.ts`: add `source_extraction_window: number` and default `3`.
In `frontend/src/i18n/fr.ts`:
```typescript
'settings.sourceExtractionWindow': 'Sources par vague d\'extraction',
'settings.sourceExtractionWindowHelp': 'Nombre de sources analysees en parallele a chaque vague. Reduit le nombre d\'appels IA quand peu de sources suffisent.',
```
In `frontend/src/pages/Settings.tsx`: add a number input (min=1, max=10) near `batch_size`.
- [ ] **Step 6: Update CLAUDE.md**
Change to `## Database (25 migrations)`.
- [ ] **Step 7: Build and test**
Run: `cd backend && cargo build && cargo test --lib`
Run: `cd frontend && npx tsc --noEmit`
- [ ] **Step 8: Commit**
```bash
git add backend/migrations/20260326000025_add_source_extraction_window.sql \
backend/src/models/settings.rs backend/src/db/settings.rs \
backend/src/services/prompts.rs backend/tests/api_syntheses_test.rs \
backend/tests/pipeline_test.rs frontend/src/types.ts \
frontend/src/i18n/fr.ts frontend/src/pages/Settings.tsx \
e2e/tests/generation-live.spec.ts CLAUDE.md
git commit -m "feat: add source_extraction_window setting (default 3)"
```
---
### Task 2: Restructure Phase 1 into wave loop
**Files:**
- Modify: `backend/src/services/synthesis.rs`
This is the core change. The current Phase 1 structure is:
```
1a. Extract links from ALL sources (parallel JoinSet)
1b. Filter history
1c. Shuffle
1d. Batch scrape+classify loop
```
The new structure wraps this in a wave loop:
```
For each wave (chunk of source_extraction_window sources):
1a. Extract links from this wave's sources (parallel JoinSet)
1b. Filter history
1c. Shuffle
1d. Batch scrape+classify (existing loop)
1e. Check if max_total reached → break if full
1f. Flush traces
```
- [ ] **Step 1: Read the current Phase 1 code**
Read `backend/src/services/synthesis.rs` from line ~288 ("PHASE 1: Personalized Sources") to line ~553 (end of Phase 1). Understand the structure:
- Lines ~296-376: parallel link extraction JoinSet
- Lines ~378-400: history filter
- Lines ~402-409: shuffle + url_source tracking
- Lines ~411-553: batch scrape+classify loop
- [ ] **Step 2: Wrap in a wave loop**
Replace the Phase 1 block. The new structure:
```rust
if !sources.is_empty() {
emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15);
let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
let max_links = 15usize;
let window_size = settings.source_extraction_window.max(1) as usize;
let source_waves: Vec<&[Source]> = rotated_sources.chunks(window_size).collect();
let total_waves = source_waves.len();
'wave_loop: for (wave_idx, wave_sources) in source_waves.iter().enumerate() {
emit_progress(tx, "sources_scrape",
&format!("Extraction des sources (vague {}/{})", wave_idx + 1, total_waves),
15 + ((wave_idx as u32 * 10) / total_waves.max(1) as u32).min(10) as u8);
// 1a. Extract links from this wave's sources (parallel)
let mut wave_urls: Vec<(String, String)> = Vec::new();
{
let mut join_set = tokio::task::JoinSet::new();
for source in *wave_sources {
// ... existing spawn logic (same as current code)
}
while let Some(join_result) = join_set.join_next().await {
// ... existing result handling (same as current code)
// Push to wave_urls instead of candidate_urls
}
}
// 1b. Filter against article history
if settings.article_history_days > 0 && !wave_urls.is_empty() {
// ... existing history filter logic (same as current code)
// Uses seen_urls which carries across waves
}
// 1c. Shuffle this wave's candidates
use rand::seq::SliceRandom;
wave_urls.shuffle(&mut rand::thread_rng());
// Track url -> source
for (url, source_url) in &wave_urls {
url_source.insert(url.clone(), source_url.clone());
}
// 1d. Batch scrape+classify (existing loop, operates on wave_urls)
if !wave_urls.is_empty() {
// ... existing batch loop (same as current code)
// Uses wave_urls instead of candidate_urls
// filled_counts, source_counts, article_scraped carry across waves
}
// 1e. Check if full after this wave
let total: usize = article_scraped.values().map(|v| v.len()).sum();
if total >= max_total {
tracing::info!(wave = wave_idx + 1, total_waves, "Synthesis full after wave, skipping remaining sources");
break 'wave_loop;
}
// 1f. Flush traces
if !pending_traces.is_empty() {
db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
pending_traces.clear();
}
}
}
```
The key changes:
- `candidate_urls` becomes `wave_urls` (scoped to each wave)
- The parallel JoinSet no longer needs `max_concurrent=5` throttling — it processes at most `window_size` sources
- `seen_urls`, `filled_counts`, `source_counts`, `article_scraped` are NOT reset between waves
- The batch scrape+classify loop is unchanged — it just operates on `wave_urls` instead of `candidate_urls`
- Progress percentages adjusted per wave
**IMPORTANT:** Read the actual file carefully. The spawn logic inside the JoinSet is duplicated (seed tasks + continuation on join). With the windowed approach, the JoinSet only has `window_size` sources, so the `max_concurrent` throttle is no longer needed — just spawn all sources in the wave at once.
- [ ] **Step 3: Build and test**
Run: `cd backend && cargo build && cargo test --lib`
Expected: All pass
- [ ] **Step 4: Commit**
```bash
git add backend/src/services/synthesis.rs
git commit -m "feat: restructure Phase 1 into windowed source extraction waves"
```
Loading…
Cancel
Save