You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ai_synth/docs/superpowers/plans/2026-03-26-windowed-source-...

8.4 KiB

Windowed Source Extraction — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Restructure Phase 1 to extract links in waves of source_extraction_window sources, stopping early when the synthesis is full.

Architecture: New setting controls wave size. Phase 1's flat "extract all → process all" becomes a loop: "extract wave → filter → shuffle → batch process → check if full → next wave". All existing tracking (filled_counts, seen_urls, source_counts) carries across waves.

Tech Stack: Rust (sqlx, tokio), SolidJS, PostgreSQL

Spec: docs/superpowers/specs/2026-03-26-windowed-source-extraction-design.md


Task 1: Add source_extraction_window setting

Files:

  • Create: backend/migrations/20260326000025_add_source_extraction_window.sql
  • Modify: backend/src/models/settings.rs
  • Modify: backend/src/db/settings.rs
  • Modify: backend/src/services/prompts.rs (test fixture)
  • Modify: backend/tests/api_syntheses_test.rs (test fixture)
  • Modify: backend/tests/pipeline_test.rs (test fixture)
  • Modify: frontend/src/types.ts
  • Modify: frontend/src/i18n/fr.ts
  • Modify: frontend/src/pages/Settings.tsx
  • Modify: e2e/tests/generation-live.spec.ts
  • Modify: CLAUDE.md

Same pattern as batch_size, summary_length, etc.

  • Step 1: Create migration
ALTER TABLE settings ADD COLUMN source_extraction_window INTEGER NOT NULL DEFAULT 3;
  • Step 2: Add to all Rust settings structs

In backend/src/models/settings.rs:

  • Add pub source_extraction_window: i32 to UserSettings and UpdateSettingsRequest
  • Add to Default: source_extraction_window: 3,
  • Add to valid_request(): source_extraction_window: 3,
  • Add validation:
if !(1..=10).contains(&self.source_extraction_window) {
    return Err("source_extraction_window must be between 1 and 10".into());
}
  • Step 3: Update DB queries

Add source_extraction_window to SettingsRow, TryFrom, both SQL queries, .bind() calls.

  • Step 4: Update all test fixtures

Add source_extraction_window: 3, (Rust) or "source_extraction_window": 3 (JSON) to:

  • prompts.rstest_settings()

  • api_syntheses_test.rs → settings JSON

  • pipeline_test.rs → settings JSON

  • e2e/tests/generation-live.spec.ts → settings payload

  • Step 5: Frontend types + i18n + UI

In frontend/src/types.ts: add source_extraction_window: number and default 3.

In frontend/src/i18n/fr.ts:

'settings.sourceExtractionWindow': 'Sources par vague d\'extraction',
'settings.sourceExtractionWindowHelp': 'Nombre de sources analysees en parallele a chaque vague. Reduit le nombre d\'appels IA quand peu de sources suffisent.',

In frontend/src/pages/Settings.tsx: add a number input (min=1, max=10) near batch_size.

  • Step 6: Update CLAUDE.md

Change to ## Database (25 migrations).

  • Step 7: Build and test

Run: cd backend && cargo build && cargo test --lib Run: cd frontend && npx tsc --noEmit

  • Step 8: Commit
git add backend/migrations/20260326000025_add_source_extraction_window.sql \
  backend/src/models/settings.rs backend/src/db/settings.rs \
  backend/src/services/prompts.rs backend/tests/api_syntheses_test.rs \
  backend/tests/pipeline_test.rs frontend/src/types.ts \
  frontend/src/i18n/fr.ts frontend/src/pages/Settings.tsx \
  e2e/tests/generation-live.spec.ts CLAUDE.md
git commit -m "feat: add source_extraction_window setting (default 3)"

Task 2: Restructure Phase 1 into wave loop

Files:

  • Modify: backend/src/services/synthesis.rs

This is the core change. The current Phase 1 structure is:

1a. Extract links from ALL sources (parallel JoinSet)
1b. Filter history
1c. Shuffle
1d. Batch scrape+classify loop

The new structure wraps this in a wave loop:

