123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- from __future__ import annotations
-
- import io
- import typing
- from base64 import b64encode
- from enum import Enum
-
- from ..exceptions import UnrewindableBodyError
- from .util import to_bytes
-
- if typing.TYPE_CHECKING:
- from typing_extensions import Final
-
- # Pass as a value within ``headers`` to skip
- # emitting some HTTP headers that are added automatically.
- # The only headers that are supported are ``Accept-Encoding``,
- # ``Host``, and ``User-Agent``.
- SKIP_HEADER = "@@@SKIP_HEADER@@@"
- SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
-
- ACCEPT_ENCODING = "gzip,deflate"
- try:
- try:
- import brotlicffi as _unused_module_brotli # type: ignore[import] # noqa: F401
- except ImportError:
- import brotli as _unused_module_brotli # type: ignore[import] # noqa: F401
- except ImportError:
- pass
- else:
- ACCEPT_ENCODING += ",br"
- try:
- import zstandard as _unused_module_zstd # type: ignore[import] # noqa: F401
- except ImportError:
- pass
- else:
- ACCEPT_ENCODING += ",zstd"
-
-
- class _TYPE_FAILEDTELL(Enum):
- token = 0
-
-
- _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
-
- _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
-
- # When sending a request with these methods we aren't expecting
- # a body so don't need to set an explicit 'Content-Length: 0'
- # The reason we do this in the negative instead of tracking methods
- # which 'should' have a body is because unknown methods should be
- # treated as if they were 'POST' which *does* expect a body.
- _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
-
-
- def make_headers(
- keep_alive: bool | None = None,
- accept_encoding: bool | list[str] | str | None = None,
- user_agent: str | None = None,
- basic_auth: str | None = None,
- proxy_basic_auth: str | None = None,
- disable_cache: bool | None = None,
- ) -> dict[str, str]:
- """
- Shortcuts for generating request headers.
-
- :param keep_alive:
- If ``True``, adds 'connection: keep-alive' header.
-
- :param accept_encoding:
- Can be a boolean, list, or string.
- ``True`` translates to 'gzip,deflate'. If either the ``brotli`` or
- ``brotlicffi`` package is installed 'gzip,deflate,br' is used instead.
- List will get joined by comma.
- String will be used as provided.
-
- :param user_agent:
- String representing the user-agent you want, such as
- "python-urllib3/0.6"
-
- :param basic_auth:
- Colon-separated username:password string for 'authorization: basic ...'
- auth header.
-
- :param proxy_basic_auth:
- Colon-separated username:password string for 'proxy-authorization: basic ...'
- auth header.
-
- :param disable_cache:
- If ``True``, adds 'cache-control: no-cache' header.
-
- Example:
-
- .. code-block:: python
-
- import urllib3
-
- print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
- # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
- print(urllib3.util.make_headers(accept_encoding=True))
- # {'accept-encoding': 'gzip,deflate'}
- """
- headers: dict[str, str] = {}
- if accept_encoding:
- if isinstance(accept_encoding, str):
- pass
- elif isinstance(accept_encoding, list):
- accept_encoding = ",".join(accept_encoding)
- else:
- accept_encoding = ACCEPT_ENCODING
- headers["accept-encoding"] = accept_encoding
-
- if user_agent:
- headers["user-agent"] = user_agent
-
- if keep_alive:
- headers["connection"] = "keep-alive"
-
- if basic_auth:
- headers[
- "authorization"
- ] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
-
- if proxy_basic_auth:
- headers[
- "proxy-authorization"
- ] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
-
- if disable_cache:
- headers["cache-control"] = "no-cache"
-
- return headers
-
-
- def set_file_position(
- body: typing.Any, pos: _TYPE_BODY_POSITION | None
- ) -> _TYPE_BODY_POSITION | None:
- """
- If a position is provided, move file to that point.
- Otherwise, we'll attempt to record a position for future use.
- """
- if pos is not None:
- rewind_body(body, pos)
- elif getattr(body, "tell", None) is not None:
- try:
- pos = body.tell()
- except OSError:
- # This differentiates from None, allowing us to catch
- # a failed `tell()` later when trying to rewind the body.
- pos = _FAILEDTELL
-
- return pos
-
-
- def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
- """
- Attempt to rewind body to a certain position.
- Primarily used for request redirects and retries.
-
- :param body:
- File-like object that supports seek.
-
- :param int pos:
- Position to seek to in file.
- """
- body_seek = getattr(body, "seek", None)
- if body_seek is not None and isinstance(body_pos, int):
- try:
- body_seek(body_pos)
- except OSError as e:
- raise UnrewindableBodyError(
- "An error occurred when rewinding request body for redirect/retry."
- ) from e
- elif body_pos is _FAILEDTELL:
- raise UnrewindableBodyError(
- "Unable to record file position for rewinding "
- "request body during a redirect/retry."
- )
- else:
- raise ValueError(
- f"body_pos must be of type integer, instead it was {type(body_pos)}."
- )
-
-
- class ChunksAndContentLength(typing.NamedTuple):
- chunks: typing.Iterable[bytes] | None
- content_length: int | None
-
-
- def body_to_chunks(
- body: typing.Any | None, method: str, blocksize: int
- ) -> ChunksAndContentLength:
- """Takes the HTTP request method, body, and blocksize and
- transforms them into an iterable of chunks to pass to
- socket.sendall() and an optional 'Content-Length' header.
-
- A 'Content-Length' of 'None' indicates the length of the body
- can't be determined so should use 'Transfer-Encoding: chunked'
- for framing instead.
- """
-
- chunks: typing.Iterable[bytes] | None
- content_length: int | None
-
- # No body, we need to make a recommendation on 'Content-Length'
- # based on whether that request method is expected to have
- # a body or not.
- if body is None:
- chunks = None
- if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
- content_length = 0
- else:
- content_length = None
-
- # Bytes or strings become bytes
- elif isinstance(body, (str, bytes)):
- chunks = (to_bytes(body),)
- content_length = len(chunks[0])
-
- # File-like object, TODO: use seek() and tell() for length?
- elif hasattr(body, "read"):
-
- def chunk_readable() -> typing.Iterable[bytes]:
- nonlocal body, blocksize
- encode = isinstance(body, io.TextIOBase)
- while True:
- datablock = body.read(blocksize) # type: ignore[union-attr]
- if not datablock:
- break
- if encode:
- datablock = datablock.encode("iso-8859-1")
- yield datablock
-
- chunks = chunk_readable()
- content_length = None
-
- # Otherwise we need to start checking via duck-typing.
- else:
- try:
- # Check if the body implements the buffer API.
- mv = memoryview(body)
- except TypeError:
- try:
- # Check if the body is an iterable
- chunks = iter(body)
- content_length = None
- except TypeError:
- raise TypeError(
- f"'body' must be a bytes-like object, file-like "
- f"object, or iterable. Instead was {body!r}"
- ) from None
- else:
- # Since it implements the buffer API can be passed directly to socket.sendall()
- chunks = (body,)
- content_length = mv.nbytes
-
- return ChunksAndContentLength(chunks=chunks, content_length=content_length)
|