You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
115 lines
4.1 KiB
Rust
115 lines
4.1 KiB
Rust
//! Background scheduler for automated synthesis generation.
|
|
|
|
use chrono::Datelike;
|
|
|
|
use crate::app_state::AppState;
|
|
use crate::db;
|
|
use crate::services::{email, job_store, synthesis};
|
|
use crate::models::synthesis::NewsSection;
|
|
use std::sync::atomic::AtomicBool;
|
|
use tokio::sync::watch;
|
|
use uuid::Uuid;
|
|
|
|
/// Get the day code for the current UTC day.
|
|
fn current_day_code() -> &'static str {
|
|
match chrono::Utc::now().weekday() {
|
|
chrono::Weekday::Mon => "mon",
|
|
chrono::Weekday::Tue => "tue",
|
|
chrono::Weekday::Wed => "wed",
|
|
chrono::Weekday::Thu => "thu",
|
|
chrono::Weekday::Fri => "fri",
|
|
chrono::Weekday::Sat => "sat",
|
|
chrono::Weekday::Sun => "sun",
|
|
}
|
|
}
|
|
|
|
/// Check for due schedules and run them sequentially.
|
|
pub async fn run_scheduled_jobs(state: &AppState) {
|
|
let day = current_day_code();
|
|
let time = chrono::Utc::now().format("%H:%M").to_string();
|
|
|
|
let due = match db::schedules::find_due_schedules(&state.pool, day, &time).await {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "Failed to query due schedules");
|
|
return;
|
|
}
|
|
};
|
|
|
|
if due.is_empty() { return; }
|
|
|
|
tracing::info!(count = due.len(), "Found due scheduled jobs");
|
|
|
|
for schedule in due {
|
|
if state.job_store.has_active_job(schedule.user_id).is_some() {
|
|
tracing::info!(user_id = %schedule.user_id, "Skipping scheduled job — manual generation in progress");
|
|
continue;
|
|
}
|
|
|
|
tracing::info!(schedule_id = %schedule.id, theme_id = %schedule.theme_id, "Running scheduled generation");
|
|
|
|
let (tx, _rx) = watch::channel(job_store::ProgressEvent::Progress {
|
|
step: "init".into(), message: "Scheduled generation...".into(), percent: 0,
|
|
});
|
|
let job_id = Uuid::new_v4();
|
|
let cancelled = AtomicBool::new(false);
|
|
|
|
let timeout_result = tokio::time::timeout(
|
|
std::time::Duration::from_secs(900),
|
|
synthesis::run_generation_inner(
|
|
job_id, state, schedule.user_id, schedule.theme_id,
|
|
&tx, None, &cancelled,
|
|
),
|
|
).await;
|
|
|
|
let result = match timeout_result {
|
|
Ok(inner) => inner,
|
|
Err(_) => {
|
|
tracing::error!(schedule_id = %schedule.id, "Scheduled generation timed out after 15 minutes");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
match result {
|
|
Ok(synthesis_id) => {
|
|
tracing::info!(synthesis_id = %synthesis_id, "Scheduled generation completed");
|
|
|
|
// Send emails
|
|
let emails: Vec<String> = serde_json::from_value(schedule.emails.clone()).unwrap_or_default();
|
|
if !emails.is_empty() {
|
|
if let Ok(Some(synth)) = db::syntheses::get_by_id(&state.pool, synthesis_id).await {
|
|
let sections: Vec<NewsSection> = serde_json::from_value(synth.sections).unwrap_or_default();
|
|
let date = synth.created_at.format("%d %B %Y").to_string();
|
|
|
|
for addr in &emails {
|
|
match email::send_synthesis_email(
|
|
&state.http_client, &state.config.resend_api_key,
|
|
&state.config.email_from, addr, &synth.week, &date, §ions,
|
|
).await {
|
|
Ok(()) => tracing::info!(to = addr, "Scheduled email sent"),
|
|
Err(e) => tracing::warn!(to = addr, error = %e, "Failed to send scheduled email"),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
db::schedules::mark_run(&state.pool, schedule.id).await.ok();
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(schedule_id = %schedule.id, error = %e, "Scheduled generation failed");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn current_day_code_returns_valid_code() {
|
|
let code = current_day_code();
|
|
assert!(["mon", "tue", "wed", "thu", "fri", "sat", "sun"].contains(&code));
|
|
}
|
|
}
|