123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- """
- Module for using pyOpenSSL as a TLS backend. This module was relevant before
- the standard library ``ssl`` module supported SNI, but now that we've dropped
- support for Python 2.7 all relevant Python versions support SNI so
- **this module is no longer recommended**.
-
- This needs the following packages installed:
-
- * `pyOpenSSL`_ (tested with 16.0.0)
- * `cryptography`_ (minimum 1.3.4, from pyopenssl)
- * `idna`_ (minimum 2.0, from cryptography)
-
- However, pyOpenSSL depends on cryptography, which depends on idna, so while we
- use all three directly here we end up having relatively few packages required.
-
- You can install them with the following command:
-
- .. code-block:: bash
-
- $ python -m pip install pyopenssl cryptography idna
-
- To activate certificate checking, call
- :func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
- before you begin making HTTP requests. This can be done in a ``sitecustomize``
- module, or at any other time before your application begins using ``urllib3``,
- like this:
-
- .. code-block:: python
-
- try:
- import urllib3.contrib.pyopenssl
- urllib3.contrib.pyopenssl.inject_into_urllib3()
- except ImportError:
- pass
-
- .. _pyopenssl: https://www.pyopenssl.org
- .. _cryptography: https://cryptography.io
- .. _idna: https://github.com/kjd/idna
- """
-
- from __future__ import annotations
-
- import OpenSSL.SSL # type: ignore[import]
- from cryptography import x509
-
- try:
- from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
- except ImportError:
- # UnsupportedExtension is gone in cryptography >= 2.1.0
- class UnsupportedExtension(Exception): # type: ignore[no-redef]
- pass
-
-
- import logging
- import ssl
- import typing
- import warnings
- from io import BytesIO
- from socket import socket as socket_cls
- from socket import timeout
-
- from .. import util
-
- warnings.warn(
- "'urllib3.contrib.pyopenssl' module is deprecated and will be removed "
- "in urllib3 v2.1.0. Read more in this issue: "
- "https://github.com/urllib3/urllib3/issues/2680",
- category=DeprecationWarning,
- stacklevel=2,
- )
-
- if typing.TYPE_CHECKING:
- from OpenSSL.crypto import X509 # type: ignore[import]
-
-
- __all__ = ["inject_into_urllib3", "extract_from_urllib3"]
-
- # Map from urllib3 to PyOpenSSL compatible parameter-values.
- _openssl_versions = {
- util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
- util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
- ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
- }
-
- if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
- _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
-
- if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
- _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
-
-
- _stdlib_to_openssl_verify = {
- ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
- ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
- ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
- + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
- }
- _openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
-
- # The SSLvX values are the most likely to be missing in the future
- # but we check them all just to be sure.
- _OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
- OpenSSL.SSL, "OP_NO_SSLv3", 0
- )
- _OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
- _OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
- _OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
- _OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
-
- _openssl_to_ssl_minimum_version: dict[int, int] = {
- ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
- ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
- ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
- ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
- ssl.TLSVersion.TLSv1_3: (
- _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
- ),
- ssl.TLSVersion.MAXIMUM_SUPPORTED: (
- _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
- ),
- }
- _openssl_to_ssl_maximum_version: dict[int, int] = {
- ssl.TLSVersion.MINIMUM_SUPPORTED: (
- _OP_NO_SSLv2_OR_SSLv3
- | _OP_NO_TLSv1
- | _OP_NO_TLSv1_1
- | _OP_NO_TLSv1_2
- | _OP_NO_TLSv1_3
- ),
- ssl.TLSVersion.TLSv1: (
- _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
- ),
- ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
- ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
- ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
- ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
- }
-
- # OpenSSL will only write 16K at a time
- SSL_WRITE_BLOCKSIZE = 16384
-
- orig_util_SSLContext = util.ssl_.SSLContext
-
-
- log = logging.getLogger(__name__)
-
-
- def inject_into_urllib3() -> None:
- "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
-
- _validate_dependencies_met()
-
- util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
- util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
- util.IS_PYOPENSSL = True
- util.ssl_.IS_PYOPENSSL = True
-
-
- def extract_from_urllib3() -> None:
- "Undo monkey-patching by :func:`inject_into_urllib3`."
-
- util.SSLContext = orig_util_SSLContext
- util.ssl_.SSLContext = orig_util_SSLContext
- util.IS_PYOPENSSL = False
- util.ssl_.IS_PYOPENSSL = False
-
-
- def _validate_dependencies_met() -> None:
- """
- Verifies that PyOpenSSL's package-level dependencies have been met.
- Throws `ImportError` if they are not met.
- """
- # Method added in `cryptography==1.1`; not available in older versions
- from cryptography.x509.extensions import Extensions
-
- if getattr(Extensions, "get_extension_for_class", None) is None:
- raise ImportError(
- "'cryptography' module missing required functionality. "
- "Try upgrading to v1.3.4 or newer."
- )
-
- # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
- # attribute is only present on those versions.
- from OpenSSL.crypto import X509
-
- x509 = X509()
- if getattr(x509, "_x509", None) is None:
- raise ImportError(
- "'pyOpenSSL' module missing required functionality. "
- "Try upgrading to v0.14 or newer."
- )
-
-
- def _dnsname_to_stdlib(name: str) -> str | None:
- """
- Converts a dNSName SubjectAlternativeName field to the form used by the
- standard library on the given Python version.
-
- Cryptography produces a dNSName as a unicode string that was idna-decoded
- from ASCII bytes. We need to idna-encode that string to get it back, and
- then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
- uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
-
- If the name cannot be idna-encoded then we return None signalling that
- the name given should be skipped.
- """
-
- def idna_encode(name: str) -> bytes | None:
- """
- Borrowed wholesale from the Python Cryptography Project. It turns out
- that we can't just safely call `idna.encode`: it can explode for
- wildcard names. This avoids that problem.
- """
- import idna
-
- try:
- for prefix in ["*.", "."]:
- if name.startswith(prefix):
- name = name[len(prefix) :]
- return prefix.encode("ascii") + idna.encode(name)
- return idna.encode(name)
- except idna.core.IDNAError:
- return None
-
- # Don't send IPv6 addresses through the IDNA encoder.
- if ":" in name:
- return name
-
- encoded_name = idna_encode(name)
- if encoded_name is None:
- return None
- return encoded_name.decode("utf-8")
-
-
- def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
- """
- Given an PyOpenSSL certificate, provides all the subject alternative names.
- """
- cert = peer_cert.to_cryptography()
-
- # We want to find the SAN extension. Ask Cryptography to locate it (it's
- # faster than looping in Python)
- try:
- ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
- except x509.ExtensionNotFound:
- # No such extension, return the empty list.
- return []
- except (
- x509.DuplicateExtension,
- UnsupportedExtension,
- x509.UnsupportedGeneralNameType,
- UnicodeError,
- ) as e:
- # A problem has been found with the quality of the certificate. Assume
- # no SAN field is present.
- log.warning(
- "A problem was encountered with the certificate that prevented "
- "urllib3 from finding the SubjectAlternativeName field. This can "
- "affect certificate validation. The error was %s",
- e,
- )
- return []
-
- # We want to return dNSName and iPAddress fields. We need to cast the IPs
- # back to strings because the match_hostname function wants them as
- # strings.
- # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
- # decoded. This is pretty frustrating, but that's what the standard library
- # does with certificates, and so we need to attempt to do the same.
- # We also want to skip over names which cannot be idna encoded.
- names = [
- ("DNS", name)
- for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
- if name is not None
- ]
- names.extend(
- ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
- )
-
- return names
-
-
- class WrappedSocket:
- """API-compatibility wrapper for Python OpenSSL's Connection-class."""
-
- def __init__(
- self,
- connection: OpenSSL.SSL.Connection,
- socket: socket_cls,
- suppress_ragged_eofs: bool = True,
- ) -> None:
- self.connection = connection
- self.socket = socket
- self.suppress_ragged_eofs = suppress_ragged_eofs
- self._io_refs = 0
- self._closed = False
-
- def fileno(self) -> int:
- return self.socket.fileno()
-
- # Copy-pasted from Python 3.5 source code
- def _decref_socketios(self) -> None:
- if self._io_refs > 0:
- self._io_refs -= 1
- if self._closed:
- self.close()
-
- def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
- try:
- data = self.connection.recv(*args, **kwargs)
- except OpenSSL.SSL.SysCallError as e:
- if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
- return b""
- else:
- raise OSError(e.args[0], str(e)) from e
- except OpenSSL.SSL.ZeroReturnError:
- if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
- return b""
- else:
- raise
- except OpenSSL.SSL.WantReadError as e:
- if not util.wait_for_read(self.socket, self.socket.gettimeout()):
- raise timeout("The read operation timed out") from e
- else:
- return self.recv(*args, **kwargs)
-
- # TLS 1.3 post-handshake authentication
- except OpenSSL.SSL.Error as e:
- raise ssl.SSLError(f"read error: {e!r}") from e
- else:
- return data # type: ignore[no-any-return]
-
- def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
- try:
- return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
- except OpenSSL.SSL.SysCallError as e:
- if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
- return 0
- else:
- raise OSError(e.args[0], str(e)) from e
- except OpenSSL.SSL.ZeroReturnError:
- if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
- return 0
- else:
- raise
- except OpenSSL.SSL.WantReadError as e:
- if not util.wait_for_read(self.socket, self.socket.gettimeout()):
- raise timeout("The read operation timed out") from e
- else:
- return self.recv_into(*args, **kwargs)
-
- # TLS 1.3 post-handshake authentication
- except OpenSSL.SSL.Error as e:
- raise ssl.SSLError(f"read error: {e!r}") from e
-
- def settimeout(self, timeout: float) -> None:
- return self.socket.settimeout(timeout)
-
- def _send_until_done(self, data: bytes) -> int:
- while True:
- try:
- return self.connection.send(data) # type: ignore[no-any-return]
- except OpenSSL.SSL.WantWriteError as e:
- if not util.wait_for_write(self.socket, self.socket.gettimeout()):
- raise timeout() from e
- continue
- except OpenSSL.SSL.SysCallError as e:
- raise OSError(e.args[0], str(e)) from e
-
- def sendall(self, data: bytes) -> None:
- total_sent = 0
- while total_sent < len(data):
- sent = self._send_until_done(
- data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
- )
- total_sent += sent
-
- def shutdown(self) -> None:
- # FIXME rethrow compatible exceptions should we ever use this
- self.connection.shutdown()
-
- def close(self) -> None:
- self._closed = True
- if self._io_refs <= 0:
- self._real_close()
-
- def _real_close(self) -> None:
- try:
- return self.connection.close() # type: ignore[no-any-return]
- except OpenSSL.SSL.Error:
- return
-
- def getpeercert(
- self, binary_form: bool = False
- ) -> dict[str, list[typing.Any]] | None:
- x509 = self.connection.get_peer_certificate()
-
- if not x509:
- return x509 # type: ignore[no-any-return]
-
- if binary_form:
- return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
-
- return {
- "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
- "subjectAltName": get_subj_alt_name(x509),
- }
-
- def version(self) -> str:
- return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
-
-
- WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
-
-
- class PyOpenSSLContext:
- """
- I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
- for translating the interface of the standard library ``SSLContext`` object
- to calls into PyOpenSSL.
- """
-
- def __init__(self, protocol: int) -> None:
- self.protocol = _openssl_versions[protocol]
- self._ctx = OpenSSL.SSL.Context(self.protocol)
- self._options = 0
- self.check_hostname = False
- self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
- self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
-
- @property
- def options(self) -> int:
- return self._options
-
- @options.setter
- def options(self, value: int) -> None:
- self._options = value
- self._set_ctx_options()
-
- @property
- def verify_mode(self) -> int:
- return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
-
- @verify_mode.setter
- def verify_mode(self, value: ssl.VerifyMode) -> None:
- self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
-
- def set_default_verify_paths(self) -> None:
- self._ctx.set_default_verify_paths()
-
- def set_ciphers(self, ciphers: bytes | str) -> None:
- if isinstance(ciphers, str):
- ciphers = ciphers.encode("utf-8")
- self._ctx.set_cipher_list(ciphers)
-
- def load_verify_locations(
- self,
- cafile: str | None = None,
- capath: str | None = None,
- cadata: bytes | None = None,
- ) -> None:
- if cafile is not None:
- cafile = cafile.encode("utf-8") # type: ignore[assignment]
- if capath is not None:
- capath = capath.encode("utf-8") # type: ignore[assignment]
- try:
- self._ctx.load_verify_locations(cafile, capath)
- if cadata is not None:
- self._ctx.load_verify_locations(BytesIO(cadata))
- except OpenSSL.SSL.Error as e:
- raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
-
- def load_cert_chain(
- self,
- certfile: str,
- keyfile: str | None = None,
- password: str | None = None,
- ) -> None:
- try:
- self._ctx.use_certificate_chain_file(certfile)
- if password is not None:
- if not isinstance(password, bytes):
- password = password.encode("utf-8") # type: ignore[assignment]
- self._ctx.set_passwd_cb(lambda *_: password)
- self._ctx.use_privatekey_file(keyfile or certfile)
- except OpenSSL.SSL.Error as e:
- raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
-
- def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
- protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
- return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
-
- def wrap_socket(
- self,
- sock: socket_cls,
- server_side: bool = False,
- do_handshake_on_connect: bool = True,
- suppress_ragged_eofs: bool = True,
- server_hostname: bytes | str | None = None,
- ) -> WrappedSocket:
- cnx = OpenSSL.SSL.Connection(self._ctx, sock)
-
- # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
- if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
- if isinstance(server_hostname, str):
- server_hostname = server_hostname.encode("utf-8")
- cnx.set_tlsext_host_name(server_hostname)
-
- cnx.set_connect_state()
-
- while True:
- try:
- cnx.do_handshake()
- except OpenSSL.SSL.WantReadError as e:
- if not util.wait_for_read(sock, sock.gettimeout()):
- raise timeout("select timed out") from e
- continue
- except OpenSSL.SSL.Error as e:
- raise ssl.SSLError(f"bad handshake: {e!r}") from e
- break
-
- return WrappedSocket(cnx, sock)
-
- def _set_ctx_options(self) -> None:
- self._ctx.set_options(
- self._options
- | _openssl_to_ssl_minimum_version[self._minimum_version]
- | _openssl_to_ssl_maximum_version[self._maximum_version]
- )
-
- @property
- def minimum_version(self) -> int:
- return self._minimum_version
-
- @minimum_version.setter
- def minimum_version(self, minimum_version: int) -> None:
- self._minimum_version = minimum_version
- self._set_ctx_options()
-
- @property
- def maximum_version(self) -> int:
- return self._maximum_version
-
- @maximum_version.setter
- def maximum_version(self, maximum_version: int) -> None:
- self._maximum_version = maximum_version
- self._set_ctx_options()
-
-
- def _verify_callback(
- cnx: OpenSSL.SSL.Connection,
- x509: X509,
- err_no: int,
- err_depth: int,
- return_code: int,
- ) -> bool:
- return err_no == 0
|