feat: rate limiter waits instead of failing — sleeps until window passes (max 60s)

master
oabrivard 3 months ago
parent ed399e9a6e
commit 3353e5261f

@ -76,6 +76,29 @@ impl RateLimiter {
}
}
/// Returns how long to wait until the next request is allowed, or None if allowed now.
pub fn time_until_available(&self, key: &str) -> Option<Duration> {
let now = Instant::now();
let cutoff = now - self.inner.window;
let entry = self.inner.entries.get(key)?;
let timestamps = entry.value();
let recent: Vec<&Instant> = timestamps.iter().filter(|t| **t > cutoff).collect();
if recent.len() < self.inner.max_requests {
return None; // Allowed now
}
// The oldest timestamp in the window — when it expires, a slot opens
let oldest = recent.iter().min().copied()?;
let expires_at = *oldest + self.inner.window;
if expires_at > now {
Some(expires_at - now)
} else {
None
}
}
/// Returns the number of remaining requests allowed for the given key.
pub fn remaining(&self, key: &str) -> usize {
let now = Instant::now();
@ -200,6 +223,28 @@ impl ProviderRateLimiter {
}
}
/// Returns how long to wait until the next request is allowed, or None if allowed now.
pub fn time_until_available(&self, provider: &str) -> Option<Duration> {
let now = Instant::now();
let bucket = self.inner.global_buckets.get(provider)?;
let cutoff = now - bucket.time_window;
let recent_count = bucket.timestamps.iter().filter(|t| **t >= cutoff).count();
if recent_count < bucket.max_requests as usize {
return None; // Allowed now
}
// The oldest timestamp in the window
let oldest = bucket.timestamps.iter().filter(|t| **t >= cutoff).min()?;
let expires_at = *oldest + bucket.time_window;
if expires_at > now {
Some(expires_at - now)
} else {
None
}
}
/// Returns the number of remaining requests allowed for a provider.
pub fn remaining(&self, provider: &str) -> usize {
let now = Instant::now();

@ -450,7 +450,7 @@ async fn run_generation_inner(
}
// Phase B: Classify/summarize batch in parallel
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let mut classify_set = tokio::task::JoinSet::new();
for (final_url, source_url, body_text, page_title) in &scraped_articles {
@ -554,7 +554,7 @@ async fn run_generation_inner(
if !category_gaps.is_empty() {
emit_progress(tx, "search", "Recherche d'actualites complementaires...", 70);
check_rate_limit(state, &user_rate_limiter, &provider_name)?;
check_rate_limit(state, &user_rate_limiter, &provider_name).await?;
let search_schema = crate::services::llm::schema::build_category_schema(&user_categories, settings.max_items_per_category);
let current_date = Utc::now().format("%A %d %B %Y").to_string();
@ -798,22 +798,46 @@ fn get_user_rate_limiter(
}
/// Check rate limits using the user's limiter if provided, otherwise the global limiter.
fn check_rate_limit(
/// Check rate limits, waiting if necessary (up to 60 seconds).
///
/// Instead of failing with an error, this function sleeps until the rate
/// limit window passes. If still rate limited after 60 seconds, returns an error.
async fn check_rate_limit(
state: &AppState,
user_limiter: &Option<crate::services::rate_limiter::RateLimiter>,
provider_name: &str,
) -> Result<(), AppError> {
let max_wait = std::time::Duration::from_secs(60);
let start = std::time::Instant::now();
loop {
let allowed = match user_limiter {
Some(limiter) => limiter.check(&format!("user_gen_{}", provider_name)),
None => state.provider_rate_limiter.check(provider_name),
};
if !allowed {
if allowed {
return Ok(());
}
// Calculate how long to wait
let wait_time = match user_limiter {
Some(limiter) => limiter.time_until_available(&format!("user_gen_{}", provider_name)),
None => state.provider_rate_limiter.time_until_available(provider_name),
};
let wait = wait_time.unwrap_or(std::time::Duration::from_secs(1));
if start.elapsed() + wait > max_wait {
return Err(AppError::RateLimited(
"Limite de requetes atteinte. Veuillez reessayer dans quelques instants.".into(),
));
}
tracing::info!(wait_ms = wait.as_millis() as u64, "Rate limited, waiting...");
tokio::time::sleep(wait).await;
}
Ok(())
}

Loading…
Cancel
Save