For each wave (chunk of source_extraction_window sources):
  1a. Extract links from this wave's sources (parallel JoinSet)
  1b. Filter history
  1c. Shuffle
  1d. Batch scrape+classify (existing loop)
  1e. Check if max_total reached → break if full
  1f. Flush traces
  • Step 1: Read the current Phase 1 code

Read backend/src/services/synthesis.rs from line ~288 ("PHASE 1: Personalized Sources") to line ~553 (end of Phase 1). Understand the structure:

  • Lines ~296-376: parallel link extraction JoinSet

  • Lines ~378-400: history filter

  • Lines ~402-409: shuffle + url_source tracking

  • Lines ~411-553: batch scrape+classify loop

  • Step 2: Wrap in a wave loop

Replace the Phase 1 block. The new structure:

if !sources.is_empty() {
    emit_progress(tx, "sources_scrape", "Analyse des sources personnalisees...", 15);

    let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
    let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
    let max_links = 15usize;
    let window_size = settings.source_extraction_window.max(1) as usize;
    let source_waves: Vec<&[Source]> = rotated_sources.chunks(window_size).collect();
    let total_waves = source_waves.len();

    'wave_loop: for (wave_idx, wave_sources) in source_waves.iter().enumerate() {
        emit_progress(tx, "sources_scrape",
            &format!("Extraction des sources (vague {}/{})", wave_idx + 1, total_waves),
            15 + ((wave_idx as u32 * 10) / total_waves.max(1) as u32).min(10) as u8);

        // 1a. Extract links from this wave's sources (parallel)
        let mut wave_urls: Vec<(String, String)> = Vec::new();
        {
            let mut join_set = tokio::task::JoinSet::new();
            for source in *wave_sources {
                // ... existing spawn logic (same as current code)
            }
            while let Some(join_result) = join_set.join_next().await {
                // ... existing result handling (same as current code)
                // Push to wave_urls instead of candidate_urls
            }
        }

        // 1b. Filter against article history
        if settings.article_history_days > 0 && !wave_urls.is_empty() {
            // ... existing history filter logic (same as current code)
            // Uses seen_urls which carries across waves
        }

        // 1c. Shuffle this wave's candidates
        use rand::seq::SliceRandom;
        wave_urls.shuffle(&mut rand::thread_rng());

        // Track url -> source
        for (url, source_url) in &wave_urls {
            url_source.insert(url.clone(), source_url.clone());
        }

        // 1d. Batch scrape+classify (existing loop, operates on wave_urls)
        if !wave_urls.is_empty() {
            // ... existing batch loop (same as current code)
            // Uses wave_urls instead of candidate_urls
            // filled_counts, source_counts, article_scraped carry across waves
        }

        // 1e. Check if full after this wave
        let total: usize = article_scraped.values().map(|v| v.len()).sum();
        if total >= max_total {
            tracing::info!(wave = wave_idx + 1, total_waves, "Synthesis full after wave, skipping remaining sources");
            break 'wave_loop;
        }

        // 1f. Flush traces
        if !pending_traces.is_empty() {
            db::article_history::batch_insert_entries(&state.pool, &pending_traces).await.ok();
            pending_traces.clear();
        }
    }
}

The key changes:

  • candidate_urls becomes wave_urls (scoped to each wave)
  • The parallel JoinSet no longer needs max_concurrent=5 throttling — it processes at most window_size sources
  • seen_urls, filled_counts, source_counts, article_scraped are NOT reset between waves
  • The batch scrape+classify loop is unchanged — it just operates on wave_urls instead of candidate_urls
  • Progress percentages adjusted per wave

IMPORTANT: Read the actual file carefully. The spawn logic inside the JoinSet is duplicated (seed tasks + continuation on join). With the windowed approach, the JoinSet only has window_size sources, so the max_concurrent throttle is no longer needed — just spawn all sources in the wave at once.

  • Step 3: Build and test

Run: cd backend && cargo build && cargo test --lib Expected: All pass

  • Step 4: Commit
git add backend/src/services/synthesis.rs
git commit -m "feat: restructure Phase 1 into windowed source extraction waves"