feat: parallel source extraction, shuffle candidates, clear history endpoint

- Remove 10-source cap; all sources are now processed
- Increase max links per source from 10 to 15
- Extract article links in parallel (up to 5 concurrent) using JoinSet
- Shuffle candidate URLs after history filtering to interleave sources
- Add DELETE /api/v1/article-history endpoint to clear all history for a user

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 2d623c6ced
commit 7f7d584314

@ -201,6 +201,15 @@ pub async fn get_last_source_url(
Ok(result) Ok(result)
} }
/// Delete ALL article history entries for a user.
pub async fn delete_all_for_user(pool: &PgPool, user_id: Uuid) -> Result<u64, AppError> {
let result = sqlx::query("DELETE FROM article_history WHERE user_id = $1")
.bind(user_id)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
/// Delete history entries older than N days for this user. /// Delete history entries older than N days for this user.
/// ///
/// Only removes entries where synthesis_id IS NULL (dropped articles). /// Only removes entries where synthesis_id IS NULL (dropped articles).

@ -54,6 +54,18 @@ pub async fn list_history(
}))) })))
} }
/// DELETE /api/v1/article-history
///
/// Deletes ALL article history entries for the authenticated user.
pub async fn clear_history(
auth_user: AuthUser,
State(state): State<AppState>,
) -> Result<impl IntoResponse, AppError> {
let deleted = db::article_history::delete_all_for_user(&state.pool, auth_user.id).await?;
tracing::info!(user_id = %auth_user.id, deleted = deleted, "Cleared article history");
Ok(Json(serde_json::json!({ "deleted": deleted })))
}
/// GET /api/v1/syntheses/:id/provenance /// GET /api/v1/syntheses/:id/provenance
/// ///
/// Returns all article history entries for the generation run /// Returns all article history entries for the generation run

@ -55,7 +55,7 @@ pub fn build_router(state: AppState, config: &AppConfig) -> Router {
.route("/syntheses/generate", post(handlers::generation::trigger_generate)) .route("/syntheses/generate", post(handlers::generation::trigger_generate))
.route("/syntheses/generate/{job_id}/progress", get(handlers::generation::progress_stream)) .route("/syntheses/generate/{job_id}/progress", get(handlers::generation::progress_stream))
// Article history & provenance routes (authenticated) // Article history & provenance routes (authenticated)
.route("/article-history", get(handlers::article_history::list_history)) .route("/article-history", get(handlers::article_history::list_history).delete(handlers::article_history::clear_history))
.route("/syntheses/{id}/provenance", get(handlers::article_history::get_provenance)) .route("/syntheses/{id}/provenance", get(handlers::article_history::get_provenance))
// LLM call log routes (authenticated) // LLM call log routes (authenticated)
.route("/llm-logs/{job_id}", get(handlers::llm_logs::get_logs)) .route("/llm-logs/{job_id}", get(handlers::llm_logs::get_logs))

@ -281,32 +281,79 @@ async fn run_generation_inner(
let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None); let last_source = db::article_history::get_last_source_url(&state.pool, user_id).await.unwrap_or(None);
let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref()); let rotated_sources = rotate_sources(sources.clone(), last_source.as_deref());
let max_sources = rotated_sources.len().min(10); let max_links = 15usize;
let max_links = 10usize;
// 1a. Extract article links from source pages (parallel, max 5 concurrent)
// 1a. Extract article links + filter against history let mut candidate_urls: Vec<(String, String)> = Vec::new();
let mut candidate_urls: Vec<(String, String)> = Vec::new(); // (article_url, source_url) {
let mut join_set = tokio::task::JoinSet::new();
for source in rotated_sources.iter().take(max_sources) { let mut pending = rotated_sources.iter().peekable();
let links = if settings.use_llm_for_source_links { let max_concurrent = 5;
source_scraper::extract_article_links_with_llm(
&state.http_client, &source.url, max_links, &provider, &model_research, // Seed initial tasks
).await for _ in 0..max_concurrent {
} else { if let Some(source) = pending.next() {
source_scraper::extract_article_links( let client = state.http_client.clone();
&state.http_client, &source.url, max_links, let source_url = source.url.clone();
).await let source_title = source.title.clone();
}; let use_llm = settings.use_llm_for_source_links;
let provider_clone = std::sync::Arc::clone(&provider);
let model = model_research.clone();
let max_l = max_links;
join_set.spawn(async move {
let links = if use_llm {
source_scraper::extract_article_links_with_llm(
&client, &source_url, max_l, &provider_clone, &model,
).await
} else {
source_scraper::extract_article_links(
&client, &source_url, max_l,
).await
};
(source_url, source_title, links)
});
}
}
if let Ok(links) = links { while let Some(join_result) = join_set.join_next().await {
tracing::info!(source = %source.title, links = links.len(), "Extracted links from source"); if let Ok((source_url, source_title, links_result)) = join_result {
for link in links { match links_result {
if seen_urls.insert(link.to_lowercase()) { Ok(links) => {
candidate_urls.push((link, source.url.clone())); tracing::info!(source = %source_title, links = links.len(), "Extracted links from source");
for link in links {
if seen_urls.insert(link.to_lowercase()) {
candidate_urls.push((link, source_url.clone()));
}
}
}
Err(e) => {
tracing::warn!(source = %source_title, error = %e, "Failed to extract links");
}
} }
} }
} else if let Err(e) = links {
tracing::warn!(source = %source.title, error = %e, "Failed to extract links"); // Spawn next task
if let Some(source) = pending.next() {
let client = state.http_client.clone();
let source_url = source.url.clone();
let source_title = source.title.clone();
let use_llm = settings.use_llm_for_source_links;
let provider_clone = std::sync::Arc::clone(&provider);
let model = model_research.clone();
let max_l = max_links;
join_set.spawn(async move {
let links = if use_llm {
source_scraper::extract_article_links_with_llm(
&client, &source_url, max_l, &provider_clone, &model,
).await
} else {
source_scraper::extract_article_links(
&client, &source_url, max_l,
).await
};
(source_url, source_title, links)
});
}
} }
} }
@ -324,6 +371,10 @@ async fn run_generation_inner(
} }
} }
// Shuffle candidates to interleave articles from different sources
use rand::seq::SliceRandom;
candidate_urls.shuffle(&mut rand::thread_rng());
// Track url -> source // Track url -> source
for (url, source_url) in &candidate_urls { for (url, source_url) in &candidate_urls {
url_source.insert(url.clone(), source_url.clone()); url_source.insert(url.clone(), source_url.clone());

Loading…
Cancel
Save