diff --git a/backend/src/main.rs b/backend/src/main.rs index f13c52c..38fc20f 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -84,6 +84,18 @@ async fn main() -> anyhow::Result<()> { }); } + // Scheduled synthesis generation (check every 60 seconds) + { + let scheduler_state = state.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + loop { + interval.tick().await; + ai_synth_backend::services::scheduler::run_scheduled_jobs(&scheduler_state).await; + } + }); + } + let shutdown_pool = state.pool.clone(); let app = router::build_router(state, &config); diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 36aee0a..a4f9ae7 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -7,6 +7,7 @@ pub mod export; pub mod llm; pub mod prompts; pub mod rate_limiter; +pub mod scheduler; pub mod scraper; pub mod source_scraper; pub mod synthesis; diff --git a/backend/src/services/scheduler.rs b/backend/src/services/scheduler.rs new file mode 100644 index 0000000..73994f9 --- /dev/null +++ b/backend/src/services/scheduler.rs @@ -0,0 +1,92 @@ +//! Background scheduler for automated synthesis generation. + +use chrono::Datelike; + +use crate::app_state::AppState; +use crate::db; +use crate::services::{email, synthesis}; +use crate::models::synthesis::NewsSection; +use std::sync::atomic::AtomicBool; +use tokio::sync::watch; +use uuid::Uuid; + +/// Get the day code for the current UTC day. +fn current_day_code() -> &'static str { + match chrono::Utc::now().weekday() { + chrono::Weekday::Mon => "mon", + chrono::Weekday::Tue => "tue", + chrono::Weekday::Wed => "wed", + chrono::Weekday::Thu => "thu", + chrono::Weekday::Fri => "fri", + chrono::Weekday::Sat => "sat", + chrono::Weekday::Sun => "sun", + } +} + +/// Check for due schedules and run them sequentially. +pub async fn run_scheduled_jobs(state: &AppState) { + let day = current_day_code(); + let time = chrono::Utc::now().format("%H:%M").to_string(); + + let due = match db::schedules::find_due_schedules(&state.pool, day, &time).await { + Ok(s) => s, + Err(e) => { + tracing::warn!(error = %e, "Failed to query due schedules"); + return; + } + }; + + if due.is_empty() { return; } + + tracing::info!(count = due.len(), "Found due scheduled jobs"); + + for schedule in due { + if state.job_store.has_active_job(schedule.user_id).is_some() { + tracing::info!(user_id = %schedule.user_id, "Skipping scheduled job — manual generation in progress"); + continue; + } + + tracing::info!(schedule_id = %schedule.id, theme_id = %schedule.theme_id, "Running scheduled generation"); + + let (tx, _rx) = watch::channel(synthesis::ProgressEvent::Progress { + step: "init".into(), message: "Scheduled generation...".into(), percent: 0, + }); + let job_id = Uuid::new_v4(); + let cancelled = AtomicBool::new(false); + + let result = synthesis::run_generation_inner( + job_id, state, schedule.user_id, schedule.theme_id, + &tx, None, &cancelled, + ).await; + + match result { + Ok(synthesis_id) => { + tracing::info!(synthesis_id = %synthesis_id, "Scheduled generation completed"); + + // Send emails + let emails: Vec = serde_json::from_value(schedule.emails.clone()).unwrap_or_default(); + if !emails.is_empty() { + if let Ok(Some(synth)) = db::syntheses::get_by_id(&state.pool, synthesis_id).await { + let sections: Vec = serde_json::from_value(synth.sections).unwrap_or_default(); + let date = synth.created_at.format("%d %B %Y").to_string(); + + for addr in &emails { + match email::send_synthesis_email( + &state.http_client, &state.config.resend_api_key, + &state.config.email_from, addr, &synth.week, &date, §ions, + ).await { + Ok(()) => tracing::info!(to = addr, "Scheduled email sent"), + Err(e) => tracing::warn!(to = addr, error = %e, "Failed to send scheduled email"), + } + } + } + } + + db::schedules::mark_run(&state.pool, schedule.id).await.ok(); + } + Err(e) => { + tracing::error!(schedule_id = %schedule.id, error = %e, "Scheduled generation failed"); + } + } + } +}