from __future__ import annotations from dataclasses import dataclass from datetime import date, datetime, time, timedelta from typing import Any from zoneinfo import ZoneInfo import holidays @dataclass(frozen=True) class AvailabilityResult: start: str end: str available: bool busy_slots: list[dict[str, str]] checked_calendars: list[str] @dataclass(frozen=True) class MeetingIntervalsResult: start: str end: str timezone: str meeting_intervals: list[dict[str, str]] checked_calendars: list[str] class CalendarAvailabilityAgent: def __init__(self, calendar_service: Any) -> None: self.calendar_service = calendar_service def get_availability( self, start: str, end: str, calendar_ids: list[str] | None = None ) -> AvailabilityResult: start_dt = _parse_iso_datetime(start) end_dt = _parse_iso_datetime(end) if end_dt <= start_dt: raise ValueError("end must be after start.") calendars = calendar_ids or ["primary"] query_body: dict[str, Any] = { "timeMin": start_dt.isoformat(), "timeMax": end_dt.isoformat(), "items": [{"id": calendar_id} for calendar_id in calendars], } freebusy = self.calendar_service.freebusy().query(body=query_body).execute() calendars_payload = freebusy.get("calendars", {}) busy_slots: list[dict[str, str]] = [] for calendar_id, data in calendars_payload.items(): for busy_slot in data.get("busy", []): busy_slots.append( { "calendar_id": calendar_id, "start": busy_slot["start"], "end": busy_slot["end"], } ) return AvailabilityResult( start=start_dt.isoformat(), end=end_dt.isoformat(), available=len(busy_slots) == 0, busy_slots=busy_slots, checked_calendars=calendars, ) def get_available_meeting_intervals( self, start: str, end: str, calendar_ids: list[str] | None = None ) -> MeetingIntervalsResult: start_dt = _parse_iso_datetime(start) end_dt = _parse_iso_datetime(end) if end_dt <= start_dt: raise ValueError("end must be after start.") if end_dt - start_dt > timedelta(days=90): raise ValueError("time range cannot exceed 90 days.") calendars = calendar_ids or ["primary"] query_body: dict[str, Any] = { "timeMin": start_dt.isoformat(), "timeMax": end_dt.isoformat(), "items": [{"id": calendar_id} for calendar_id in calendars], } freebusy = self.calendar_service.freebusy().query(body=query_body).execute() calendars_payload = freebusy.get("calendars", {}) paris_tz = ZoneInfo("Europe/Paris") start_paris = start_dt.astimezone(paris_tz) end_paris = end_dt.astimezone(paris_tz) merged_busy = _merge_intervals( _collect_busy_intervals( calendars_payload=calendars_payload, start_bound=start_paris, end_bound=end_paris, timezone=paris_tz, ) ) holiday_dates = _holiday_dates(start_paris.date(), end_paris.date()) allowed_windows = _build_allowed_windows(start_paris, end_paris, holiday_dates, paris_tz) meeting_intervals: list[dict[str, str]] = [] for window_start, window_end in allowed_windows: for interval_start, interval_end in _subtract_busy( window_start=window_start, window_end=window_end, busy_intervals=merged_busy, ): if interval_end - interval_start < timedelta(minutes=30): continue meeting_intervals.append( { "start": interval_start.isoformat(), "end": interval_end.isoformat(), } ) return MeetingIntervalsResult( start=start_dt.isoformat(), end=end_dt.isoformat(), timezone="Europe/Paris", meeting_intervals=meeting_intervals, checked_calendars=calendars, ) def _parse_iso_datetime(value: str) -> datetime: normalized = value.strip() if normalized.endswith("Z"): normalized = normalized[:-1] + "+00:00" parsed = datetime.fromisoformat(normalized) if parsed.tzinfo is None: raise ValueError("datetime must include a timezone offset, for example +01:00.") return parsed def _collect_busy_intervals( *, calendars_payload: Any, start_bound: datetime, end_bound: datetime, timezone: ZoneInfo, ) -> list[tuple[datetime, datetime]]: if not isinstance(calendars_payload, dict): return [] intervals: list[tuple[datetime, datetime]] = [] for value in calendars_payload.values(): if not isinstance(value, dict): continue raw_busy_slots = value.get("busy", []) if not isinstance(raw_busy_slots, list): continue for raw_busy_slot in raw_busy_slots: if not isinstance(raw_busy_slot, dict): continue busy_start = raw_busy_slot.get("start") busy_end = raw_busy_slot.get("end") if not isinstance(busy_start, str) or not isinstance(busy_end, str): continue start_paris = _parse_iso_datetime(busy_start).astimezone(timezone) end_paris = _parse_iso_datetime(busy_end).astimezone(timezone) clipped_start = max(start_paris, start_bound) clipped_end = min(end_paris, end_bound) if clipped_end <= clipped_start: continue intervals.append((clipped_start, clipped_end)) return intervals def _merge_intervals(intervals: list[tuple[datetime, datetime]]) -> list[tuple[datetime, datetime]]: if not intervals: return [] sorted_intervals = sorted(intervals, key=lambda interval: (interval[0], interval[1])) merged: list[tuple[datetime, datetime]] = [sorted_intervals[0]] for current_start, current_end in sorted_intervals[1:]: last_start, last_end = merged[-1] if current_start <= last_end: merged[-1] = (last_start, max(last_end, current_end)) continue merged.append((current_start, current_end)) return merged def _build_allowed_windows( start: datetime, end: datetime, holiday_dates: set[date], timezone: ZoneInfo, ) -> list[tuple[datetime, datetime]]: windows: list[tuple[datetime, datetime]] = [] for day in _date_range(start.date(), end.date()): if day in holiday_dates: continue daily_window = _daily_allowed_window(day) if daily_window is None: continue raw_start, raw_end = daily_window day_start = datetime.combine(day, raw_start, tzinfo=timezone) day_end = datetime.combine(day, raw_end, tzinfo=timezone) window_start = max(start, day_start) window_end = min(end, day_end) if window_end <= window_start: continue windows.append((window_start, window_end)) return windows def _daily_allowed_window(day: date) -> tuple[time, time] | None: weekday = day.weekday() if weekday == 6: return None if weekday == 5: return time(hour=9, minute=30), time(hour=12, minute=0) return time(hour=8, minute=30), time(hour=21, minute=30) def _subtract_busy( *, window_start: datetime, window_end: datetime, busy_intervals: list[tuple[datetime, datetime]], ) -> list[tuple[datetime, datetime]]: free_intervals: list[tuple[datetime, datetime]] = [] cursor = window_start for busy_start, busy_end in busy_intervals: if busy_end <= cursor: continue if busy_start >= window_end: break if busy_start > cursor: free_end = min(busy_start, window_end) if free_end > cursor: free_intervals.append((cursor, free_end)) cursor = max(cursor, busy_end) if cursor >= window_end: break if cursor < window_end: free_intervals.append((cursor, window_end)) return free_intervals def _holiday_dates(start_day: date, end_day: date) -> set[date]: years = range(start_day.year, end_day.year + 1) france_holidays = holidays.country_holidays("FR", years=years, observed=False) quebec_holidays = holidays.country_holidays( "CA", subdiv="QC", years=years, observed=False, ) return set(france_holidays.keys()) | set(quebec_holidays.keys()) def _date_range(start_day: date, end_day: date) -> list[date]: current_day = start_day days: list[date] = [] while current_day <= end_day: days.append(current_day) current_day += timedelta(days=1) return days