Fixed PyLance type errors

master
oabrivard 1 week ago
parent 39d47b5570
commit 83b1962758

@ -28,7 +28,7 @@ class CalendarAvailabilityAgent:
raise ValueError("end must be after start.")
calendars = calendar_ids or ["primary"]
query_body = {
query_body: dict[str, Any] = {
"timeMin": start_dt.isoformat(),
"timeMax": end_dt.isoformat(),
"items": [{"id": calendar_id} for calendar_id in calendars],

@ -146,11 +146,11 @@ class GmailTriageAgent:
# Hard requirement: scan only unread messages in root inbox.
base_query = (self.query or "").strip()
query_lower = base_query.lower()
required_terms = []
required_terms: list[str] = []
if "in:inbox" not in query_lower:
required_terms.append("in:inbox")
required_terms.append("in:inbox") # type: ignore
if "is:unread" not in query_lower:
required_terms.append("is:unread")
required_terms.append("is:unread") # type: ignore
if not base_query:
return " ".join(required_terms)
@ -226,15 +226,15 @@ class GmailTriageAgent:
if label == "LINKEDIN":
add_labels.insert(0, label_by_name["LinkedIn"])
remove_labels.append("INBOX")
remove_labels.append("INBOX") # type: ignore
outcome = "linkedin"
elif label == "ADVERTISING":
add_labels.insert(0, label_by_name["Advertising"])
remove_labels.append("INBOX")
remove_labels.append("INBOX") # type: ignore
outcome = "advertising"
elif label == "VEILLE_TECHNO":
add_labels.insert(0, label_by_name["VeilleTechno"])
remove_labels.append("INBOX")
remove_labels.append("INBOX") # type: ignore
outcome = "veille_techno"
else:
outcome = "skipped"

@ -3,7 +3,7 @@ import os
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.discovery import build, Resource # type: ignore
from app.config import GOOGLE_SCOPES, Settings
@ -12,37 +12,37 @@ def get_google_credentials(settings: Settings) -> Credentials:
creds = None
if os.path.exists(settings.google_token_file):
creds = Credentials.from_authorized_user_file(
creds = Credentials.from_authorized_user_file( # type: ignore
settings.google_token_file, GOOGLE_SCOPES
)
if not creds.has_scopes(GOOGLE_SCOPES):
if not creds.has_scopes(GOOGLE_SCOPES): # type: ignore
creds = None
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
if creds and creds.expired and creds.refresh_token: # type: ignore
creds.refresh(Request()) # type: ignore
else:
if not os.path.exists(settings.google_client_secrets_file):
raise FileNotFoundError(
f"Missing OAuth client file at {settings.google_client_secrets_file}. "
"Create Google OAuth desktop credentials and save the JSON at this path."
)
flow = InstalledAppFlow.from_client_secrets_file(
flow = InstalledAppFlow.from_client_secrets_file( # type: ignore
settings.google_client_secrets_file, GOOGLE_SCOPES
)
creds = flow.run_local_server(port=0)
creds = flow.run_local_server(port=0) # type: ignore
with open(settings.google_token_file, "w", encoding="utf-8") as token_file:
token_file.write(creds.to_json())
token_file.write(creds.to_json()) # type: ignore
return creds
return creds # type: ignore
def build_gmail_service(settings: Settings):
def build_gmail_service(settings: Settings) -> Resource:
creds = get_google_credentials(settings)
return build("gmail", "v1", credentials=creds, cache_discovery=False)
return build("gmail", "v1", credentials=creds, cache_discovery=False) # type: ignore
def build_calendar_service(settings: Settings):
def build_calendar_service(settings: Settings) -> Resource:
creds = get_google_credentials(settings)
return build("calendar", "v3", credentials=creds, cache_discovery=False)
return build("calendar", "v3", credentials=creds, cache_discovery=False) # type: ignore

@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Annotated
@ -25,7 +26,6 @@ settings = get_settings()
logging.basicConfig(level=getattr(logging, settings.log_level.upper(), logging.INFO))
logger = logging.getLogger("personal-agent")
app = FastAPI(title="Personal Agent", version="0.3.0")
scheduler: AsyncIOScheduler | None = None
scan_lock: asyncio.Lock | None = None
unsubscribe_lock: asyncio.Lock | None = None
@ -358,8 +358,9 @@ async def _scheduled_unsubscribe_auto() -> None:
logger.exception("Scheduled unsubscribe auto run failed")
@app.on_event("startup")
async def startup_event() -> None:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global scheduler
_get_scan_lock()
_get_unsubscribe_lock()
@ -368,26 +369,26 @@ async def startup_event() -> None:
_is_api_auth_enabled(),
)
scheduler = AsyncIOScheduler()
scheduler.add_job(
scheduler.add_job( # type: ignore
_scheduled_scan,
"interval",
minutes=settings.gmail_scan_interval_minutes,
next_run_time=datetime.now(),
)
scheduler.add_job(
scheduler.add_job( # type: ignore
_scheduled_unsubscribe_digest,
"interval",
minutes=settings.unsubscribe_digest_interval_minutes,
next_run_time=datetime.now(),
)
if settings.unsubscribe_auto_enabled:
scheduler.add_job(
scheduler.add_job( # type: ignore
_scheduled_unsubscribe_auto,
"interval",
minutes=settings.unsubscribe_auto_interval_minutes,
next_run_time=datetime.now(),
)
scheduler.start()
scheduler.start() # type: ignore
logger.info(
"Scheduler started (scan=%s min, digest=%s min, auto_unsub=%s/%s min)",
settings.gmail_scan_interval_minutes,
@ -395,12 +396,13 @@ async def startup_event() -> None:
settings.unsubscribe_auto_enabled,
settings.unsubscribe_auto_interval_minutes,
)
yield
# Shutdown
if scheduler:
scheduler.shutdown(wait=False) # type: ignore
@app.on_event("shutdown")
async def shutdown_event() -> None:
if scheduler:
scheduler.shutdown(wait=False)
app = FastAPI(title="Personal Agent", version="0.3.0", lifespan=lifespan) # type: ignore
@app.get("/health")
@ -454,7 +456,7 @@ async def availability(request: AvailabilityRequest) -> AvailabilityResponse:
start=result.start,
end=result.end,
available=result.available,
busy_slots=result.busy_slots,
busy_slots=result.busy_slots, # type: ignore
checked_calendars=result.checked_calendars,
)
except ValueError as exc:

@ -63,7 +63,7 @@ class StrandsEmailClassifier:
precedence: str,
message_label_ids: set[str],
) -> EmailClassification:
prompt_payload = {
prompt_payload: dict[str, object] = {
"sender": sender,
"subject": subject,
"snippet": snippet,
@ -74,7 +74,7 @@ class StrandsEmailClassifier:
}
response = self._invoke_agent_with_temperature_fallback(prompt_payload)
parsed = _parse_json(str(response))
parsed: dict[str, object] = _parse_json(str(response))
label = str(parsed.get("label", "OTHER")).upper().strip()
if label not in ALLOWED_LABELS:
logger.warning("Unexpected Strands label '%s', falling back to OTHER.", label)
@ -84,7 +84,7 @@ class StrandsEmailClassifier:
reason = str(parsed.get("reason", "")).strip()
return EmailClassification(label=label, confidence=confidence, reason=reason)
def _invoke_agent_with_temperature_fallback(self, prompt_payload: dict) -> object:
def _invoke_agent_with_temperature_fallback(self, prompt_payload: dict[str, object]) -> object:
prompt = json.dumps(prompt_payload, ensure_ascii=True)
try:
return self.agent(prompt)
@ -101,7 +101,7 @@ class StrandsEmailClassifier:
raise
def _build_agent(self, *, include_temperature: bool) -> Agent:
client_args = {"api_key": self._api_key, "timeout": self._timeout_seconds}
client_args: dict[str, object] = {"api_key": self._api_key, "timeout": self._timeout_seconds}
if self._base_url:
client_args["base_url"] = self._base_url
@ -109,35 +109,35 @@ class StrandsEmailClassifier:
if include_temperature:
params = {"temperature": self._temperature}
model_kwargs = {
model_kwargs: dict[str, object] = {
"client_args": client_args,
"model_id": self._model_id,
}
if params is not None:
model_kwargs["params"] = params
model = OpenAIModel(**model_kwargs)
model = OpenAIModel(**model_kwargs) # type: ignore
return Agent(model=model, system_prompt=SYSTEM_PROMPT)
def _parse_json(content: str) -> dict:
def _parse_json(content: str) -> dict[str, object]:
if not content:
return {}
return {}
try:
return json.loads(content)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", content, re.DOTALL)
if not match:
return {}
return {}
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
return {}
return {}
def _to_confidence(raw_value: object) -> float:
try:
confidence = float(raw_value)
confidence = float(raw_value) # type: ignore
except (TypeError, ValueError):
return 0.0
if confidence < 0:

@ -0,0 +1,10 @@
"""
This type stub file was generated by pyright.
"""
import importlib.metadata as importlib_metadata
import sys
release = ...
version_info = ...
__version__ = ...

@ -0,0 +1,76 @@
"""
This type stub file was generated by pyright.
"""
__all__ = ("EVENT_ALL", "EVENT_ALL_JOBS_REMOVED", "EVENT_EXECUTOR_ADDED", "EVENT_EXECUTOR_REMOVED", "EVENT_JOBSTORE_ADDED", "EVENT_JOBSTORE_REMOVED", "EVENT_JOB_ADDED", "EVENT_JOB_ERROR", "EVENT_JOB_EXECUTED", "EVENT_JOB_MAX_INSTANCES", "EVENT_JOB_MISSED", "EVENT_JOB_MODIFIED", "EVENT_JOB_REMOVED", "EVENT_JOB_SUBMITTED", "EVENT_SCHEDULER_PAUSED", "EVENT_SCHEDULER_RESUMED", "EVENT_SCHEDULER_SHUTDOWN", "EVENT_SCHEDULER_STARTED", "JobEvent", "JobExecutionEvent", "JobSubmissionEvent", "SchedulerEvent")
EVENT_SCHEDULER_START = ...
EVENT_SCHEDULER_SHUTDOWN = ...
EVENT_SCHEDULER_PAUSED = ...
EVENT_SCHEDULER_RESUMED = ...
EVENT_EXECUTOR_ADDED = ...
EVENT_EXECUTOR_REMOVED = ...
EVENT_JOBSTORE_ADDED = ...
EVENT_JOBSTORE_REMOVED = ...
EVENT_ALL_JOBS_REMOVED = ...
EVENT_JOB_ADDED = ...
EVENT_JOB_REMOVED = ...
EVENT_JOB_MODIFIED = ...
EVENT_JOB_EXECUTED = ...
EVENT_JOB_ERROR = ...
EVENT_JOB_MISSED = ...
EVENT_JOB_SUBMITTED = ...
EVENT_JOB_MAX_INSTANCES = ...
EVENT_ALL = ...
class SchedulerEvent:
"""
An event that concerns the scheduler itself.
:ivar code: the type code of this event
:ivar alias: alias of the job store or executor that was added or removed (if applicable)
"""
def __init__(self, code, alias=...) -> None:
...
def __repr__(self): # -> str:
...
class JobEvent(SchedulerEvent):
"""
An event that concerns a job.
:ivar code: the type code of this event
:ivar job_id: identifier of the job in question
:ivar jobstore: alias of the job store containing the job in question
"""
def __init__(self, code, job_id, jobstore) -> None:
...
class JobSubmissionEvent(JobEvent):
"""
An event that concerns the submission of a job to its executor.
:ivar scheduled_run_times: a list of datetimes when the job was intended to run
"""
def __init__(self, code, job_id, jobstore, scheduled_run_times) -> None:
...
class JobExecutionEvent(JobEvent):
"""
An event that concerns the running of a job within its executor.
:ivar scheduled_run_time: the time when the job was scheduled to be run
:ivar retval: the return value of the successfully executed job
:ivar exception: the exception raised by the job
:ivar traceback: a formatted traceback for the exception
"""
def __init__(self, code, job_id, jobstore, scheduled_run_time, retval=..., exception=..., traceback=...) -> None:
...

@ -0,0 +1,24 @@
"""
This type stub file was generated by pyright.
"""
from apscheduler.executors.base import BaseExecutor
class AsyncIOExecutor(BaseExecutor):
"""
Runs jobs in the default executor of the event loop.
If the job function is a native coroutine function, it is scheduled to be run directly in the
event loop as soon as possible. All other functions are run in the event loop's default
executor which is usually a thread pool.
Plugin alias: ``asyncio``
"""
def start(self, scheduler, alias): # -> None:
...
def shutdown(self, wait=...): # -> None:
...

@ -0,0 +1,68 @@
"""
This type stub file was generated by pyright.
"""
from abc import ABCMeta
class MaxInstancesReachedError(Exception):
def __init__(self, job) -> None:
...
class BaseExecutor(metaclass=ABCMeta):
"""Abstract base class that defines the interface that every executor must implement."""
_scheduler = ...
_lock = ...
_logger = ...
def __init__(self) -> None:
...
def start(self, scheduler, alias): # -> None:
"""
Called by the scheduler when the scheduler is being started or when the executor is being
added to an already running scheduler.
:param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting
this executor
:param str|unicode alias: alias of this executor as it was assigned to the scheduler
"""
...
def shutdown(self, wait=...): # -> None:
"""
Shuts down this executor.
:param bool wait: ``True`` to wait until all submitted jobs
have been executed
"""
...
def submit_job(self, job, run_times): # -> None:
"""
Submits job for execution.
:param Job job: job to execute
:param list[datetime] run_times: list of datetimes specifying
when the job should have been run
:raises MaxInstancesReachedError: if the maximum number of
allowed instances for this job has been reached
"""
...
def run_job(job, jobstore_alias, run_times, logger_name): # -> list[Any]:
"""
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
scheduler.
"""
...
async def run_coroutine_job(job, jobstore_alias, run_times, logger_name): # -> list[Any]:
"""Coroutine version of run_job()."""
...

@ -0,0 +1,47 @@
"""
This type stub file was generated by pyright.
"""
from abc import abstractmethod
from apscheduler.executors.base import BaseExecutor
class BasePoolExecutor(BaseExecutor):
@abstractmethod
def __init__(self, pool) -> None:
...
def shutdown(self, wait=...): # -> None:
...
class ThreadPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures thread pool.
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ThreadPoolExecutor constructor
"""
def __init__(self, max_workers=..., pool_kwargs=...) -> None:
...
class ProcessPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures process pool.
Plugin alias: ``processpool``
:param max_workers: the maximum number of spawned processes.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ProcessPoolExecutor constructor
"""
def __init__(self, max_workers=..., pool_kwargs=...) -> None:
...

@ -0,0 +1,113 @@
"""
This type stub file was generated by pyright.
"""
UTC = ...
class Job:
"""
Contains the options given when scheduling callables and its current schedule and other state.
This class should never be instantiated by the user.
:var str id: the unique identifier of this job
:var str name: the description of this job
:var func: the callable to execute
:var tuple|list args: positional arguments to the callable
:var dict kwargs: keyword arguments to the callable
:var bool coalesce: whether to only run the job once when several run times are due
:var trigger: the trigger object that controls the schedule of this job
:var str executor: the name of the executor that will run this job
:var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to
be late (``None`` means "allow the job to run no matter how late it is")
:var int max_instances: the maximum number of concurrently executing instances allowed for this
job
:var datetime.datetime next_run_time: the next scheduled run time of this job
.. note::
The ``misfire_grace_time`` has some non-obvious effects on job execution. See the
:ref:`missed-job-executions` section in the documentation for an in-depth explanation.
"""
__slots__ = ...
def __init__(self, scheduler, id=..., **kwargs) -> None:
...
def modify(self, **changes): # -> Self:
"""
Makes the given changes to this job and saves it in the associated job store.
Accepted keyword arguments are the same as the variables on this class.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.modify_job`
:return Job: this job instance
"""
...
def reschedule(self, trigger, **trigger_args): # -> Self:
"""
Shortcut for switching the trigger on this job.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.reschedule_job`
:return Job: this job instance
"""
...
def pause(self): # -> Self:
"""
Temporarily suspend the execution of this job.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`
:return Job: this job instance
"""
...
def resume(self): # -> Self:
"""
Resume the schedule of this job if previously paused.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.resume_job`
:return Job: this job instance
"""
...
def remove(self): # -> None:
"""
Unschedules this job and removes it from its associated job store.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.remove_job`
"""
...
@property
def pending(self): # -> bool:
"""
Returns ``True`` if the referenced job is still waiting to be added to its designated job
store.
"""
...
def __getstate__(self): # -> dict[str, Any]:
...
def __setstate__(self, state): # -> None:
...
def __eq__(self, other) -> bool:
...
def __repr__(self): # -> str:
...
def __str__(self) -> str:
...

@ -0,0 +1,138 @@
"""
This type stub file was generated by pyright.
"""
from abc import ABCMeta, abstractmethod
class JobLookupError(KeyError):
"""Raised when the job store cannot find a job for update or removal."""
def __init__(self, job_id) -> None:
...
class ConflictingIdError(KeyError):
"""Raised when the uniqueness of job IDs is being violated."""
def __init__(self, job_id) -> None:
...
class TransientJobError(ValueError):
"""
Raised when an attempt to add transient (with no func_ref) job to a persistent job store is
detected.
"""
def __init__(self, job_id) -> None:
...
class BaseJobStore(metaclass=ABCMeta):
"""Abstract base class that defines the interface that every job store must implement."""
_scheduler = ...
_alias = ...
_logger = ...
def start(self, scheduler, alias): # -> None:
"""
Called by the scheduler when the scheduler is being started or when the job store is being
added to an already running scheduler.
:param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting
this job store
:param str|unicode alias: alias of this job store as it was assigned to the scheduler
"""
...
def shutdown(self): # -> None:
"""Frees any resources still bound to this job store."""
...
@abstractmethod
def lookup_job(self, job_id): # -> None:
"""
Returns a specific job, or ``None`` if it isn't found..
The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of
the returned job to point to the scheduler and itself, respectively.
:param str|unicode job_id: identifier of the job
:rtype: Job
"""
...
@abstractmethod
def get_due_jobs(self, now): # -> None:
"""
Returns the list of jobs that have ``next_run_time`` earlier or equal to ``now``.
The returned jobs must be sorted by next run time (ascending).
:param datetime.datetime now: the current (timezone aware) datetime
:rtype: list[Job]
"""
...
@abstractmethod
def get_next_run_time(self): # -> None:
"""
Returns the earliest run time of all the jobs stored in this job store, or ``None`` if
there are no active jobs.
:rtype: datetime.datetime
"""
...
@abstractmethod
def get_all_jobs(self): # -> None:
"""
Returns a list of all jobs in this job store.
The returned jobs should be sorted by next run time (ascending).
Paused jobs (next_run_time == None) should be sorted last.
The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of
the returned jobs to point to the scheduler and itself, respectively.
:rtype: list[Job]
"""
...
@abstractmethod
def add_job(self, job): # -> None:
"""
Adds the given job to this store.
:param Job job: the job to add
:raises ConflictingIdError: if there is another job in this store with the same ID
"""
...
@abstractmethod
def update_job(self, job): # -> None:
"""
Replaces the job in the store with the given newer version.
:param Job job: the job to update
:raises JobLookupError: if the job does not exist
"""
...
@abstractmethod
def remove_job(self, job_id): # -> None:
"""
Removes the given job from this store.
:param str|unicode job_id: identifier of the job
:raises JobLookupError: if the job does not exist
"""
...
@abstractmethod
def remove_all_jobs(self): # -> None:
"""Removes all jobs from this store."""
...
def __repr__(self): # -> str:
...

@ -0,0 +1,44 @@
"""
This type stub file was generated by pyright.
"""
from apscheduler.jobstores.base import BaseJobStore
class MemoryJobStore(BaseJobStore):
"""
Stores jobs in an array in RAM. Provides no persistence support.
Plugin alias: ``memory``
"""
def __init__(self) -> None:
...
def lookup_job(self, job_id):
...
def get_due_jobs(self, now): # -> list[Any]:
...
def get_next_run_time(self): # -> None:
...
def get_all_jobs(self): # -> list[Any]:
...
def add_job(self, job): # -> None:
...
def update_job(self, job): # -> None:
...
def remove_job(self, job_id): # -> None:
...
def remove_all_jobs(self): # -> None:
...
def shutdown(self): # -> None:
...

@ -0,0 +1,18 @@
"""
This type stub file was generated by pyright.
"""
class SchedulerAlreadyRunningError(Exception):
"""Raised when attempting to start or configure the scheduler when it's already running."""
def __str__(self) -> str:
...
class SchedulerNotRunningError(Exception):
"""Raised when attempting to shutdown the scheduler when it's not running."""
def __str__(self) -> str:
...

@ -0,0 +1,35 @@
"""
This type stub file was generated by pyright.
"""
from apscheduler.schedulers.base import BaseScheduler
def run_in_event_loop(func): # -> _Wrapped[Callable[..., Any], Any, Callable[..., Any], None]:
...
class AsyncIOScheduler(BaseScheduler):
"""
A scheduler that runs on an asyncio (:pep:`3156`) event loop.
The default executor can run jobs based on native coroutines (``async def``).
Extra options:
============== =============================================================
``event_loop`` AsyncIO event loop to use (defaults to the global event loop)
============== =============================================================
"""
_eventloop = ...
_timeout = ...
def start(self, paused=...): # -> None:
...
def shutdown(self, wait=...): # -> None:
...
@run_in_event_loop
def wakeup(self): # -> None:
...

@ -0,0 +1,396 @@
"""
This type stub file was generated by pyright.
"""
import sys
from abc import ABCMeta, abstractmethod
STATE_STOPPED = ...
STATE_RUNNING = ...
STATE_PAUSED = ...
class BaseScheduler(metaclass=ABCMeta):
"""
Abstract base class for all schedulers.
Takes the following keyword arguments:
:param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to
apscheduler.scheduler)
:param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone)
:param int|float jobstore_retry_interval: the minimum number of seconds to wait between
retries in the scheduler's main loop if the job store raises an exception when getting
the list of due jobs
:param dict job_defaults: default values for newly added jobs
:param dict jobstores: a dictionary of job store alias -> job store instance or configuration
dict
:param dict executors: a dictionary of executor alias -> executor instance or configuration
dict
:ivar int state: current running state of the scheduler (one of the following constants from
``apscheduler.schedulers.base``: ``STATE_STOPPED``, ``STATE_RUNNING``, ``STATE_PAUSED``)
.. seealso:: :ref:`scheduler-config`
"""
if (3, 8) <= sys.version_info < (3, 10):
_trigger_plugins = ...
_executor_plugins = ...
_jobstore_plugins = ...
else:
_trigger_plugins = ...
_executor_plugins = ...
_jobstore_plugins = ...
_trigger_classes = ...
_executor_classes = ...
_jobstore_classes = ...
def __init__(self, gconfig=..., **options) -> None:
...
def __getstate__(self):
...
def configure(self, gconfig=..., prefix=..., **options): # -> None:
"""
Reconfigures the scheduler with the given options.
Can only be done when the scheduler isn't running.
:param dict gconfig: a "global" configuration dictionary whose values can be overridden by
keyword arguments to this method
:param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with
this string (pass an empty string or ``None`` to use all keys)
:raises SchedulerAlreadyRunningError: if the scheduler is already running
"""
...
def start(self, paused=...): # -> None:
"""
Start the configured executors and job stores and begin processing scheduled jobs.
:param bool paused: if ``True``, don't start job processing until :meth:`resume` is called
:raises SchedulerAlreadyRunningError: if the scheduler is already running
:raises RuntimeError: if running under uWSGI with threads disabled
"""
...
@abstractmethod
def shutdown(self, wait=...): # -> None:
"""
Shuts down the scheduler, along with its executors and job stores.
Does not interrupt any currently running jobs.
:param bool wait: ``True`` to wait until all currently executing jobs have finished
:raises SchedulerNotRunningError: if the scheduler has not been started yet
"""
...
def pause(self): # -> None:
"""
Pause job processing in the scheduler.
This will prevent the scheduler from waking up to do job processing until :meth:`resume`
is called. It will not however stop any already running job processing.
"""
...
def resume(self): # -> None:
"""Resume job processing in the scheduler."""
...
@property
def running(self): # -> bool:
"""
Return ``True`` if the scheduler has been started.
This is a shortcut for ``scheduler.state != STATE_STOPPED``.
"""
...
def add_executor(self, executor, alias=..., **executor_opts): # -> None:
"""
Adds an executor to this scheduler.
Any extra keyword arguments will be passed to the executor plugin's constructor, assuming
that the first argument is the name of an executor plugin.
:param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor
instance or the name of an executor plugin
:param str|unicode alias: alias for the scheduler
:raises ValueError: if there is already an executor by the given alias
"""
...
def remove_executor(self, alias, shutdown=...): # -> None:
"""
Removes the executor by the given alias from this scheduler.
:param str|unicode alias: alias of the executor
:param bool shutdown: ``True`` to shut down the executor after
removing it
"""
...
def add_jobstore(self, jobstore, alias=..., **jobstore_opts): # -> None:
"""
Adds a job store to this scheduler.
Any extra keyword arguments will be passed to the job store plugin's constructor, assuming
that the first argument is the name of a job store plugin.
:param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
:param str|unicode alias: alias for the job store
:raises ValueError: if there is already a job store by the given alias
"""
...
def remove_jobstore(self, alias, shutdown=...): # -> None:
"""
Removes the job store by the given alias from this scheduler.
:param str|unicode alias: alias of the job store
:param bool shutdown: ``True`` to shut down the job store after removing it
"""
...
def add_listener(self, callback, mask=...): # -> None:
"""
add_listener(callback, mask=EVENT_ALL)
Adds a listener for scheduler events.
When a matching event occurs, ``callback`` is executed with the event object as its
sole argument. If the ``mask`` parameter is not provided, the callback will receive events
of all types.
:param callback: any callable that takes one argument
:param int mask: bitmask that indicates which events should be
listened to
.. seealso:: :mod:`apscheduler.events`
.. seealso:: :ref:`scheduler-events`
"""
...
def remove_listener(self, callback): # -> None:
"""Removes a previously added event listener."""
...
def add_job(self, func, trigger=..., args=..., kwargs=..., id=..., name=..., misfire_grace_time=..., coalesce=..., max_instances=..., next_run_time=..., jobstore=..., executor=..., replace_existing=..., **trigger_args): # -> Job:
"""
add_job(func, trigger=None, args=None, kwargs=None, id=None, \
name=None, misfire_grace_time=undefined, coalesce=undefined, \
max_instances=undefined, next_run_time=undefined, \
jobstore='default', executor='default', \
replace_existing=False, **trigger_args)
Adds the given job to the job list and wakes up the scheduler if it's already running.
Any option that defaults to ``undefined`` will be replaced with the corresponding default
value when the job is scheduled (which happens when the scheduler is started, or
immediately if the scheduler is already running).
The ``func`` argument can be given either as a callable object or a textual reference in
the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
importable module and the second half is a reference to the callable object, relative to
the module.
The ``trigger`` argument can either be:
#. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
any extra keyword arguments to this method are passed on to the trigger's constructor
#. an instance of a trigger class
:param func: callable (or a textual reference to one) to run at the given time
:param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
``func`` is called
:param list|tuple args: list of positional arguments to call func with
:param dict kwargs: dict of keyword arguments to call func with
:param str|unicode id: explicit identifier for the job (for modifying it later)
:param str|unicode name: textual description of the job
:param int misfire_grace_time: seconds after the designated runtime that the job is still
allowed to be run (or ``None`` to allow the job to run no matter how late it is)
:param bool coalesce: run once instead of many times if the scheduler determines that the
job should be run more than once in succession
:param int max_instances: maximum number of concurrently running instances allowed for this
job
:param datetime next_run_time: when to first run the job, regardless of the trigger (pass
``None`` to add the job as paused)
:param str|unicode jobstore: alias of the job store to store the job in
:param str|unicode executor: alias of the executor to run the job with
:param bool replace_existing: ``True`` to replace an existing job with the same ``id``
(but retain the number of runs from the existing one)
:rtype: Job
"""
...
def scheduled_job(self, trigger, args=..., kwargs=..., id=..., name=..., misfire_grace_time=..., coalesce=..., max_instances=..., next_run_time=..., jobstore=..., executor=..., **trigger_args): # -> Callable[..., Any]:
"""
scheduled_job(trigger, args=None, kwargs=None, id=None, \
name=None, misfire_grace_time=undefined, \
coalesce=undefined, max_instances=undefined, \
next_run_time=undefined, jobstore='default', \
executor='default',**trigger_args)
A decorator version of :meth:`add_job`, except that ``replace_existing`` is always
``True``.
.. important:: The ``id`` argument must be given if scheduling a job in a persistent job
store. The scheduler cannot, however, enforce this requirement.
"""
...
def modify_job(self, job_id, jobstore=..., **changes):
"""
Modifies the properties of a single job.
Modifications are passed to this method as extra keyword arguments.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:return Job: the relevant job instance
"""
...
def reschedule_job(self, job_id, jobstore=..., trigger=..., **trigger_args):
"""
Constructs a new trigger for a job and updates its next run time.
Extra keyword arguments are passed directly to the trigger's constructor.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:param trigger: alias of the trigger type or a trigger instance
:return Job: the relevant job instance
"""
...
def pause_job(self, job_id, jobstore=...):
"""
Causes the given job not to be executed until it is explicitly resumed.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:return Job: the relevant job instance
"""
...
def resume_job(self, job_id, jobstore=...): # -> None:
"""
Resumes the schedule of the given job, or removes the job if its schedule is finished.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:return Job|None: the relevant job instance if the job was rescheduled, or ``None`` if no
next run time could be calculated and the job was removed
"""
...
def get_jobs(self, jobstore=..., pending=...): # -> list[Any]:
"""
Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled
jobs, either from a specific job store or from all of them.
If the scheduler has not been started yet, only pending jobs can be returned because the
job stores haven't been started yet either.
:param str|unicode jobstore: alias of the job store
:param bool pending: **DEPRECATED**
:rtype: list[Job]
"""
...
def get_job(self, job_id, jobstore=...): # -> None:
"""
Returns the Job that matches the given ``job_id``.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that most likely contains the job
:return: the Job by the given ID, or ``None`` if it wasn't found
:rtype: Job
"""
...
def remove_job(self, job_id, jobstore=...): # -> None:
"""
Removes a job, preventing it from being run any more.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:raises JobLookupError: if the job was not found
"""
...
def remove_all_jobs(self, jobstore=...): # -> None:
"""
Removes all jobs from the specified job store, or all job stores if none is given.
:param str|unicode jobstore: alias of the job store
"""
...
def print_jobs(self, jobstore=..., out=...): # -> None:
"""
print_jobs(jobstore=None, out=sys.stdout)
Prints out a textual listing of all jobs currently scheduled on either all job stores or
just a specific one.
:param str|unicode jobstore: alias of the job store, ``None`` to list jobs from all stores
:param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is
given)
"""
...
def export_jobs(self, outfile, jobstore=...): # -> None:
"""
Export stored jobs as JSON.
:param outfile: either a file object opened in text write mode ("w"), or a path
to the target file
:param jobstore: alias of the job store to export jobs from (if omitted, export
from all configured job stores)
"""
...
def import_jobs(self, infile, jobstore=...): # -> None:
"""
Import jobs previously exported via :meth:`export_jobs.
:param infile: either a file object opened in text read mode ("r") or a path to
a JSON file containing previously exported jobs
:param jobstore: the alias of the job store to import the jobs to
"""
...
@abstractmethod
def wakeup(self): # -> None:
"""
Notifies the scheduler that there may be jobs due for execution.
Triggers :meth:`_process_jobs` to be run in an implementation specific manner.
"""
...

@ -0,0 +1,22 @@
"""
This type stub file was generated by pyright.
"""
from abc import ABCMeta, abstractmethod
class BaseTrigger(metaclass=ABCMeta):
"""Abstract base class that defines the interface that every trigger must implement."""
__slots__ = ...
@abstractmethod
def get_next_fire_time(self, previous_fire_time, now): # -> None:
"""
Returns the next datetime to fire on, If no such datetime can be calculated, returns
``None``.
:param datetime.datetime previous_fire_time: the previous time the trigger was fired
:param datetime.datetime now: current datetime
"""
...

@ -0,0 +1,201 @@
"""
This type stub file was generated by pyright.
"""
import sys
from datetime import datetime, timedelta, tzinfo
"""This module contains several handy functions primarily meant for internal use."""
__all__ = ("asbool", "asint", "astimezone", "check_callable_args", "convert_to_datetime", "datetime_ceil", "datetime_to_utc_timestamp", "get_callable_name", "localize", "maybe_ref", "normalize", "obj_to_ref", "ref_to_obj", "undefined", "utc_timestamp_to_datetime")
if sys.version_info < (3, 14):
...
else:
...
if sys.version_info < (3, 9):
...
else:
...
UTC = ...
class _Undefined:
def __nonzero__(self): # -> Literal[False]:
...
def __bool__(self): # -> Literal[False]:
...
def __repr__(self): # -> Literal['<undefined>']:
...
undefined = ...
def asint(text): # -> int | None:
"""
Safely converts a string to an integer, returning ``None`` if the string is ``None``.
:type text: str
:rtype: int
"""
...
def asbool(obj): # -> bool:
"""
Interprets an object as a boolean value.
:rtype: bool
"""
...
def astimezone(obj): # -> timezone | ZoneInfo | tzinfo | None:
"""
Interprets an object as a timezone.
:rtype: tzinfo
"""
...
def asdate(obj): # -> date:
...
_DATE_REGEX = ...
def convert_to_datetime(input, tz, arg_name): # -> datetime | None:
"""
Converts the given object to a timezone aware datetime object.
If a timezone aware datetime object is passed, it is returned unmodified.
If a native datetime object is passed, it is given the specified timezone.
If the input is a string, it is parsed as a datetime with the given timezone.
Date strings are accepted in three different forms: date only (Y-m-d), date with
time (Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro).
Additionally you can override the time zone by giving a specific offset in the
format specified by ISO 8601: Z (UTC), +HH:MM or -HH:MM.
:param str|datetime input: the datetime or string to convert to a timezone aware
datetime
:param datetime.tzinfo tz: timezone to interpret ``input`` in
:param str arg_name: the name of the argument (used in an error message)
:rtype: datetime
"""
...
def datetime_to_utc_timestamp(timeval): # -> None:
"""
Converts a datetime instance to a timestamp.
:type timeval: datetime
:rtype: float
"""
...
def utc_timestamp_to_datetime(timestamp): # -> datetime | None:
"""
Converts the given timestamp to a datetime instance.
:type timestamp: float
:rtype: datetime
"""
...
def timedelta_seconds(delta):
"""
Converts the given timedelta to seconds.
:type delta: timedelta
:rtype: float
"""
...
def datetime_ceil(dateval): # -> datetime:
"""
Rounds the given datetime object upwards.
:type dateval: datetime
"""
...
def datetime_utc_add(dateval: datetime, tdelta: timedelta) -> datetime:
"""
Adds an timedelta to a datetime in UTC for correct datetime arithmetic across
Daylight Saving Time changes
:param dateval: The date to add to
:type dateval: datetime
:param operand: The timedelta to add to the datetime
:type operand: timedelta
:return: The sum of the datetime and the timedelta
:rtype: datetime
"""
...
def datetime_repr(dateval): # -> Literal['None']:
...
def timezone_repr(timezone: tzinfo) -> str:
...
def get_callable_name(func): # -> str:
"""
Returns the best available display name for the given function/callable.
:rtype: str
"""
...
def obj_to_ref(obj): # -> str:
"""
Returns the path to the given callable.
:rtype: str
:raises TypeError: if the given object is not callable
:raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested
function
"""
...
def ref_to_obj(ref): # -> Any:
"""
Returns the object pointed to by ``ref``.
:type ref: str
"""
...
def maybe_ref(ref): # -> Any:
"""
Returns the object that the given reference points to, if it is indeed a reference.
If it is not a reference, the object is returned as-is.
"""
...
def check_callable_args(func, args, kwargs): # -> None:
"""
Ensures that the given callable can be called with the given arguments.
:type args: tuple
:type kwargs: dict
"""
...
def iscoroutinefunction_partial(f): # -> bool:
...
def normalize(dt): # -> datetime:
...
def localize(dt, tzinfo): # -> datetime:
...

@ -0,0 +1,12 @@
"""
This type stub file was generated by pyright.
"""
from .interactive import get_user_credentials
"""oauthlib integration for Google Auth
This library provides `oauthlib <https://oauthlib.readthedocs.io/>`__
integration with `google-auth <https://google-auth.readthedocs.io/>`__.
"""
__all__ = ["get_user_credentials"]

@ -0,0 +1,357 @@
"""
This type stub file was generated by pyright.
"""
import wsgiref.simple_server
"""OAuth 2.0 Authorization Flow
This module provides integration with `requests-oauthlib`_ for running the
`OAuth 2.0 Authorization Flow`_ and acquiring user credentials. See
`Using OAuth 2.0 to Access Google APIs`_ for an overview of OAuth 2.0
authorization scenarios Google APIs support.
Here's an example of using :class:`InstalledAppFlow`::
from google_auth_oauthlib.flow import InstalledAppFlow
# Create the flow using the client secrets file from the Google API
# Console.
flow = InstalledAppFlow.from_client_secrets_file(
'client_secrets.json',
scopes=['profile', 'email'])
flow.run_local_server()
# You can use flow.credentials, or you can just get a requests session
# using flow.authorized_session.
session = flow.authorized_session()
profile_info = session.get(
'https://www.googleapis.com/userinfo/v2/me').json()
print(profile_info)
# {'name': '...', 'email': '...', ...}
.. _requests-oauthlib: http://requests-oauthlib.readthedocs.io/en/latest/
.. _OAuth 2.0 Authorization Flow:
https://tools.ietf.org/html/rfc6749#section-1.2
.. _Using OAuth 2.0 to Access Google APIs:
https://developers.google.com/identity/protocols/oauth2
"""
_LOGGER = ...
class Flow:
"""OAuth 2.0 Authorization Flow
This class uses a :class:`requests_oauthlib.OAuth2Session` instance at
:attr:`oauth2session` to perform all of the OAuth 2.0 logic. This class
just provides convenience methods and sane defaults for doing Google's
particular flavors of OAuth 2.0.
Typically you'll construct an instance of this flow using
:meth:`from_client_secrets_file` and a `client secrets file`_ obtained
from the `Google API Console`_.
.. _client secrets file:
https://developers.google.com/identity/protocols/oauth2/web-server
#creatingcred
.. _Google API Console:
https://console.developers.google.com/apis/credentials
"""
def __init__(self, oauth2session, client_type, client_config, redirect_uri=..., code_verifier=..., autogenerate_code_verifier=...) -> None:
"""
Args:
oauth2session (requests_oauthlib.OAuth2Session):
The OAuth 2.0 session from ``requests-oauthlib``.
client_type (str): The client type, either ``web`` or
``installed``.
client_config (Mapping[str, Any]): The client
configuration in the Google `client secrets`_ format.
redirect_uri (str): The OAuth 2.0 redirect URI if known at flow
creation time. Otherwise, it will need to be set using
:attr:`redirect_uri`.
code_verifier (str): random string of 43-128 chars used to verify
the key exchange.using PKCE.
autogenerate_code_verifier (bool): If true, auto-generate a
code_verifier.
.. _client secrets:
https://github.com/googleapis/google-api-python-client/blob
/main/docs/client-secrets.md
"""
...
@classmethod
def from_client_config(cls, client_config, scopes, **kwargs): # -> Self:
"""Creates a :class:`requests_oauthlib.OAuth2Session` from client
configuration loaded from a Google-format client secrets file.
Args:
client_config (Mapping[str, Any]): The client
configuration in the Google `client secrets`_ format.
scopes (Sequence[str]): The list of scopes to request during the
flow.
kwargs: Any additional parameters passed to
:class:`requests_oauthlib.OAuth2Session`
Returns:
Flow: The constructed Flow instance.
Raises:
ValueError: If the client configuration is not in the correct
format.
.. _client secrets:
https://github.com/googleapis/google-api-python-client/blob/main/docs/client-secrets.md
"""
...
@classmethod
def from_client_secrets_file(cls, client_secrets_file, scopes, **kwargs): # -> Self:
"""Creates a :class:`Flow` instance from a Google client secrets file.
Args:
client_secrets_file (str): The path to the client secrets .json
file.
scopes (Sequence[str]): The list of scopes to request during the
flow.
kwargs: Any additional parameters passed to
:class:`requests_oauthlib.OAuth2Session`
Returns:
Flow: The constructed Flow instance.
"""
...
@property
def redirect_uri(self):
"""The OAuth 2.0 redirect URI. Pass-through to
``self.oauth2session.redirect_uri``."""
...
@redirect_uri.setter
def redirect_uri(self, value): # -> None:
"""The OAuth 2.0 redirect URI. Pass-through to
``self.oauth2session.redirect_uri``."""
...
def authorization_url(self, **kwargs): # -> tuple[Any, Any]:
"""Generates an authorization URL.
This is the first step in the OAuth 2.0 Authorization Flow. The user's
browser should be redirected to the returned URL.
This method calls
:meth:`requests_oauthlib.OAuth2Session.authorization_url`
and specifies the client configuration's authorization URI (usually
Google's authorization server) and specifies that "offline" access is
desired. This is required in order to obtain a refresh token.
Args:
kwargs: Additional arguments passed through to
:meth:`requests_oauthlib.OAuth2Session.authorization_url`
Returns:
Tuple[str, str]: The generated authorization URL and state. The
user must visit the URL to complete the flow. The state is used
when completing the flow to verify that the request originated
from your application. If your application is using a different
:class:`Flow` instance to obtain the token, you will need to
specify the ``state`` when constructing the :class:`Flow`.
"""
...
def fetch_token(self, **kwargs):
"""Completes the Authorization Flow and obtains an access token.
This is the final step in the OAuth 2.0 Authorization Flow. This is
called after the user consents.
This method calls
:meth:`requests_oauthlib.OAuth2Session.fetch_token`
and specifies the client configuration's token URI (usually Google's
token server).
Args:
kwargs: Arguments passed through to
:meth:`requests_oauthlib.OAuth2Session.fetch_token`. At least
one of ``code`` or ``authorization_response`` must be
specified.
Returns:
Mapping[str, str]: The obtained tokens. Typically, you will not use
return value of this function and instead use
:meth:`credentials` to obtain a
:class:`~google.auth.credentials.Credentials` instance.
"""
...
@property
def credentials(self): # -> google.auth.external_account_authorized_user.Credentials | google.oauth2.credentials.Credentials:
"""Returns credentials from the OAuth 2.0 session.
:meth:`fetch_token` must be called before accessing this. This method
constructs a :class:`google.oauth2.credentials.Credentials` class using
the session's token and the client config.
Returns:
google.oauth2.credentials.Credentials: The constructed credentials.
Raises:
ValueError: If there is no access token in the session.
"""
...
def authorized_session(self): # -> AuthorizedSession:
"""Returns a :class:`requests.Session` authorized with credentials.
:meth:`fetch_token` must be called before this method. This method
constructs a :class:`google.auth.transport.requests.AuthorizedSession`
class using this flow's :attr:`credentials`.
Returns:
google.auth.transport.requests.AuthorizedSession: The constructed
session.
"""
...
class InstalledAppFlow(Flow):
"""Authorization flow helper for installed applications.
This :class:`Flow` subclass makes it easier to perform the
`Installed Application Authorization Flow`_. This flow is useful for
local development or applications that are installed on a desktop operating
system.
This flow uses a local server strategy provided by :meth:`run_local_server`.
Example::
from google_auth_oauthlib.flow import InstalledAppFlow
flow = InstalledAppFlow.from_client_secrets_file(
'client_secrets.json',
scopes=['profile', 'email'])
flow.run_local_server()
session = flow.authorized_session()
profile_info = session.get(
'https://www.googleapis.com/userinfo/v2/me').json()
print(profile_info)
# {'name': '...', 'email': '...', ...}
Note that this isn't the only way to accomplish the installed
application flow, just one of the most common. You can use the
:class:`Flow` class to perform the same flow with different methods of
presenting the authorization URL to the user or obtaining the authorization
response, such as using an embedded web view.
.. _Installed Application Authorization Flow:
https://github.com/googleapis/google-api-python-client/blob/main/docs/oauth-installed.md
"""
_DEFAULT_AUTH_PROMPT_MESSAGE = ...
_DEFAULT_AUTH_CODE_MESSAGE = ...
_DEFAULT_WEB_SUCCESS_MESSAGE = ...
def run_local_server(self, host=..., bind_addr=..., port=..., authorization_prompt_message=..., success_message=..., open_browser=..., redirect_uri_trailing_slash=..., timeout_seconds=..., token_audience=..., browser=..., **kwargs): # -> google.auth.external_account_authorized_user.Credentials | google.oauth2.credentials.Credentials:
"""Run the flow using the server strategy.
The server strategy instructs the user to open the authorization URL in
their browser and will attempt to automatically open the URL for them.
It will start a local web server to listen for the authorization
response. Once authorization is complete the authorization server will
redirect the user's browser to the local web server. The web server
will get the authorization code from the response and shutdown. The
code is then exchanged for a token.
Args:
host (str): The hostname for the local redirect server. This will
be served over http, not https.
bind_addr (str): Optionally provide an ip address for the redirect
server to listen on when it is not the same as host
(e.g. in a container). Default value is None,
which means that the redirect server will listen
on the ip address specified in the host parameter.
port (int): The port for the local redirect server.
authorization_prompt_message (str | None): The message to display to tell
the user to navigate to the authorization URL. If None or empty,
don't display anything.
success_message (str): The message to display in the web browser
the authorization flow is complete.
open_browser (bool): Whether or not to open the authorization URL
in the user's browser.
redirect_uri_trailing_slash (bool): whether or not to add trailing
slash when constructing the redirect_uri. Default value is True.
timeout_seconds (int): It will raise a WSGITimeoutError exception after the
timeout timing if there are no credentials response. The value is in
seconds.
When set to None there is no timeout.
Default value is None.
token_audience (str): Passed along with the request for an access
token. Determines the endpoints with which the token can be
used. Optional.
browser (str): specify which browser to open for authentication. If not
specified this defaults to default browser.
kwargs: Additional keyword arguments passed through to
:meth:`authorization_url`.
Returns:
google.oauth2.credentials.Credentials: The OAuth 2.0 credentials
for the user.
Raises:
WSGITimeoutError: If there is a timeout when waiting for the response from the
authorization server.
"""
...
class _WSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
"""Custom WSGIRequestHandler.
Uses a named logger instead of printing to stderr.
"""
def log_message(self, format, *args): # -> None:
...
class _RedirectWSGIApp:
"""WSGI app to handle the authorization redirect.
Stores the request URI and displays the given success message.
"""
def __init__(self, success_message) -> None:
"""
Args:
success_message (str): The message to display in the web browser
the authorization flow is complete.
"""
...
def __call__(self, environ, start_response): # -> list[Any]:
"""WSGI Callable.
Args:
environ (Mapping[str, Any]): The WSGI environment.
start_response (Callable[str, list]): The WSGI start_response
callable.
Returns:
Iterable[bytes]: The response body.
"""
...
class WSGITimeoutError(AttributeError):
"""Raised when the WSGI server times out waiting for a response."""
...

@ -0,0 +1,83 @@
"""
This type stub file was generated by pyright.
"""
"""Integration helpers.
This module provides helpers for integrating with `requests-oauthlib`_.
Typically, you'll want to use the higher-level helpers in
:mod:`google_auth_oauthlib.flow`.
.. _requests-oauthlib: http://requests-oauthlib.readthedocs.io/en/latest/
"""
_REQUIRED_CONFIG_KEYS = ...
def session_from_client_config(client_config, scopes, **kwargs): # -> tuple[OAuth2Session, Any]:
"""Creates a :class:`requests_oauthlib.OAuth2Session` from client
configuration loaded from a Google-format client secrets file.
Args:
client_config (Mapping[str, Any]): The client
configuration in the Google `client secrets`_ format.
scopes (Sequence[str]): The list of scopes to request during the
flow.
kwargs: Any additional parameters passed to
:class:`requests_oauthlib.OAuth2Session`
Raises:
ValueError: If the client configuration is not in the correct
format.
Returns:
Tuple[requests_oauthlib.OAuth2Session, Mapping[str, Any]]: The new
oauthlib session and the validated client configuration.
.. _client secrets:
https://github.com/googleapis/google-api-python-client/blob/main/docs/client-secrets.md
"""
...
def session_from_client_secrets_file(client_secrets_file, scopes, **kwargs): # -> tuple[OAuth2Session, Any]:
"""Creates a :class:`requests_oauthlib.OAuth2Session` instance from a
Google-format client secrets file.
Args:
client_secrets_file (str): The path to the `client secrets`_ .json
file.
scopes (Sequence[str]): The list of scopes to request during the
flow.
kwargs: Any additional parameters passed to
:class:`requests_oauthlib.OAuth2Session`
Returns:
Tuple[requests_oauthlib.OAuth2Session, Mapping[str, Any]]: The new
oauthlib session and the validated client configuration.
.. _client secrets:
https://github.com/googleapis/google-api-python-client/blob/main/docs/client-secrets.md
"""
...
def credentials_from_session(session, client_config=...): # -> google.auth.external_account_authorized_user.Credentials | google.oauth2.credentials.Credentials:
"""Creates :class:`google.oauth2.credentials.Credentials` from a
:class:`requests_oauthlib.OAuth2Session`.
:meth:`fetch_token` must be called on the session before before calling
this. This uses the session's auth token and the provided client
configuration to create :class:`google.oauth2.credentials.Credentials`.
This allows you to use the credentials from the session with Google
API client libraries.
Args:
session (requests_oauthlib.OAuth2Session): The OAuth 2.0 session.
client_config (Mapping[str, Any]): The subset of the client
configuration to use. For example, if you have a web client
you would pass in `client_config['web']`.
Returns:
google.oauth2.credentials.Credentials: The constructed credentials.
Raises:
ValueError: If there is no access token in the session.
"""
...

@ -0,0 +1,114 @@
"""
This type stub file was generated by pyright.
"""
"""Get user credentials from interactive code environments.
This module contains helpers for getting user credentials from interactive
code environments installed on a development machine, such as Jupyter
notebooks.
"""
LOCALHOST = ...
DEFAULT_PORTS_TO_TRY = ...
def is_port_open(port): # -> bool:
"""Check if a port is open on localhost.
Based on StackOverflow answer: https://stackoverflow.com/a/43238489/101923
Parameters
----------
port : int
A port to check on localhost.
Returns
-------
is_open : bool
True if a socket can be opened at the requested port.
"""
...
def find_open_port(start=..., stop=...): # -> int | None:
"""Find an open port between ``start`` and ``stop``.
Parameters
----------
start : Optional[int]
Beginning of range of ports to try. Defaults to 8080.
stop : Optional[int]
End of range of ports to try (not including exactly equals ``stop``).
This function tries 100 possible ports if no ``stop`` is specified.
Returns
-------
Optional[int]
``None`` if no open port is found, otherwise an integer indicating an
open port.
"""
...
def get_user_credentials(scopes, client_id, client_secret, minimum_port=..., maximum_port=...): # -> google.auth.external_account_authorized_user.Credentials | google.oauth2.credentials.Credentials:
"""Gets credentials associated with your Google user account.
This function authenticates using your user credentials by going through
the OAuth 2.0 flow. You'll open a browser window to authenticate to your
Google account. The permissions it requests correspond to the scopes
you've provided.
To obtain the ``client_id`` and ``client_secret``, create an **OAuth
client ID** with application type **Other** from the `Credentials page on
the Google Developer's Console
<https://console.developers.google.com/apis/credentials>`_. Learn more
with the `Authenticating as an end user
<https://cloud.google.com/docs/authentication/end-user>`_ guide.
Args:
scopes (Sequence[str]):
A list of scopes to use when authenticating to Google APIs. See
the `list of OAuth 2.0 scopes for Google APIs
<https://developers.google.com/identity/protocols/googlescopes>`_.
client_id (str):
A string that identifies your application to Google APIs. Find
this value in the `Credentials page on the Google Developer's
Console
<https://console.developers.google.com/apis/credentials>`_.
client_secret (str):
A string that verifies your application to Google APIs. Find this
value in the `Credentials page on the Google Developer's Console
<https://console.developers.google.com/apis/credentials>`_.
minimum_port (int):
Beginning of range of ports to try for redirect URI HTTP server.
Defaults to 8080.
maximum_port (Optional[int]):
End of range of ports to try (not including exactly equals ``stop``).
This function tries 100 possible ports if no ``stop`` is specified.
Returns:
google.oauth2.credentials.Credentials:
The OAuth 2.0 credentials for the user.
Examples:
Get credentials for your user account and use them to run a query
with BigQuery::
import google_auth_oauthlib
# TODO: Create a client ID for your project.
client_id = "YOUR-CLIENT-ID.apps.googleusercontent.com"
client_secret = "abc_ThIsIsAsEcReT"
# TODO: Choose the needed scopes for your applications.
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
credentials = google_auth_oauthlib.get_user_credentials(
scopes, client_id, client_secret
)
# 1. Open the link.
# 2. Authorize the application to have access to your account.
# 3. Copy and paste the authorization code to the prompt.
# Use the credentials to construct a client for Google APIs.
from google.cloud import bigquery
bigquery_client = bigquery.Client(
credentials=credentials, project="your-project-id"
)
print(list(bigquery_client.query("SELECT 1").result()))
"""
...

@ -0,0 +1,7 @@
"""
This type stub file was generated by pyright.
"""
import logging
from logging import NullHandler

@ -0,0 +1,56 @@
"""
This type stub file was generated by pyright.
"""
"""Helpers for authentication using oauth2client or google-auth."""
HAS_GOOGLE_AUTH = ...
HAS_OAUTH2CLIENT = ...
def credentials_from_file(filename, scopes=..., quota_project_id=...): # -> Credentials | ServiceAccountCredentials:
"""Returns credentials loaded from a file."""
...
def default_credentials(scopes=..., quota_project_id=...):
"""Returns Application Default Credentials."""
...
def with_scopes(credentials, scopes): # -> Scoped:
"""Scopes the credentials if necessary.
Args:
credentials (Union[
google.auth.credentials.Credentials,
oauth2client.client.Credentials]): The credentials to scope.
scopes (Sequence[str]): The list of scopes.
Returns:
Union[google.auth.credentials.Credentials,
oauth2client.client.Credentials]: The scoped credentials.
"""
...
def authorized_http(credentials): # -> AuthorizedHttp:
"""Returns an http client that is authorized with the given credentials.
Args:
credentials (Union[
google.auth.credentials.Credentials,
oauth2client.client.Credentials]): The credentials to use.
Returns:
Union[httplib2.Http, google_auth_httplib2.AuthorizedHttp]: An
authorized http client.
"""
...
def refresh_credentials(credentials):
...
def apply_credentials(credentials, headers):
...
def is_valid(credentials): # -> bool:
...
def get_credentials_from_http(http): # -> None:
...

@ -0,0 +1,120 @@
"""
This type stub file was generated by pyright.
"""
"""Helper functions for commonly used utilities."""
logger = ...
POSITIONAL_WARNING = ...
POSITIONAL_EXCEPTION = ...
POSITIONAL_IGNORE = ...
POSITIONAL_SET = ...
positional_parameters_enforcement = ...
_SYM_LINK_MESSAGE = ...
_IS_DIR_MESSAGE = ...
_MISSING_FILE_MESSAGE = ...
def positional(max_positional_args): # -> Callable[..., _Wrapped[Callable[..., Any], Any, Callable[..., Any], Any]]:
"""A decorator to declare that only the first N arguments may be positional.
This decorator makes it easy to support Python 3 style keyword-only
parameters. For example, in Python 3 it is possible to write::
def fn(pos1, *, kwonly1=None, kwonly2=None):
...
All named parameters after ``*`` must be a keyword::
fn(10, 'kw1', 'kw2') # Raises exception.
fn(10, kwonly1='kw1') # Ok.
Example
^^^^^^^
To define a function like above, do::
@positional(1)
def fn(pos1, kwonly1=None, kwonly2=None):
...
If no default value is provided to a keyword argument, it becomes a
required keyword argument::
@positional(0)
def fn(required_kw):
...
This must be called with the keyword parameter::
fn() # Raises exception.
fn(10) # Raises exception.
fn(required_kw=10) # Ok.
When defining instance or class methods always remember to account for
``self`` and ``cls``::
class MyClass(object):
@positional(2)
def my_method(self, pos1, kwonly1=None):
...
@classmethod
@positional(2)
def my_method(cls, pos1, kwonly1=None):
...
The positional decorator behavior is controlled by
``_helpers.positional_parameters_enforcement``, which may be set to
``POSITIONAL_EXCEPTION``, ``POSITIONAL_WARNING`` or
``POSITIONAL_IGNORE`` to raise an exception, log a warning, or do
nothing, respectively, if a declaration is violated.
Args:
max_positional_arguments: Maximum number of positional arguments. All
parameters after this index must be
keyword only.
Returns:
A decorator that prevents using arguments after max_positional_args
from being used as positional parameters.
Raises:
TypeError: if a keyword-only argument is provided as a positional
parameter, but only if
_helpers.positional_parameters_enforcement is set to
POSITIONAL_EXCEPTION.
"""
...
def parse_unique_urlencoded(content): # -> dict[Any, Any]:
"""Parses unique key-value parameters from urlencoded content.
Args:
content: string, URL-encoded key-value pairs.
Returns:
dict, The key-value pairs from ``content``.
Raises:
ValueError: if one of the keys is repeated.
"""
...
def update_query_params(uri, params):
"""Updates a URI with new query parameters.
If a given key from ``params`` is repeated in the ``uri``, then
the URI will be considered invalid and an error will occur.
If the URI is valid, then each value from ``params`` will
replace the corresponding value in the query parameters (if
it exists).
Args:
uri: string, A valid URI, with potential existing query parameters.
params: dict, A dictionary of query parameters.
Returns:
The same URI but with the new query parameters added.
"""
...

@ -0,0 +1,333 @@
"""
This type stub file was generated by pyright.
"""
from email.generator import BytesGenerator
from googleapiclient._helpers import positional
"""Client for discovery based APIs.
A client library for Google's discovery based APIs.
"""
__author__ = ...
__all__ = ["build", "build_from_document", "fix_method_name", "key2param"]
HAS_UNIVERSE = ...
logger = ...
URITEMPLATE = ...
VARNAME = ...
DISCOVERY_URI = ...
V1_DISCOVERY_URI = ...
V2_DISCOVERY_URI = ...
DEFAULT_METHOD_DOC = ...
HTTP_PAYLOAD_METHODS = ...
_MEDIA_SIZE_BIT_SHIFTS = ...
BODY_PARAMETER_DEFAULT_VALUE = ...
MEDIA_BODY_PARAMETER_DEFAULT_VALUE = ...
MEDIA_MIME_TYPE_PARAMETER_DEFAULT_VALUE = ...
_PAGE_TOKEN_NAMES = ...
GOOGLE_API_USE_CLIENT_CERTIFICATE = ...
GOOGLE_API_USE_MTLS_ENDPOINT = ...
GOOGLE_CLOUD_UNIVERSE_DOMAIN = ...
DEFAULT_UNIVERSE = ...
STACK_QUERY_PARAMETERS = ...
STACK_QUERY_PARAMETER_DEFAULT_VALUE = ...
class APICoreVersionError(ValueError):
def __init__(self) -> None:
...
RESERVED_WORDS = ...
class _BytesGenerator(BytesGenerator):
_write_lines = ...
def fix_method_name(name):
"""Fix method names to avoid '$' characters and reserved word conflicts.
Args:
name: string, method name.
Returns:
The name with '_' appended if the name is a reserved word and '$' and '-'
replaced with '_'.
"""
...
def key2param(key): # -> LiteralString:
"""Converts key names into parameter names.
For example, converting "max-results" -> "max_results"
Args:
key: string, the method key name.
Returns:
A safe method name based on the key name.
"""
...
@positional(2)
def build(serviceName, version, http=..., discoveryServiceUrl=..., developerKey=..., model=..., requestBuilder=..., credentials=..., cache_discovery=..., cache=..., client_options=..., adc_cert_path=..., adc_key_path=..., num_retries=..., static_discovery=..., always_use_jwt_access=...):
"""Construct a Resource for interacting with an API.
Construct a Resource object for interacting with an API. The serviceName and
version are the names from the Discovery service.
Args:
serviceName: string, name of the service.
version: string, the version of the service.
http: httplib2.Http, An instance of httplib2.Http or something that acts
like it that HTTP requests will be made through.
discoveryServiceUrl: string, a URI Template that points to the location of
the discovery service. It should have two parameters {api} and
{apiVersion} that when filled in produce an absolute URI to the discovery
document for that service.
developerKey: string, key obtained from
https://code.google.com/apis/console.
model: googleapiclient.Model, converts to and from the wire format.
requestBuilder: googleapiclient.http.HttpRequest, encapsulator for an HTTP
request.
credentials: oauth2client.Credentials or
google.auth.credentials.Credentials, credentials to be used for
authentication.
cache_discovery: Boolean, whether or not to cache the discovery doc.
cache: googleapiclient.discovery_cache.base.CacheBase, an optional
cache object for the discovery documents.
client_options: Mapping object or google.api_core.client_options, client
options to set user options on the client.
(1) The API endpoint should be set through client_options. If API endpoint
is not set, `GOOGLE_API_USE_MTLS_ENDPOINT` environment variable can be used
to control which endpoint to use.
(2) client_cert_source is not supported, client cert should be provided using
client_encrypted_cert_source instead. In order to use the provided client
cert, `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be
set to `true`.
More details on the environment variables are here:
https://google.aip.dev/auth/4114
adc_cert_path: str, client certificate file path to save the application
default client certificate for mTLS. This field is required if you want to
use the default client certificate. `GOOGLE_API_USE_CLIENT_CERTIFICATE`
environment variable must be set to `true` in order to use this field,
otherwise this field doesn't nothing.
More details on the environment variables are here:
https://google.aip.dev/auth/4114
adc_key_path: str, client encrypted private key file path to save the
application default client encrypted private key for mTLS. This field is
required if you want to use the default client certificate.
`GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be set to
`true` in order to use this field, otherwise this field doesn't nothing.
More details on the environment variables are here:
https://google.aip.dev/auth/4114
num_retries: Integer, number of times to retry discovery with
randomized exponential backoff in case of intermittent/connection issues.
static_discovery: Boolean, whether or not to use the static discovery docs
included in the library. The default value for `static_discovery` depends
on the value of `discoveryServiceUrl`. `static_discovery` will default to
`True` when `discoveryServiceUrl` is also not provided, otherwise it will
default to `False`.
always_use_jwt_access: Boolean, whether always use self signed JWT for service
account credentials. This only applies to
google.oauth2.service_account.Credentials.
Returns:
A Resource object with methods for interacting with the service.
Raises:
google.auth.exceptions.MutualTLSChannelError: if there are any problems
setting up mutual TLS channel.
"""
...
@positional(1)
def build_from_document(service, base=..., future=..., http=..., developerKey=..., model=..., requestBuilder=..., credentials=..., client_options=..., adc_cert_path=..., adc_key_path=..., always_use_jwt_access=...):
"""Create a Resource for interacting with an API.
Same as `build()`, but constructs the Resource object from a discovery
document that is it given, as opposed to retrieving one over HTTP.
Args:
service: string or object, the JSON discovery document describing the API.
The value passed in may either be the JSON string or the deserialized
JSON.
base: string, base URI for all HTTP requests, usually the discovery URI.
This parameter is no longer used as rootUrl and servicePath are included
within the discovery document. (deprecated)
future: string, discovery document with future capabilities (deprecated).
http: httplib2.Http, An instance of httplib2.Http or something that acts
like it that HTTP requests will be made through.
developerKey: string, Key for controlling API usage, generated
from the API Console.
model: Model class instance that serializes and de-serializes requests and
responses.
requestBuilder: Takes an http request and packages it up to be executed.
credentials: oauth2client.Credentials or
google.auth.credentials.Credentials, credentials to be used for
authentication.
client_options: Mapping object or google.api_core.client_options, client
options to set user options on the client.
(1) The API endpoint should be set through client_options. If API endpoint
is not set, `GOOGLE_API_USE_MTLS_ENDPOINT` environment variable can be used
to control which endpoint to use.
(2) client_cert_source is not supported, client cert should be provided using
client_encrypted_cert_source instead. In order to use the provided client
cert, `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be
set to `true`.
More details on the environment variables are here:
https://google.aip.dev/auth/4114
adc_cert_path: str, client certificate file path to save the application
default client certificate for mTLS. This field is required if you want to
use the default client certificate. `GOOGLE_API_USE_CLIENT_CERTIFICATE`
environment variable must be set to `true` in order to use this field,
otherwise this field doesn't nothing.
More details on the environment variables are here:
https://google.aip.dev/auth/4114
adc_key_path: str, client encrypted private key file path to save the
application default client encrypted private key for mTLS. This field is
required if you want to use the default client certificate.
`GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be set to
`true` in order to use this field, otherwise this field doesn't nothing.
More details on the environment variables are here:
https://google.aip.dev/auth/4114
always_use_jwt_access: Boolean, whether always use self signed JWT for service
account credentials. This only applies to
google.oauth2.service_account.Credentials.
Returns:
A Resource object with methods for interacting with the service.
Raises:
google.auth.exceptions.MutualTLSChannelError: if there are any problems
setting up mutual TLS channel.
"""
...
class ResourceMethodParameters:
"""Represents the parameters associated with a method.
Attributes:
argmap: Map from method parameter name (string) to query parameter name
(string).
required_params: List of required parameters (represented by parameter
name as string).
repeated_params: List of repeated parameters (represented by parameter
name as string).
pattern_params: Map from method parameter name (string) to regular
expression (as a string). If the pattern is set for a parameter, the
value for that parameter must match the regular expression.
query_params: List of parameters (represented by parameter name as string)
that will be used in the query string.
path_params: Set of parameters (represented by parameter name as string)
that will be used in the base URL path.
param_types: Map from method parameter name (string) to parameter type. Type
can be any valid JSON schema type; valid values are 'any', 'array',
'boolean', 'integer', 'number', 'object', or 'string'. Reference:
http://tools.ietf.org/html/draft-zyp-json-schema-03#section-5.1
enum_params: Map from method parameter name (string) to list of strings,
where each list of strings is the list of acceptable enum values.
"""
def __init__(self, method_desc) -> None:
"""Constructor for ResourceMethodParameters.
Sets default values and defers to set_parameters to populate.
Args:
method_desc: Dictionary with metadata describing an API method. Value
comes from the dictionary of methods stored in the 'methods' key in
the deserialized discovery document.
"""
...
def set_parameters(self, method_desc): # -> None:
"""Populates maps and lists based on method description.
Iterates through each parameter for the method and parses the values from
the parameter dictionary.
Args:
method_desc: Dictionary with metadata describing an API method. Value
comes from the dictionary of methods stored in the 'methods' key in
the deserialized discovery document.
"""
...
def createMethod(methodName, methodDesc, rootDesc, schema): # -> tuple[Any, Callable[..., Any]]:
"""Creates a method for attaching to a Resource.
Args:
methodName: string, name of the method to use.
methodDesc: object, fragment of deserialized discovery document that
describes the method.
rootDesc: object, the entire deserialized discovery document.
schema: object, mapping of schema names to schema descriptions.
"""
...
def createNextMethod(methodName, pageTokenName=..., nextPageTokenName=..., isPageTokenParameter=...): # -> tuple[Any, Callable[..., Any | None]]:
"""Creates any _next methods for attaching to a Resource.
The _next methods allow for easy iteration through list() responses.
Args:
methodName: string, name of the method to use.
pageTokenName: string, name of request page token field.
nextPageTokenName: string, name of response page token field.
isPageTokenParameter: Boolean, True if request page token is a query
parameter, False if request page token is a field of the request body.
"""
...
class Resource:
"""A class for interacting with a resource."""
def __init__(self, http, baseUrl, model, requestBuilder, developerKey, resourceDesc, rootDesc, schema, universe_domain=...) -> None:
"""Build a Resource from the API description.
Args:
http: httplib2.Http, Object to make http requests with.
baseUrl: string, base URL for the API. All requests are relative to this
URI.
model: googleapiclient.Model, converts to and from the wire format.
requestBuilder: class or callable that instantiates an
googleapiclient.HttpRequest object.
developerKey: string, key obtained from
https://code.google.com/apis/console
resourceDesc: object, section of deserialized discovery document that
describes a resource. Note that the top level discovery document
is considered a resource.
rootDesc: object, the entire deserialized discovery document.
schema: object, mapping of schema names to schema descriptions.
universe_domain: string, the universe for the API. The default universe
is "googleapis.com".
"""
...
def __getstate__(self): # -> dict[str, Any]:
"""Trim the state down to something that can be pickled.
Uses the fact that the instance variable _dynamic_attrs holds attrs that
will be wiped and restored on pickle serialization.
"""
...
def __setstate__(self, state): # -> None:
"""Reconstitute the state of the object from being pickled.
Uses the fact that the instance variable _dynamic_attrs holds attrs that
will be wiped and restored on pickle serialization.
"""
...
def __enter__(self): # -> Self:
...
def __exit__(self, exc_type, exc, exc_tb): # -> None:
...
def close(self): # -> None:
"""Close httplib2 connections."""
...

@ -0,0 +1,34 @@
"""
This type stub file was generated by pyright.
"""
import logging
import os
"""Caching utility for the discovery document."""
LOGGER = ...
DISCOVERY_DOC_MAX_AGE = ...
DISCOVERY_DOC_DIR = ...
def autodetect(): # -> googleapiclient.discovery_cache.appengine_memcache.Cache | googleapiclient.discovery_cache.file_cache.Cache | None:
"""Detects an appropriate cache module and returns it.
Returns:
googleapiclient.discovery_cache.base.Cache, a cache object which
is auto detected, or None if no cache object is available.
"""
...
def get_static_doc(serviceName, version): # -> str | None:
"""Retrieves the discovery document from the directory defined in
DISCOVERY_DOC_DIR corresponding to the serviceName and version provided.
Args:
serviceName: string, name of the service.
version: string, the version of the service.
Returns:
A string containing the contents of the JSON discovery document,
otherwise None if the JSON discovery document was not found.
"""
...

@ -0,0 +1,28 @@
"""
This type stub file was generated by pyright.
"""
from . import base
"""App Engine memcache based cache for the discovery document."""
LOGGER = ...
NAMESPACE = ...
class Cache(base.Cache):
"""A cache with app engine memcache API."""
def __init__(self, max_age) -> None:
"""Constructor.
Args:
max_age: Cache expiration in seconds.
"""
...
def get(self, url): # -> None:
...
def set(self, url, content): # -> None:
...
cache = ...

@ -0,0 +1,35 @@
"""
This type stub file was generated by pyright.
"""
import abc
"""An abstract class for caching the discovery document."""
class Cache:
"""A base abstract cache class."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get(self, url):
"""Gets the content from the memcache with a given key.
Args:
url: string, the key for the cache.
Returns:
object, the value in the cache for the given key, or None if the key is
not in the cache.
"""
...
@abc.abstractmethod
def set(self, url, content):
"""Sets the given key and content in the cache.
Args:
url: string, the key for the cache.
content: string, the discovery document.
"""
...

@ -0,0 +1,35 @@
"""
This type stub file was generated by pyright.
"""
from . import base
"""File based cache for the discovery document.
The cache is stored in a single file so that multiple processes can
share the same cache. It locks the file whenever accessing to the
file. When the cache content is corrupted, it will be initialized with
an empty cache.
"""
LOGGER = ...
FILENAME = ...
EPOCH = ...
class Cache(base.Cache):
"""A file based cache for the discovery documents."""
def __init__(self, max_age) -> None:
"""Constructor.
Args:
max_age: Cache expiration in seconds.
"""
...
def get(self, url): # -> Any | None:
...
def set(self, url, content): # -> None:
...
cache = ...

@ -0,0 +1,108 @@
"""
This type stub file was generated by pyright.
"""
from googleapiclient import _helpers as util
"""Errors for the library.
All exceptions defined by the library
should be defined in this file.
"""
__author__ = ...
class Error(Exception):
"""Base error for this module."""
...
class HttpError(Error):
"""HTTP data was invalid or unexpected."""
@util.positional(3)
def __init__(self, resp, content, uri=...) -> None:
...
@property
def status_code(self):
"""Return the HTTP status code from the response content."""
...
def __repr__(self): # -> str:
...
__str__ = ...
class InvalidJsonError(Error):
"""The JSON returned could not be parsed."""
...
class UnknownFileType(Error):
"""File type unknown or unexpected."""
...
class UnknownLinkType(Error):
"""Link type unknown or unexpected."""
...
class UnknownApiNameOrVersion(Error):
"""No API with that name and version exists."""
...
class UnacceptableMimeTypeError(Error):
"""That is an unacceptable mimetype for this operation."""
...
class MediaUploadSizeError(Error):
"""Media is larger than the method can accept."""
...
class ResumableUploadError(HttpError):
"""Error occurred during resumable upload."""
...
class InvalidChunkSizeError(Error):
"""The given chunksize is not valid."""
...
class InvalidNotificationError(Error):
"""The channel Notification is invalid."""
...
class BatchError(HttpError):
"""Error occurred during batch operations."""
@util.positional(2)
def __init__(self, reason, resp=..., content=...) -> None:
...
def __repr__(self): # -> LiteralString:
...
__str__ = ...
class UnexpectedMethodError(Error):
"""Exception raised by RequestMockBuilder on unexpected calls."""
@util.positional(1)
def __init__(self, methodId=...) -> None:
"""Constructor for an UnexpectedMethodError."""
...
class UnexpectedBodyError(Error):
"""Exception raised by RequestMockBuilder on unexpected bodies."""
def __init__(self, expected, provided) -> None:
"""Constructor for an UnexpectedMethodError."""
...

@ -0,0 +1,857 @@
"""
This type stub file was generated by pyright.
"""
from googleapiclient import _helpers as util
"""Classes to encapsulate a single HTTP request.
The classes implement a command pattern, with every
object supporting an execute() method that does the
actual HTTP request.
"""
__author__ = ...
LOGGER = ...
DEFAULT_CHUNK_SIZE = ...
MAX_URI_LENGTH = ...
MAX_BATCH_LIMIT = ...
_TOO_MANY_REQUESTS = ...
DEFAULT_HTTP_TIMEOUT_SEC = ...
_LEGACY_BATCH_URI = ...
class MediaUploadProgress:
"""Status of a resumable upload."""
def __init__(self, resumable_progress, total_size) -> None:
"""Constructor.
Args:
resumable_progress: int, bytes sent so far.
total_size: int, total bytes in complete upload, or None if the total
upload size isn't known ahead of time.
"""
...
def progress(self): # -> float:
"""Percent of upload completed, as a float.
Returns:
the percentage complete as a float, returning 0.0 if the total size of
the upload is unknown.
"""
...
class MediaDownloadProgress:
"""Status of a resumable download."""
def __init__(self, resumable_progress, total_size) -> None:
"""Constructor.
Args:
resumable_progress: int, bytes received so far.
total_size: int, total bytes in complete download.
"""
...
def progress(self): # -> float:
"""Percent of download completed, as a float.
Returns:
the percentage complete as a float, returning 0.0 if the total size of
the download is unknown.
"""
...
class MediaUpload:
"""Describes a media object to upload.
Base class that defines the interface of MediaUpload subclasses.
Note that subclasses of MediaUpload may allow you to control the chunksize
when uploading a media object. It is important to keep the size of the chunk
as large as possible to keep the upload efficient. Other factors may influence
the size of the chunk you use, particularly if you are working in an
environment where individual HTTP requests may have a hardcoded time limit,
such as under certain classes of requests under Google App Engine.
Streams are io.Base compatible objects that support seek(). Some MediaUpload
subclasses support using streams directly to upload data. Support for
streaming may be indicated by a MediaUpload sub-class and if appropriate for a
platform that stream will be used for uploading the media object. The support
for streaming is indicated by has_stream() returning True. The stream() method
should return an io.Base object that supports seek(). On platforms where the
underlying httplib module supports streaming, for example Python 2.6 and
later, the stream will be passed into the http library which will result in
less memory being used and possibly faster uploads.
If you need to upload media that can't be uploaded using any of the existing
MediaUpload sub-class then you can sub-class MediaUpload for your particular
needs.
"""
def chunksize(self):
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
...
def mimetype(self): # -> Literal['application/octet-stream']:
"""Mime type of the body.
Returns:
Mime type.
"""
...
def size(self): # -> None:
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
...
def resumable(self): # -> Literal[False]:
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
...
def getbytes(self, begin, end):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorter than length if EOF was reached
first.
"""
...
def has_stream(self): # -> Literal[False]:
"""Does the underlying upload support a streaming interface.
Streaming means it is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
Returns:
True if the call to stream() will return an instance of a seekable io.Base
subclass.
"""
...
def stream(self):
"""A stream interface to the data being uploaded.
Returns:
The returned value is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
"""
...
def to_json(self): # -> str:
"""Create a JSON representation of an instance of MediaUpload.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
...
@classmethod
def new_from_json(cls, s): # -> Any:
"""Utility class method to instantiate a MediaUpload subclass from a JSON
representation produced by to_json().
Args:
s: string, JSON from to_json().
Returns:
An instance of the subclass of MediaUpload that was serialized with
to_json().
"""
...
class MediaIoBaseUpload(MediaUpload):
"""A MediaUpload for a io.Base objects.
Note that the Python file object is compatible with io.Base and can be used
with this class also.
fh = BytesIO('...Some data to upload...')
media = MediaIoBaseUpload(fh, mimetype='image/png',
chunksize=1024*1024, resumable=True)
farm.animals().insert(
id='cow',
name='cow.png',
media_body=media).execute()
Depending on the platform you are working on, you may pass -1 as the
chunksize, which indicates that the entire file should be uploaded in a single
request. If the underlying platform supports streams, such as Python 2.6 or
later, then this can be very efficient as it avoids multiple connections, and
also avoids loading the entire file into memory before sending it. Note that
Google App Engine has a 5MB limit on request size, so you should never set
your chunksize larger than 5MB, or to -1.
"""
@util.positional(3)
def __init__(self, fd, mimetype, chunksize=..., resumable=...) -> None:
"""Constructor.
Args:
fd: io.Base or file object, The source of the bytes to upload. MUST be
opened in blocking mode, do not use streams opened in non-blocking mode.
The given stream must be seekable, that is, it must be able to call
seek() on fd.
mimetype: string, Mime-type of the file.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True. Pass in a value of -1 if the file is to be
uploaded as a single chunk. Note that Google App Engine has a 5MB limit
on request size, so you should never set your chunksize larger than 5MB,
or to -1.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
...
def chunksize(self): # -> int:
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
...
def mimetype(self): # -> Any:
"""Mime type of the body.
Returns:
Mime type.
"""
...
def size(self):
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
...
def resumable(self): # -> bool:
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
...
def getbytes(self, begin, length):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorted than length if EOF was reached
first.
"""
...
def has_stream(self): # -> Literal[True]:
"""Does the underlying upload support a streaming interface.
Streaming means it is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
Returns:
True if the call to stream() will return an instance of a seekable io.Base
subclass.
"""
...
def stream(self): # -> Any:
"""A stream interface to the data being uploaded.
Returns:
The returned value is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
"""
...
def to_json(self):
"""This upload type is not serializable."""
...
class MediaFileUpload(MediaIoBaseUpload):
"""A MediaUpload for a file.
Construct a MediaFileUpload and pass as the media_body parameter of the
method. For example, if we had a service that allowed uploading images:
media = MediaFileUpload('cow.png', mimetype='image/png',
chunksize=1024*1024, resumable=True)
farm.animals().insert(
id='cow',
name='cow.png',
media_body=media).execute()
Depending on the platform you are working on, you may pass -1 as the
chunksize, which indicates that the entire file should be uploaded in a single
request. If the underlying platform supports streams, such as Python 2.6 or
later, then this can be very efficient as it avoids multiple connections, and
also avoids loading the entire file into memory before sending it. Note that
Google App Engine has a 5MB limit on request size, so you should never set
your chunksize larger than 5MB, or to -1.
"""
@util.positional(2)
def __init__(self, filename, mimetype=..., chunksize=..., resumable=...) -> None:
"""Constructor.
Args:
filename: string, Name of the file.
mimetype: string, Mime-type of the file. If None then a mime-type will be
guessed from the file extension.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True. Pass in a value of -1 if the file is to be
uploaded in a single chunk. Note that Google App Engine has a 5MB limit
on request size, so you should never set your chunksize larger than 5MB,
or to -1.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
...
def __del__(self): # -> None:
...
def to_json(self): # -> str:
"""Creating a JSON representation of an instance of MediaFileUpload.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
...
@staticmethod
def from_json(s): # -> MediaFileUpload:
...
class MediaInMemoryUpload(MediaIoBaseUpload):
"""MediaUpload for a chunk of bytes.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or io.StringIO for
the stream.
"""
@util.positional(2)
def __init__(self, body, mimetype=..., chunksize=..., resumable=...) -> None:
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or io.StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
...
class MediaIoBaseDownload:
""" "Download media resources.
Note that the Python file object is compatible with io.Base and can be used
with this class also.
Example:
request = farms.animals().get_media(id='cow')
fh = io.FileIO('cow.png', mode='wb')
downloader = MediaIoBaseDownload(fh, request, chunksize=1024*1024)
done = False
while done is False:
status, done = downloader.next_chunk()
if status:
print "Download %d%%." % int(status.progress() * 100)
print "Download Complete!"
"""
@util.positional(3)
def __init__(self, fd, request, chunksize=...) -> None:
"""Constructor.
Args:
fd: io.Base or file object, The stream in which to write the downloaded
bytes.
request: googleapiclient.http.HttpRequest, the media request to perform in
chunks.
chunksize: int, File will be downloaded in chunks of this many bytes.
"""
...
@util.positional(1)
def next_chunk(self, num_retries=...): # -> tuple[MediaDownloadProgress, bool] | tuple[MediaDownloadProgress, Literal[True]]:
"""Get the next chunk of the download.
Args:
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
(status, done): (MediaDownloadProgress, boolean)
The value of 'done' will be True when the media has been fully
downloaded or the total size of the media is unknown.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
...
class _StreamSlice:
"""Truncated stream.
Takes a stream and presents a stream that is a slice of the original stream.
This is used when uploading media in chunks. In later versions of Python a
stream can be passed to httplib in place of the string of data to send. The
problem is that httplib just blindly reads to the end of the stream. This
wrapper presents a virtual stream that only reads to the end of the chunk.
"""
def __init__(self, stream, begin, chunksize) -> None:
"""Constructor.
Args:
stream: (io.Base, file object), the stream to wrap.
begin: int, the seek position the chunk begins at.
chunksize: int, the size of the chunk.
"""
...
def read(self, n=...):
"""Read n bytes.
Args:
n, int, the number of bytes to read.
Returns:
A string of length 'n', or less if EOF is reached.
"""
...
class HttpRequest:
"""Encapsulates a single HTTP request."""
@util.positional(4)
def __init__(self, http, postproc, uri, method=..., body=..., headers=..., methodId=..., resumable=...) -> None:
"""Constructor for an HttpRequest.
Args:
http: httplib2.Http, the transport object to use to make a request
postproc: callable, called on the HTTP response and content to transform
it into a data object before returning, or raising an exception
on an error.
uri: string, the absolute URI to send the request to
method: string, the HTTP method to use
body: string, the request body of the HTTP request,
headers: dict, the HTTP request headers
methodId: string, a unique identifier for the API method being called.
resumable: MediaUpload, None if this is not a resumbale request.
"""
...
@util.positional(1)
def execute(self, http=..., num_retries=...):
"""Execute the request.
Args:
http: httplib2.Http, an http object to be used in place of the
one the HttpRequest request object was constructed with.
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
A deserialized object model of the response body as determined
by the postproc.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
...
@util.positional(2)
def add_response_callback(self, cb): # -> None:
"""add_response_headers_callback
Args:
cb: Callback to be called on receiving the response headers, of signature:
def cb(resp):
# Where resp is an instance of httplib2.Response
"""
...
@util.positional(1)
def next_chunk(self, http=..., num_retries=...): # -> tuple[MediaUploadProgress | None, Any] | tuple[None, Any] | tuple[MediaUploadProgress, None]:
"""Execute the next step of a resumable upload.
Can only be used if the method being executed supports media uploads and
the MediaUpload object passed in was flagged as using resumable upload.
Example:
media = MediaFileUpload('cow.png', mimetype='image/png',
chunksize=1000, resumable=True)
request = farm.animals().insert(
id='cow',
name='cow.png',
media_body=media)
response = None
while response is None:
status, response = request.next_chunk()
if status:
print "Upload %d%% complete." % int(status.progress() * 100)
Args:
http: httplib2.Http, an http object to be used in place of the
one the HttpRequest request object was constructed with.
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
(status, body): (ResumableMediaStatus, object)
The body will be None until the resumable media is fully uploaded.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
...
def to_json(self): # -> str:
"""Returns a JSON representation of the HttpRequest."""
...
@staticmethod
def from_json(s, http, postproc): # -> HttpRequest:
"""Returns an HttpRequest populated with info from a JSON object."""
...
@staticmethod
def null_postproc(resp, contents): # -> tuple[Any, Any]:
...
class BatchHttpRequest:
"""Batches multiple HttpRequest objects into a single HTTP request.
Example:
from googleapiclient.http import BatchHttpRequest
def list_animals(request_id, response, exception):
\"\"\"Do something with the animals list response.\"\"\"
if exception is not None:
# Do something with the exception.
pass
else:
# Do something with the response.
pass
def list_farmers(request_id, response, exception):
\"\"\"Do something with the farmers list response.\"\"\"
if exception is not None:
# Do something with the exception.
pass
else:
# Do something with the response.
pass
service = build('farm', 'v2')
batch = BatchHttpRequest()
batch.add(service.animals().list(), list_animals)
batch.add(service.farmers().list(), list_farmers)
batch.execute(http=http)
"""
@util.positional(1)
def __init__(self, callback=..., batch_uri=...) -> None:
"""Constructor for a BatchHttpRequest.
Args:
callback: callable, A callback to be called for each response, of the
form callback(id, response, exception). The first parameter is the
request id, and the second is the deserialized response object. The
third is an googleapiclient.errors.HttpError exception object if an HTTP error
occurred while processing the request, or None if no error occurred.
batch_uri: string, URI to send batch requests to.
"""
...
@util.positional(2)
def add(self, request, callback=..., request_id=...): # -> None:
"""Add a new request.
Every callback added will be paired with a unique id, the request_id. That
unique id will be passed back to the callback when the response comes back
from the server. The default behavior is to have the library generate it's
own unique id. If the caller passes in a request_id then they must ensure
uniqueness for each request_id, and if they are not an exception is
raised. Callers should either supply all request_ids or never supply a
request id, to avoid such an error.
Args:
request: HttpRequest, Request to add to the batch.
callback: callable, A callback to be called for this response, of the
form callback(id, response, exception). The first parameter is the
request id, and the second is the deserialized response object. The
third is an googleapiclient.errors.HttpError exception object if an HTTP error
occurred while processing the request, or None if no errors occurred.
request_id: string, A unique id for the request. The id will be passed
to the callback with the response.
Returns:
None
Raises:
BatchError if a media request is added to a batch.
KeyError is the request_id is not unique.
"""
...
@util.positional(1)
def execute(self, http=...): # -> None:
"""Execute all the requests as a single batched HTTP request.
Args:
http: httplib2.Http, an http object to be used in place of the one the
HttpRequest request object was constructed with. If one isn't supplied
then use a http object from the requests in this batch.
Returns:
None
Raises:
httplib2.HttpLib2Error if a transport error has occurred.
googleapiclient.errors.BatchError if the response is the wrong format.
"""
...
class HttpRequestMock:
"""Mock of HttpRequest.
Do not construct directly, instead use RequestMockBuilder.
"""
def __init__(self, resp, content, postproc) -> None:
"""Constructor for HttpRequestMock
Args:
resp: httplib2.Response, the response to emulate coming from the request
content: string, the response body
postproc: callable, the post processing function usually supplied by
the model class. See model.JsonModel.response() as an example.
"""
...
def execute(self, http=...):
"""Execute the request.
Same behavior as HttpRequest.execute(), but the response is
mocked and not really from an HTTP request/response.
"""
...
class RequestMockBuilder:
"""A simple mock of HttpRequest
Pass in a dictionary to the constructor that maps request methodIds to
tuples of (httplib2.Response, content, opt_expected_body) that should be
returned when that method is called. None may also be passed in for the
httplib2.Response, in which case a 200 OK response will be generated.
If an opt_expected_body (str or dict) is provided, it will be compared to
the body and UnexpectedBodyError will be raised on inequality.
Example:
response = '{"data": {"id": "tag:google.c...'
requestBuilder = RequestMockBuilder(
{
'plus.activities.get': (None, response),
}
)
googleapiclient.discovery.build("plus", "v1", requestBuilder=requestBuilder)
Methods that you do not supply a response for will return a
200 OK with an empty string as the response content or raise an excpetion
if check_unexpected is set to True. The methodId is taken from the rpcName
in the discovery document.
For more details see the project wiki.
"""
def __init__(self, responses, check_unexpected=...) -> None:
"""Constructor for RequestMockBuilder
The constructed object should be a callable object
that can replace the class HttpResponse.
responses - A dictionary that maps methodIds into tuples
of (httplib2.Response, content). The methodId
comes from the 'rpcName' field in the discovery
document.
check_unexpected - A boolean setting whether or not UnexpectedMethodError
should be raised on unsupplied method.
"""
...
def __call__(self, http, postproc, uri, method=..., body=..., headers=..., methodId=..., resumable=...): # -> HttpRequestMock:
"""Implements the callable interface that discovery.build() expects
of requestBuilder, which is to build an object compatible with
HttpRequest.execute(). See that method for the description of the
parameters and the expected response.
"""
...
class HttpMock:
"""Mock of httplib2.Http"""
def __init__(self, filename=..., headers=...) -> None:
"""
Args:
filename: string, absolute filename to read response from
headers: dict, header to return with response
"""
...
def request(self, uri, method=..., body=..., headers=..., redirections=..., connection_type=...): # -> tuple[Response[str], bytes | None]:
...
def close(self): # -> None:
...
class HttpMockSequence:
"""Mock of httplib2.Http
Mocks a sequence of calls to request returning different responses for each
call. Create an instance initialized with the desired response headers
and content and then use as if an httplib2.Http instance.
http = HttpMockSequence([
({'status': '401'}, ''),
({'status': '200'}, '{"access_token":"1/3w","expires_in":3600}'),
({'status': '200'}, 'echo_request_headers'),
])
resp, content = http.request("http://examples.com")
There are special values you can pass in for content to trigger
behavours that are helpful in testing.
'echo_request_headers' means return the request headers in the response body
'echo_request_headers_as_json' means return the request headers in
the response body
'echo_request_body' means return the request body in the response body
'echo_request_uri' means return the request uri in the response body
"""
def __init__(self, iterable) -> None:
"""
Args:
iterable: iterable, a sequence of pairs of (headers, body)
"""
...
def request(self, uri, method=..., body=..., headers=..., redirections=..., connection_type=...): # -> tuple[Response[Any], bytes | Any | bytearray | memoryview[_I] | None]:
...
def set_user_agent(http, user_agent):
"""Set the user-agent on every request.
Args:
http - An instance of httplib2.Http
or something that acts like it.
user_agent: string, the value for the user-agent header.
Returns:
A modified instance of http that was passed in.
Example:
h = httplib2.Http()
h = set_user_agent(h, "my-app-name/6.0")
Most of the time the user-agent will be set doing auth, this is for the rare
cases where you are accessing an unauthenticated endpoint.
"""
...
def tunnel_patch(http):
"""Tunnel PATCH requests over POST.
Args:
http - An instance of httplib2.Http
or something that acts like it.
Returns:
A modified instance of http that was passed in.
Example:
h = httplib2.Http()
h = tunnel_patch(h, "my-app-name/6.0")
Useful if you are running on a platform that doesn't support PATCH.
Apply this last if you are using OAuth 1.0, as changing the method
will result in a different signature.
"""
...
def build_http(): # -> Http:
"""Builds httplib2.Http object
Returns:
A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
To override default timeout call
socket.setdefaulttimeout(timeout_in_sec)
before interacting with this method.
"""
...

@ -0,0 +1,107 @@
"""
This type stub file was generated by pyright.
"""
"""MIME-Type Parser
This module provides basic functions for handling mime-types. It can handle
matching mime-types against a list of media-ranges. See section 14.1 of the
HTTP specification [RFC 2616] for a complete explanation.
http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.1
Contents:
- parse_mime_type(): Parses a mime-type into its component parts.
- parse_media_range(): Media-ranges are mime-types with wild-cards and a 'q'
quality parameter.
- quality(): Determines the quality ('q') of a mime-type when
compared against a list of media-ranges.
- quality_parsed(): Just like quality() except the second parameter must be
pre-parsed.
- best_match(): Choose the mime-type with the highest quality ('q')
from a list of candidates.
"""
__version__ = ...
__author__ = ...
__email__ = ...
__license__ = ...
__credits__ = ...
def parse_mime_type(mime_type): # -> tuple[LiteralString | Any, LiteralString | Any, dict[Any, Any]]:
"""Parses a mime-type into its component parts.
Carves up a mime-type and returns a tuple of the (type, subtype, params)
where 'params' is a dictionary of all the parameters for the media range.
For example, the media range 'application/xhtml;q=0.5' would get parsed
into:
('application', 'xhtml', {'q', '0.5'})
"""
...
def parse_media_range(range): # -> tuple[LiteralString | Any, LiteralString | Any, dict[Any, Any]]:
"""Parse a media-range into its component parts.
Carves up a media range and returns a tuple of the (type, subtype,
params) where 'params' is a dictionary of all the parameters for the media
range. For example, the media range 'application/*;q=0.5' would get parsed
into:
('application', '*', {'q', '0.5'})
In addition this function also guarantees that there is a value for 'q'
in the params dictionary, filling it in with a proper default if
necessary.
"""
...
def fitness_and_quality_parsed(mime_type, parsed_ranges): # -> tuple[Any | int, float]:
"""Find the best match for a mime-type amongst parsed media-ranges.
Find the best match for a given mime-type against a list of media_ranges
that have already been parsed by parse_media_range(). Returns a tuple of
the fitness value and the value of the 'q' quality parameter of the best
match, or (-1, 0) if no match was found. Just as for quality_parsed(),
'parsed_ranges' must be a list of parsed media ranges.
"""
...
def quality_parsed(mime_type, parsed_ranges): # -> float:
"""Find the best match for a mime-type amongst parsed media-ranges.
Find the best match for a given mime-type against a list of media_ranges
that have already been parsed by parse_media_range(). Returns the 'q'
quality parameter of the best match, 0 if no match was found. This function
bahaves the same as quality() except that 'parsed_ranges' must be a list of
parsed media ranges.
"""
...
def quality(mime_type, ranges): # -> float:
"""Return the quality ('q') of a mime-type against a list of media-ranges.
Returns the quality 'q' of a mime-type when compared against the
media-ranges in ranges. For example:
>>> quality('text/html','text/*;q=0.3, text/html;q=0.7,
text/html;level=1, text/html;level=2;q=0.4, */*;q=0.5')
0.7
"""
...
def best_match(supported, header): # -> Literal['']:
"""Return mime-type with the highest quality ('q') from list of candidates.
Takes a list of supported mime-types and finds the best match for all the
media-ranges listed in header. The value of header must be a string that
conforms to the format of the HTTP Accept: header. The value of 'supported'
is a list of mime-types. The list of supported mime-types should be sorted
in order of increasing desirability, in case of a situation where there is
a tie.
>>> best_match(['application/xbel+xml', 'text/xml'],
'text/*;q=0.5,*/*; q=0.1')
'text/xml'
"""
...

@ -0,0 +1,262 @@
"""
This type stub file was generated by pyright.
"""
"""Model objects for requests and responses.
Each API may support one or more serializations, such
as JSON, Atom, etc. The model classes are responsible
for converting between the wire format and the Python
object representation.
"""
__author__ = ...
HAS_API_VERSION = ...
_LIBRARY_VERSION = ...
_PY_VERSION = ...
LOGGER = ...
dump_request_response = ...
class Model:
"""Model base class.
All Model classes should implement this interface.
The Model serializes and de-serializes between a wire
format such as JSON and a Python object representation.
"""
def request(self, headers, path_params, query_params, body_value): # -> None:
"""Updates outgoing requests with a serialized body.
Args:
headers: dict, request headers
path_params: dict, parameters that appear in the request path
query_params: dict, parameters that appear in the query
body_value: object, the request body as a Python object, which must be
serializable.
Returns:
A tuple of (headers, path_params, query, body)
headers: dict, request headers
path_params: dict, parameters that appear in the request path
query: string, query part of the request URI
body: string, the body serialized in the desired wire format.
"""
...
def response(self, resp, content): # -> None:
"""Convert the response wire format into a Python object.
Args:
resp: httplib2.Response, the HTTP response headers and status
content: string, the body of the HTTP response
Returns:
The body de-serialized as a Python object.
Raises:
googleapiclient.errors.HttpError if a non 2xx response is received.
"""
...
class BaseModel(Model):
"""Base model class.
Subclasses should provide implementations for the "serialize" and
"deserialize" methods, as well as values for the following class attributes.
Attributes:
accept: The value to use for the HTTP Accept header.
content_type: The value to use for the HTTP Content-type header.
no_content_response: The value to return when deserializing a 204 "No
Content" response.
alt_param: The value to supply as the "alt" query parameter for requests.
"""
accept = ...
content_type = ...
no_content_response = ...
alt_param = ...
def request(self, headers, path_params, query_params, body_value, api_version=...): # -> tuple[Any, Any, Any, Any | None]:
"""Updates outgoing requests with a serialized body.
Args:
headers: dict, request headers
path_params: dict, parameters that appear in the request path
query_params: dict, parameters that appear in the query
body_value: object, the request body as a Python object, which must be
serializable by json.
api_version: str, The precise API version represented by this request,
which will result in an API Version header being sent along with the
HTTP request.
Returns:
A tuple of (headers, path_params, query, body)
headers: dict, request headers
path_params: dict, parameters that appear in the request path
query: string, query part of the request URI
body: string, the body serialized as JSON
"""
...
def response(self, resp, content): # -> None:
"""Convert the response wire format into a Python object.
Args:
resp: httplib2.Response, the HTTP response headers and status
content: string, the body of the HTTP response
Returns:
The body de-serialized as a Python object.
Raises:
googleapiclient.errors.HttpError if a non 2xx response is received.
"""
...
def serialize(self, body_value): # -> None:
"""Perform the actual Python object serialization.
Args:
body_value: object, the request body as a Python object.
Returns:
string, the body in serialized form.
"""
...
def deserialize(self, content): # -> None:
"""Perform the actual deserialization from response string to Python
object.
Args:
content: string, the body of the HTTP response
Returns:
The body de-serialized as a Python object.
"""
...
class JsonModel(BaseModel):
"""Model class for JSON.
Serializes and de-serializes between JSON and the Python
object representation of HTTP request and response bodies.
"""
accept = ...
content_type = ...
alt_param = ...
def __init__(self, data_wrapper=...) -> None:
"""Construct a JsonModel.
Args:
data_wrapper: boolean, wrap requests and responses in a data wrapper
"""
...
def serialize(self, body_value): # -> str:
...
def deserialize(self, content): # -> Any:
...
@property
def no_content_response(self): # -> dict[Any, Any]:
...
class RawModel(JsonModel):
"""Model class for requests that don't return JSON.
Serializes and de-serializes between JSON and the Python
object representation of HTTP request, and returns the raw bytes
of the response body.
"""
accept = ...
content_type = ...
alt_param = ...
def deserialize(self, content):
...
@property
def no_content_response(self): # -> Literal['']:
...
class MediaModel(JsonModel):
"""Model class for requests that return Media.
Serializes and de-serializes between JSON and the Python
object representation of HTTP request, and returns the raw bytes
of the response body.
"""
accept = ...
content_type = ...
alt_param = ...
def deserialize(self, content):
...
@property
def no_content_response(self): # -> Literal['']:
...
class ProtocolBufferModel(BaseModel):
"""Model class for protocol buffers.
Serializes and de-serializes the binary protocol buffer sent in the HTTP
request and response bodies.
"""
accept = ...
content_type = ...
alt_param = ...
def __init__(self, protocol_buffer) -> None:
"""Constructs a ProtocolBufferModel.
The serialized protocol buffer returned in an HTTP response will be
de-serialized using the given protocol buffer class.
Args:
protocol_buffer: The protocol buffer class used to de-serialize a
response from the API.
"""
...
def serialize(self, body_value):
...
def deserialize(self, content):
...
@property
def no_content_response(self):
...
def makepatch(original, modified): # -> dict[Any, Any]:
"""Create a patch object.
Some methods support PATCH, an efficient way to send updates to a resource.
This method allows the easy construction of patch bodies by looking at the
differences between a resource before and after it was modified.
Args:
original: object, the original deserialized resource
modified: object, the modified deserialized resource
Returns:
An object that contains only the changes from original to modified, in a
form suitable to pass to a PATCH method.
Example usage:
item = service.activities().get(postid=postid, userid=userid).execute()
original = copy.deepcopy(item)
item['object']['content'] = 'This is updated.'
service.activities.patch(postid=postid, userid=userid,
body=makepatch(original, item)).execute()
"""
...

@ -0,0 +1,160 @@
"""
This type stub file was generated by pyright.
"""
from googleapiclient import _helpers as util
"""Schema processing for discovery based APIs
Schemas holds an APIs discovery schemas. It can return those schema as
deserialized JSON objects, or pretty print them as prototype objects that
conform to the schema.
For example, given the schema:
schema = \"\"\"{
"Foo": {
"type": "object",
"properties": {
"etag": {
"type": "string",
"description": "ETag of the collection."
},
"kind": {
"type": "string",
"description": "Type of the collection ('calendar#acl').",
"default": "calendar#acl"
},
"nextPageToken": {
"type": "string",
"description": "Token used to access the next
page of this result. Omitted if no further results are available."
}
}
}
}\"\"\"
s = Schemas(schema)
print s.prettyPrintByName('Foo')
Produces the following output:
{
"nextPageToken": "A String", # Token used to access the
# next page of this result. Omitted if no further results are available.
"kind": "A String", # Type of the collection ('calendar#acl').
"etag": "A String", # ETag of the collection.
},
The constructor takes a discovery document in which to look up named schema.
"""
__author__ = ...
class Schemas:
"""Schemas for an API."""
def __init__(self, discovery) -> None:
"""Constructor.
Args:
discovery: object, Deserialized discovery document from which we pull
out the named schema.
"""
...
def prettyPrintByName(self, name):
"""Get pretty printed object prototype from the schema name.
Args:
name: string, Name of schema in the discovery document.
Returns:
string, A string that contains a prototype object with
comments that conforms to the given schema.
"""
...
def prettyPrintSchema(self, schema): # -> LiteralString:
"""Get pretty printed object prototype of schema.
Args:
schema: object, Parsed JSON schema.
Returns:
string, A string that contains a prototype object with
comments that conforms to the given schema.
"""
...
def get(self, name, default=...):
"""Get deserialized JSON schema from the schema name.
Args:
name: string, Schema name.
default: object, return value if name not found.
"""
...
class _SchemaToStruct:
"""Convert schema to a prototype object."""
@util.positional(3)
def __init__(self, schema, seen, dent=...) -> None:
"""Constructor.
Args:
schema: object, Parsed JSON schema.
seen: list, List of names of schema already seen while parsing. Used to
handle recursive definitions.
dent: int, Initial indentation depth.
"""
...
def emit(self, text): # -> None:
"""Add text as a line to the output.
Args:
text: string, Text to output.
"""
...
def emitBegin(self, text): # -> None:
"""Add text to the output, but with no line terminator.
Args:
text: string, Text to output.
"""
...
def emitEnd(self, text, comment): # -> None:
"""Add text and comment to the output with line terminator.
Args:
text: string, Text to output.
comment: string, Python comment.
"""
...
def indent(self): # -> None:
"""Increase indentation level."""
...
def undent(self): # -> None:
"""Decrease indentation level."""
...
def to_str(self, from_cache): # -> LiteralString:
"""Prototype object based on the schema, in Python code with comments.
Args:
from_cache: callable(name, seen), Callable that retrieves an object
prototype for a schema with the given name. Seen is a list of schema
names already seen as we recursively descend the schema definition.
Returns:
Prototype object based on the schema, in Python code with comments.
The lines of the code will all be properly indented.
"""
...

@ -0,0 +1,5 @@
"""
This type stub file was generated by pyright.
"""
__version__ = ...
Loading…
Cancel
Save