feat: add background scheduler for automated synthesis generation

Spawns a tokio task that checks for due schedules every 60 seconds,
runs generation via run_generation_inner, and sends emails to configured
recipients before marking each schedule as run.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
master
oabrivard 3 months ago
parent 384649b2b6
commit a068d04fa8

@ -84,6 +84,18 @@ async fn main() -> anyhow::Result<()> {
}); });
} }
// Scheduled synthesis generation (check every 60 seconds)
{
let scheduler_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
ai_synth_backend::services::scheduler::run_scheduled_jobs(&scheduler_state).await;
}
});
}
let shutdown_pool = state.pool.clone(); let shutdown_pool = state.pool.clone();
let app = router::build_router(state, &config); let app = router::build_router(state, &config);

@ -7,6 +7,7 @@ pub mod export;
pub mod llm; pub mod llm;
pub mod prompts; pub mod prompts;
pub mod rate_limiter; pub mod rate_limiter;
pub mod scheduler;
pub mod scraper; pub mod scraper;
pub mod source_scraper; pub mod source_scraper;
pub mod synthesis; pub mod synthesis;

@ -0,0 +1,92 @@
//! Background scheduler for automated synthesis generation.
use chrono::Datelike;
use crate::app_state::AppState;
use crate::db;
use crate::services::{email, synthesis};
use crate::models::synthesis::NewsSection;
use std::sync::atomic::AtomicBool;
use tokio::sync::watch;
use uuid::Uuid;
/// Get the day code for the current UTC day.
fn current_day_code() -> &'static str {
match chrono::Utc::now().weekday() {
chrono::Weekday::Mon => "mon",
chrono::Weekday::Tue => "tue",
chrono::Weekday::Wed => "wed",
chrono::Weekday::Thu => "thu",
chrono::Weekday::Fri => "fri",
chrono::Weekday::Sat => "sat",
chrono::Weekday::Sun => "sun",
}
}
/// Check for due schedules and run them sequentially.
pub async fn run_scheduled_jobs(state: &AppState) {
let day = current_day_code();
let time = chrono::Utc::now().format("%H:%M").to_string();
let due = match db::schedules::find_due_schedules(&state.pool, day, &time).await {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "Failed to query due schedules");
return;
}
};
if due.is_empty() { return; }
tracing::info!(count = due.len(), "Found due scheduled jobs");
for schedule in due {
if state.job_store.has_active_job(schedule.user_id).is_some() {
tracing::info!(user_id = %schedule.user_id, "Skipping scheduled job — manual generation in progress");
continue;
}
tracing::info!(schedule_id = %schedule.id, theme_id = %schedule.theme_id, "Running scheduled generation");
let (tx, _rx) = watch::channel(synthesis::ProgressEvent::Progress {
step: "init".into(), message: "Scheduled generation...".into(), percent: 0,
});
let job_id = Uuid::new_v4();
let cancelled = AtomicBool::new(false);
let result = synthesis::run_generation_inner(
job_id, state, schedule.user_id, schedule.theme_id,
&tx, None, &cancelled,
).await;
match result {
Ok(synthesis_id) => {
tracing::info!(synthesis_id = %synthesis_id, "Scheduled generation completed");
// Send emails
let emails: Vec<String> = serde_json::from_value(schedule.emails.clone()).unwrap_or_default();
if !emails.is_empty() {
if let Ok(Some(synth)) = db::syntheses::get_by_id(&state.pool, synthesis_id).await {
let sections: Vec<NewsSection> = serde_json::from_value(synth.sections).unwrap_or_default();
let date = synth.created_at.format("%d %B %Y").to_string();
for addr in &emails {
match email::send_synthesis_email(
&state.http_client, &state.config.resend_api_key,
&state.config.email_from, addr, &synth.week, &date, &sections,
).await {
Ok(()) => tracing::info!(to = addr, "Scheduled email sent"),
Err(e) => tracing::warn!(to = addr, error = %e, "Failed to send scheduled email"),
}
}
}
}
db::schedules::mark_run(&state.pool, schedule.id).await.ok();
}
Err(e) => {
tracing::error!(schedule_id = %schedule.id, error = %e, "Scheduled generation failed");
}
}
}
}
Loading…
Cancel
Save