|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913 |
- """
- SecureTranport support for urllib3 via ctypes.
-
- This makes platform-native TLS available to urllib3 users on macOS without the
- use of a compiler. This is an important feature because the Python Package
- Index is moving to become a TLSv1.2-or-higher server, and the default OpenSSL
- that ships with macOS is not capable of doing TLSv1.2. The only way to resolve
- this is to give macOS users an alternative solution to the problem, and that
- solution is to use SecureTransport.
-
- We use ctypes here because this solution must not require a compiler. That's
- because pip is not allowed to require a compiler either.
-
- This is not intended to be a seriously long-term solution to this problem.
- The hope is that PEP 543 will eventually solve this issue for us, at which
- point we can retire this contrib module. But in the short term, we need to
- solve the impending tire fire that is Python on Mac without this kind of
- contrib module. So...here we are.
-
- To use this module, simply import and inject it::
-
- import urllib3.contrib.securetransport
- urllib3.contrib.securetransport.inject_into_urllib3()
-
- Happy TLSing!
-
- This code is a bastardised version of the code found in Will Bond's oscrypto
- library. An enormous debt is owed to him for blazing this trail for us. For
- that reason, this code should be considered to be covered both by urllib3's
- license and by oscrypto's:
-
- .. code-block::
-
- Copyright (c) 2015-2016 Will Bond <will@wbond.net>
-
- Permission is hereby granted, free of charge, to any person obtaining a
- copy of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom the
- Software is furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- DEALINGS IN THE SOFTWARE.
- """
-
- from __future__ import annotations
-
- import contextlib
- import ctypes
- import errno
- import os.path
- import shutil
- import socket
- import ssl
- import struct
- import threading
- import typing
- import warnings
- import weakref
- from socket import socket as socket_cls
-
- from .. import util
- from ._securetransport.bindings import ( # type: ignore[attr-defined]
- CoreFoundation,
- Security,
- )
- from ._securetransport.low_level import (
- SecurityConst,
- _assert_no_error,
- _build_tls_unknown_ca_alert,
- _cert_array_from_pem,
- _create_cfstring_array,
- _load_client_cert_chain,
- _temporary_keychain,
- )
-
- warnings.warn(
- "'urllib3.contrib.securetransport' module is deprecated and will be removed "
- "in urllib3 v2.1.0. Read more in this issue: "
- "https://github.com/urllib3/urllib3/issues/2681",
- category=DeprecationWarning,
- stacklevel=2,
- )
-
- if typing.TYPE_CHECKING:
- from typing_extensions import Literal
-
- __all__ = ["inject_into_urllib3", "extract_from_urllib3"]
-
- orig_util_SSLContext = util.ssl_.SSLContext
-
- # This dictionary is used by the read callback to obtain a handle to the
- # calling wrapped socket. This is a pretty silly approach, but for now it'll
- # do. I feel like I should be able to smuggle a handle to the wrapped socket
- # directly in the SSLConnectionRef, but for now this approach will work I
- # guess.
- #
- # We need to lock around this structure for inserts, but we don't do it for
- # reads/writes in the callbacks. The reasoning here goes as follows:
- #
- # 1. It is not possible to call into the callbacks before the dictionary is
- # populated, so once in the callback the id must be in the dictionary.
- # 2. The callbacks don't mutate the dictionary, they only read from it, and
- # so cannot conflict with any of the insertions.
- #
- # This is good: if we had to lock in the callbacks we'd drastically slow down
- # the performance of this code.
- _connection_refs: weakref.WeakValueDictionary[
- int, WrappedSocket
- ] = weakref.WeakValueDictionary()
- _connection_ref_lock = threading.Lock()
-
- # Limit writes to 16kB. This is OpenSSL's limit, but we'll cargo-cult it over
- # for no better reason than we need *a* limit, and this one is right there.
- SSL_WRITE_BLOCKSIZE = 16384
-
- # Basically this is simple: for PROTOCOL_SSLv23 we turn it into a low of
- # TLSv1 and a high of TLSv1.2. For everything else, we pin to that version.
- # TLSv1 to 1.2 are supported on macOS 10.8+
- _protocol_to_min_max = {
- util.ssl_.PROTOCOL_TLS: (SecurityConst.kTLSProtocol1, SecurityConst.kTLSProtocol12), # type: ignore[attr-defined]
- util.ssl_.PROTOCOL_TLS_CLIENT: ( # type: ignore[attr-defined]
- SecurityConst.kTLSProtocol1,
- SecurityConst.kTLSProtocol12,
- ),
- }
-
- if hasattr(ssl, "PROTOCOL_SSLv2"):
- _protocol_to_min_max[ssl.PROTOCOL_SSLv2] = (
- SecurityConst.kSSLProtocol2,
- SecurityConst.kSSLProtocol2,
- )
- if hasattr(ssl, "PROTOCOL_SSLv3"):
- _protocol_to_min_max[ssl.PROTOCOL_SSLv3] = (
- SecurityConst.kSSLProtocol3,
- SecurityConst.kSSLProtocol3,
- )
- if hasattr(ssl, "PROTOCOL_TLSv1"):
- _protocol_to_min_max[ssl.PROTOCOL_TLSv1] = (
- SecurityConst.kTLSProtocol1,
- SecurityConst.kTLSProtocol1,
- )
- if hasattr(ssl, "PROTOCOL_TLSv1_1"):
- _protocol_to_min_max[ssl.PROTOCOL_TLSv1_1] = (
- SecurityConst.kTLSProtocol11,
- SecurityConst.kTLSProtocol11,
- )
- if hasattr(ssl, "PROTOCOL_TLSv1_2"):
- _protocol_to_min_max[ssl.PROTOCOL_TLSv1_2] = (
- SecurityConst.kTLSProtocol12,
- SecurityConst.kTLSProtocol12,
- )
-
-
- _tls_version_to_st: dict[int, int] = {
- ssl.TLSVersion.MINIMUM_SUPPORTED: SecurityConst.kTLSProtocol1,
- ssl.TLSVersion.TLSv1: SecurityConst.kTLSProtocol1,
- ssl.TLSVersion.TLSv1_1: SecurityConst.kTLSProtocol11,
- ssl.TLSVersion.TLSv1_2: SecurityConst.kTLSProtocol12,
- ssl.TLSVersion.MAXIMUM_SUPPORTED: SecurityConst.kTLSProtocol12,
- }
-
-
- def inject_into_urllib3() -> None:
- """
- Monkey-patch urllib3 with SecureTransport-backed SSL-support.
- """
- util.SSLContext = SecureTransportContext # type: ignore[assignment]
- util.ssl_.SSLContext = SecureTransportContext # type: ignore[assignment]
- util.IS_SECURETRANSPORT = True
- util.ssl_.IS_SECURETRANSPORT = True
-
-
- def extract_from_urllib3() -> None:
- """
- Undo monkey-patching by :func:`inject_into_urllib3`.
- """
- util.SSLContext = orig_util_SSLContext
- util.ssl_.SSLContext = orig_util_SSLContext
- util.IS_SECURETRANSPORT = False
- util.ssl_.IS_SECURETRANSPORT = False
-
-
- def _read_callback(
- connection_id: int, data_buffer: int, data_length_pointer: bytearray
- ) -> int:
- """
- SecureTransport read callback. This is called by ST to request that data
- be returned from the socket.
- """
- wrapped_socket = None
- try:
- wrapped_socket = _connection_refs.get(connection_id)
- if wrapped_socket is None:
- return SecurityConst.errSSLInternal
- base_socket = wrapped_socket.socket
-
- requested_length = data_length_pointer[0]
-
- timeout = wrapped_socket.gettimeout()
- error = None
- read_count = 0
-
- try:
- while read_count < requested_length:
- if timeout is None or timeout >= 0:
- if not util.wait_for_read(base_socket, timeout):
- raise OSError(errno.EAGAIN, "timed out")
-
- remaining = requested_length - read_count
- buffer = (ctypes.c_char * remaining).from_address(
- data_buffer + read_count
- )
- chunk_size = base_socket.recv_into(buffer, remaining)
- read_count += chunk_size
- if not chunk_size:
- if not read_count:
- return SecurityConst.errSSLClosedGraceful
- break
- except OSError as e:
- error = e.errno
-
- if error is not None and error != errno.EAGAIN:
- data_length_pointer[0] = read_count
- if error == errno.ECONNRESET or error == errno.EPIPE:
- return SecurityConst.errSSLClosedAbort
- raise
-
- data_length_pointer[0] = read_count
-
- if read_count != requested_length:
- return SecurityConst.errSSLWouldBlock
-
- return 0
- except Exception as e:
- if wrapped_socket is not None:
- wrapped_socket._exception = e
- return SecurityConst.errSSLInternal
-
-
- def _write_callback(
- connection_id: int, data_buffer: int, data_length_pointer: bytearray
- ) -> int:
- """
- SecureTransport write callback. This is called by ST to request that data
- actually be sent on the network.
- """
- wrapped_socket = None
- try:
- wrapped_socket = _connection_refs.get(connection_id)
- if wrapped_socket is None:
- return SecurityConst.errSSLInternal
- base_socket = wrapped_socket.socket
-
- bytes_to_write = data_length_pointer[0]
- data = ctypes.string_at(data_buffer, bytes_to_write)
-
- timeout = wrapped_socket.gettimeout()
- error = None
- sent = 0
-
- try:
- while sent < bytes_to_write:
- if timeout is None or timeout >= 0:
- if not util.wait_for_write(base_socket, timeout):
- raise OSError(errno.EAGAIN, "timed out")
- chunk_sent = base_socket.send(data)
- sent += chunk_sent
-
- # This has some needless copying here, but I'm not sure there's
- # much value in optimising this data path.
- data = data[chunk_sent:]
- except OSError as e:
- error = e.errno
-
- if error is not None and error != errno.EAGAIN:
- data_length_pointer[0] = sent
- if error == errno.ECONNRESET or error == errno.EPIPE:
- return SecurityConst.errSSLClosedAbort
- raise
-
- data_length_pointer[0] = sent
-
- if sent != bytes_to_write:
- return SecurityConst.errSSLWouldBlock
-
- return 0
- except Exception as e:
- if wrapped_socket is not None:
- wrapped_socket._exception = e
- return SecurityConst.errSSLInternal
-
-
- # We need to keep these two objects references alive: if they get GC'd while
- # in use then SecureTransport could attempt to call a function that is in freed
- # memory. That would be...uh...bad. Yeah, that's the word. Bad.
- _read_callback_pointer = Security.SSLReadFunc(_read_callback)
- _write_callback_pointer = Security.SSLWriteFunc(_write_callback)
-
-
- class WrappedSocket:
- """
- API-compatibility wrapper for Python's OpenSSL wrapped socket object.
- """
-
- def __init__(self, socket: socket_cls) -> None:
- self.socket = socket
- self.context = None
- self._io_refs = 0
- self._closed = False
- self._real_closed = False
- self._exception: Exception | None = None
- self._keychain = None
- self._keychain_dir: str | None = None
- self._client_cert_chain = None
-
- # We save off the previously-configured timeout and then set it to
- # zero. This is done because we use select and friends to handle the
- # timeouts, but if we leave the timeout set on the lower socket then
- # Python will "kindly" call select on that socket again for us. Avoid
- # that by forcing the timeout to zero.
- self._timeout = self.socket.gettimeout()
- self.socket.settimeout(0)
-
- @contextlib.contextmanager
- def _raise_on_error(self) -> typing.Generator[None, None, None]:
- """
- A context manager that can be used to wrap calls that do I/O from
- SecureTransport. If any of the I/O callbacks hit an exception, this
- context manager will correctly propagate the exception after the fact.
- This avoids silently swallowing those exceptions.
-
- It also correctly forces the socket closed.
- """
- self._exception = None
-
- # We explicitly don't catch around this yield because in the unlikely
- # event that an exception was hit in the block we don't want to swallow
- # it.
- yield
- if self._exception is not None:
- exception, self._exception = self._exception, None
- self._real_close()
- raise exception
-
- def _set_alpn_protocols(self, protocols: list[bytes] | None) -> None:
- """
- Sets up the ALPN protocols on the context.
- """
- if not protocols:
- return
- protocols_arr = _create_cfstring_array(protocols)
- try:
- result = Security.SSLSetALPNProtocols(self.context, protocols_arr)
- _assert_no_error(result)
- finally:
- CoreFoundation.CFRelease(protocols_arr)
-
- def _custom_validate(self, verify: bool, trust_bundle: bytes | None) -> None:
- """
- Called when we have set custom validation. We do this in two cases:
- first, when cert validation is entirely disabled; and second, when
- using a custom trust DB.
- Raises an SSLError if the connection is not trusted.
- """
- # If we disabled cert validation, just say: cool.
- if not verify or trust_bundle is None:
- return
-
- successes = (
- SecurityConst.kSecTrustResultUnspecified,
- SecurityConst.kSecTrustResultProceed,
- )
- try:
- trust_result = self._evaluate_trust(trust_bundle)
- if trust_result in successes:
- return
- reason = f"error code: {int(trust_result)}"
- exc = None
- except Exception as e:
- # Do not trust on error
- reason = f"exception: {e!r}"
- exc = e
-
- # SecureTransport does not send an alert nor shuts down the connection.
- rec = _build_tls_unknown_ca_alert(self.version())
- self.socket.sendall(rec)
- # close the connection immediately
- # l_onoff = 1, activate linger
- # l_linger = 0, linger for 0 seoncds
- opts = struct.pack("ii", 1, 0)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, opts)
- self._real_close()
- raise ssl.SSLError(f"certificate verify failed, {reason}") from exc
-
- def _evaluate_trust(self, trust_bundle: bytes) -> int:
- # We want data in memory, so load it up.
- if os.path.isfile(trust_bundle):
- with open(trust_bundle, "rb") as f:
- trust_bundle = f.read()
-
- cert_array = None
- trust = Security.SecTrustRef()
-
- try:
- # Get a CFArray that contains the certs we want.
- cert_array = _cert_array_from_pem(trust_bundle)
-
- # Ok, now the hard part. We want to get the SecTrustRef that ST has
- # created for this connection, shove our CAs into it, tell ST to
- # ignore everything else it knows, and then ask if it can build a
- # chain. This is a buuuunch of code.
- result = Security.SSLCopyPeerTrust(self.context, ctypes.byref(trust))
- _assert_no_error(result)
- if not trust:
- raise ssl.SSLError("Failed to copy trust reference")
-
- result = Security.SecTrustSetAnchorCertificates(trust, cert_array)
- _assert_no_error(result)
-
- result = Security.SecTrustSetAnchorCertificatesOnly(trust, True)
- _assert_no_error(result)
-
- trust_result = Security.SecTrustResultType()
- result = Security.SecTrustEvaluate(trust, ctypes.byref(trust_result))
- _assert_no_error(result)
- finally:
- if trust:
- CoreFoundation.CFRelease(trust)
-
- if cert_array is not None:
- CoreFoundation.CFRelease(cert_array)
-
- return trust_result.value # type: ignore[no-any-return]
-
- def handshake(
- self,
- server_hostname: bytes | str | None,
- verify: bool,
- trust_bundle: bytes | None,
- min_version: int,
- max_version: int,
- client_cert: str | None,
- client_key: str | None,
- client_key_passphrase: typing.Any,
- alpn_protocols: list[bytes] | None,
- ) -> None:
- """
- Actually performs the TLS handshake. This is run automatically by
- wrapped socket, and shouldn't be needed in user code.
- """
- # First, we do the initial bits of connection setup. We need to create
- # a context, set its I/O funcs, and set the connection reference.
- self.context = Security.SSLCreateContext(
- None, SecurityConst.kSSLClientSide, SecurityConst.kSSLStreamType
- )
- result = Security.SSLSetIOFuncs(
- self.context, _read_callback_pointer, _write_callback_pointer
- )
- _assert_no_error(result)
-
- # Here we need to compute the handle to use. We do this by taking the
- # id of self modulo 2**31 - 1. If this is already in the dictionary, we
- # just keep incrementing by one until we find a free space.
- with _connection_ref_lock:
- handle = id(self) % 2147483647
- while handle in _connection_refs:
- handle = (handle + 1) % 2147483647
- _connection_refs[handle] = self
-
- result = Security.SSLSetConnection(self.context, handle)
- _assert_no_error(result)
-
- # If we have a server hostname, we should set that too.
- # RFC6066 Section 3 tells us not to use SNI when the host is an IP, but we have
- # to do it anyway to match server_hostname against the server certificate
- if server_hostname:
- if not isinstance(server_hostname, bytes):
- server_hostname = server_hostname.encode("utf-8")
-
- result = Security.SSLSetPeerDomainName(
- self.context, server_hostname, len(server_hostname)
- )
- _assert_no_error(result)
-
- # Setup the ALPN protocols.
- self._set_alpn_protocols(alpn_protocols)
-
- # Set the minimum and maximum TLS versions.
- result = Security.SSLSetProtocolVersionMin(self.context, min_version)
- _assert_no_error(result)
-
- result = Security.SSLSetProtocolVersionMax(self.context, max_version)
- _assert_no_error(result)
-
- # If there's a trust DB, we need to use it. We do that by telling
- # SecureTransport to break on server auth. We also do that if we don't
- # want to validate the certs at all: we just won't actually do any
- # authing in that case.
- if not verify or trust_bundle is not None:
- result = Security.SSLSetSessionOption(
- self.context, SecurityConst.kSSLSessionOptionBreakOnServerAuth, True
- )
- _assert_no_error(result)
-
- # If there's a client cert, we need to use it.
- if client_cert:
- self._keychain, self._keychain_dir = _temporary_keychain()
- self._client_cert_chain = _load_client_cert_chain(
- self._keychain, client_cert, client_key
- )
- result = Security.SSLSetCertificate(self.context, self._client_cert_chain)
- _assert_no_error(result)
-
- while True:
- with self._raise_on_error():
- result = Security.SSLHandshake(self.context)
-
- if result == SecurityConst.errSSLWouldBlock:
- raise socket.timeout("handshake timed out")
- elif result == SecurityConst.errSSLServerAuthCompleted:
- self._custom_validate(verify, trust_bundle)
- continue
- else:
- _assert_no_error(result)
- break
-
- def fileno(self) -> int:
- return self.socket.fileno()
-
- # Copy-pasted from Python 3.5 source code
- def _decref_socketios(self) -> None:
- if self._io_refs > 0:
- self._io_refs -= 1
- if self._closed:
- self.close()
-
- def recv(self, bufsiz: int) -> bytes:
- buffer = ctypes.create_string_buffer(bufsiz)
- bytes_read = self.recv_into(buffer, bufsiz)
- data = buffer[:bytes_read]
- return typing.cast(bytes, data)
-
- def recv_into(
- self, buffer: ctypes.Array[ctypes.c_char], nbytes: int | None = None
- ) -> int:
- # Read short on EOF.
- if self._real_closed:
- return 0
-
- if nbytes is None:
- nbytes = len(buffer)
-
- buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
- processed_bytes = ctypes.c_size_t(0)
-
- with self._raise_on_error():
- result = Security.SSLRead(
- self.context, buffer, nbytes, ctypes.byref(processed_bytes)
- )
-
- # There are some result codes that we want to treat as "not always
- # errors". Specifically, those are errSSLWouldBlock,
- # errSSLClosedGraceful, and errSSLClosedNoNotify.
- if result == SecurityConst.errSSLWouldBlock:
- # If we didn't process any bytes, then this was just a time out.
- # However, we can get errSSLWouldBlock in situations when we *did*
- # read some data, and in those cases we should just read "short"
- # and return.
- if processed_bytes.value == 0:
- # Timed out, no data read.
- raise socket.timeout("recv timed out")
- elif result in (
- SecurityConst.errSSLClosedGraceful,
- SecurityConst.errSSLClosedNoNotify,
- ):
- # The remote peer has closed this connection. We should do so as
- # well. Note that we don't actually return here because in
- # principle this could actually be fired along with return data.
- # It's unlikely though.
- self._real_close()
- else:
- _assert_no_error(result)
-
- # Ok, we read and probably succeeded. We should return whatever data
- # was actually read.
- return processed_bytes.value
-
- def settimeout(self, timeout: float) -> None:
- self._timeout = timeout
-
- def gettimeout(self) -> float | None:
- return self._timeout
-
- def send(self, data: bytes) -> int:
- processed_bytes = ctypes.c_size_t(0)
-
- with self._raise_on_error():
- result = Security.SSLWrite(
- self.context, data, len(data), ctypes.byref(processed_bytes)
- )
-
- if result == SecurityConst.errSSLWouldBlock and processed_bytes.value == 0:
- # Timed out
- raise socket.timeout("send timed out")
- else:
- _assert_no_error(result)
-
- # We sent, and probably succeeded. Tell them how much we sent.
- return processed_bytes.value
-
- def sendall(self, data: bytes) -> None:
- total_sent = 0
- while total_sent < len(data):
- sent = self.send(data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE])
- total_sent += sent
-
- def shutdown(self) -> None:
- with self._raise_on_error():
- Security.SSLClose(self.context)
-
- def close(self) -> None:
- self._closed = True
- # TODO: should I do clean shutdown here? Do I have to?
- if self._io_refs <= 0:
- self._real_close()
-
- def _real_close(self) -> None:
- self._real_closed = True
- if self.context:
- CoreFoundation.CFRelease(self.context)
- self.context = None
- if self._client_cert_chain:
- CoreFoundation.CFRelease(self._client_cert_chain)
- self._client_cert_chain = None
- if self._keychain:
- Security.SecKeychainDelete(self._keychain)
- CoreFoundation.CFRelease(self._keychain)
- shutil.rmtree(self._keychain_dir)
- self._keychain = self._keychain_dir = None
- return self.socket.close()
-
- def getpeercert(self, binary_form: bool = False) -> bytes | None:
- # Urgh, annoying.
- #
- # Here's how we do this:
- #
- # 1. Call SSLCopyPeerTrust to get hold of the trust object for this
- # connection.
- # 2. Call SecTrustGetCertificateAtIndex for index 0 to get the leaf.
- # 3. To get the CN, call SecCertificateCopyCommonName and process that
- # string so that it's of the appropriate type.
- # 4. To get the SAN, we need to do something a bit more complex:
- # a. Call SecCertificateCopyValues to get the data, requesting
- # kSecOIDSubjectAltName.
- # b. Mess about with this dictionary to try to get the SANs out.
- #
- # This is gross. Really gross. It's going to be a few hundred LoC extra
- # just to repeat something that SecureTransport can *already do*. So my
- # operating assumption at this time is that what we want to do is
- # instead to just flag to urllib3 that it shouldn't do its own hostname
- # validation when using SecureTransport.
- if not binary_form:
- raise ValueError("SecureTransport only supports dumping binary certs")
- trust = Security.SecTrustRef()
- certdata = None
- der_bytes = None
-
- try:
- # Grab the trust store.
- result = Security.SSLCopyPeerTrust(self.context, ctypes.byref(trust))
- _assert_no_error(result)
- if not trust:
- # Probably we haven't done the handshake yet. No biggie.
- return None
-
- cert_count = Security.SecTrustGetCertificateCount(trust)
- if not cert_count:
- # Also a case that might happen if we haven't handshaked.
- # Handshook? Handshaken?
- return None
-
- leaf = Security.SecTrustGetCertificateAtIndex(trust, 0)
- assert leaf
-
- # Ok, now we want the DER bytes.
- certdata = Security.SecCertificateCopyData(leaf)
- assert certdata
-
- data_length = CoreFoundation.CFDataGetLength(certdata)
- data_buffer = CoreFoundation.CFDataGetBytePtr(certdata)
- der_bytes = ctypes.string_at(data_buffer, data_length)
- finally:
- if certdata:
- CoreFoundation.CFRelease(certdata)
- if trust:
- CoreFoundation.CFRelease(trust)
-
- return der_bytes
-
- def version(self) -> str:
- protocol = Security.SSLProtocol()
- result = Security.SSLGetNegotiatedProtocolVersion(
- self.context, ctypes.byref(protocol)
- )
- _assert_no_error(result)
- if protocol.value == SecurityConst.kTLSProtocol13:
- raise ssl.SSLError("SecureTransport does not support TLS 1.3")
- elif protocol.value == SecurityConst.kTLSProtocol12:
- return "TLSv1.2"
- elif protocol.value == SecurityConst.kTLSProtocol11:
- return "TLSv1.1"
- elif protocol.value == SecurityConst.kTLSProtocol1:
- return "TLSv1"
- elif protocol.value == SecurityConst.kSSLProtocol3:
- return "SSLv3"
- elif protocol.value == SecurityConst.kSSLProtocol2:
- return "SSLv2"
- else:
- raise ssl.SSLError(f"Unknown TLS version: {protocol!r}")
-
-
- def makefile(
- self: socket_cls,
- mode: (
- Literal["r"] | Literal["w"] | Literal["rw"] | Literal["wr"] | Literal[""]
- ) = "r",
- buffering: int | None = None,
- *args: typing.Any,
- **kwargs: typing.Any,
- ) -> typing.BinaryIO | typing.TextIO:
- # We disable buffering with SecureTransport because it conflicts with
- # the buffering that ST does internally (see issue #1153 for more).
- buffering = 0
- return socket_cls.makefile(self, mode, buffering, *args, **kwargs)
-
-
- WrappedSocket.makefile = makefile # type: ignore[attr-defined]
-
-
- class SecureTransportContext:
- """
- I am a wrapper class for the SecureTransport library, to translate the
- interface of the standard library ``SSLContext`` object to calls into
- SecureTransport.
- """
-
- def __init__(self, protocol: int) -> None:
- self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
- self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
- if protocol not in (None, ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_CLIENT):
- self._min_version, self._max_version = _protocol_to_min_max[protocol]
-
- self._options = 0
- self._verify = False
- self._trust_bundle: bytes | None = None
- self._client_cert: str | None = None
- self._client_key: str | None = None
- self._client_key_passphrase = None
- self._alpn_protocols: list[bytes] | None = None
-
- @property
- def check_hostname(self) -> Literal[True]:
- """
- SecureTransport cannot have its hostname checking disabled. For more,
- see the comment on getpeercert() in this file.
- """
- return True
-
- @check_hostname.setter
- def check_hostname(self, value: typing.Any) -> None:
- """
- SecureTransport cannot have its hostname checking disabled. For more,
- see the comment on getpeercert() in this file.
- """
-
- @property
- def options(self) -> int:
- # TODO: Well, crap.
- #
- # So this is the bit of the code that is the most likely to cause us
- # trouble. Essentially we need to enumerate all of the SSL options that
- # users might want to use and try to see if we can sensibly translate
- # them, or whether we should just ignore them.
- return self._options
-
- @options.setter
- def options(self, value: int) -> None:
- # TODO: Update in line with above.
- self._options = value
-
- @property
- def verify_mode(self) -> int:
- return ssl.CERT_REQUIRED if self._verify else ssl.CERT_NONE
-
- @verify_mode.setter
- def verify_mode(self, value: int) -> None:
- self._verify = value == ssl.CERT_REQUIRED
-
- def set_default_verify_paths(self) -> None:
- # So, this has to do something a bit weird. Specifically, what it does
- # is nothing.
- #
- # This means that, if we had previously had load_verify_locations
- # called, this does not undo that. We need to do that because it turns
- # out that the rest of the urllib3 code will attempt to load the
- # default verify paths if it hasn't been told about any paths, even if
- # the context itself was sometime earlier. We resolve that by just
- # ignoring it.
- pass
-
- def load_default_certs(self) -> None:
- return self.set_default_verify_paths()
-
- def set_ciphers(self, ciphers: typing.Any) -> None:
- raise ValueError("SecureTransport doesn't support custom cipher strings")
-
- def load_verify_locations(
- self,
- cafile: str | None = None,
- capath: str | None = None,
- cadata: bytes | None = None,
- ) -> None:
- # OK, we only really support cadata and cafile.
- if capath is not None:
- raise ValueError("SecureTransport does not support cert directories")
-
- # Raise if cafile does not exist.
- if cafile is not None:
- with open(cafile):
- pass
-
- self._trust_bundle = cafile or cadata # type: ignore[assignment]
-
- def load_cert_chain(
- self,
- certfile: str,
- keyfile: str | None = None,
- password: str | None = None,
- ) -> None:
- self._client_cert = certfile
- self._client_key = keyfile
- self._client_cert_passphrase = password
-
- def set_alpn_protocols(self, protocols: list[str | bytes]) -> None:
- """
- Sets the ALPN protocols that will later be set on the context.
-
- Raises a NotImplementedError if ALPN is not supported.
- """
- if not hasattr(Security, "SSLSetALPNProtocols"):
- raise NotImplementedError(
- "SecureTransport supports ALPN only in macOS 10.12+"
- )
- self._alpn_protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
-
- def wrap_socket(
- self,
- sock: socket_cls,
- server_side: bool = False,
- do_handshake_on_connect: bool = True,
- suppress_ragged_eofs: bool = True,
- server_hostname: bytes | str | None = None,
- ) -> WrappedSocket:
- # So, what do we do here? Firstly, we assert some properties. This is a
- # stripped down shim, so there is some functionality we don't support.
- # See PEP 543 for the real deal.
- assert not server_side
- assert do_handshake_on_connect
- assert suppress_ragged_eofs
-
- # Ok, we're good to go. Now we want to create the wrapped socket object
- # and store it in the appropriate place.
- wrapped_socket = WrappedSocket(sock)
-
- # Now we can handshake
- wrapped_socket.handshake(
- server_hostname,
- self._verify,
- self._trust_bundle,
- _tls_version_to_st[self._minimum_version],
- _tls_version_to_st[self._maximum_version],
- self._client_cert,
- self._client_key,
- self._client_key_passphrase,
- self._alpn_protocols,
- )
- return wrapped_socket
-
- @property
- def minimum_version(self) -> int:
- return self._minimum_version
-
- @minimum_version.setter
- def minimum_version(self, minimum_version: int) -> None:
- self._minimum_version = minimum_version
-
- @property
- def maximum_version(self) -> int:
- return self._maximum_version
-
- @maximum_version.setter
- def maximum_version(self, maximum_version: int) -> None:
- self._maximum_version = maximum_version
|