You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
8.5 KiB
Rust

//! Article history: tracks which article URLs have been used in past syntheses.
//!
//! Prevents the same article from appearing in multiple syntheses.
use std::collections::HashSet;
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::PgPool;
use uuid::Uuid;
use crate::errors::AppError;
/// Entry for inserting into article_history with full tracing metadata.
pub struct ArticleHistoryEntry {
pub user_id: Uuid,
pub url: String,
pub url_hash: String,
pub title: String,
pub source_type: String,
pub source_url: Option<String>,
pub category: Option<String>,
pub synthesis_id: Option<Uuid>,
pub status: String,
pub scraped_ok: bool,
pub job_id: Uuid,
pub published_date: Option<String>,
}
/// Row returned from article_history queries.
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ArticleHistoryRow {
pub id: Uuid,
pub url: String,
pub title: String,
pub source_type: String,
pub source_url: Option<String>,
pub category: Option<String>,
pub synthesis_id: Option<Uuid>,
pub status: String,
pub scraped_ok: bool,
pub job_id: Uuid,
pub created_at: DateTime<Utc>,
}
/// Check which URL hashes already exist in history for this user.
///
/// Returns the set of url_hashes that were found (i.e., already used).
pub async fn check_urls_exist(
pool: &PgPool,
user_id: Uuid,
url_hashes: &[String],
) -> Result<HashSet<String>, AppError> {
if url_hashes.is_empty() {
return Ok(HashSet::new());
}
let rows = sqlx::query_scalar::<_, String>(
"SELECT url_hash FROM article_history WHERE user_id = $1 AND url_hash = ANY($2)",
)
.bind(user_id)
.bind(url_hashes)
.fetch_all(pool)
.await?;
Ok(rows.into_iter().collect())
}
/// Insert article URLs into history.
///
/// Uses ON CONFLICT DO NOTHING to silently skip duplicates.
pub async fn insert_urls(
pool: &PgPool,
user_id: Uuid,
urls: &[(String, String)], // Vec<(url, url_hash)>
) -> Result<(), AppError> {
if urls.is_empty() {
return Ok(());
}
for (url, url_hash) in urls {
sqlx::query(
"INSERT INTO article_history (user_id, url_hash, url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
)
.bind(user_id)
.bind(url_hash)
.bind(url)
.execute(pool)
.await?;
}
Ok(())
}
/// Insert multiple article history entries in a single query.
pub async fn batch_insert_entries(pool: &PgPool, entries: &[ArticleHistoryEntry]) -> Result<(), AppError> {
if entries.is_empty() {
return Ok(());
}
let user_ids: Vec<Uuid> = entries.iter().map(|e| e.user_id).collect();
let urls: Vec<&str> = entries.iter().map(|e| e.url.as_str()).collect();
let url_hashes: Vec<&str> = entries.iter().map(|e| e.url_hash.as_str()).collect();
let titles: Vec<&str> = entries.iter().map(|e| e.title.as_str()).collect();
let source_types: Vec<&str> = entries.iter().map(|e| e.source_type.as_str()).collect();
let source_urls: Vec<Option<&str>> = entries.iter().map(|e| e.source_url.as_deref()).collect();
let categories: Vec<Option<&str>> = entries.iter().map(|e| e.category.as_deref()).collect();
let synthesis_ids: Vec<Option<Uuid>> = entries.iter().map(|e| e.synthesis_id).collect();
let statuses: Vec<&str> = entries.iter().map(|e| e.status.as_str()).collect();
let scraped_oks: Vec<bool> = entries.iter().map(|e| e.scraped_ok).collect();
let job_ids: Vec<Uuid> = entries.iter().map(|e| e.job_id).collect();
let published_dates: Vec<Option<&str>> = entries.iter().map(|e| e.published_date.as_deref()).collect();
sqlx::query(
r#"
INSERT INTO article_history (user_id, url, url_hash, title, source_type, source_url, category, synthesis_id, status, scraped_ok, job_id, published_date)
SELECT * FROM unnest($1::uuid[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::uuid[], $9::text[], $10::bool[], $11::uuid[], $12::text[])
"#,
)
.bind(&user_ids)
.bind(&urls)
.bind(&url_hashes)
.bind(&titles)
.bind(&source_types)
.bind(&source_urls)
.bind(&categories)
.bind(&synthesis_ids)
.bind(&statuses)
.bind(&scraped_oks)
.bind(&job_ids)
.bind(&published_dates)
.execute(pool)
.await?;
Ok(())
}
/// Insert a single article history entry with full tracing metadata.
pub async fn insert_entry(pool: &PgPool, entry: &ArticleHistoryEntry) -> Result<(), AppError> {
sqlx::query(
r#"
INSERT INTO article_history (user_id, url_hash, url, title, source_type, source_url, category, synthesis_id, status, scraped_ok, job_id, published_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
"#,
)
.bind(entry.user_id)
.bind(&entry.url_hash)
.bind(&entry.url)
.bind(&entry.title)
.bind(&entry.source_type)
.bind(&entry.source_url)
.bind(&entry.category)
.bind(entry.synthesis_id)
.bind(&entry.status)
.bind(entry.scraped_ok)
.bind(entry.job_id)
.bind(&entry.published_date)
.execute(pool)
.await?;
Ok(())
}
/// List article history with optional filters, paginated.
pub async fn list_history(
pool: &PgPool,
user_id: Uuid,
limit: i64,
offset: i64,
status_filter: Option<&str>,
source_type_filter: Option<&str>,
) -> Result<Vec<ArticleHistoryRow>, AppError> {
let rows = sqlx::query_as::<_, ArticleHistoryRow>(
r#"
SELECT id, url, title, source_type, source_url, category, synthesis_id, status, scraped_ok, job_id, created_at
FROM article_history
WHERE user_id = $1
AND ($4::TEXT IS NULL OR status = $4)
AND ($5::TEXT IS NULL OR source_type = $5)
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(user_id)
.bind(limit)
.bind(offset)
.bind(status_filter)
.bind(source_type_filter)
.fetch_all(pool)
.await?;
Ok(rows)
}
/// Count article history entries with optional filters.
pub async fn count_history(
pool: &PgPool,
user_id: Uuid,
status_filter: Option<&str>,
source_type_filter: Option<&str>,
) -> Result<i64, AppError> {
let row = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*) FROM article_history
WHERE user_id = $1
AND ($2::TEXT IS NULL OR status = $2)
AND ($3::TEXT IS NULL OR source_type = $3)
"#,
)
.bind(user_id)
.bind(status_filter)
.bind(source_type_filter)
.fetch_one(pool)
.await?;
Ok(row)
}
/// List all article history entries for a generation job.
pub async fn list_by_job_id(
pool: &PgPool,
user_id: Uuid,
job_id: Uuid,
) -> Result<Vec<ArticleHistoryRow>, AppError> {
let rows = sqlx::query_as::<_, ArticleHistoryRow>(
r#"
SELECT id, url, title, source_type, source_url, category, synthesis_id, status, scraped_ok, job_id, created_at
FROM article_history
WHERE user_id = $1 AND job_id = $2
ORDER BY created_at ASC
"#,
)
.bind(user_id)
.bind(job_id)
.fetch_all(pool)
.await?;
Ok(rows)
}
/// Get the source_url from the most recent 'used' entry for source rotation.
pub async fn get_last_source_url(
pool: &PgPool,
user_id: Uuid,
) -> Result<Option<String>, AppError> {
let result = sqlx::query_scalar::<_, String>(
"SELECT source_url FROM article_history WHERE user_id = $1 AND status = 'used' AND source_url IS NOT NULL ORDER BY created_at DESC LIMIT 1",
)
.bind(user_id)
.fetch_optional(pool)
.await?;
Ok(result)
}
/// Delete ALL article history entries for a user.
pub async fn delete_all_for_user(pool: &PgPool, user_id: Uuid) -> Result<u64, AppError> {
let result = sqlx::query("DELETE FROM article_history WHERE user_id = $1")
.bind(user_id)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
/// Delete history entries older than N days for this user.
///
/// Only removes entries where synthesis_id IS NULL (dropped articles).
/// Used articles linked to syntheses stay until the synthesis is deleted.
///
/// Returns the number of deleted rows.
pub async fn cleanup_old(
pool: &PgPool,
user_id: Uuid,
days: i32,
) -> Result<u64, AppError> {
let result = sqlx::query(
"DELETE FROM article_history WHERE user_id = $1 AND created_at < now() - make_interval(days => $2) AND synthesis_id IS NULL",
)
.bind(user_id)
.bind(days)
.execute(pool)
.await?;
Ok(result.rows_affected())
}