|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851 |
- import os
- import socket
- from errno import errorcode
- from functools import partial, wraps
- from itertools import chain, count
- from sys import platform
- from weakref import WeakValueDictionary
-
- from OpenSSL._util import (
- UNSPECIFIED as _UNSPECIFIED,
- exception_from_error_queue as _exception_from_error_queue,
- ffi as _ffi,
- lib as _lib,
- make_assert as _make_assert,
- no_zero_allocator as _no_zero_allocator,
- path_bytes as _path_bytes,
- text_to_bytes_and_warn as _text_to_bytes_and_warn,
- )
- from OpenSSL.crypto import (
- FILETYPE_PEM,
- PKey,
- X509,
- X509Name,
- X509Store,
- _PassphraseHelper,
- )
-
- __all__ = [
- "OPENSSL_VERSION_NUMBER",
- "SSLEAY_VERSION",
- "SSLEAY_CFLAGS",
- "SSLEAY_PLATFORM",
- "SSLEAY_DIR",
- "SSLEAY_BUILT_ON",
- "OPENSSL_VERSION",
- "OPENSSL_CFLAGS",
- "OPENSSL_PLATFORM",
- "OPENSSL_DIR",
- "OPENSSL_BUILT_ON",
- "SENT_SHUTDOWN",
- "RECEIVED_SHUTDOWN",
- "SSLv23_METHOD",
- "TLSv1_METHOD",
- "TLSv1_1_METHOD",
- "TLSv1_2_METHOD",
- "TLS_METHOD",
- "TLS_SERVER_METHOD",
- "TLS_CLIENT_METHOD",
- "DTLS_METHOD",
- "DTLS_SERVER_METHOD",
- "DTLS_CLIENT_METHOD",
- "SSL3_VERSION",
- "TLS1_VERSION",
- "TLS1_1_VERSION",
- "TLS1_2_VERSION",
- "TLS1_3_VERSION",
- "OP_NO_SSLv2",
- "OP_NO_SSLv3",
- "OP_NO_TLSv1",
- "OP_NO_TLSv1_1",
- "OP_NO_TLSv1_2",
- "MODE_RELEASE_BUFFERS",
- "OP_SINGLE_DH_USE",
- "OP_SINGLE_ECDH_USE",
- "OP_EPHEMERAL_RSA",
- "OP_MICROSOFT_SESS_ID_BUG",
- "OP_NETSCAPE_CHALLENGE_BUG",
- "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG",
- "OP_SSLREF2_REUSE_CERT_TYPE_BUG",
- "OP_MICROSOFT_BIG_SSLV3_BUFFER",
- "OP_MSIE_SSLV2_RSA_PADDING",
- "OP_SSLEAY_080_CLIENT_DH_BUG",
- "OP_TLS_D5_BUG",
- "OP_TLS_BLOCK_PADDING_BUG",
- "OP_DONT_INSERT_EMPTY_FRAGMENTS",
- "OP_CIPHER_SERVER_PREFERENCE",
- "OP_TLS_ROLLBACK_BUG",
- "OP_PKCS1_CHECK_1",
- "OP_PKCS1_CHECK_2",
- "OP_NETSCAPE_CA_DN_BUG",
- "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG",
- "OP_NO_COMPRESSION",
- "OP_NO_QUERY_MTU",
- "OP_COOKIE_EXCHANGE",
- "OP_NO_TICKET",
- "OP_ALL",
- "VERIFY_PEER",
- "VERIFY_FAIL_IF_NO_PEER_CERT",
- "VERIFY_CLIENT_ONCE",
- "VERIFY_NONE",
- "SESS_CACHE_OFF",
- "SESS_CACHE_CLIENT",
- "SESS_CACHE_SERVER",
- "SESS_CACHE_BOTH",
- "SESS_CACHE_NO_AUTO_CLEAR",
- "SESS_CACHE_NO_INTERNAL_LOOKUP",
- "SESS_CACHE_NO_INTERNAL_STORE",
- "SESS_CACHE_NO_INTERNAL",
- "SSL_ST_CONNECT",
- "SSL_ST_ACCEPT",
- "SSL_ST_MASK",
- "SSL_CB_LOOP",
- "SSL_CB_EXIT",
- "SSL_CB_READ",
- "SSL_CB_WRITE",
- "SSL_CB_ALERT",
- "SSL_CB_READ_ALERT",
- "SSL_CB_WRITE_ALERT",
- "SSL_CB_ACCEPT_LOOP",
- "SSL_CB_ACCEPT_EXIT",
- "SSL_CB_CONNECT_LOOP",
- "SSL_CB_CONNECT_EXIT",
- "SSL_CB_HANDSHAKE_START",
- "SSL_CB_HANDSHAKE_DONE",
- "Error",
- "WantReadError",
- "WantWriteError",
- "WantX509LookupError",
- "ZeroReturnError",
- "SysCallError",
- "NO_OVERLAPPING_PROTOCOLS",
- "SSLeay_version",
- "Session",
- "Context",
- "Connection",
- "X509VerificationCodes",
- ]
-
-
- OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER
- OPENSSL_VERSION = SSLEAY_VERSION = _lib.OPENSSL_VERSION
- OPENSSL_CFLAGS = SSLEAY_CFLAGS = _lib.OPENSSL_CFLAGS
- OPENSSL_PLATFORM = SSLEAY_PLATFORM = _lib.OPENSSL_PLATFORM
- OPENSSL_DIR = SSLEAY_DIR = _lib.OPENSSL_DIR
- OPENSSL_BUILT_ON = SSLEAY_BUILT_ON = _lib.OPENSSL_BUILT_ON
-
- SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
- RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
-
- SSLv23_METHOD = 3
- TLSv1_METHOD = 4
- TLSv1_1_METHOD = 5
- TLSv1_2_METHOD = 6
- TLS_METHOD = 7
- TLS_SERVER_METHOD = 8
- TLS_CLIENT_METHOD = 9
- DTLS_METHOD = 10
- DTLS_SERVER_METHOD = 11
- DTLS_CLIENT_METHOD = 12
-
- try:
- SSL3_VERSION = _lib.SSL3_VERSION
- TLS1_VERSION = _lib.TLS1_VERSION
- TLS1_1_VERSION = _lib.TLS1_1_VERSION
- TLS1_2_VERSION = _lib.TLS1_2_VERSION
- TLS1_3_VERSION = _lib.TLS1_3_VERSION
- except AttributeError:
- # Hardcode constants for cryptography < 3.4, see
- # https://github.com/pyca/pyopenssl/pull/985#issuecomment-775186682
- SSL3_VERSION = 768
- TLS1_VERSION = 769
- TLS1_1_VERSION = 770
- TLS1_2_VERSION = 771
- TLS1_3_VERSION = 772
-
- OP_NO_SSLv2 = _lib.SSL_OP_NO_SSLv2
- OP_NO_SSLv3 = _lib.SSL_OP_NO_SSLv3
- OP_NO_TLSv1 = _lib.SSL_OP_NO_TLSv1
- OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1
- OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2
- try:
- OP_NO_TLSv1_3 = _lib.SSL_OP_NO_TLSv1_3
- __all__.append("OP_NO_TLSv1_3")
- except AttributeError:
- pass
-
- MODE_RELEASE_BUFFERS = _lib.SSL_MODE_RELEASE_BUFFERS
-
- OP_SINGLE_DH_USE = _lib.SSL_OP_SINGLE_DH_USE
- OP_SINGLE_ECDH_USE = _lib.SSL_OP_SINGLE_ECDH_USE
- OP_EPHEMERAL_RSA = _lib.SSL_OP_EPHEMERAL_RSA
- OP_MICROSOFT_SESS_ID_BUG = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
- OP_NETSCAPE_CHALLENGE_BUG = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
- OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG = (
- _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
- )
- OP_SSLREF2_REUSE_CERT_TYPE_BUG = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
- OP_MICROSOFT_BIG_SSLV3_BUFFER = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
- OP_MSIE_SSLV2_RSA_PADDING = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
- OP_SSLEAY_080_CLIENT_DH_BUG = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
- OP_TLS_D5_BUG = _lib.SSL_OP_TLS_D5_BUG
- OP_TLS_BLOCK_PADDING_BUG = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
- OP_DONT_INSERT_EMPTY_FRAGMENTS = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
- OP_CIPHER_SERVER_PREFERENCE = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
- OP_TLS_ROLLBACK_BUG = _lib.SSL_OP_TLS_ROLLBACK_BUG
- OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
- OP_PKCS1_CHECK_2 = _lib.SSL_OP_PKCS1_CHECK_2
- OP_NETSCAPE_CA_DN_BUG = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
- OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG = (
- _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
- )
- OP_NO_COMPRESSION = _lib.SSL_OP_NO_COMPRESSION
-
- OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU
- OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE
- OP_NO_TICKET = _lib.SSL_OP_NO_TICKET
-
- try:
- OP_NO_RENEGOTIATION = _lib.SSL_OP_NO_RENEGOTIATION
- __all__.append("OP_NO_RENEGOTIATION")
- except AttributeError:
- pass
-
- try:
- OP_IGNORE_UNEXPECTED_EOF = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF
- __all__.append("OP_IGNORE_UNEXPECTED_EOF")
- except AttributeError:
- pass
-
- OP_ALL = _lib.SSL_OP_ALL
-
- VERIFY_PEER = _lib.SSL_VERIFY_PEER
- VERIFY_FAIL_IF_NO_PEER_CERT = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
- VERIFY_CLIENT_ONCE = _lib.SSL_VERIFY_CLIENT_ONCE
- VERIFY_NONE = _lib.SSL_VERIFY_NONE
-
- SESS_CACHE_OFF = _lib.SSL_SESS_CACHE_OFF
- SESS_CACHE_CLIENT = _lib.SSL_SESS_CACHE_CLIENT
- SESS_CACHE_SERVER = _lib.SSL_SESS_CACHE_SERVER
- SESS_CACHE_BOTH = _lib.SSL_SESS_CACHE_BOTH
- SESS_CACHE_NO_AUTO_CLEAR = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
- SESS_CACHE_NO_INTERNAL_LOOKUP = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
- SESS_CACHE_NO_INTERNAL_STORE = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
- SESS_CACHE_NO_INTERNAL = _lib.SSL_SESS_CACHE_NO_INTERNAL
-
- SSL_ST_CONNECT = _lib.SSL_ST_CONNECT
- SSL_ST_ACCEPT = _lib.SSL_ST_ACCEPT
- SSL_ST_MASK = _lib.SSL_ST_MASK
-
- SSL_CB_LOOP = _lib.SSL_CB_LOOP
- SSL_CB_EXIT = _lib.SSL_CB_EXIT
- SSL_CB_READ = _lib.SSL_CB_READ
- SSL_CB_WRITE = _lib.SSL_CB_WRITE
- SSL_CB_ALERT = _lib.SSL_CB_ALERT
- SSL_CB_READ_ALERT = _lib.SSL_CB_READ_ALERT
- SSL_CB_WRITE_ALERT = _lib.SSL_CB_WRITE_ALERT
- SSL_CB_ACCEPT_LOOP = _lib.SSL_CB_ACCEPT_LOOP
- SSL_CB_ACCEPT_EXIT = _lib.SSL_CB_ACCEPT_EXIT
- SSL_CB_CONNECT_LOOP = _lib.SSL_CB_CONNECT_LOOP
- SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT
- SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START
- SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE
-
-
- class X509VerificationCodes:
- """
- Success and error codes for X509 verification, as returned by the
- underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL
- to verification callback functions.
-
- See `OpenSSL Verification Errors
- <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_
- for details.
- """
-
- OK = _lib.X509_V_OK
- ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT
- ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL
- ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = (
- _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE
- )
- ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = (
- _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE
- )
- ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = (
- _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY
- )
- ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE
- ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE
- ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID
- ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED
- ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID
- ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED
- ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = (
- _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD
- )
- ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = (
- _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD
- )
- ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = (
- _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD
- )
- ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = (
- _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD
- )
- ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM
- ERR_DEPTH_ZERO_SELF_SIGNED_CERT = (
- _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
- )
- ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
- ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = (
- _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY
- )
- ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = (
- _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
- )
- ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG
- ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED
- ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA
- ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED
- ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE
- ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED
- ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED
- ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH
- ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH
- ERR_AKID_ISSUER_SERIAL_MISMATCH = (
- _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH
- )
- ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN
- ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER
- ERR_UNHANDLED_CRITICAL_EXTENSION = (
- _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION
- )
- ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN
- ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = (
- _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION
- )
- ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA
- ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED
- ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = (
- _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE
- )
- ERR_PROXY_CERTIFICATES_NOT_ALLOWED = (
- _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED
- )
- ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION
- ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION
- ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY
- ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE
- ERR_UNSUPPORTED_EXTENSION_FEATURE = (
- _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE
- )
- ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE
- ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION
- ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION
- ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX
- ERR_UNSUPPORTED_CONSTRAINT_TYPE = (
- _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE
- )
- ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = (
- _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX
- )
- ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX
- ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR
- ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH
- ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH
- ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH
- ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION
-
-
- # Taken from https://golang.org/src/crypto/x509/root_linux.go
- _CERTIFICATE_FILE_LOCATIONS = [
- "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
- "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
- "/etc/ssl/ca-bundle.pem", # OpenSUSE
- "/etc/pki/tls/cacert.pem", # OpenELEC
- "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
- ]
-
- _CERTIFICATE_PATH_LOCATIONS = [
- "/etc/ssl/certs", # SLES10/SLES11
- ]
-
- # These values are compared to output from cffi's ffi.string so they must be
- # byte strings.
- _CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
- _CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
-
-
- class Error(Exception):
- """
- An error occurred in an `OpenSSL.SSL` API.
- """
-
-
- _raise_current_error = partial(_exception_from_error_queue, Error)
- _openssl_assert = _make_assert(Error)
-
-
- class WantReadError(Error):
- pass
-
-
- class WantWriteError(Error):
- pass
-
-
- class WantX509LookupError(Error):
- pass
-
-
- class ZeroReturnError(Error):
- pass
-
-
- class SysCallError(Error):
- pass
-
-
- class _CallbackExceptionHelper:
- """
- A base class for wrapper classes that allow for intelligent exception
- handling in OpenSSL callbacks.
-
- :ivar list _problems: Any exceptions that occurred while executing in a
- context where they could not be raised in the normal way. Typically
- this is because OpenSSL has called into some Python code and requires a
- return value. The exceptions are saved to be raised later when it is
- possible to do so.
- """
-
- def __init__(self):
- self._problems = []
-
- def raise_if_problem(self):
- """
- Raise an exception from the OpenSSL error queue or that was previously
- captured whe running a callback.
- """
- if self._problems:
- try:
- _raise_current_error()
- except Error:
- pass
- raise self._problems.pop(0)
-
-
- class _VerifyHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as a certificate verification
- callback.
- """
-
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
-
- @wraps(callback)
- def wrapper(ok, store_ctx):
- x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
- _lib.X509_up_ref(x509)
- cert = X509._from_raw_x509_ptr(x509)
- error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
- error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
-
- index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
- ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
- connection = Connection._reverse_mapping[ssl]
-
- try:
- result = callback(
- connection, cert, error_number, error_depth, ok
- )
- except Exception as e:
- self._problems.append(e)
- return 0
- else:
- if result:
- _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
- return 1
- else:
- return 0
-
- self.callback = _ffi.callback(
- "int (*)(int, X509_STORE_CTX *)", wrapper
- )
-
-
- NO_OVERLAPPING_PROTOCOLS = object()
-
-
- class _ALPNSelectHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as an ALPN selection callback.
- """
-
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
-
- @wraps(callback)
- def wrapper(ssl, out, outlen, in_, inlen, arg):
- try:
- conn = Connection._reverse_mapping[ssl]
-
- # The string passed to us is made up of multiple
- # length-prefixed bytestrings. We need to split that into a
- # list.
- instr = _ffi.buffer(in_, inlen)[:]
- protolist = []
- while instr:
- encoded_len = instr[0]
- proto = instr[1 : encoded_len + 1]
- protolist.append(proto)
- instr = instr[encoded_len + 1 :]
-
- # Call the callback
- outbytes = callback(conn, protolist)
- any_accepted = True
- if outbytes is NO_OVERLAPPING_PROTOCOLS:
- outbytes = b""
- any_accepted = False
- elif not isinstance(outbytes, bytes):
- raise TypeError(
- "ALPN callback must return a bytestring or the "
- "special NO_OVERLAPPING_PROTOCOLS sentinel value."
- )
-
- # Save our callback arguments on the connection object to make
- # sure that they don't get freed before OpenSSL can use them.
- # Then, return them in the appropriate output parameters.
- conn._alpn_select_callback_args = [
- _ffi.new("unsigned char *", len(outbytes)),
- _ffi.new("unsigned char[]", outbytes),
- ]
- outlen[0] = conn._alpn_select_callback_args[0][0]
- out[0] = conn._alpn_select_callback_args[1]
- if not any_accepted:
- return _lib.SSL_TLSEXT_ERR_NOACK
- return _lib.SSL_TLSEXT_ERR_OK
- except Exception as e:
- self._problems.append(e)
- return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
-
- self.callback = _ffi.callback(
- (
- "int (*)(SSL *, unsigned char **, unsigned char *, "
- "const unsigned char *, unsigned int, void *)"
- ),
- wrapper,
- )
-
-
- class _OCSPServerCallbackHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as an OCSP callback for the server
- side.
-
- Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
- ways. For servers, that callback is expected to retrieve some OCSP data and
- hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
- SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
- is expected to check the OCSP data, and returns a negative value on error,
- 0 if the response is not acceptable, or positive if it is. These are
- mutually exclusive return code behaviours, and they mean that we need two
- helpers so that we always return an appropriate error code if the user's
- code throws an exception.
-
- Given that we have to have two helpers anyway, these helpers are a bit more
- helpery than most: specifically, they hide a few more of the OpenSSL
- functions so that the user has an easier time writing these callbacks.
-
- This helper implements the server side.
- """
-
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
-
- @wraps(callback)
- def wrapper(ssl, cdata):
- try:
- conn = Connection._reverse_mapping[ssl]
-
- # Extract the data if any was provided.
- if cdata != _ffi.NULL:
- data = _ffi.from_handle(cdata)
- else:
- data = None
-
- # Call the callback.
- ocsp_data = callback(conn, data)
-
- if not isinstance(ocsp_data, bytes):
- raise TypeError("OCSP callback must return a bytestring.")
-
- # If the OCSP data was provided, we will pass it to OpenSSL.
- # However, we have an early exit here: if no OCSP data was
- # provided we will just exit out and tell OpenSSL that there
- # is nothing to do.
- if not ocsp_data:
- return 3 # SSL_TLSEXT_ERR_NOACK
-
- # OpenSSL takes ownership of this data and expects it to have
- # been allocated by OPENSSL_malloc.
- ocsp_data_length = len(ocsp_data)
- data_ptr = _lib.OPENSSL_malloc(ocsp_data_length)
- _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data
-
- _lib.SSL_set_tlsext_status_ocsp_resp(
- ssl, data_ptr, ocsp_data_length
- )
-
- return 0
- except Exception as e:
- self._problems.append(e)
- return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
-
- self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
-
-
- class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as an OCSP callback for the client
- side.
-
- Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
- ways. For servers, that callback is expected to retrieve some OCSP data and
- hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
- SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
- is expected to check the OCSP data, and returns a negative value on error,
- 0 if the response is not acceptable, or positive if it is. These are
- mutually exclusive return code behaviours, and they mean that we need two
- helpers so that we always return an appropriate error code if the user's
- code throws an exception.
-
- Given that we have to have two helpers anyway, these helpers are a bit more
- helpery than most: specifically, they hide a few more of the OpenSSL
- functions so that the user has an easier time writing these callbacks.
-
- This helper implements the client side.
- """
-
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
-
- @wraps(callback)
- def wrapper(ssl, cdata):
- try:
- conn = Connection._reverse_mapping[ssl]
-
- # Extract the data if any was provided.
- if cdata != _ffi.NULL:
- data = _ffi.from_handle(cdata)
- else:
- data = None
-
- # Get the OCSP data.
- ocsp_ptr = _ffi.new("unsigned char **")
- ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr)
- if ocsp_len < 0:
- # No OCSP data.
- ocsp_data = b""
- else:
- # Copy the OCSP data, then pass it to the callback.
- ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:]
-
- valid = callback(conn, ocsp_data, data)
-
- # Return 1 on success or 0 on error.
- return int(bool(valid))
-
- except Exception as e:
- self._problems.append(e)
- # Return negative value if an exception is hit.
- return -1
-
- self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
-
-
- class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
-
- @wraps(callback)
- def wrapper(ssl, out, outlen):
- try:
- conn = Connection._reverse_mapping[ssl]
- cookie = callback(conn)
- out[0 : len(cookie)] = cookie
- outlen[0] = len(cookie)
- return 1
- except Exception as e:
- self._problems.append(e)
- # "a zero return value can be used to abort the handshake"
- return 0
-
- self.callback = _ffi.callback(
- "int (*)(SSL *, unsigned char *, unsigned int *)",
- wrapper,
- )
-
-
- class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
-
- @wraps(callback)
- def wrapper(ssl, c_cookie, cookie_len):
- try:
- conn = Connection._reverse_mapping[ssl]
- return callback(conn, bytes(c_cookie[0:cookie_len]))
- except Exception as e:
- self._problems.append(e)
- return 0
-
- self.callback = _ffi.callback(
- "int (*)(SSL *, unsigned char *, unsigned int)",
- wrapper,
- )
-
-
- def _asFileDescriptor(obj):
- fd = None
- if not isinstance(obj, int):
- meth = getattr(obj, "fileno", None)
- if meth is not None:
- obj = meth()
-
- if isinstance(obj, int):
- fd = obj
-
- if not isinstance(fd, int):
- raise TypeError("argument must be an int, or have a fileno() method.")
- elif fd < 0:
- raise ValueError(
- "file descriptor cannot be a negative integer (%i)" % (fd,)
- )
-
- return fd
-
-
- def OpenSSL_version(type):
- """
- Return a string describing the version of OpenSSL in use.
-
- :param type: One of the :const:`OPENSSL_` constants defined in this module.
- """
- return _ffi.string(_lib.OpenSSL_version(type))
-
-
- SSLeay_version = OpenSSL_version
-
-
- def _make_requires(flag, error):
- """
- Builds a decorator that ensures that functions that rely on OpenSSL
- functions that are not present in this build raise NotImplementedError,
- rather than AttributeError coming out of cryptography.
-
- :param flag: A cryptography flag that guards the functions, e.g.
- ``Cryptography_HAS_NEXTPROTONEG``.
- :param error: The string to be used in the exception if the flag is false.
- """
-
- def _requires_decorator(func):
- if not flag:
-
- @wraps(func)
- def explode(*args, **kwargs):
- raise NotImplementedError(error)
-
- return explode
- else:
- return func
-
- return _requires_decorator
-
-
- _requires_alpn = _make_requires(
- _lib.Cryptography_HAS_ALPN, "ALPN not available"
- )
-
-
- _requires_keylog = _make_requires(
- getattr(_lib, "Cryptography_HAS_KEYLOG", None), "Key logging not available"
- )
-
-
- class Session:
- """
- A class representing an SSL session. A session defines certain connection
- parameters which may be re-used to speed up the setup of subsequent
- connections.
-
- .. versionadded:: 0.14
- """
-
- pass
-
-
- class Context:
- """
- :class:`OpenSSL.SSL.Context` instances define the parameters for setting
- up new SSL connections.
-
- :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
- DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
- SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
- not be used.
- """
-
- _methods = {
- SSLv23_METHOD: (_lib.TLS_method, None),
- TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION),
- TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION),
- TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION),
- TLS_METHOD: (_lib.TLS_method, None),
- TLS_SERVER_METHOD: (_lib.TLS_server_method, None),
- TLS_CLIENT_METHOD: (_lib.TLS_client_method, None),
- DTLS_METHOD: (_lib.DTLS_method, None),
- DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None),
- DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None),
- }
-
- def __init__(self, method):
- if not isinstance(method, int):
- raise TypeError("method must be an integer")
-
- try:
- method_func, version = self._methods[method]
- except KeyError:
- raise ValueError("No such protocol")
-
- method_obj = method_func()
- _openssl_assert(method_obj != _ffi.NULL)
-
- context = _lib.SSL_CTX_new(method_obj)
- _openssl_assert(context != _ffi.NULL)
- context = _ffi.gc(context, _lib.SSL_CTX_free)
-
- self._context = context
- self._passphrase_helper = None
- self._passphrase_callback = None
- self._passphrase_userdata = None
- self._verify_helper = None
- self._verify_callback = None
- self._info_callback = None
- self._keylog_callback = None
- self._tlsext_servername_callback = None
- self._app_data = None
- self._alpn_select_helper = None
- self._alpn_select_callback = None
- self._ocsp_helper = None
- self._ocsp_callback = None
- self._ocsp_data = None
- self._cookie_generate_helper = None
- self._cookie_verify_helper = None
-
- self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE)
- if version is not None:
- self.set_min_proto_version(version)
- self.set_max_proto_version(version)
-
- def set_min_proto_version(self, version):
- """
- Set the minimum supported protocol version. Setting the minimum
- version to 0 will enable protocol versions down to the lowest version
- supported by the library.
-
- If the underlying OpenSSL build is missing support for the selected
- version, this method will raise an exception.
- """
- _openssl_assert(
- _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1
- )
-
- def set_max_proto_version(self, version):
- """
- Set the maximum supported protocol version. Setting the maximum
- version to 0 will enable protocol versions up to the highest version
- supported by the library.
-
- If the underlying OpenSSL build is missing support for the selected
- version, this method will raise an exception.
- """
- _openssl_assert(
- _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1
- )
-
- def load_verify_locations(self, cafile, capath=None):
- """
- Let SSL know where we can find trusted certificates for the certificate
- chain. Note that the certificates have to be in PEM format.
-
- If capath is passed, it must be a directory prepared using the
- ``c_rehash`` tool included with OpenSSL. Either, but not both, of
- *pemfile* or *capath* may be :data:`None`.
-
- :param cafile: In which file we can find the certificates (``bytes`` or
- ``unicode``).
- :param capath: In which directory we can find the certificates
- (``bytes`` or ``unicode``).
-
- :return: None
- """
- if cafile is None:
- cafile = _ffi.NULL
- else:
- cafile = _path_bytes(cafile)
-
- if capath is None:
- capath = _ffi.NULL
- else:
- capath = _path_bytes(capath)
-
- load_result = _lib.SSL_CTX_load_verify_locations(
- self._context, cafile, capath
- )
- if not load_result:
- _raise_current_error()
-
- def _wrap_callback(self, callback):
- @wraps(callback)
- def wrapper(size, verify, userdata):
- return callback(size, verify, self._passphrase_userdata)
-
- return _PassphraseHelper(
- FILETYPE_PEM, wrapper, more_args=True, truncate=True
- )
-
- def set_passwd_cb(self, callback, userdata=None):
- """
- Set the passphrase callback. This function will be called
- when a private key with a passphrase is loaded.
-
- :param callback: The Python callback to use. This must accept three
- positional arguments. First, an integer giving the maximum length
- of the passphrase it may return. If the returned passphrase is
- longer than this, it will be truncated. Second, a boolean value
- which will be true if the user should be prompted for the
- passphrase twice and the callback should verify that the two values
- supplied are equal. Third, the value given as the *userdata*
- parameter to :meth:`set_passwd_cb`. The *callback* must return
- a byte string. If an error occurs, *callback* should return a false
- value (e.g. an empty string).
- :param userdata: (optional) A Python object which will be given as
- argument to the callback
- :return: None
- """
- if not callable(callback):
- raise TypeError("callback must be callable")
-
- self._passphrase_helper = self._wrap_callback(callback)
- self._passphrase_callback = self._passphrase_helper.callback
- _lib.SSL_CTX_set_default_passwd_cb(
- self._context, self._passphrase_callback
- )
- self._passphrase_userdata = userdata
-
- def set_default_verify_paths(self):
- """
- Specify that the platform provided CA certificates are to be used for
- verification purposes. This method has some caveats related to the
- binary wheels that cryptography (pyOpenSSL's primary dependency) ships:
-
- * macOS will only load certificates using this method if the user has
- the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed
- in the default location.
- * Windows will not work.
- * manylinux1 cryptography wheels will work on most common Linux
- distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the
- manylinux1 wheel and attempts to load roots via a fallback path.
-
- :return: None
- """
- # SSL_CTX_set_default_verify_paths will attempt to load certs from
- # both a cafile and capath that are set at compile time. However,
- # it will first check environment variables and, if present, load
- # those paths instead
- set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
- _openssl_assert(set_result == 1)
- # After attempting to set default_verify_paths we need to know whether
- # to go down the fallback path.
- # First we'll check to see if any env vars have been set. If so,
- # we won't try to do anything else because the user has set the path
- # themselves.
- dir_env_var = _ffi.string(_lib.X509_get_default_cert_dir_env()).decode(
- "ascii"
- )
- file_env_var = _ffi.string(
- _lib.X509_get_default_cert_file_env()
- ).decode("ascii")
- if not self._check_env_vars_set(dir_env_var, file_env_var):
- default_dir = _ffi.string(_lib.X509_get_default_cert_dir())
- default_file = _ffi.string(_lib.X509_get_default_cert_file())
- # Now we check to see if the default_dir and default_file are set
- # to the exact values we use in our manylinux1 builds. If they are
- # then we know to load the fallbacks
- if (
- default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
- and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
- ):
- # This is manylinux1, let's load our fallback paths
- self._fallback_default_verify_paths(
- _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS
- )
-
- def _check_env_vars_set(self, dir_env_var, file_env_var):
- """
- Check to see if the default cert dir/file environment vars are present.
-
- :return: bool
- """
- return (
- os.environ.get(file_env_var) is not None
- or os.environ.get(dir_env_var) is not None
- )
-
- def _fallback_default_verify_paths(self, file_path, dir_path):
- """
- Default verify paths are based on the compiled version of OpenSSL.
- However, when pyca/cryptography is compiled as a manylinux1 wheel
- that compiled location can potentially be wrong. So, like Go, we
- will try a predefined set of paths and attempt to load roots
- from there.
-
- :return: None
- """
- for cafile in file_path:
- if os.path.isfile(cafile):
- self.load_verify_locations(cafile)
- break
-
- for capath in dir_path:
- if os.path.isdir(capath):
- self.load_verify_locations(None, capath)
- break
-
- def use_certificate_chain_file(self, certfile):
- """
- Load a certificate chain from a file.
-
- :param certfile: The name of the certificate chain file (``bytes`` or
- ``unicode``). Must be PEM encoded.
-
- :return: None
- """
- certfile = _path_bytes(certfile)
-
- result = _lib.SSL_CTX_use_certificate_chain_file(
- self._context, certfile
- )
- if not result:
- _raise_current_error()
-
- def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
- """
- Load a certificate from a file
-
- :param certfile: The name of the certificate file (``bytes`` or
- ``unicode``).
- :param filetype: (optional) The encoding of the file, which is either
- :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
- :const:`FILETYPE_PEM`.
-
- :return: None
- """
- certfile = _path_bytes(certfile)
- if not isinstance(filetype, int):
- raise TypeError("filetype must be an integer")
-
- use_result = _lib.SSL_CTX_use_certificate_file(
- self._context, certfile, filetype
- )
- if not use_result:
- _raise_current_error()
-
- def use_certificate(self, cert):
- """
- Load a certificate from a X509 object
-
- :param cert: The X509 object
- :return: None
- """
- # Mirrored at Connection.use_certificate
- if not isinstance(cert, X509):
- raise TypeError("cert must be an X509 instance")
-
- use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
- if not use_result:
- _raise_current_error()
-
- def add_extra_chain_cert(self, certobj):
- """
- Add certificate to chain
-
- :param certobj: The X509 certificate object to add to the chain
- :return: None
- """
- if not isinstance(certobj, X509):
- raise TypeError("certobj must be an X509 instance")
-
- copy = _lib.X509_dup(certobj._x509)
- add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
- if not add_result:
- # TODO: This is untested.
- _lib.X509_free(copy)
- _raise_current_error()
-
- def _raise_passphrase_exception(self):
- if self._passphrase_helper is not None:
- self._passphrase_helper.raise_if_problem(Error)
-
- _raise_current_error()
-
- def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
- """
- Load a private key from a file
-
- :param keyfile: The name of the key file (``bytes`` or ``unicode``)
- :param filetype: (optional) The encoding of the file, which is either
- :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
- :const:`FILETYPE_PEM`.
-
- :return: None
- """
- keyfile = _path_bytes(keyfile)
-
- if filetype is _UNSPECIFIED:
- filetype = FILETYPE_PEM
- elif not isinstance(filetype, int):
- raise TypeError("filetype must be an integer")
-
- use_result = _lib.SSL_CTX_use_PrivateKey_file(
- self._context, keyfile, filetype
- )
- if not use_result:
- self._raise_passphrase_exception()
-
- def use_privatekey(self, pkey):
- """
- Load a private key from a PKey object
-
- :param pkey: The PKey object
- :return: None
- """
- # Mirrored at Connection.use_privatekey
- if not isinstance(pkey, PKey):
- raise TypeError("pkey must be a PKey instance")
-
- use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
- if not use_result:
- self._raise_passphrase_exception()
-
- def check_privatekey(self):
- """
- Check if the private key (loaded with :meth:`use_privatekey`) matches
- the certificate (loaded with :meth:`use_certificate`)
-
- :return: :data:`None` (raises :exc:`Error` if something's wrong)
- """
- if not _lib.SSL_CTX_check_private_key(self._context):
- _raise_current_error()
-
- def load_client_ca(self, cafile):
- """
- Load the trusted certificates that will be sent to the client. Does
- not actually imply any of the certificates are trusted; that must be
- configured separately.
-
- :param bytes cafile: The path to a certificates file in PEM format.
- :return: None
- """
- ca_list = _lib.SSL_load_client_CA_file(
- _text_to_bytes_and_warn("cafile", cafile)
- )
- _openssl_assert(ca_list != _ffi.NULL)
- _lib.SSL_CTX_set_client_CA_list(self._context, ca_list)
-
- def set_session_id(self, buf):
- """
- Set the session id to *buf* within which a session can be reused for
- this Context object. This is needed when doing session resumption,
- because there is no way for a stored session to know which Context
- object it is associated with.
-
- :param bytes buf: The session id.
-
- :returns: None
- """
- buf = _text_to_bytes_and_warn("buf", buf)
- _openssl_assert(
- _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf))
- == 1
- )
-
- def set_session_cache_mode(self, mode):
- """
- Set the behavior of the session cache used by all connections using
- this Context. The previously set mode is returned. See
- :const:`SESS_CACHE_*` for details about particular modes.
-
- :param mode: One or more of the SESS_CACHE_* flags (combine using
- bitwise or)
- :returns: The previously set caching mode.
-
- .. versionadded:: 0.14
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
-
- return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
-
- def get_session_cache_mode(self):
- """
- Get the current session cache mode.
-
- :returns: The currently used cache mode.
-
- .. versionadded:: 0.14
- """
- return _lib.SSL_CTX_get_session_cache_mode(self._context)
-
- def set_verify(self, mode, callback=None):
- """
- Set the verification flags for this Context object to *mode* and
- specify that *callback* should be used for verification callbacks.
-
- :param mode: The verify mode, this should be one of
- :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If
- :const:`VERIFY_PEER` is used, *mode* can be OR:ed with
- :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and
- :const:`VERIFY_CLIENT_ONCE` to further control the behaviour.
- :param callback: The optional Python verification callback to use.
- This should take five arguments: A Connection object, an X509
- object, and three integer variables, which are in turn potential
- error number, error depth and return code. *callback* should
- return True if verification passes and False otherwise.
- If omitted, OpenSSL's default verification is used.
- :return: None
-
- See SSL_CTX_set_verify(3SSL) for further details.
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
-
- if callback is None:
- self._verify_helper = None
- self._verify_callback = None
- _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL)
- else:
- if not callable(callback):
- raise TypeError("callback must be callable")
-
- self._verify_helper = _VerifyHelper(callback)
- self._verify_callback = self._verify_helper.callback
- _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
-
- def set_verify_depth(self, depth):
- """
- Set the maximum depth for the certificate chain verification that shall
- be allowed for this Context object.
-
- :param depth: An integer specifying the verify depth
- :return: None
- """
- if not isinstance(depth, int):
- raise TypeError("depth must be an integer")
-
- _lib.SSL_CTX_set_verify_depth(self._context, depth)
-
- def get_verify_mode(self):
- """
- Retrieve the Context object's verify mode, as set by
- :meth:`set_verify`.
-
- :return: The verify mode
- """
- return _lib.SSL_CTX_get_verify_mode(self._context)
-
- def get_verify_depth(self):
- """
- Retrieve the Context object's verify depth, as set by
- :meth:`set_verify_depth`.
-
- :return: The verify depth
- """
- return _lib.SSL_CTX_get_verify_depth(self._context)
-
- def load_tmp_dh(self, dhfile):
- """
- Load parameters for Ephemeral Diffie-Hellman
-
- :param dhfile: The file to load EDH parameters from (``bytes`` or
- ``unicode``).
-
- :return: None
- """
- dhfile = _path_bytes(dhfile)
-
- bio = _lib.BIO_new_file(dhfile, b"r")
- if bio == _ffi.NULL:
- _raise_current_error()
- bio = _ffi.gc(bio, _lib.BIO_free)
-
- dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
- dh = _ffi.gc(dh, _lib.DH_free)
- res = _lib.SSL_CTX_set_tmp_dh(self._context, dh)
- _openssl_assert(res == 1)
-
- def set_tmp_ecdh(self, curve):
- """
- Select a curve to use for ECDHE key exchange.
-
- :param curve: A curve object to use as returned by either
- :meth:`OpenSSL.crypto.get_elliptic_curve` or
- :meth:`OpenSSL.crypto.get_elliptic_curves`.
-
- :return: None
- """
- _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
-
- def set_cipher_list(self, cipher_list):
- """
- Set the list of ciphers to be used in this context.
-
- See the OpenSSL manual for more information (e.g.
- :manpage:`ciphers(1)`).
-
- :param bytes cipher_list: An OpenSSL cipher string.
- :return: None
- """
- cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list)
-
- if not isinstance(cipher_list, bytes):
- raise TypeError("cipher_list must be a byte string.")
-
- _openssl_assert(
- _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1
- )
- # In OpenSSL 1.1.1 setting the cipher list will always return TLS 1.3
- # ciphers even if you pass an invalid cipher. Applications (like
- # Twisted) have tests that depend on an error being raised if an
- # invalid cipher string is passed, but without the following check
- # for the TLS 1.3 specific cipher suites it would never error.
- tmpconn = Connection(self, None)
- if tmpconn.get_cipher_list() == [
- "TLS_AES_256_GCM_SHA384",
- "TLS_CHACHA20_POLY1305_SHA256",
- "TLS_AES_128_GCM_SHA256",
- ]:
- raise Error(
- [
- (
- "SSL routines",
- "SSL_CTX_set_cipher_list",
- "no cipher match",
- ),
- ],
- )
-
- def set_client_ca_list(self, certificate_authorities):
- """
- Set the list of preferred client certificate signers for this server
- context.
-
- This list of certificate authorities will be sent to the client when
- the server requests a client certificate.
-
- :param certificate_authorities: a sequence of X509Names.
- :return: None
-
- .. versionadded:: 0.10
- """
- name_stack = _lib.sk_X509_NAME_new_null()
- _openssl_assert(name_stack != _ffi.NULL)
-
- try:
- for ca_name in certificate_authorities:
- if not isinstance(ca_name, X509Name):
- raise TypeError(
- "client CAs must be X509Name objects, not %s "
- "objects" % (type(ca_name).__name__,)
- )
- copy = _lib.X509_NAME_dup(ca_name._name)
- _openssl_assert(copy != _ffi.NULL)
- push_result = _lib.sk_X509_NAME_push(name_stack, copy)
- if not push_result:
- _lib.X509_NAME_free(copy)
- _raise_current_error()
- except Exception:
- _lib.sk_X509_NAME_free(name_stack)
- raise
-
- _lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
-
- def add_client_ca(self, certificate_authority):
- """
- Add the CA certificate to the list of preferred signers for this
- context.
-
- The list of certificate authorities will be sent to the client when the
- server requests a client certificate.
-
- :param certificate_authority: certificate authority's X509 certificate.
- :return: None
-
- .. versionadded:: 0.10
- """
- if not isinstance(certificate_authority, X509):
- raise TypeError("certificate_authority must be an X509 instance")
-
- add_result = _lib.SSL_CTX_add_client_CA(
- self._context, certificate_authority._x509
- )
- _openssl_assert(add_result == 1)
-
- def set_timeout(self, timeout):
- """
- Set the timeout for newly created sessions for this Context object to
- *timeout*. The default value is 300 seconds. See the OpenSSL manual
- for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`).
-
- :param timeout: The timeout in (whole) seconds
- :return: The previous session timeout
- """
- if not isinstance(timeout, int):
- raise TypeError("timeout must be an integer")
-
- return _lib.SSL_CTX_set_timeout(self._context, timeout)
-
- def get_timeout(self):
- """
- Retrieve session timeout, as set by :meth:`set_timeout`. The default
- is 300 seconds.
-
- :return: The session timeout
- """
- return _lib.SSL_CTX_get_timeout(self._context)
-
- def set_info_callback(self, callback):
- """
- Set the information callback to *callback*. This function will be
- called from time to time during SSL handshakes.
-
- :param callback: The Python callback to use. This should take three
- arguments: a Connection object and two integers. The first integer
- specifies where in the SSL handshake the function was called, and
- the other the return code from a (possibly failed) internal
- function call.
- :return: None
- """
-
- @wraps(callback)
- def wrapper(ssl, where, return_code):
- callback(Connection._reverse_mapping[ssl], where, return_code)
-
- self._info_callback = _ffi.callback(
- "void (*)(const SSL *, int, int)", wrapper
- )
- _lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
-
- @_requires_keylog
- def set_keylog_callback(self, callback):
- """
- Set the TLS key logging callback to *callback*. This function will be
- called whenever TLS key material is generated or received, in order
- to allow applications to store this keying material for debugging
- purposes.
-
- :param callback: The Python callback to use. This should take two
- arguments: a Connection object and a bytestring that contains
- the key material in the format used by NSS for its SSLKEYLOGFILE
- debugging output.
- :return: None
- """
-
- @wraps(callback)
- def wrapper(ssl, line):
- line = _ffi.string(line)
- callback(Connection._reverse_mapping[ssl], line)
-
- self._keylog_callback = _ffi.callback(
- "void (*)(const SSL *, const char *)", wrapper
- )
- _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback)
-
- def get_app_data(self):
- """
- Get the application data (supplied via :meth:`set_app_data()`)
-
- :return: The application data
- """
- return self._app_data
-
- def set_app_data(self, data):
- """
- Set the application data (will be returned from get_app_data())
-
- :param data: Any Python object
- :return: None
- """
- self._app_data = data
-
- def get_cert_store(self):
- """
- Get the certificate store for the context. This can be used to add
- "trusted" certificates without using the
- :meth:`load_verify_locations` method.
-
- :return: A X509Store object or None if it does not have one.
- """
- store = _lib.SSL_CTX_get_cert_store(self._context)
- if store == _ffi.NULL:
- # TODO: This is untested.
- return None
-
- pystore = X509Store.__new__(X509Store)
- pystore._store = store
- return pystore
-
- def set_options(self, options):
- """
- Add options. Options set before are not cleared!
- This method should be used with the :const:`OP_*` constants.
-
- :param options: The options to add.
- :return: The new option bitmask.
- """
- if not isinstance(options, int):
- raise TypeError("options must be an integer")
-
- return _lib.SSL_CTX_set_options(self._context, options)
-
- def set_mode(self, mode):
- """
- Add modes via bitmask. Modes set before are not cleared! This method
- should be used with the :const:`MODE_*` constants.
-
- :param mode: The mode to add.
- :return: The new mode bitmask.
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
-
- return _lib.SSL_CTX_set_mode(self._context, mode)
-
- def set_tlsext_servername_callback(self, callback):
- """
- Specify a callback function to be called when clients specify a server
- name.
-
- :param callback: The callback function. It will be invoked with one
- argument, the Connection instance.
-
- .. versionadded:: 0.13
- """
-
- @wraps(callback)
- def wrapper(ssl, alert, arg):
- callback(Connection._reverse_mapping[ssl])
- return 0
-
- self._tlsext_servername_callback = _ffi.callback(
- "int (*)(SSL *, int *, void *)", wrapper
- )
- _lib.SSL_CTX_set_tlsext_servername_callback(
- self._context, self._tlsext_servername_callback
- )
-
- def set_tlsext_use_srtp(self, profiles):
- """
- Enable support for negotiating SRTP keying material.
-
- :param bytes profiles: A colon delimited list of protection profile
- names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``.
- :return: None
- """
- if not isinstance(profiles, bytes):
- raise TypeError("profiles must be a byte string.")
-
- _openssl_assert(
- _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0
- )
-
- @_requires_alpn
- def set_alpn_protos(self, protos):
- """
- Specify the protocols that the client is prepared to speak after the
- TLS connection has been negotiated using Application Layer Protocol
- Negotiation.
-
- :param protos: A list of the protocols to be offered to the server.
- This list should be a Python list of bytestrings representing the
- protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
- """
- # Different versions of OpenSSL are inconsistent about how they handle
- # empty proto lists (see #1043), so we avoid the problem entirely by
- # rejecting them ourselves.
- if not protos:
- raise ValueError("at least one protocol must be specified")
-
- # Take the list of protocols and join them together, prefixing them
- # with their lengths.
- protostr = b"".join(
- chain.from_iterable((bytes((len(p),)), p) for p in protos)
- )
-
- # Build a C string from the list. We don't need to save this off
- # because OpenSSL immediately copies the data out.
- input_str = _ffi.new("unsigned char[]", protostr)
-
- # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
- # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
- # return 0 on success, and non-0 on failure.
- # WARNING: these functions reverse the return value convention.
- _openssl_assert(
- _lib.SSL_CTX_set_alpn_protos(
- self._context, input_str, len(protostr)
- )
- == 0
- )
-
- @_requires_alpn
- def set_alpn_select_callback(self, callback):
- """
- Specify a callback function that will be called on the server when a
- client offers protocols using ALPN.
-
- :param callback: The callback function. It will be invoked with two
- arguments: the Connection, and a list of offered protocols as
- bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return
- one of those bytestrings to indicate the chosen protocol, the
- empty bytestring to terminate the TLS connection, or the
- :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered
- protocol was selected, but that the connection should not be
- aborted.
- """
- self._alpn_select_helper = _ALPNSelectHelper(callback)
- self._alpn_select_callback = self._alpn_select_helper.callback
- _lib.SSL_CTX_set_alpn_select_cb(
- self._context, self._alpn_select_callback, _ffi.NULL
- )
-
- def _set_ocsp_callback(self, helper, data):
- """
- This internal helper does the common work for
- ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is
- almost all of it.
- """
- self._ocsp_helper = helper
- self._ocsp_callback = helper.callback
- if data is None:
- self._ocsp_data = _ffi.NULL
- else:
- self._ocsp_data = _ffi.new_handle(data)
-
- rc = _lib.SSL_CTX_set_tlsext_status_cb(
- self._context, self._ocsp_callback
- )
- _openssl_assert(rc == 1)
- rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data)
- _openssl_assert(rc == 1)
-
- def set_ocsp_server_callback(self, callback, data=None):
- """
- Set a callback to provide OCSP data to be stapled to the TLS handshake
- on the server side.
-
- :param callback: The callback function. It will be invoked with two
- arguments: the Connection, and the optional arbitrary data you have
- provided. The callback must return a bytestring that contains the
- OCSP data to staple to the handshake. If no OCSP data is available
- for this connection, return the empty bytestring.
- :param data: Some opaque data that will be passed into the callback
- function when called. This can be used to avoid needing to do
- complex data lookups or to keep track of what context is being
- used. This parameter is optional.
- """
- helper = _OCSPServerCallbackHelper(callback)
- self._set_ocsp_callback(helper, data)
-
- def set_ocsp_client_callback(self, callback, data=None):
- """
- Set a callback to validate OCSP data stapled to the TLS handshake on
- the client side.
-
- :param callback: The callback function. It will be invoked with three
- arguments: the Connection, a bytestring containing the stapled OCSP
- assertion, and the optional arbitrary data you have provided. The
- callback must return a boolean that indicates the result of
- validating the OCSP data: ``True`` if the OCSP data is valid and
- the certificate can be trusted, or ``False`` if either the OCSP
- data is invalid or the certificate has been revoked.
- :param data: Some opaque data that will be passed into the callback
- function when called. This can be used to avoid needing to do
- complex data lookups or to keep track of what context is being
- used. This parameter is optional.
- """
- helper = _OCSPClientCallbackHelper(callback)
- self._set_ocsp_callback(helper, data)
-
- def set_cookie_generate_callback(self, callback):
- self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
- _lib.SSL_CTX_set_cookie_generate_cb(
- self._context,
- self._cookie_generate_helper.callback,
- )
-
- def set_cookie_verify_callback(self, callback):
- self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
- _lib.SSL_CTX_set_cookie_verify_cb(
- self._context,
- self._cookie_verify_helper.callback,
- )
-
-
- class Connection:
- _reverse_mapping = WeakValueDictionary()
-
- def __init__(self, context, socket=None):
- """
- Create a new Connection object, using the given OpenSSL.SSL.Context
- instance and socket.
-
- :param context: An SSL Context to use for this connection
- :param socket: The socket to use for transport layer
- """
- if not isinstance(context, Context):
- raise TypeError("context must be a Context instance")
-
- ssl = _lib.SSL_new(context._context)
- self._ssl = _ffi.gc(ssl, _lib.SSL_free)
- # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns
- # an SSL_ERROR_WANT_READ when processing a non-application data packet
- # even though there is still data on the underlying transport.
- # See https://github.com/openssl/openssl/issues/6234 for more details.
- _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY)
- self._context = context
- self._app_data = None
-
- # References to strings used for Application Layer Protocol
- # Negotiation. These strings get copied at some point but it's well
- # after the callback returns, so we have to hang them somewhere to
- # avoid them getting freed.
- self._alpn_select_callback_args = None
-
- # Reference the verify_callback of the Context. This ensures that if
- # set_verify is called again after the SSL object has been created we
- # do not point to a dangling reference
- self._verify_helper = context._verify_helper
- self._verify_callback = context._verify_callback
-
- # And likewise for the cookie callbacks
- self._cookie_generate_helper = context._cookie_generate_helper
- self._cookie_verify_helper = context._cookie_verify_helper
-
- self._reverse_mapping[self._ssl] = self
-
- if socket is None:
- self._socket = None
- # Don't set up any gc for these, SSL_free will take care of them.
- self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem())
- _openssl_assert(self._into_ssl != _ffi.NULL)
-
- self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem())
- _openssl_assert(self._from_ssl != _ffi.NULL)
-
- _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl)
- else:
- self._into_ssl = None
- self._from_ssl = None
- self._socket = socket
- set_result = _lib.SSL_set_fd(
- self._ssl, _asFileDescriptor(self._socket)
- )
- _openssl_assert(set_result == 1)
-
- def __getattr__(self, name):
- """
- Look up attributes on the wrapped socket object if they are not found
- on the Connection object.
- """
- if self._socket is None:
- raise AttributeError(
- "'%s' object has no attribute '%s'"
- % (self.__class__.__name__, name)
- )
- else:
- return getattr(self._socket, name)
-
- def _raise_ssl_error(self, ssl, result):
- if self._context._verify_helper is not None:
- self._context._verify_helper.raise_if_problem()
- if self._context._alpn_select_helper is not None:
- self._context._alpn_select_helper.raise_if_problem()
- if self._context._ocsp_helper is not None:
- self._context._ocsp_helper.raise_if_problem()
-
- error = _lib.SSL_get_error(ssl, result)
- if error == _lib.SSL_ERROR_WANT_READ:
- raise WantReadError()
- elif error == _lib.SSL_ERROR_WANT_WRITE:
- raise WantWriteError()
- elif error == _lib.SSL_ERROR_ZERO_RETURN:
- raise ZeroReturnError()
- elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP:
- # TODO: This is untested.
- raise WantX509LookupError()
- elif error == _lib.SSL_ERROR_SYSCALL:
- if _lib.ERR_peek_error() == 0:
- if result < 0:
- if platform == "win32":
- errno = _ffi.getwinerror()[0]
- else:
- errno = _ffi.errno
-
- if errno != 0:
- raise SysCallError(errno, errorcode.get(errno))
- raise SysCallError(-1, "Unexpected EOF")
- else:
- # TODO: This is untested.
- _raise_current_error()
- elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0:
- # In 3.0.x an unexpected EOF no longer triggers syscall error
- # but we want to maintain compatibility so we check here and
- # raise syscall if it is an EOF. Since we're not actually sure
- # what else could raise SSL_ERROR_SSL we check for the presence
- # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING
- # and if it's not present we just raise an error, which matches
- # the behavior before we added this elif section
- peeked_error = _lib.ERR_peek_error()
- reason = _lib.ERR_GET_REASON(peeked_error)
- if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING:
- _openssl_assert(
- reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING
- )
- _lib.ERR_clear_error()
- raise SysCallError(-1, "Unexpected EOF")
- else:
- _raise_current_error()
- elif error == _lib.SSL_ERROR_NONE:
- pass
- else:
- _raise_current_error()
-
- def get_context(self):
- """
- Retrieve the :class:`Context` object associated with this
- :class:`Connection`.
- """
- return self._context
-
- def set_context(self, context):
- """
- Switch this connection to a new session context.
-
- :param context: A :class:`Context` instance giving the new session
- context to use.
- """
- if not isinstance(context, Context):
- raise TypeError("context must be a Context instance")
-
- _lib.SSL_set_SSL_CTX(self._ssl, context._context)
- self._context = context
-
- def get_servername(self):
- """
- Retrieve the servername extension value if provided in the client hello
- message, or None if there wasn't one.
-
- :return: A byte string giving the server name or :data:`None`.
-
- .. versionadded:: 0.13
- """
- name = _lib.SSL_get_servername(
- self._ssl, _lib.TLSEXT_NAMETYPE_host_name
- )
- if name == _ffi.NULL:
- return None
-
- return _ffi.string(name)
-
- def set_verify(self, mode, callback=None):
- """
- Override the Context object's verification flags for this specific
- connection. See :py:meth:`Context.set_verify` for details.
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
-
- if callback is None:
- self._verify_helper = None
- self._verify_callback = None
- _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL)
- else:
- if not callable(callback):
- raise TypeError("callback must be callable")
-
- self._verify_helper = _VerifyHelper(callback)
- self._verify_callback = self._verify_helper.callback
- _lib.SSL_set_verify(self._ssl, mode, self._verify_callback)
-
- def get_verify_mode(self):
- """
- Retrieve the Connection object's verify mode, as set by
- :meth:`set_verify`.
-
- :return: The verify mode
- """
- return _lib.SSL_get_verify_mode(self._ssl)
-
- def use_certificate(self, cert):
- """
- Load a certificate from a X509 object
-
- :param cert: The X509 object
- :return: None
- """
- # Mirrored from Context.use_certificate
- if not isinstance(cert, X509):
- raise TypeError("cert must be an X509 instance")
-
- use_result = _lib.SSL_use_certificate(self._ssl, cert._x509)
- if not use_result:
- _raise_current_error()
-
- def use_privatekey(self, pkey):
- """
- Load a private key from a PKey object
-
- :param pkey: The PKey object
- :return: None
- """
- # Mirrored from Context.use_privatekey
- if not isinstance(pkey, PKey):
- raise TypeError("pkey must be a PKey instance")
-
- use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey)
- if not use_result:
- self._context._raise_passphrase_exception()
-
- def set_ciphertext_mtu(self, mtu):
- """
- For DTLS, set the maximum UDP payload size (*not* including IP/UDP
- overhead).
-
- Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
- OpenSSL from spontaneously clearing this.
-
- :param mtu: An integer giving the maximum transmission unit.
-
- .. versionadded:: 21.1
- """
- _lib.SSL_set_mtu(self._ssl, mtu)
-
- def get_cleartext_mtu(self):
- """
- For DTLS, get the maximum size of unencrypted data you can pass to
- :meth:`write` without exceeding the MTU (as passed to
- :meth:`set_ciphertext_mtu`).
-
- :return: The effective MTU as an integer.
-
- .. versionadded:: 21.1
- """
-
- if not hasattr(_lib, "DTLS_get_data_mtu"):
- raise NotImplementedError("requires OpenSSL 1.1.1 or better")
- return _lib.DTLS_get_data_mtu(self._ssl)
-
- def set_tlsext_host_name(self, name):
- """
- Set the value of the servername extension to send in the client hello.
-
- :param name: A byte string giving the name.
-
- .. versionadded:: 0.13
- """
- if not isinstance(name, bytes):
- raise TypeError("name must be a byte string")
- elif b"\0" in name:
- raise TypeError("name must not contain NUL byte")
-
- # XXX I guess this can fail sometimes?
- _lib.SSL_set_tlsext_host_name(self._ssl, name)
-
- def pending(self):
- """
- Get the number of bytes that can be safely read from the SSL buffer
- (**not** the underlying transport buffer).
-
- :return: The number of bytes available in the receive buffer.
- """
- return _lib.SSL_pending(self._ssl)
-
- def send(self, buf, flags=0):
- """
- Send data on the connection. NOTE: If you get one of the WantRead,
- WantWrite or WantX509Lookup exceptions on this, you have to call the
- method again with the SAME buffer.
-
- :param buf: The string, buffer or memoryview to send
- :param flags: (optional) Included for compatibility with the socket
- API, the value is ignored
- :return: The number of bytes written
- """
- # Backward compatibility
- buf = _text_to_bytes_and_warn("buf", buf)
-
- with _ffi.from_buffer(buf) as data:
- # check len(buf) instead of len(data) for testability
- if len(buf) > 2147483647:
- raise ValueError(
- "Cannot send more than 2**31-1 bytes at once."
- )
-
- result = _lib.SSL_write(self._ssl, data, len(data))
- self._raise_ssl_error(self._ssl, result)
-
- return result
-
- write = send
-
- def sendall(self, buf, flags=0):
- """
- Send "all" data on the connection. This calls send() repeatedly until
- all data is sent. If an error occurs, it's impossible to tell how much
- data has been sent.
-
- :param buf: The string, buffer or memoryview to send
- :param flags: (optional) Included for compatibility with the socket
- API, the value is ignored
- :return: The number of bytes written
- """
- buf = _text_to_bytes_and_warn("buf", buf)
-
- with _ffi.from_buffer(buf) as data:
- left_to_send = len(buf)
- total_sent = 0
-
- while left_to_send:
- # SSL_write's num arg is an int,
- # so we cannot send more than 2**31-1 bytes at once.
- result = _lib.SSL_write(
- self._ssl, data + total_sent, min(left_to_send, 2147483647)
- )
- self._raise_ssl_error(self._ssl, result)
- total_sent += result
- left_to_send -= result
-
- return total_sent
-
- def recv(self, bufsiz, flags=None):
- """
- Receive data on the connection.
-
- :param bufsiz: The maximum number of bytes to read
- :param flags: (optional) The only supported flag is ``MSG_PEEK``,
- all other flags are ignored.
- :return: The string read from the Connection
- """
- buf = _no_zero_allocator("char[]", bufsiz)
- if flags is not None and flags & socket.MSG_PEEK:
- result = _lib.SSL_peek(self._ssl, buf, bufsiz)
- else:
- result = _lib.SSL_read(self._ssl, buf, bufsiz)
- self._raise_ssl_error(self._ssl, result)
- return _ffi.buffer(buf, result)[:]
-
- read = recv
-
- def recv_into(self, buffer, nbytes=None, flags=None):
- """
- Receive data on the connection and copy it directly into the provided
- buffer, rather than creating a new string.
-
- :param buffer: The buffer to copy into.
- :param nbytes: (optional) The maximum number of bytes to read into the
- buffer. If not present, defaults to the size of the buffer. If
- larger than the size of the buffer, is reduced to the size of the
- buffer.
- :param flags: (optional) The only supported flag is ``MSG_PEEK``,
- all other flags are ignored.
- :return: The number of bytes read into the buffer.
- """
- if nbytes is None:
- nbytes = len(buffer)
- else:
- nbytes = min(nbytes, len(buffer))
-
- # We need to create a temporary buffer. This is annoying, it would be
- # better if we could pass memoryviews straight into the SSL_read call,
- # but right now we can't. Revisit this if CFFI gets that ability.
- buf = _no_zero_allocator("char[]", nbytes)
- if flags is not None and flags & socket.MSG_PEEK:
- result = _lib.SSL_peek(self._ssl, buf, nbytes)
- else:
- result = _lib.SSL_read(self._ssl, buf, nbytes)
- self._raise_ssl_error(self._ssl, result)
-
- # This strange line is all to avoid a memory copy. The buffer protocol
- # should allow us to assign a CFFI buffer to the LHS of this line, but
- # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
- # wrap it in a memoryview.
- buffer[:result] = memoryview(_ffi.buffer(buf, result))
-
- return result
-
- def _handle_bio_errors(self, bio, result):
- if _lib.BIO_should_retry(bio):
- if _lib.BIO_should_read(bio):
- raise WantReadError()
- elif _lib.BIO_should_write(bio):
- # TODO: This is untested.
- raise WantWriteError()
- elif _lib.BIO_should_io_special(bio):
- # TODO: This is untested. I think io_special means the socket
- # BIO has a not-yet connected socket.
- raise ValueError("BIO_should_io_special")
- else:
- # TODO: This is untested.
- raise ValueError("unknown bio failure")
- else:
- # TODO: This is untested.
- _raise_current_error()
-
- def bio_read(self, bufsiz):
- """
- If the Connection was created with a memory BIO, this method can be
- used to read bytes from the write end of that memory BIO. Many
- Connection methods will add bytes which must be read in this manner or
- the buffer will eventually fill up and the Connection will be able to
- take no further actions.
-
- :param bufsiz: The maximum number of bytes to read
- :return: The string read.
- """
- if self._from_ssl is None:
- raise TypeError("Connection sock was not None")
-
- if not isinstance(bufsiz, int):
- raise TypeError("bufsiz must be an integer")
-
- buf = _no_zero_allocator("char[]", bufsiz)
- result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
- if result <= 0:
- self._handle_bio_errors(self._from_ssl, result)
-
- return _ffi.buffer(buf, result)[:]
-
- def bio_write(self, buf):
- """
- If the Connection was created with a memory BIO, this method can be
- used to add bytes to the read end of that memory BIO. The Connection
- can then read the bytes (for example, in response to a call to
- :meth:`recv`).
-
- :param buf: The string to put into the memory BIO.
- :return: The number of bytes written
- """
- buf = _text_to_bytes_and_warn("buf", buf)
-
- if self._into_ssl is None:
- raise TypeError("Connection sock was not None")
-
- with _ffi.from_buffer(buf) as data:
- result = _lib.BIO_write(self._into_ssl, data, len(data))
- if result <= 0:
- self._handle_bio_errors(self._into_ssl, result)
- return result
-
- def renegotiate(self):
- """
- Renegotiate the session.
-
- :return: True if the renegotiation can be started, False otherwise
- :rtype: bool
- """
- if not self.renegotiate_pending():
- _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1)
- return True
- return False
-
- def do_handshake(self):
- """
- Perform an SSL handshake (usually called after :meth:`renegotiate` or
- one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can
- raise the same exceptions as :meth:`send` and :meth:`recv`.
-
- :return: None.
- """
- result = _lib.SSL_do_handshake(self._ssl)
- self._raise_ssl_error(self._ssl, result)
-
- def renegotiate_pending(self):
- """
- Check if there's a renegotiation in progress, it will return False once
- a renegotiation is finished.
-
- :return: Whether there's a renegotiation in progress
- :rtype: bool
- """
- return _lib.SSL_renegotiate_pending(self._ssl) == 1
-
- def total_renegotiations(self):
- """
- Find out the total number of renegotiations.
-
- :return: The number of renegotiations.
- :rtype: int
- """
- return _lib.SSL_total_renegotiations(self._ssl)
-
- def connect(self, addr):
- """
- Call the :meth:`connect` method of the underlying socket and set up SSL
- on the socket, using the :class:`Context` object supplied to this
- :class:`Connection` object at creation.
-
- :param addr: A remote address
- :return: What the socket's connect method returns
- """
- _lib.SSL_set_connect_state(self._ssl)
- return self._socket.connect(addr)
-
- def connect_ex(self, addr):
- """
- Call the :meth:`connect_ex` method of the underlying socket and set up
- SSL on the socket, using the Context object supplied to this Connection
- object at creation. Note that if the :meth:`connect_ex` method of the
- socket doesn't return 0, SSL won't be initialized.
-
- :param addr: A remove address
- :return: What the socket's connect_ex method returns
- """
- connect_ex = self._socket.connect_ex
- self.set_connect_state()
- return connect_ex(addr)
-
- def accept(self):
- """
- Call the :meth:`accept` method of the underlying socket and set up SSL
- on the returned socket, using the Context object supplied to this
- :class:`Connection` object at creation.
-
- :return: A *(conn, addr)* pair where *conn* is the new
- :class:`Connection` object created, and *address* is as returned by
- the socket's :meth:`accept`.
- """
- client, addr = self._socket.accept()
- conn = Connection(self._context, client)
- conn.set_accept_state()
- return (conn, addr)
-
- def DTLSv1_listen(self):
- """
- Call the OpenSSL function DTLSv1_listen on this connection. See the
- OpenSSL manual for more details.
-
- :return: None
- """
- # Possible future extension: return the BIO_ADDR in some form.
- bio_addr = _lib.BIO_ADDR_new()
- try:
- result = _lib.DTLSv1_listen(self._ssl, bio_addr)
- finally:
- _lib.BIO_ADDR_free(bio_addr)
- # DTLSv1_listen is weird. A zero return value means 'didn't find a
- # ClientHello with valid cookie, but keep trying'. So basically
- # WantReadError. But it doesn't work correctly with _raise_ssl_error.
- # So we raise it manually instead.
- if self._cookie_generate_helper is not None:
- self._cookie_generate_helper.raise_if_problem()
- if self._cookie_verify_helper is not None:
- self._cookie_verify_helper.raise_if_problem()
- if result == 0:
- raise WantReadError()
- if result < 0:
- self._raise_ssl_error(self._ssl, result)
-
- def DTLSv1_get_timeout(self):
- """
- Determine when the DTLS SSL object next needs to perform internal
- processing due to the passage of time.
-
- When the returned number of seconds have passed, the
- :meth:`DTLSv1_handle_timeout` method needs to be called.
-
- :return: The time left in seconds before the next timeout or `None`
- if no timeout is currently active.
- """
- ptv_sec = _ffi.new("time_t *")
- ptv_usec = _ffi.new("long *")
- if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
- return ptv_sec[0] + (ptv_usec[0] / 1000000)
- else:
- return None
-
- def DTLSv1_handle_timeout(self):
- """
- Handles any timeout events which have become pending on a DTLS SSL
- object.
-
- :return: `True` if there was a pending timeout, `False` otherwise.
- """
- result = _lib.DTLSv1_handle_timeout(self._ssl)
- if result < 0:
- self._raise_ssl_error(self._ssl, result)
- else:
- return bool(result)
-
- def bio_shutdown(self):
- """
- If the Connection was created with a memory BIO, this method can be
- used to indicate that *end of file* has been reached on the read end of
- that memory BIO.
-
- :return: None
- """
- if self._from_ssl is None:
- raise TypeError("Connection sock was not None")
-
- _lib.BIO_set_mem_eof_return(self._into_ssl, 0)
-
- def shutdown(self):
- """
- Send the shutdown message to the Connection.
-
- :return: True if the shutdown completed successfully (i.e. both sides
- have sent closure alerts), False otherwise (in which case you
- call :meth:`recv` or :meth:`send` when the connection becomes
- readable/writeable).
- """
- result = _lib.SSL_shutdown(self._ssl)
- if result < 0:
- self._raise_ssl_error(self._ssl, result)
- elif result > 0:
- return True
- else:
- return False
-
- def get_cipher_list(self):
- """
- Retrieve the list of ciphers used by the Connection object.
-
- :return: A list of native cipher strings.
- """
- ciphers = []
- for i in count():
- result = _lib.SSL_get_cipher_list(self._ssl, i)
- if result == _ffi.NULL:
- break
- ciphers.append(_ffi.string(result).decode("utf-8"))
- return ciphers
-
- def get_client_ca_list(self):
- """
- Get CAs whose certificates are suggested for client authentication.
-
- :return: If this is a server connection, the list of certificate
- authorities that will be sent or has been sent to the client, as
- controlled by this :class:`Connection`'s :class:`Context`.
-
- If this is a client connection, the list will be empty until the
- connection with the server is established.
-
- .. versionadded:: 0.10
- """
- ca_names = _lib.SSL_get_client_CA_list(self._ssl)
- if ca_names == _ffi.NULL:
- # TODO: This is untested.
- return []
-
- result = []
- for i in range(_lib.sk_X509_NAME_num(ca_names)):
- name = _lib.sk_X509_NAME_value(ca_names, i)
- copy = _lib.X509_NAME_dup(name)
- _openssl_assert(copy != _ffi.NULL)
-
- pyname = X509Name.__new__(X509Name)
- pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
- result.append(pyname)
- return result
-
- def makefile(self, *args, **kwargs):
- """
- The makefile() method is not implemented, since there is no dup
- semantics for SSL connections
-
- :raise: NotImplementedError
- """
- raise NotImplementedError(
- "Cannot make file object of OpenSSL.SSL.Connection"
- )
-
- def get_app_data(self):
- """
- Retrieve application data as set by :meth:`set_app_data`.
-
- :return: The application data
- """
- return self._app_data
-
- def set_app_data(self, data):
- """
- Set application data
-
- :param data: The application data
- :return: None
- """
- self._app_data = data
-
- def get_shutdown(self):
- """
- Get the shutdown state of the Connection.
-
- :return: The shutdown state, a bitvector of SENT_SHUTDOWN,
- RECEIVED_SHUTDOWN.
- """
- return _lib.SSL_get_shutdown(self._ssl)
-
- def set_shutdown(self, state):
- """
- Set the shutdown state of the Connection.
-
- :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
- :return: None
- """
- if not isinstance(state, int):
- raise TypeError("state must be an integer")
-
- _lib.SSL_set_shutdown(self._ssl, state)
-
- def get_state_string(self):
- """
- Retrieve a verbose string detailing the state of the Connection.
-
- :return: A string representing the state
- :rtype: bytes
- """
- return _ffi.string(_lib.SSL_state_string_long(self._ssl))
-
- def server_random(self):
- """
- Retrieve the random value used with the server hello message.
-
- :return: A string representing the state
- """
- session = _lib.SSL_get_session(self._ssl)
- if session == _ffi.NULL:
- return None
- length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0)
- _openssl_assert(length > 0)
- outp = _no_zero_allocator("unsigned char[]", length)
- _lib.SSL_get_server_random(self._ssl, outp, length)
- return _ffi.buffer(outp, length)[:]
-
- def client_random(self):
- """
- Retrieve the random value used with the client hello message.
-
- :return: A string representing the state
- """
- session = _lib.SSL_get_session(self._ssl)
- if session == _ffi.NULL:
- return None
-
- length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0)
- _openssl_assert(length > 0)
- outp = _no_zero_allocator("unsigned char[]", length)
- _lib.SSL_get_client_random(self._ssl, outp, length)
- return _ffi.buffer(outp, length)[:]
-
- def master_key(self):
- """
- Retrieve the value of the master key for this session.
-
- :return: A string representing the state
- """
- session = _lib.SSL_get_session(self._ssl)
- if session == _ffi.NULL:
- return None
-
- length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0)
- _openssl_assert(length > 0)
- outp = _no_zero_allocator("unsigned char[]", length)
- _lib.SSL_SESSION_get_master_key(session, outp, length)
- return _ffi.buffer(outp, length)[:]
-
- def export_keying_material(self, label, olen, context=None):
- """
- Obtain keying material for application use.
-
- :param: label - a disambiguating label string as described in RFC 5705
- :param: olen - the length of the exported key material in bytes
- :param: context - a per-association context value
- :return: the exported key material bytes or None
- """
- outp = _no_zero_allocator("unsigned char[]", olen)
- context_buf = _ffi.NULL
- context_len = 0
- use_context = 0
- if context is not None:
- context_buf = context
- context_len = len(context)
- use_context = 1
- success = _lib.SSL_export_keying_material(
- self._ssl,
- outp,
- olen,
- label,
- len(label),
- context_buf,
- context_len,
- use_context,
- )
- _openssl_assert(success == 1)
- return _ffi.buffer(outp, olen)[:]
-
- def sock_shutdown(self, *args, **kwargs):
- """
- Call the :meth:`shutdown` method of the underlying socket.
- See :manpage:`shutdown(2)`.
-
- :return: What the socket's shutdown() method returns
- """
- return self._socket.shutdown(*args, **kwargs)
-
- def get_certificate(self):
- """
- Retrieve the local certificate (if any)
-
- :return: The local certificate
- """
- cert = _lib.SSL_get_certificate(self._ssl)
- if cert != _ffi.NULL:
- _lib.X509_up_ref(cert)
- return X509._from_raw_x509_ptr(cert)
- return None
-
- def get_peer_certificate(self):
- """
- Retrieve the other side's certificate (if any)
-
- :return: The peer's certificate
- """
- cert = _lib.SSL_get_peer_certificate(self._ssl)
- if cert != _ffi.NULL:
- return X509._from_raw_x509_ptr(cert)
- return None
-
- @staticmethod
- def _cert_stack_to_list(cert_stack):
- """
- Internal helper to convert a STACK_OF(X509) to a list of X509
- instances.
- """
- result = []
- for i in range(_lib.sk_X509_num(cert_stack)):
- cert = _lib.sk_X509_value(cert_stack, i)
- _openssl_assert(cert != _ffi.NULL)
- res = _lib.X509_up_ref(cert)
- _openssl_assert(res >= 1)
- pycert = X509._from_raw_x509_ptr(cert)
- result.append(pycert)
- return result
-
- def get_peer_cert_chain(self):
- """
- Retrieve the other side's certificate (if any)
-
- :return: A list of X509 instances giving the peer's certificate chain,
- or None if it does not have one.
- """
- cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
- if cert_stack == _ffi.NULL:
- return None
-
- return self._cert_stack_to_list(cert_stack)
-
- def get_verified_chain(self):
- """
- Retrieve the verified certificate chain of the peer including the
- peer's end entity certificate. It must be called after a session has
- been successfully established. If peer verification was not successful
- the chain may be incomplete, invalid, or None.
-
- :return: A list of X509 instances giving the peer's verified
- certificate chain, or None if it does not have one.
-
- .. versionadded:: 20.0
- """
- # OpenSSL 1.1+
- cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
- if cert_stack == _ffi.NULL:
- return None
-
- return self._cert_stack_to_list(cert_stack)
-
- def want_read(self):
- """
- Checks if more data has to be read from the transport layer to complete
- an operation.
-
- :return: True iff more data has to be read
- """
- return _lib.SSL_want_read(self._ssl)
-
- def want_write(self):
- """
- Checks if there is data to write to the transport layer to complete an
- operation.
-
- :return: True iff there is data to write
- """
- return _lib.SSL_want_write(self._ssl)
-
- def set_accept_state(self):
- """
- Set the connection to work in server mode. The handshake will be
- handled automatically by read/write.
-
- :return: None
- """
- _lib.SSL_set_accept_state(self._ssl)
-
- def set_connect_state(self):
- """
- Set the connection to work in client mode. The handshake will be
- handled automatically by read/write.
-
- :return: None
- """
- _lib.SSL_set_connect_state(self._ssl)
-
- def get_session(self):
- """
- Returns the Session currently used.
-
- :return: An instance of :class:`OpenSSL.SSL.Session` or
- :obj:`None` if no session exists.
-
- .. versionadded:: 0.14
- """
- session = _lib.SSL_get1_session(self._ssl)
- if session == _ffi.NULL:
- return None
-
- pysession = Session.__new__(Session)
- pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
- return pysession
-
- def set_session(self, session):
- """
- Set the session to be used when the TLS/SSL connection is established.
-
- :param session: A Session instance representing the session to use.
- :returns: None
-
- .. versionadded:: 0.14
- """
- if not isinstance(session, Session):
- raise TypeError("session must be a Session instance")
-
- result = _lib.SSL_set_session(self._ssl, session._session)
- _openssl_assert(result == 1)
-
- def _get_finished_message(self, function):
- """
- Helper to implement :meth:`get_finished` and
- :meth:`get_peer_finished`.
-
- :param function: Either :data:`SSL_get_finished`: or
- :data:`SSL_get_peer_finished`.
-
- :return: :data:`None` if the desired message has not yet been
- received, otherwise the contents of the message.
- :rtype: :class:`bytes` or :class:`NoneType`
- """
- # The OpenSSL documentation says nothing about what might happen if the
- # count argument given is zero. Specifically, it doesn't say whether
- # the output buffer may be NULL in that case or not. Inspection of the
- # implementation reveals that it calls memcpy() unconditionally.
- # Section 7.1.4, paragraph 1 of the C standard suggests that
- # memcpy(NULL, source, 0) is not guaranteed to produce defined (let
- # alone desirable) behavior (though it probably does on just about
- # every implementation...)
- #
- # Allocate a tiny buffer to pass in (instead of just passing NULL as
- # one might expect) for the initial call so as to be safe against this
- # potentially undefined behavior.
- empty = _ffi.new("char[]", 0)
- size = function(self._ssl, empty, 0)
- if size == 0:
- # No Finished message so far.
- return None
-
- buf = _no_zero_allocator("char[]", size)
- function(self._ssl, buf, size)
- return _ffi.buffer(buf, size)[:]
-
- def get_finished(self):
- """
- Obtain the latest TLS Finished message that we sent.
-
- :return: The contents of the message or :obj:`None` if the TLS
- handshake has not yet completed.
- :rtype: :class:`bytes` or :class:`NoneType`
-
- .. versionadded:: 0.15
- """
- return self._get_finished_message(_lib.SSL_get_finished)
-
- def get_peer_finished(self):
- """
- Obtain the latest TLS Finished message that we received from the peer.
-
- :return: The contents of the message or :obj:`None` if the TLS
- handshake has not yet completed.
- :rtype: :class:`bytes` or :class:`NoneType`
-
- .. versionadded:: 0.15
- """
- return self._get_finished_message(_lib.SSL_get_peer_finished)
-
- def get_cipher_name(self):
- """
- Obtain the name of the currently used cipher.
-
- :returns: The name of the currently used cipher or :obj:`None`
- if no connection has been established.
- :rtype: :class:`unicode` or :class:`NoneType`
-
- .. versionadded:: 0.15
- """
- cipher = _lib.SSL_get_current_cipher(self._ssl)
- if cipher == _ffi.NULL:
- return None
- else:
- name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
- return name.decode("utf-8")
-
- def get_cipher_bits(self):
- """
- Obtain the number of secret bits of the currently used cipher.
-
- :returns: The number of secret bits of the currently used cipher
- or :obj:`None` if no connection has been established.
- :rtype: :class:`int` or :class:`NoneType`
-
- .. versionadded:: 0.15
- """
- cipher = _lib.SSL_get_current_cipher(self._ssl)
- if cipher == _ffi.NULL:
- return None
- else:
- return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
-
- def get_cipher_version(self):
- """
- Obtain the protocol version of the currently used cipher.
-
- :returns: The protocol name of the currently used cipher
- or :obj:`None` if no connection has been established.
- :rtype: :class:`unicode` or :class:`NoneType`
-
- .. versionadded:: 0.15
- """
- cipher = _lib.SSL_get_current_cipher(self._ssl)
- if cipher == _ffi.NULL:
- return None
- else:
- version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
- return version.decode("utf-8")
-
- def get_protocol_version_name(self):
- """
- Retrieve the protocol version of the current connection.
-
- :returns: The TLS version of the current connection, for example
- the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown``
- for connections that were not successfully established.
- :rtype: :class:`unicode`
- """
- version = _ffi.string(_lib.SSL_get_version(self._ssl))
- return version.decode("utf-8")
-
- def get_protocol_version(self):
- """
- Retrieve the SSL or TLS protocol version of the current connection.
-
- :returns: The TLS version of the current connection. For example,
- it will return ``0x769`` for connections made over TLS version 1.
- :rtype: :class:`int`
- """
- version = _lib.SSL_version(self._ssl)
- return version
-
- @_requires_alpn
- def set_alpn_protos(self, protos):
- """
- Specify the client's ALPN protocol list.
-
- These protocols are offered to the server during protocol negotiation.
-
- :param protos: A list of the protocols to be offered to the server.
- This list should be a Python list of bytestrings representing the
- protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
- """
- # Different versions of OpenSSL are inconsistent about how they handle
- # empty proto lists (see #1043), so we avoid the problem entirely by
- # rejecting them ourselves.
- if not protos:
- raise ValueError("at least one protocol must be specified")
-
- # Take the list of protocols and join them together, prefixing them
- # with their lengths.
- protostr = b"".join(
- chain.from_iterable((bytes((len(p),)), p) for p in protos)
- )
-
- # Build a C string from the list. We don't need to save this off
- # because OpenSSL immediately copies the data out.
- input_str = _ffi.new("unsigned char[]", protostr)
-
- # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
- # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
- # return 0 on success, and non-0 on failure.
- # WARNING: these functions reverse the return value convention.
- _openssl_assert(
- _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0
- )
-
- @_requires_alpn
- def get_alpn_proto_negotiated(self):
- """
- Get the protocol that was negotiated by ALPN.
-
- :returns: A bytestring of the protocol name. If no protocol has been
- negotiated yet, returns an empty bytestring.
- """
- data = _ffi.new("unsigned char **")
- data_len = _ffi.new("unsigned int *")
-
- _lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
-
- if not data_len:
- return b""
-
- return _ffi.buffer(data[0], data_len[0])[:]
-
- def request_ocsp(self):
- """
- Called to request that the server sends stapled OCSP data, if
- available. If this is not called on the client side then the server
- will not send OCSP data. Should be used in conjunction with
- :meth:`Context.set_ocsp_client_callback`.
- """
- rc = _lib.SSL_set_tlsext_status_type(
- self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
- )
- _openssl_assert(rc == 1)
|