You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

858 lines
28 KiB
Python

"""
This type stub file was generated by pyright.
"""
from googleapiclient import _helpers as util
"""Classes to encapsulate a single HTTP request.
The classes implement a command pattern, with every
object supporting an execute() method that does the
actual HTTP request.
"""
__author__ = ...
LOGGER = ...
DEFAULT_CHUNK_SIZE = ...
MAX_URI_LENGTH = ...
MAX_BATCH_LIMIT = ...
_TOO_MANY_REQUESTS = ...
DEFAULT_HTTP_TIMEOUT_SEC = ...
_LEGACY_BATCH_URI = ...
class MediaUploadProgress:
"""Status of a resumable upload."""
def __init__(self, resumable_progress, total_size) -> None:
"""Constructor.
Args:
resumable_progress: int, bytes sent so far.
total_size: int, total bytes in complete upload, or None if the total
upload size isn't known ahead of time.
"""
...
def progress(self): # -> float:
"""Percent of upload completed, as a float.
Returns:
the percentage complete as a float, returning 0.0 if the total size of
the upload is unknown.
"""
...
class MediaDownloadProgress:
"""Status of a resumable download."""
def __init__(self, resumable_progress, total_size) -> None:
"""Constructor.
Args:
resumable_progress: int, bytes received so far.
total_size: int, total bytes in complete download.
"""
...
def progress(self): # -> float:
"""Percent of download completed, as a float.
Returns:
the percentage complete as a float, returning 0.0 if the total size of
the download is unknown.
"""
...
class MediaUpload:
"""Describes a media object to upload.
Base class that defines the interface of MediaUpload subclasses.
Note that subclasses of MediaUpload may allow you to control the chunksize
when uploading a media object. It is important to keep the size of the chunk
as large as possible to keep the upload efficient. Other factors may influence
the size of the chunk you use, particularly if you are working in an
environment where individual HTTP requests may have a hardcoded time limit,
such as under certain classes of requests under Google App Engine.
Streams are io.Base compatible objects that support seek(). Some MediaUpload
subclasses support using streams directly to upload data. Support for
streaming may be indicated by a MediaUpload sub-class and if appropriate for a
platform that stream will be used for uploading the media object. The support
for streaming is indicated by has_stream() returning True. The stream() method
should return an io.Base object that supports seek(). On platforms where the
underlying httplib module supports streaming, for example Python 2.6 and
later, the stream will be passed into the http library which will result in
less memory being used and possibly faster uploads.
If you need to upload media that can't be uploaded using any of the existing
MediaUpload sub-class then you can sub-class MediaUpload for your particular
needs.
"""
def chunksize(self):
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
...
def mimetype(self): # -> Literal['application/octet-stream']:
"""Mime type of the body.
Returns:
Mime type.
"""
...
def size(self): # -> None:
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
...
def resumable(self): # -> Literal[False]:
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
...
def getbytes(self, begin, end):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorter than length if EOF was reached
first.
"""
...
def has_stream(self): # -> Literal[False]:
"""Does the underlying upload support a streaming interface.
Streaming means it is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
Returns:
True if the call to stream() will return an instance of a seekable io.Base
subclass.
"""
...
def stream(self):
"""A stream interface to the data being uploaded.
Returns:
The returned value is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
"""
...
def to_json(self): # -> str:
"""Create a JSON representation of an instance of MediaUpload.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
...
@classmethod
def new_from_json(cls, s): # -> Any:
"""Utility class method to instantiate a MediaUpload subclass from a JSON
representation produced by to_json().
Args:
s: string, JSON from to_json().
Returns:
An instance of the subclass of MediaUpload that was serialized with
to_json().
"""
...
class MediaIoBaseUpload(MediaUpload):
"""A MediaUpload for a io.Base objects.
Note that the Python file object is compatible with io.Base and can be used
with this class also.
fh = BytesIO('...Some data to upload...')
media = MediaIoBaseUpload(fh, mimetype='image/png',
chunksize=1024*1024, resumable=True)
farm.animals().insert(
id='cow',
name='cow.png',
media_body=media).execute()
Depending on the platform you are working on, you may pass -1 as the
chunksize, which indicates that the entire file should be uploaded in a single
request. If the underlying platform supports streams, such as Python 2.6 or
later, then this can be very efficient as it avoids multiple connections, and
also avoids loading the entire file into memory before sending it. Note that
Google App Engine has a 5MB limit on request size, so you should never set
your chunksize larger than 5MB, or to -1.
"""
@util.positional(3)
def __init__(self, fd, mimetype, chunksize=..., resumable=...) -> None:
"""Constructor.
Args:
fd: io.Base or file object, The source of the bytes to upload. MUST be
opened in blocking mode, do not use streams opened in non-blocking mode.
The given stream must be seekable, that is, it must be able to call
seek() on fd.
mimetype: string, Mime-type of the file.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True. Pass in a value of -1 if the file is to be
uploaded as a single chunk. Note that Google App Engine has a 5MB limit
on request size, so you should never set your chunksize larger than 5MB,
or to -1.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
...
def chunksize(self): # -> int:
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
...
def mimetype(self): # -> Any:
"""Mime type of the body.
Returns:
Mime type.
"""
...
def size(self):
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
...
def resumable(self): # -> bool:
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
...
def getbytes(self, begin, length):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorted than length if EOF was reached
first.
"""
...
def has_stream(self): # -> Literal[True]:
"""Does the underlying upload support a streaming interface.
Streaming means it is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
Returns:
True if the call to stream() will return an instance of a seekable io.Base
subclass.
"""
...
def stream(self): # -> Any:
"""A stream interface to the data being uploaded.
Returns:
The returned value is an io.IOBase subclass that supports seek, i.e.
seekable() returns True.
"""
...
def to_json(self):
"""This upload type is not serializable."""
...
class MediaFileUpload(MediaIoBaseUpload):
"""A MediaUpload for a file.
Construct a MediaFileUpload and pass as the media_body parameter of the
method. For example, if we had a service that allowed uploading images:
media = MediaFileUpload('cow.png', mimetype='image/png',
chunksize=1024*1024, resumable=True)
farm.animals().insert(
id='cow',
name='cow.png',
media_body=media).execute()
Depending on the platform you are working on, you may pass -1 as the
chunksize, which indicates that the entire file should be uploaded in a single
request. If the underlying platform supports streams, such as Python 2.6 or
later, then this can be very efficient as it avoids multiple connections, and
also avoids loading the entire file into memory before sending it. Note that
Google App Engine has a 5MB limit on request size, so you should never set
your chunksize larger than 5MB, or to -1.
"""
@util.positional(2)
def __init__(self, filename, mimetype=..., chunksize=..., resumable=...) -> None:
"""Constructor.
Args:
filename: string, Name of the file.
mimetype: string, Mime-type of the file. If None then a mime-type will be
guessed from the file extension.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True. Pass in a value of -1 if the file is to be
uploaded in a single chunk. Note that Google App Engine has a 5MB limit
on request size, so you should never set your chunksize larger than 5MB,
or to -1.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
...
def __del__(self): # -> None:
...
def to_json(self): # -> str:
"""Creating a JSON representation of an instance of MediaFileUpload.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
...
@staticmethod
def from_json(s): # -> MediaFileUpload:
...
class MediaInMemoryUpload(MediaIoBaseUpload):
"""MediaUpload for a chunk of bytes.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or io.StringIO for
the stream.
"""
@util.positional(2)
def __init__(self, body, mimetype=..., chunksize=..., resumable=...) -> None:
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or io.StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
...
class MediaIoBaseDownload:
""" "Download media resources.
Note that the Python file object is compatible with io.Base and can be used
with this class also.
Example:
request = farms.animals().get_media(id='cow')
fh = io.FileIO('cow.png', mode='wb')
downloader = MediaIoBaseDownload(fh, request, chunksize=1024*1024)
done = False
while done is False:
status, done = downloader.next_chunk()
if status:
print "Download %d%%." % int(status.progress() * 100)
print "Download Complete!"
"""
@util.positional(3)
def __init__(self, fd, request, chunksize=...) -> None:
"""Constructor.
Args:
fd: io.Base or file object, The stream in which to write the downloaded
bytes.
request: googleapiclient.http.HttpRequest, the media request to perform in
chunks.
chunksize: int, File will be downloaded in chunks of this many bytes.
"""
...
@util.positional(1)
def next_chunk(self, num_retries=...): # -> tuple[MediaDownloadProgress, bool] | tuple[MediaDownloadProgress, Literal[True]]:
"""Get the next chunk of the download.
Args:
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
(status, done): (MediaDownloadProgress, boolean)
The value of 'done' will be True when the media has been fully
downloaded or the total size of the media is unknown.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
...
class _StreamSlice:
"""Truncated stream.
Takes a stream and presents a stream that is a slice of the original stream.
This is used when uploading media in chunks. In later versions of Python a
stream can be passed to httplib in place of the string of data to send. The
problem is that httplib just blindly reads to the end of the stream. This
wrapper presents a virtual stream that only reads to the end of the chunk.
"""
def __init__(self, stream, begin, chunksize) -> None:
"""Constructor.
Args:
stream: (io.Base, file object), the stream to wrap.
begin: int, the seek position the chunk begins at.
chunksize: int, the size of the chunk.
"""
...
def read(self, n=...):
"""Read n bytes.
Args:
n, int, the number of bytes to read.
Returns:
A string of length 'n', or less if EOF is reached.
"""
...
class HttpRequest:
"""Encapsulates a single HTTP request."""
@util.positional(4)
def __init__(self, http, postproc, uri, method=..., body=..., headers=..., methodId=..., resumable=...) -> None:
"""Constructor for an HttpRequest.
Args:
http: httplib2.Http, the transport object to use to make a request
postproc: callable, called on the HTTP response and content to transform
it into a data object before returning, or raising an exception
on an error.
uri: string, the absolute URI to send the request to
method: string, the HTTP method to use
body: string, the request body of the HTTP request,
headers: dict, the HTTP request headers
methodId: string, a unique identifier for the API method being called.
resumable: MediaUpload, None if this is not a resumbale request.
"""
...
@util.positional(1)
def execute(self, http=..., num_retries=...):
"""Execute the request.
Args:
http: httplib2.Http, an http object to be used in place of the
one the HttpRequest request object was constructed with.
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
A deserialized object model of the response body as determined
by the postproc.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
...
@util.positional(2)
def add_response_callback(self, cb): # -> None:
"""add_response_headers_callback
Args:
cb: Callback to be called on receiving the response headers, of signature:
def cb(resp):
# Where resp is an instance of httplib2.Response
"""
...
@util.positional(1)
def next_chunk(self, http=..., num_retries=...): # -> tuple[MediaUploadProgress | None, Any] | tuple[None, Any] | tuple[MediaUploadProgress, None]:
"""Execute the next step of a resumable upload.
Can only be used if the method being executed supports media uploads and
the MediaUpload object passed in was flagged as using resumable upload.
Example:
media = MediaFileUpload('cow.png', mimetype='image/png',
chunksize=1000, resumable=True)
request = farm.animals().insert(
id='cow',
name='cow.png',
media_body=media)
response = None
while response is None:
status, response = request.next_chunk()
if status:
print "Upload %d%% complete." % int(status.progress() * 100)
Args:
http: httplib2.Http, an http object to be used in place of the
one the HttpRequest request object was constructed with.
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
(status, body): (ResumableMediaStatus, object)
The body will be None until the resumable media is fully uploaded.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
...
def to_json(self): # -> str:
"""Returns a JSON representation of the HttpRequest."""
...
@staticmethod
def from_json(s, http, postproc): # -> HttpRequest:
"""Returns an HttpRequest populated with info from a JSON object."""
...
@staticmethod
def null_postproc(resp, contents): # -> tuple[Any, Any]:
...
class BatchHttpRequest:
"""Batches multiple HttpRequest objects into a single HTTP request.
Example:
from googleapiclient.http import BatchHttpRequest
def list_animals(request_id, response, exception):
\"\"\"Do something with the animals list response.\"\"\"
if exception is not None:
# Do something with the exception.
pass
else:
# Do something with the response.
pass
def list_farmers(request_id, response, exception):
\"\"\"Do something with the farmers list response.\"\"\"
if exception is not None:
# Do something with the exception.
pass
else:
# Do something with the response.
pass
service = build('farm', 'v2')
batch = BatchHttpRequest()
batch.add(service.animals().list(), list_animals)
batch.add(service.farmers().list(), list_farmers)
batch.execute(http=http)
"""
@util.positional(1)
def __init__(self, callback=..., batch_uri=...) -> None:
"""Constructor for a BatchHttpRequest.
Args:
callback: callable, A callback to be called for each response, of the
form callback(id, response, exception). The first parameter is the
request id, and the second is the deserialized response object. The
third is an googleapiclient.errors.HttpError exception object if an HTTP error
occurred while processing the request, or None if no error occurred.
batch_uri: string, URI to send batch requests to.
"""
...
@util.positional(2)
def add(self, request, callback=..., request_id=...): # -> None:
"""Add a new request.
Every callback added will be paired with a unique id, the request_id. That
unique id will be passed back to the callback when the response comes back
from the server. The default behavior is to have the library generate it's
own unique id. If the caller passes in a request_id then they must ensure
uniqueness for each request_id, and if they are not an exception is
raised. Callers should either supply all request_ids or never supply a
request id, to avoid such an error.
Args:
request: HttpRequest, Request to add to the batch.
callback: callable, A callback to be called for this response, of the
form callback(id, response, exception). The first parameter is the
request id, and the second is the deserialized response object. The
third is an googleapiclient.errors.HttpError exception object if an HTTP error
occurred while processing the request, or None if no errors occurred.
request_id: string, A unique id for the request. The id will be passed
to the callback with the response.
Returns:
None
Raises:
BatchError if a media request is added to a batch.
KeyError is the request_id is not unique.
"""
...
@util.positional(1)
def execute(self, http=...): # -> None:
"""Execute all the requests as a single batched HTTP request.
Args:
http: httplib2.Http, an http object to be used in place of the one the
HttpRequest request object was constructed with. If one isn't supplied
then use a http object from the requests in this batch.
Returns:
None
Raises:
httplib2.HttpLib2Error if a transport error has occurred.
googleapiclient.errors.BatchError if the response is the wrong format.
"""
...
class HttpRequestMock:
"""Mock of HttpRequest.
Do not construct directly, instead use RequestMockBuilder.
"""
def __init__(self, resp, content, postproc) -> None:
"""Constructor for HttpRequestMock
Args:
resp: httplib2.Response, the response to emulate coming from the request
content: string, the response body
postproc: callable, the post processing function usually supplied by
the model class. See model.JsonModel.response() as an example.
"""
...
def execute(self, http=...):
"""Execute the request.
Same behavior as HttpRequest.execute(), but the response is
mocked and not really from an HTTP request/response.
"""
...
class RequestMockBuilder:
"""A simple mock of HttpRequest
Pass in a dictionary to the constructor that maps request methodIds to
tuples of (httplib2.Response, content, opt_expected_body) that should be
returned when that method is called. None may also be passed in for the
httplib2.Response, in which case a 200 OK response will be generated.
If an opt_expected_body (str or dict) is provided, it will be compared to
the body and UnexpectedBodyError will be raised on inequality.
Example:
response = '{"data": {"id": "tag:google.c...'
requestBuilder = RequestMockBuilder(
{
'plus.activities.get': (None, response),
}
)
googleapiclient.discovery.build("plus", "v1", requestBuilder=requestBuilder)
Methods that you do not supply a response for will return a
200 OK with an empty string as the response content or raise an excpetion
if check_unexpected is set to True. The methodId is taken from the rpcName
in the discovery document.
For more details see the project wiki.
"""
def __init__(self, responses, check_unexpected=...) -> None:
"""Constructor for RequestMockBuilder
The constructed object should be a callable object
that can replace the class HttpResponse.
responses - A dictionary that maps methodIds into tuples
of (httplib2.Response, content). The methodId
comes from the 'rpcName' field in the discovery
document.
check_unexpected - A boolean setting whether or not UnexpectedMethodError
should be raised on unsupplied method.
"""
...
def __call__(self, http, postproc, uri, method=..., body=..., headers=..., methodId=..., resumable=...): # -> HttpRequestMock:
"""Implements the callable interface that discovery.build() expects
of requestBuilder, which is to build an object compatible with
HttpRequest.execute(). See that method for the description of the
parameters and the expected response.
"""
...
class HttpMock:
"""Mock of httplib2.Http"""
def __init__(self, filename=..., headers=...) -> None:
"""
Args:
filename: string, absolute filename to read response from
headers: dict, header to return with response
"""
...
def request(self, uri, method=..., body=..., headers=..., redirections=..., connection_type=...): # -> tuple[Response[str], bytes | None]:
...
def close(self): # -> None:
...
class HttpMockSequence:
"""Mock of httplib2.Http
Mocks a sequence of calls to request returning different responses for each
call. Create an instance initialized with the desired response headers
and content and then use as if an httplib2.Http instance.
http = HttpMockSequence([
({'status': '401'}, ''),
({'status': '200'}, '{"access_token":"1/3w","expires_in":3600}'),
({'status': '200'}, 'echo_request_headers'),
])
resp, content = http.request("http://examples.com")
There are special values you can pass in for content to trigger
behavours that are helpful in testing.
'echo_request_headers' means return the request headers in the response body
'echo_request_headers_as_json' means return the request headers in
the response body
'echo_request_body' means return the request body in the response body
'echo_request_uri' means return the request uri in the response body
"""
def __init__(self, iterable) -> None:
"""
Args:
iterable: iterable, a sequence of pairs of (headers, body)
"""
...
def request(self, uri, method=..., body=..., headers=..., redirections=..., connection_type=...): # -> tuple[Response[Any], bytes | Any | bytearray | memoryview[_I] | None]:
...
def set_user_agent(http, user_agent):
"""Set the user-agent on every request.
Args:
http - An instance of httplib2.Http
or something that acts like it.
user_agent: string, the value for the user-agent header.
Returns:
A modified instance of http that was passed in.
Example:
h = httplib2.Http()
h = set_user_agent(h, "my-app-name/6.0")
Most of the time the user-agent will be set doing auth, this is for the rare
cases where you are accessing an unauthenticated endpoint.
"""
...
def tunnel_patch(http):
"""Tunnel PATCH requests over POST.
Args:
http - An instance of httplib2.Http
or something that acts like it.
Returns:
A modified instance of http that was passed in.
Example:
h = httplib2.Http()
h = tunnel_patch(h, "my-app-name/6.0")
Useful if you are running on a platform that doesn't support PATCH.
Apply this last if you are using OAuth 1.0, as changing the method
will result in a different signature.
"""
...
def build_http(): # -> Http:
"""Builds httplib2.Http object
Returns:
A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
To override default timeout call
socket.setdefaulttimeout(timeout_in_sec)
before interacting with this method.
"""
...