You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
8.9 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from typing import Any
from zoneinfo import ZoneInfo
import holidays
@dataclass(frozen=True)
class AvailabilityResult:
start: str
end: str
available: bool
busy_slots: list[dict[str, str]]
checked_calendars: list[str]
@dataclass(frozen=True)
class MeetingIntervalsResult:
start: str
end: str
timezone: str
meeting_intervals: list[dict[str, str]]
checked_calendars: list[str]
class CalendarAvailabilityAgent:
def __init__(self, calendar_service: Any) -> None:
self.calendar_service = calendar_service
def get_availability(
self, start: str, end: str, calendar_ids: list[str] | None = None
) -> AvailabilityResult:
start_dt = _parse_iso_datetime(start)
end_dt = _parse_iso_datetime(end)
if end_dt <= start_dt:
raise ValueError("end must be after start.")
calendars = calendar_ids or ["primary"]
query_body: dict[str, Any] = {
"timeMin": start_dt.isoformat(),
"timeMax": end_dt.isoformat(),
"items": [{"id": calendar_id} for calendar_id in calendars],
}
freebusy = self.calendar_service.freebusy().query(body=query_body).execute()
calendars_payload = freebusy.get("calendars", {})
busy_slots: list[dict[str, str]] = []
for calendar_id, data in calendars_payload.items():
for busy_slot in data.get("busy", []):
busy_slots.append(
{
"calendar_id": calendar_id,
"start": busy_slot["start"],
"end": busy_slot["end"],
}
)
return AvailabilityResult(
start=start_dt.isoformat(),
end=end_dt.isoformat(),
available=len(busy_slots) == 0,
busy_slots=busy_slots,
checked_calendars=calendars,
)
def get_available_meeting_intervals(
self, start: str, end: str, calendar_ids: list[str] | None = None
) -> MeetingIntervalsResult:
start_dt = _parse_iso_datetime(start)
end_dt = _parse_iso_datetime(end)
if end_dt <= start_dt:
raise ValueError("end must be after start.")
if end_dt - start_dt > timedelta(days=90):
raise ValueError("time range cannot exceed 90 days.")
calendars = calendar_ids or ["primary"]
query_body: dict[str, Any] = {
"timeMin": start_dt.isoformat(),
"timeMax": end_dt.isoformat(),
"items": [{"id": calendar_id} for calendar_id in calendars],
}
freebusy = self.calendar_service.freebusy().query(body=query_body).execute()
calendars_payload = freebusy.get("calendars", {})
paris_tz = ZoneInfo("Europe/Paris")
start_paris = start_dt.astimezone(paris_tz)
end_paris = end_dt.astimezone(paris_tz)
merged_busy = _merge_intervals(
_collect_busy_intervals(
calendars_payload=calendars_payload,
start_bound=start_paris,
end_bound=end_paris,
timezone=paris_tz,
)
)
holiday_dates = _holiday_dates(start_paris.date(), end_paris.date())
allowed_windows = _build_allowed_windows(start_paris, end_paris, holiday_dates, paris_tz)
meeting_intervals: list[dict[str, str]] = []
for window_start, window_end in allowed_windows:
for interval_start, interval_end in _subtract_busy(
window_start=window_start,
window_end=window_end,
busy_intervals=merged_busy,
):
if interval_end - interval_start < timedelta(minutes=30):
continue
meeting_intervals.append(
{
"start": interval_start.isoformat(),
"end": interval_end.isoformat(),
}
)
return MeetingIntervalsResult(
start=start_dt.isoformat(),
end=end_dt.isoformat(),
timezone="Europe/Paris",
meeting_intervals=meeting_intervals,
checked_calendars=calendars,
)
def _parse_iso_datetime(value: str) -> datetime:
normalized = value.strip()
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
raise ValueError("datetime must include a timezone offset, for example +01:00.")
return parsed
def _collect_busy_intervals(
*,
calendars_payload: Any,
start_bound: datetime,
end_bound: datetime,
timezone: ZoneInfo,
) -> list[tuple[datetime, datetime]]:
if not isinstance(calendars_payload, dict):
return []
intervals: list[tuple[datetime, datetime]] = []
for value in calendars_payload.values():
if not isinstance(value, dict):
continue
raw_busy_slots = value.get("busy", [])
if not isinstance(raw_busy_slots, list):
continue
for raw_busy_slot in raw_busy_slots:
if not isinstance(raw_busy_slot, dict):
continue
busy_start = raw_busy_slot.get("start")
busy_end = raw_busy_slot.get("end")
if not isinstance(busy_start, str) or not isinstance(busy_end, str):
continue
start_paris = _parse_iso_datetime(busy_start).astimezone(timezone)
end_paris = _parse_iso_datetime(busy_end).astimezone(timezone)
clipped_start = max(start_paris, start_bound)
clipped_end = min(end_paris, end_bound)
if clipped_end <= clipped_start:
continue
intervals.append((clipped_start, clipped_end))
return intervals
def _merge_intervals(intervals: list[tuple[datetime, datetime]]) -> list[tuple[datetime, datetime]]:
if not intervals:
return []
sorted_intervals = sorted(intervals, key=lambda interval: (interval[0], interval[1]))
merged: list[tuple[datetime, datetime]] = [sorted_intervals[0]]
for current_start, current_end in sorted_intervals[1:]:
last_start, last_end = merged[-1]
if current_start <= last_end:
merged[-1] = (last_start, max(last_end, current_end))
continue
merged.append((current_start, current_end))
return merged
def _build_allowed_windows(
start: datetime,
end: datetime,
holiday_dates: set[date],
timezone: ZoneInfo,
) -> list[tuple[datetime, datetime]]:
windows: list[tuple[datetime, datetime]] = []
for day in _date_range(start.date(), end.date()):
if day in holiday_dates:
continue
daily_window = _daily_allowed_window(day)
if daily_window is None:
continue
raw_start, raw_end = daily_window
day_start = datetime.combine(day, raw_start, tzinfo=timezone)
day_end = datetime.combine(day, raw_end, tzinfo=timezone)
window_start = max(start, day_start)
window_end = min(end, day_end)
if window_end <= window_start:
continue
windows.append((window_start, window_end))
return windows
def _daily_allowed_window(day: date) -> tuple[time, time] | None:
weekday = day.weekday()
if weekday == 6:
return None
if weekday == 5:
return time(hour=9, minute=30), time(hour=12, minute=0)
return time(hour=8, minute=30), time(hour=21, minute=30)
def _subtract_busy(
*,
window_start: datetime,
window_end: datetime,
busy_intervals: list[tuple[datetime, datetime]],
) -> list[tuple[datetime, datetime]]:
free_intervals: list[tuple[datetime, datetime]] = []
cursor = window_start
for busy_start, busy_end in busy_intervals:
if busy_end <= cursor:
continue
if busy_start >= window_end:
break
if busy_start > cursor:
free_end = min(busy_start, window_end)
if free_end > cursor:
free_intervals.append((cursor, free_end))
cursor = max(cursor, busy_end)
if cursor >= window_end:
break
if cursor < window_end:
free_intervals.append((cursor, window_end))
return free_intervals
def _holiday_dates(start_day: date, end_day: date) -> set[date]:
years = range(start_day.year, end_day.year + 1)
france_holidays = holidays.country_holidays("FR", years=years, observed=False)
quebec_holidays = holidays.country_holidays(
"CA",
subdiv="QC",
years=years,
observed=False,
)
return set(france_holidays.keys()) | set(quebec_holidays.keys())
def _date_range(start_day: date, end_day: date) -> list[date]:
current_day = start_day
days: list[date] = []
while current_day <= end_day:
days.append(current_day)
current_day += timedelta(days=1)
return days