fix: P0 audit bugs — theme-scoped imports/preferred, creation flow, scheduler timeout, job cleanup

- Bulk/CSV import now passes theme_id through to DB
- Preferred source update scoped by theme_id (no cross-theme reset)
- Theme creation sends sensible defaults from frontend
- Scheduler wraps generation in 15-minute timeout
- Job store cleanup runs every 5 minutes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
oabrivard 2 months ago
parent 58f42d0a87
commit d5d624b896

@ -142,24 +142,28 @@ pub async fn count_for_user(pool: &PgPool, user_id: Uuid) -> Result<i64, AppErro
Ok(row.0)
}
/// Bulk-update the `is_preferred` flag for a user's sources.
/// Bulk-update the `is_preferred` flag for a user's sources within a theme.
///
/// First resets all of the user's sources to non-preferred, then sets the
/// specified source IDs to preferred. This ensures a clean toggle behavior.
/// First resets all of the user's sources in the given theme to non-preferred,
/// then sets the specified source IDs to preferred. This ensures a clean toggle
/// behavior scoped to a single theme, without affecting other themes.
pub async fn update_preferred(
pool: &PgPool,
user_id: Uuid,
preferred_ids: &[Uuid],
theme_id: Uuid,
) -> Result<(), AppError> {
sqlx::query("UPDATE sources SET is_preferred = false WHERE user_id = $1")
sqlx::query("UPDATE sources SET is_preferred = false WHERE user_id = $1 AND theme_id = $2")
.bind(user_id)
.bind(theme_id)
.execute(pool)
.await?;
if !preferred_ids.is_empty() {
sqlx::query("UPDATE sources SET is_preferred = true WHERE user_id = $1 AND id = ANY($2)")
sqlx::query("UPDATE sources SET is_preferred = true WHERE user_id = $1 AND id = ANY($2) AND theme_id = $3")
.bind(user_id)
.bind(preferred_ids)
.bind(theme_id)
.execute(pool)
.await?;
}

@ -108,6 +108,15 @@ pub async fn bulk_import(
return Err(AppError::Validation("No sources provided".into()));
}
// Verify theme ownership if theme_id is provided
let theme_id = body.theme_id;
if let Some(tid) = theme_id {
let theme = db::themes::get_by_id(&state.pool, auth_user.id, tid).await?;
if theme.is_none() {
return Err(AppError::NotFound("Theme not found".into()));
}
}
let mut valid_sources: Vec<(String, String)> = Vec::new();
let mut errors: Vec<String> = Vec::new();
@ -124,6 +133,7 @@ pub async fn bulk_import(
auth_user.id,
&mut valid_sources,
&mut errors,
theme_id,
"Bulk import",
)
.await?;
@ -138,6 +148,7 @@ async fn do_bulk_import(
user_id: uuid::Uuid,
valid_sources: &mut Vec<(String, String)>,
errors: &mut Vec<String>,
theme_id: Option<Uuid>,
log_label: &str,
) -> Result<BulkImportResponse, AppError> {
let current_count = db::sources::count_for_user(pool, user_id).await?;
@ -151,7 +162,7 @@ async fn do_bulk_import(
));
}
let created = db::sources::bulk_create(pool, user_id, valid_sources, None).await?;
let created = db::sources::bulk_create(pool, user_id, valid_sources, theme_id).await?;
let imported = created.len();
let skipped = valid_sources.len() - imported;
@ -170,6 +181,12 @@ async fn do_bulk_import(
})
}
/// Query parameters for `POST /api/v1/sources/import-csv`.
#[derive(Debug, Deserialize)]
pub struct CsvImportQuery {
pub theme_id: Option<Uuid>,
}
/// `POST /api/v1/sources/import-csv`
///
/// Imports sources from a CSV file uploaded via multipart form data.
@ -178,8 +195,17 @@ async fn do_bulk_import(
pub async fn import_csv(
auth_user: AuthUser,
State(state): State<AppState>,
Query(params): Query<CsvImportQuery>,
mut multipart: Multipart,
) -> Result<impl IntoResponse, AppError> {
// Verify theme ownership if theme_id is provided
let theme_id = params.theme_id;
if let Some(tid) = theme_id {
let theme = db::themes::get_by_id(&state.pool, auth_user.id, tid).await?;
if theme.is_none() {
return Err(AppError::NotFound("Theme not found".into()));
}
}
// Extract the first file field from the multipart upload
let field = multipart
.next_field()
@ -232,7 +258,7 @@ pub async fn import_csv(
}
let response =
do_bulk_import(&state.pool, auth_user.id, &mut valid_sources, &mut errors, "CSV import")
do_bulk_import(&state.pool, auth_user.id, &mut valid_sources, &mut errors, theme_id, "CSV import")
.await?;
Ok(Json(response))
@ -268,12 +294,19 @@ pub async fn export_csv(
/// `PUT /api/v1/sources/preferred`
///
/// Bulk-update which sources are marked as preferred.
/// Accepts a list of source IDs; all other sources are set to non-preferred.
/// Accepts a list of source IDs and a theme_id; all other sources
/// in that theme are set to non-preferred.
pub async fn update_preferred(
auth_user: AuthUser,
State(state): State<AppState>,
Json(body): Json<UpdatePreferredRequest>,
) -> Result<impl IntoResponse, AppError> {
db::sources::update_preferred(&state.pool, auth_user.id, &body.source_ids).await?;
// Verify theme ownership
let theme = db::themes::get_by_id(&state.pool, auth_user.id, body.theme_id).await?;
if theme.is_none() {
return Err(AppError::NotFound("Theme not found".into()));
}
db::sources::update_preferred(&state.pool, auth_user.id, &body.source_ids, body.theme_id).await?;
Ok(StatusCode::OK)
}

@ -84,6 +84,18 @@ async fn main() -> anyhow::Result<()> {
});
}
// Periodic job store cleanup (every 5 minutes)
{
let cleanup_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
interval.tick().await;
cleanup_state.job_store.cleanup_expired();
}
});
}
// Scheduled synthesis generation (check every 60 seconds)
{
let scheduler_state = state.clone();

@ -68,12 +68,14 @@ impl CreateSourceRequest {
#[derive(Debug, Deserialize)]
pub struct UpdatePreferredRequest {
pub source_ids: Vec<Uuid>,
pub theme_id: Uuid,
}
/// Request body for `POST /api/v1/sources/bulk`.
#[derive(Debug, Deserialize)]
pub struct BulkImportRequest {
pub sources: Vec<CreateSourceRequest>,
pub theme_id: Option<Uuid>,
}
/// Response for bulk import operations (JSON and CSV).

@ -54,11 +54,22 @@ pub async fn run_scheduled_jobs(state: &AppState) {
let job_id = Uuid::new_v4();
let cancelled = AtomicBool::new(false);
let result = synthesis::run_generation_inner(
job_id, state, schedule.user_id, schedule.theme_id,
&tx, None, &cancelled,
let timeout_result = tokio::time::timeout(
std::time::Duration::from_secs(900),
synthesis::run_generation_inner(
job_id, state, schedule.user_id, schedule.theme_id,
&tx, None, &cancelled,
),
).await;
let result = match timeout_result {
Ok(inner) => inner,
Err(_) => {
tracing::error!(schedule_id = %schedule.id, "Scheduled generation timed out after 15 minutes");
continue;
}
};
match result {
Ok(synthesis_id) => {
tracing::info!(synthesis_id = %synthesis_id, "Scheduled generation completed");

@ -56,7 +56,8 @@ async fn update_preferred_sets_sources() {
// PUT /sources/preferred with [id1, id3]
let pref_body = serde_json::json!({
"source_ids": [source_ids[0], source_ids[2]]
"source_ids": [source_ids[0], source_ids[2]],
"theme_id": theme_id
});
let (pref_status, _) = app
.put_with_session("/api/v1/sources/preferred", &pref_body, &session)
@ -125,7 +126,8 @@ async fn update_preferred_clears_all_when_empty() {
// Set some as preferred
let pref_body = serde_json::json!({
"source_ids": [source_ids[0]]
"source_ids": [source_ids[0]],
"theme_id": theme_id
});
let (pref_status, _) = app
.put_with_session("/api/v1/sources/preferred", &pref_body, &session)
@ -134,7 +136,8 @@ async fn update_preferred_clears_all_when_empty() {
// Clear all preferred
let clear_body = serde_json::json!({
"source_ids": []
"source_ids": [],
"theme_id": theme_id
});
let (clear_status, _) = app
.put_with_session("/api/v1/sources/preferred", &clear_body, &session)
@ -169,7 +172,8 @@ async fn update_preferred_without_auth_returns_401() {
let app = common::TestApp::new().await;
let body = serde_json::json!({
"source_ids": []
"source_ids": [],
"theme_id": "00000000-0000-0000-0000-000000000000"
});
let (status, resp) = app
.put_with_session("/api/v1/sources/preferred", &body, "invalid-session-token")

@ -120,7 +120,7 @@ test.describe('Sources management', () => {
// Step 8: Mark a source as preferred via API
const sourceId2 = addResp2.data.id;
const prefResp = await page.evaluate(async (ids: string[]) => {
const prefResp = await page.evaluate(async ({ ids, tid }: { ids: string[]; tid: string }) => {
const resp = await fetch('/api/v1/sources/preferred', {
method: 'PUT',
headers: {
@ -128,10 +128,10 @@ test.describe('Sources management', () => {
'X-Requested-With': 'XMLHttpRequest',
},
credentials: 'same-origin',
body: JSON.stringify({ source_ids: ids }),
body: JSON.stringify({ source_ids: ids, theme_id: tid }),
});
return { status: resp.status };
}, [sourceId2]);
}, { ids: [sourceId2], tid: themeId });
expect(prefResp.status).toBe(200);
// Step 9: Verify source is now preferred

@ -25,10 +25,11 @@ export const sourcesApi = {
api.post<BulkImportResponse>('/sources/bulk', data),
/** POST /sources/import-csv -- import sources from an uploaded CSV file. */
importCsv: async (file: File): Promise<BulkImportResponse> => {
importCsv: async (file: File, themeId?: string): Promise<BulkImportResponse> => {
const formData = new FormData();
formData.append('file', file);
return api.post<BulkImportResponse>('/sources/import-csv', formData);
const query = themeId ? `?theme_id=${themeId}` : '';
return api.post<BulkImportResponse>(`/sources/import-csv${query}`, formData);
},
/** GET /sources/export-csv -- download all sources as a CSV file. */
@ -37,7 +38,7 @@ export const sourcesApi = {
await triggerDownload(response, 'sources.csv');
},
/** PUT /sources/preferred -- bulk-update which sources are preferred. */
updatePreferred: (sourceIds: string[]): Promise<void> =>
api.put<void>('/sources/preferred', { source_ids: sourceIds }),
/** PUT /sources/preferred -- bulk-update which sources are preferred (scoped to a theme). */
updatePreferred: (sourceIds: string[], themeId: string): Promise<void> =>
api.put<void>('/sources/preferred', { source_ids: sourceIds, theme_id: themeId }),
};

@ -259,6 +259,7 @@ const fr = {
'themes.saved': 'Theme enregistre',
'themes.created': 'Theme cree',
'themes.noThemes': 'Aucun theme configure. Creez votre premier theme pour commencer.',
'themes.defaultCategory': 'Actualites',
// Settings - API Keys
'settings.apiKeys.title': 'Vos cles API',

@ -143,8 +143,8 @@ const ThemeManager: Component = () => {
try {
const data: CreateThemeRequest = {
name: t('themes.createTheme'),
theme: '',
categories: [],
theme: t('themes.createTheme'),
categories: [t('themes.defaultCategory')],
};
const created = await themesApi.create(data);
setThemes((prev) => [...prev, created]);
@ -318,6 +318,9 @@ const ThemeManager: Component = () => {
// ---- Toggle preferred source ----
const handleTogglePreferred = async (sourceId: string) => {
const themeId = selectedThemeId();
if (!themeId) return;
const current = sources();
const toggled = current.map((s) =>
s.id === sourceId ? { ...s, is_preferred: !s.is_preferred } : s,
@ -328,7 +331,7 @@ const ThemeManager: Component = () => {
setSources(toggled);
try {
await sourcesApi.updatePreferred(newPreferredIds);
await sourcesApi.updatePreferred(newPreferredIds, themeId);
} catch (err) {
// Revert on error
setSources(current);
@ -361,7 +364,7 @@ const ThemeManager: Component = () => {
setCsvError(null);
try {
await sourcesApi.importCsv(file);
await sourcesApi.importCsv(file, themeId);
await fetchSources(themeId);
} catch (err) {
if (isApiError(err)) {

Loading…
Cancel
Save