Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_sslverify.py 114KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367
  1. # Copyright 2005 Divmod, Inc. See LICENSE file for details
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tests for L{twisted.internet._sslverify}.
  6. """
  7. import datetime
  8. import itertools
  9. import sys
  10. from unittest import skipIf
  11. from zope.interface import implementer
  12. from incremental import Version
  13. from twisted.internet import defer, interfaces, protocol, reactor
  14. from twisted.internet._idna import _idnaText
  15. from twisted.internet.error import CertificateError, ConnectionClosed, ConnectionLost
  16. from twisted.python.compat import nativeString
  17. from twisted.python.filepath import FilePath
  18. from twisted.python.modules import getModule
  19. from twisted.python.reflect import requireModule
  20. from twisted.test.iosim import connectedServerAndClient
  21. from twisted.test.test_twisted import SetAsideModule
  22. from twisted.trial import util
  23. from twisted.trial.unittest import SkipTest, SynchronousTestCase, TestCase
  24. skipSSL = ""
  25. skipSNI = ""
  26. skipNPN = ""
  27. skipALPN = ""
  28. if requireModule("OpenSSL"):
  29. import ipaddress
  30. from OpenSSL import SSL
  31. from OpenSSL.crypto import FILETYPE_PEM, TYPE_RSA, X509, PKey, get_elliptic_curves
  32. from cryptography import x509
  33. from cryptography.hazmat.backends import default_backend
  34. from cryptography.hazmat.primitives import hashes
  35. from cryptography.hazmat.primitives.asymmetric import rsa
  36. from cryptography.hazmat.primitives.serialization import (
  37. Encoding,
  38. NoEncryption,
  39. PrivateFormat,
  40. )
  41. from cryptography.x509.oid import NameOID
  42. from twisted.internet import ssl
  43. try:
  44. ctx = SSL.Context(SSL.SSLv23_METHOD)
  45. ctx.set_npn_advertise_callback(lambda c: None)
  46. except (NotImplementedError, AttributeError):
  47. skipNPN = (
  48. "NPN is deprecated (and OpenSSL 1.0.1 or greater required for NPN"
  49. " support)"
  50. )
  51. try:
  52. ctx = SSL.Context(SSL.SSLv23_METHOD)
  53. ctx.set_alpn_select_callback(lambda c: None) # type: ignore[misc,arg-type]
  54. except NotImplementedError:
  55. skipALPN = "OpenSSL 1.0.2 or greater required for ALPN support"
  56. else:
  57. skipSSL = "OpenSSL is required for SSL tests."
  58. skipSNI = skipSSL
  59. skipNPN = skipSSL
  60. skipALPN = skipSSL
  61. if not skipSSL:
  62. from twisted.internet import _sslverify as sslverify
  63. from twisted.internet.ssl import VerificationError, platformTrust
  64. from twisted.protocols.tls import TLSMemoryBIOFactory
  65. # A couple of static PEM-format certificates to be used by various tests.
  66. A_HOST_CERTIFICATE_PEM = """
  67. -----BEGIN CERTIFICATE-----
  68. MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
  69. A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
  70. MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
  71. dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
  72. ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
  73. OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
  74. aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
  75. IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
  76. Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
  77. KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
  78. 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
  79. KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
  80. VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
  81. JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
  82. S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
  83. fXzCWdG0O/3Lk2SRM0I=
  84. -----END CERTIFICATE-----
  85. """
  86. A_PEER_CERTIFICATE_PEM = """
  87. -----BEGIN CERTIFICATE-----
  88. MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
  89. A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
  90. MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
  91. dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
  92. bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
  93. MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
  94. dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
  95. aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
  96. c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
  97. MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
  98. CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
  99. JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
  100. e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
  101. vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
  102. i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
  103. yqDtGhklsWW3ZwBzEh5VEOUp
  104. -----END CERTIFICATE-----
  105. """
  106. A_KEYPAIR = getModule(__name__).filePath.sibling("server.pem").getContent()
  107. def counter(counter=itertools.count()):
  108. """
  109. Each time we're called, return the next integer in the natural numbers.
  110. """
  111. return next(counter)
  112. def makeCertificate(**kw):
  113. keypair = PKey()
  114. keypair.generate_key(TYPE_RSA, 2048)
  115. certificate = X509()
  116. certificate.gmtime_adj_notBefore(0)
  117. certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
  118. for xname in certificate.get_issuer(), certificate.get_subject():
  119. for (k, v) in kw.items():
  120. setattr(xname, k, nativeString(v))
  121. certificate.set_serial_number(counter())
  122. certificate.set_pubkey(keypair)
  123. certificate.sign(keypair, "md5")
  124. return keypair, certificate
  125. def certificatesForAuthorityAndServer(serviceIdentity="example.com"):
  126. """
  127. Create a self-signed CA certificate and server certificate signed by the
  128. CA.
  129. @param serviceIdentity: The identity (hostname) of the server.
  130. @type serviceIdentity: L{unicode}
  131. @return: a 2-tuple of C{(certificate_authority_certificate,
  132. server_certificate)}
  133. @rtype: L{tuple} of (L{sslverify.Certificate},
  134. L{sslverify.PrivateCertificate})
  135. """
  136. commonNameForCA = x509.Name(
  137. [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example CA")]
  138. )
  139. commonNameForServer = x509.Name(
  140. [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example Server")]
  141. )
  142. oneDay = datetime.timedelta(1, 0, 0)
  143. privateKeyForCA = rsa.generate_private_key(
  144. public_exponent=65537, key_size=4096, backend=default_backend()
  145. )
  146. publicKeyForCA = privateKeyForCA.public_key()
  147. caCertificate = (
  148. x509.CertificateBuilder()
  149. .subject_name(commonNameForCA)
  150. .issuer_name(commonNameForCA)
  151. .not_valid_before(datetime.datetime.today() - oneDay)
  152. .not_valid_after(datetime.datetime.today() + oneDay)
  153. .serial_number(x509.random_serial_number())
  154. .public_key(publicKeyForCA)
  155. .add_extension(
  156. x509.BasicConstraints(ca=True, path_length=9),
  157. critical=True,
  158. )
  159. .sign(
  160. private_key=privateKeyForCA,
  161. algorithm=hashes.SHA256(),
  162. backend=default_backend(),
  163. )
  164. )
  165. privateKeyForServer = rsa.generate_private_key(
  166. public_exponent=65537, key_size=4096, backend=default_backend()
  167. )
  168. publicKeyForServer = privateKeyForServer.public_key()
  169. try:
  170. ipAddress = ipaddress.ip_address(serviceIdentity)
  171. except ValueError:
  172. subjectAlternativeNames = [
  173. x509.DNSName(serviceIdentity.encode("idna").decode("ascii"))
  174. ]
  175. else:
  176. subjectAlternativeNames = [x509.IPAddress(ipAddress)]
  177. serverCertificate = (
  178. x509.CertificateBuilder()
  179. .subject_name(commonNameForServer)
  180. .issuer_name(commonNameForCA)
  181. .not_valid_before(datetime.datetime.today() - oneDay)
  182. .not_valid_after(datetime.datetime.today() + oneDay)
  183. .serial_number(x509.random_serial_number())
  184. .public_key(publicKeyForServer)
  185. .add_extension(
  186. x509.BasicConstraints(ca=False, path_length=None),
  187. critical=True,
  188. )
  189. .add_extension(
  190. x509.SubjectAlternativeName(subjectAlternativeNames),
  191. critical=True,
  192. )
  193. .sign(
  194. private_key=privateKeyForCA,
  195. algorithm=hashes.SHA256(),
  196. backend=default_backend(),
  197. )
  198. )
  199. caSelfCert = sslverify.Certificate.loadPEM(caCertificate.public_bytes(Encoding.PEM))
  200. serverCert = sslverify.PrivateCertificate.loadPEM(
  201. b"\n".join(
  202. [
  203. privateKeyForServer.private_bytes(
  204. Encoding.PEM,
  205. PrivateFormat.TraditionalOpenSSL,
  206. NoEncryption(),
  207. ),
  208. serverCertificate.public_bytes(Encoding.PEM),
  209. ]
  210. )
  211. )
  212. return caSelfCert, serverCert
  213. def _loopbackTLSConnection(serverOpts, clientOpts):
  214. """
  215. Common implementation code for both L{loopbackTLSConnection} and
  216. L{loopbackTLSConnectionInMemory}. Creates a loopback TLS connection
  217. using the provided server and client context factories.
  218. @param serverOpts: An OpenSSL context factory for the server.
  219. @type serverOpts: C{OpenSSLCertificateOptions}, or any class with an
  220. equivalent API.
  221. @param clientOpts: An OpenSSL context factory for the client.
  222. @type clientOpts: C{OpenSSLCertificateOptions}, or any class with an
  223. equivalent API.
  224. @return: 5-tuple of server-tls-protocol, server-inner-protocol,
  225. client-tls-protocol, client-inner-protocol and L{IOPump}
  226. @rtype: L{tuple}
  227. """
  228. class GreetingServer(protocol.Protocol):
  229. greeting = b"greetings!"
  230. def connectionMade(self):
  231. self.transport.write(self.greeting)
  232. class ListeningClient(protocol.Protocol):
  233. data = b""
  234. lostReason = None
  235. def dataReceived(self, data):
  236. self.data += data
  237. def connectionLost(self, reason):
  238. self.lostReason = reason
  239. clientWrappedProto = ListeningClient()
  240. serverWrappedProto = GreetingServer()
  241. plainClientFactory = protocol.Factory()
  242. plainClientFactory.protocol = lambda: clientWrappedProto
  243. plainServerFactory = protocol.Factory()
  244. plainServerFactory.protocol = lambda: serverWrappedProto
  245. clientFactory = TLSMemoryBIOFactory(
  246. clientOpts, isClient=True, wrappedFactory=plainServerFactory
  247. )
  248. serverFactory = TLSMemoryBIOFactory(
  249. serverOpts, isClient=False, wrappedFactory=plainClientFactory
  250. )
  251. sProto, cProto, pump = connectedServerAndClient(
  252. lambda: serverFactory.buildProtocol(None),
  253. lambda: clientFactory.buildProtocol(None),
  254. )
  255. return sProto, cProto, serverWrappedProto, clientWrappedProto, pump
  256. def loopbackTLSConnection(trustRoot, privateKeyFile, chainedCertFile=None):
  257. """
  258. Create a loopback TLS connection with the given trust and keys.
  259. @param trustRoot: the C{trustRoot} argument for the client connection's
  260. context.
  261. @type trustRoot: L{sslverify.IOpenSSLTrustRoot}
  262. @param privateKeyFile: The name of the file containing the private key.
  263. @type privateKeyFile: L{str} (native string; file name)
  264. @param chainedCertFile: The name of the chained certificate file.
  265. @type chainedCertFile: L{str} (native string; file name)
  266. @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
  267. @rtype: L{tuple}
  268. """
  269. class ContextFactory:
  270. def getContext(self):
  271. """
  272. Create a context for the server side of the connection.
  273. @return: an SSL context using a certificate and key.
  274. @rtype: C{OpenSSL.SSL.Context}
  275. """
  276. ctx = SSL.Context(SSL.SSLv23_METHOD)
  277. if chainedCertFile is not None:
  278. ctx.use_certificate_chain_file(chainedCertFile)
  279. ctx.use_privatekey_file(privateKeyFile)
  280. # Let the test author know if they screwed something up.
  281. ctx.check_privatekey()
  282. return ctx
  283. serverOpts = ContextFactory()
  284. clientOpts = sslverify.OpenSSLCertificateOptions(trustRoot=trustRoot)
  285. return _loopbackTLSConnection(serverOpts, clientOpts)
  286. def loopbackTLSConnectionInMemory(
  287. trustRoot,
  288. privateKey,
  289. serverCertificate,
  290. clientProtocols=None,
  291. serverProtocols=None,
  292. clientOptions=None,
  293. ):
  294. """
  295. Create a loopback TLS connection with the given trust and keys. Like
  296. L{loopbackTLSConnection}, but using in-memory certificates and keys rather
  297. than writing them to disk.
  298. @param trustRoot: the C{trustRoot} argument for the client connection's
  299. context.
  300. @type trustRoot: L{sslverify.IOpenSSLTrustRoot}
  301. @param privateKey: The private key.
  302. @type privateKey: L{str} (native string)
  303. @param serverCertificate: The certificate used by the server.
  304. @type chainedCertFile: L{str} (native string)
  305. @param clientProtocols: The protocols the client is willing to negotiate
  306. using NPN/ALPN.
  307. @param serverProtocols: The protocols the server is willing to negotiate
  308. using NPN/ALPN.
  309. @param clientOptions: The type of C{OpenSSLCertificateOptions} class to
  310. use for the client. Defaults to C{OpenSSLCertificateOptions}.
  311. @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
  312. @rtype: L{tuple}
  313. """
  314. if clientOptions is None:
  315. clientOptions = sslverify.OpenSSLCertificateOptions
  316. clientCertOpts = clientOptions(
  317. trustRoot=trustRoot, acceptableProtocols=clientProtocols
  318. )
  319. serverCertOpts = sslverify.OpenSSLCertificateOptions(
  320. privateKey=privateKey,
  321. certificate=serverCertificate,
  322. acceptableProtocols=serverProtocols,
  323. )
  324. return _loopbackTLSConnection(serverCertOpts, clientCertOpts)
  325. def pathContainingDumpOf(testCase, *dumpables):
  326. """
  327. Create a temporary file to store some serializable-as-PEM objects in, and
  328. return its name.
  329. @param testCase: a test case to use for generating a temporary directory.
  330. @type testCase: L{twisted.trial.unittest.TestCase}
  331. @param dumpables: arguments are objects from pyOpenSSL with a C{dump}
  332. method, taking a pyOpenSSL file-type constant, such as
  333. L{OpenSSL.crypto.FILETYPE_PEM} or L{OpenSSL.crypto.FILETYPE_ASN1}.
  334. @type dumpables: L{tuple} of L{object} with C{dump} method taking L{int}
  335. returning L{bytes}
  336. @return: the path to a file where all of the dumpables were dumped in PEM
  337. format.
  338. @rtype: L{str}
  339. """
  340. fname = testCase.mktemp()
  341. with open(fname, "wb") as f:
  342. for dumpable in dumpables:
  343. f.write(dumpable.dump(FILETYPE_PEM))
  344. return fname
  345. class DataCallbackProtocol(protocol.Protocol):
  346. def dataReceived(self, data):
  347. d, self.factory.onData = self.factory.onData, None
  348. if d is not None:
  349. d.callback(data)
  350. def connectionLost(self, reason):
  351. d, self.factory.onLost = self.factory.onLost, None
  352. if d is not None:
  353. d.errback(reason)
  354. class WritingProtocol(protocol.Protocol):
  355. byte = b"x"
  356. def connectionMade(self):
  357. self.transport.write(self.byte)
  358. def connectionLost(self, reason):
  359. self.factory.onLost.errback(reason)
  360. class FakeContext:
  361. """
  362. Introspectable fake of an C{OpenSSL.SSL.Context}.
  363. Saves call arguments for later introspection.
  364. Necessary because C{Context} offers poor introspection. cf. this
  365. U{pyOpenSSL bug<https://bugs.launchpad.net/pyopenssl/+bug/1173899>}.
  366. @ivar _method: See C{method} parameter of L{__init__}.
  367. @ivar _options: L{int} of C{OR}ed values from calls of L{set_options}.
  368. @ivar _certificate: Set by L{use_certificate}.
  369. @ivar _privateKey: Set by L{use_privatekey}.
  370. @ivar _verify: Set by L{set_verify}.
  371. @ivar _verifyDepth: Set by L{set_verify_depth}.
  372. @ivar _mode: Set by L{set_mode}.
  373. @ivar _sessionID: Set by L{set_session_id}.
  374. @ivar _extraCertChain: Accumulated L{list} of all extra certificates added
  375. by L{add_extra_chain_cert}.
  376. @ivar _cipherList: Set by L{set_cipher_list}.
  377. @ivar _dhFilename: Set by L{load_tmp_dh}.
  378. @ivar _defaultVerifyPathsSet: Set by L{set_default_verify_paths}
  379. @ivar _ecCurve: Set by L{set_tmp_ecdh}
  380. """
  381. _options = 0
  382. def __init__(self, method):
  383. self._method = method
  384. self._extraCertChain = []
  385. self._defaultVerifyPathsSet = False
  386. self._ecCurve = None
  387. # Note that this value is explicitly documented as the default by
  388. # https://www.openssl.org/docs/man1.1.1/man3/
  389. # SSL_CTX_set_session_cache_mode.html
  390. self._sessionCacheMode = SSL.SESS_CACHE_SERVER
  391. def set_options(self, options):
  392. self._options |= options
  393. def use_certificate(self, certificate):
  394. self._certificate = certificate
  395. def use_privatekey(self, privateKey):
  396. self._privateKey = privateKey
  397. def check_privatekey(self):
  398. return None
  399. def set_mode(self, mode):
  400. """
  401. Set the mode. See L{SSL.Context.set_mode}.
  402. @param mode: See L{SSL.Context.set_mode}.
  403. """
  404. self._mode = mode
  405. def set_verify(self, flags, callback=None):
  406. self._verify = flags, callback
  407. def set_verify_depth(self, depth):
  408. self._verifyDepth = depth
  409. def set_session_id(self, sessionIDContext):
  410. # This fake should change when the upstream changes:
  411. # https://github.com/pyca/pyopenssl/issues/845
  412. self._sessionIDContext = sessionIDContext
  413. def set_session_cache_mode(self, cacheMode):
  414. """
  415. Set the session cache mode on the context, as per
  416. L{SSL.Context.set_session_cache_mode}.
  417. """
  418. self._sessionCacheMode = cacheMode
  419. def get_session_cache_mode(self):
  420. """
  421. Retrieve the session cache mode from the context, as per
  422. L{SSL.Context.get_session_cache_mode}.
  423. """
  424. return self._sessionCacheMode
  425. def add_extra_chain_cert(self, cert):
  426. self._extraCertChain.append(cert)
  427. def set_cipher_list(self, cipherList):
  428. self._cipherList = cipherList
  429. def load_tmp_dh(self, dhfilename):
  430. self._dhFilename = dhfilename
  431. def set_default_verify_paths(self):
  432. """
  433. Set the default paths for the platform.
  434. """
  435. self._defaultVerifyPathsSet = True
  436. def set_tmp_ecdh(self, curve):
  437. """
  438. Set an ECDH curve. Should only be called by OpenSSL 1.0.1
  439. code.
  440. @param curve: See L{OpenSSL.SSL.Context.set_tmp_ecdh}
  441. """
  442. self._ecCurve = curve
  443. class ClientOptionsTests(SynchronousTestCase):
  444. """
  445. Tests for L{sslverify.optionsForClientTLS}.
  446. """
  447. if skipSSL:
  448. skip = skipSSL
  449. def test_extraKeywords(self):
  450. """
  451. When passed a keyword parameter other than C{extraCertificateOptions},
  452. L{sslverify.optionsForClientTLS} raises an exception just like a
  453. normal Python function would.
  454. """
  455. error = self.assertRaises(
  456. TypeError,
  457. sslverify.optionsForClientTLS,
  458. hostname="alpha",
  459. someRandomThing="beta",
  460. )
  461. self.assertEqual(
  462. str(error),
  463. "optionsForClientTLS() got an unexpected keyword argument "
  464. "'someRandomThing'",
  465. )
  466. def test_bytesFailFast(self):
  467. """
  468. If you pass L{bytes} as the hostname to
  469. L{sslverify.optionsForClientTLS} it immediately raises a L{TypeError}.
  470. """
  471. error = self.assertRaises(
  472. TypeError, sslverify.optionsForClientTLS, b"not-actually-a-hostname.com"
  473. )
  474. expectedText = (
  475. "optionsForClientTLS requires text for host names, not " + bytes.__name__
  476. )
  477. self.assertEqual(str(error), expectedText)
  478. def test_dNSNameHostname(self):
  479. """
  480. If you pass a dNSName to L{sslverify.optionsForClientTLS}
  481. L{_hostnameIsDnsName} will be True
  482. """
  483. options = sslverify.optionsForClientTLS("example.com")
  484. self.assertTrue(options._hostnameIsDnsName)
  485. def test_IPv4AddressHostname(self):
  486. """
  487. If you pass an IPv4 address to L{sslverify.optionsForClientTLS}
  488. L{_hostnameIsDnsName} will be False
  489. """
  490. options = sslverify.optionsForClientTLS("127.0.0.1")
  491. self.assertFalse(options._hostnameIsDnsName)
  492. def test_IPv6AddressHostname(self):
  493. """
  494. If you pass an IPv6 address to L{sslverify.optionsForClientTLS}
  495. L{_hostnameIsDnsName} will be False
  496. """
  497. options = sslverify.optionsForClientTLS("::1")
  498. self.assertFalse(options._hostnameIsDnsName)
  499. class FakeChooseDiffieHellmanEllipticCurve:
  500. """
  501. A fake implementation of L{_ChooseDiffieHellmanEllipticCurve}
  502. """
  503. def __init__(self, versionNumber, openSSLlib, openSSLcrypto):
  504. """
  505. A no-op constructor.
  506. """
  507. def configureECDHCurve(self, ctx):
  508. """
  509. A null configuration.
  510. @param ctx: An L{OpenSSL.SSL.Context} that would be
  511. configured.
  512. """
  513. class OpenSSLOptionsTestsMixin:
  514. """
  515. A mixin for L{OpenSSLOptions} test cases creates client and server
  516. certificates, signs them with a CA, and provides a L{loopback}
  517. that creates TLS a connections with them.
  518. """
  519. if skipSSL:
  520. skip = skipSSL
  521. serverPort = clientConn = None
  522. onServerLost = onClientLost = None
  523. def setUp(self):
  524. """
  525. Create class variables of client and server certificates.
  526. """
  527. self.sKey, self.sCert = makeCertificate(
  528. O=b"Server Test Certificate", CN=b"server"
  529. )
  530. self.cKey, self.cCert = makeCertificate(
  531. O=b"Client Test Certificate", CN=b"client"
  532. )
  533. self.caCert1 = makeCertificate(O=b"CA Test Certificate 1", CN=b"ca1")[1]
  534. self.caCert2 = makeCertificate(O=b"CA Test Certificate", CN=b"ca2")[1]
  535. self.caCerts = [self.caCert1, self.caCert2]
  536. self.extraCertChain = self.caCerts
  537. def tearDown(self):
  538. if self.serverPort is not None:
  539. self.serverPort.stopListening()
  540. if self.clientConn is not None:
  541. self.clientConn.disconnect()
  542. L = []
  543. if self.onServerLost is not None:
  544. L.append(self.onServerLost)
  545. if self.onClientLost is not None:
  546. L.append(self.onClientLost)
  547. return defer.DeferredList(L, consumeErrors=True)
  548. def loopback(
  549. self,
  550. serverCertOpts,
  551. clientCertOpts,
  552. onServerLost=None,
  553. onClientLost=None,
  554. onData=None,
  555. ):
  556. if onServerLost is None:
  557. self.onServerLost = onServerLost = defer.Deferred()
  558. if onClientLost is None:
  559. self.onClientLost = onClientLost = defer.Deferred()
  560. if onData is None:
  561. onData = defer.Deferred()
  562. serverFactory = protocol.ServerFactory()
  563. serverFactory.protocol = DataCallbackProtocol
  564. serverFactory.onLost = onServerLost
  565. serverFactory.onData = onData
  566. clientFactory = protocol.ClientFactory()
  567. clientFactory.protocol = WritingProtocol
  568. clientFactory.onLost = onClientLost
  569. self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
  570. self.clientConn = reactor.connectSSL(
  571. "127.0.0.1", self.serverPort.getHost().port, clientFactory, clientCertOpts
  572. )
  573. class OpenSSLOptionsTests(OpenSSLOptionsTestsMixin, TestCase):
  574. """
  575. Tests for L{sslverify.OpenSSLOptions}.
  576. """
  577. def setUp(self):
  578. """
  579. Same as L{OpenSSLOptionsTestsMixin.setUp}, but it also patches
  580. L{sslverify._ChooseDiffieHellmanEllipticCurve}.
  581. """
  582. super().setUp()
  583. self.patch(
  584. sslverify,
  585. "_ChooseDiffieHellmanEllipticCurve",
  586. FakeChooseDiffieHellmanEllipticCurve,
  587. )
  588. def test_constructorWithOnlyPrivateKey(self):
  589. """
  590. C{privateKey} and C{certificate} make only sense if both are set.
  591. """
  592. self.assertRaises(
  593. ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey
  594. )
  595. def test_constructorWithOnlyCertificate(self):
  596. """
  597. C{privateKey} and C{certificate} make only sense if both are set.
  598. """
  599. self.assertRaises(
  600. ValueError, sslverify.OpenSSLCertificateOptions, certificate=self.sCert
  601. )
  602. def test_constructorWithCertificateAndPrivateKey(self):
  603. """
  604. Specifying C{privateKey} and C{certificate} initializes correctly.
  605. """
  606. opts = sslverify.OpenSSLCertificateOptions(
  607. privateKey=self.sKey, certificate=self.sCert
  608. )
  609. self.assertEqual(opts.privateKey, self.sKey)
  610. self.assertEqual(opts.certificate, self.sCert)
  611. self.assertEqual(opts.extraCertChain, [])
  612. def test_constructorDoesNotAllowVerifyWithoutCACerts(self):
  613. """
  614. C{verify} must not be C{True} without specifying C{caCerts}.
  615. """
  616. self.assertRaises(
  617. ValueError,
  618. sslverify.OpenSSLCertificateOptions,
  619. privateKey=self.sKey,
  620. certificate=self.sCert,
  621. verify=True,
  622. )
  623. def test_constructorDoesNotAllowLegacyWithTrustRoot(self):
  624. """
  625. C{verify}, C{requireCertificate}, and C{caCerts} must not be specified
  626. by the caller (to be I{any} value, even the default!) when specifying
  627. C{trustRoot}.
  628. """
  629. self.assertRaises(
  630. TypeError,
  631. sslverify.OpenSSLCertificateOptions,
  632. privateKey=self.sKey,
  633. certificate=self.sCert,
  634. verify=True,
  635. trustRoot=None,
  636. caCerts=self.caCerts,
  637. )
  638. self.assertRaises(
  639. TypeError,
  640. sslverify.OpenSSLCertificateOptions,
  641. privateKey=self.sKey,
  642. certificate=self.sCert,
  643. trustRoot=None,
  644. requireCertificate=True,
  645. )
  646. def test_constructorAllowsCACertsWithoutVerify(self):
  647. """
  648. It's currently a NOP, but valid.
  649. """
  650. opts = sslverify.OpenSSLCertificateOptions(
  651. privateKey=self.sKey, certificate=self.sCert, caCerts=self.caCerts
  652. )
  653. self.assertFalse(opts.verify)
  654. self.assertEqual(self.caCerts, opts.caCerts)
  655. def test_constructorWithVerifyAndCACerts(self):
  656. """
  657. Specifying C{verify} and C{caCerts} initializes correctly.
  658. """
  659. opts = sslverify.OpenSSLCertificateOptions(
  660. privateKey=self.sKey,
  661. certificate=self.sCert,
  662. verify=True,
  663. caCerts=self.caCerts,
  664. )
  665. self.assertTrue(opts.verify)
  666. self.assertEqual(self.caCerts, opts.caCerts)
  667. def test_constructorSetsExtraChain(self):
  668. """
  669. Setting C{extraCertChain} works if C{certificate} and C{privateKey} are
  670. set along with it.
  671. """
  672. opts = sslverify.OpenSSLCertificateOptions(
  673. privateKey=self.sKey,
  674. certificate=self.sCert,
  675. extraCertChain=self.extraCertChain,
  676. )
  677. self.assertEqual(self.extraCertChain, opts.extraCertChain)
  678. def test_constructorDoesNotAllowExtraChainWithoutPrivateKey(self):
  679. """
  680. A C{extraCertChain} without C{privateKey} doesn't make sense and is
  681. thus rejected.
  682. """
  683. self.assertRaises(
  684. ValueError,
  685. sslverify.OpenSSLCertificateOptions,
  686. certificate=self.sCert,
  687. extraCertChain=self.extraCertChain,
  688. )
  689. def test_constructorDoesNotAllowExtraChainWithOutPrivateKey(self):
  690. """
  691. A C{extraCertChain} without C{certificate} doesn't make sense and is
  692. thus rejected.
  693. """
  694. self.assertRaises(
  695. ValueError,
  696. sslverify.OpenSSLCertificateOptions,
  697. privateKey=self.sKey,
  698. extraCertChain=self.extraCertChain,
  699. )
  700. def test_extraChainFilesAreAddedIfSupplied(self):
  701. """
  702. If C{extraCertChain} is set and all prerequisites are met, the
  703. specified chain certificates are added to C{Context}s that get
  704. created.
  705. """
  706. opts = sslverify.OpenSSLCertificateOptions(
  707. privateKey=self.sKey,
  708. certificate=self.sCert,
  709. extraCertChain=self.extraCertChain,
  710. )
  711. opts._contextFactory = FakeContext
  712. ctx = opts.getContext()
  713. self.assertEqual(self.sKey, ctx._privateKey)
  714. self.assertEqual(self.sCert, ctx._certificate)
  715. self.assertEqual(self.extraCertChain, ctx._extraCertChain)
  716. def test_extraChainDoesNotBreakPyOpenSSL(self):
  717. """
  718. C{extraCertChain} doesn't break C{OpenSSL.SSL.Context} creation.
  719. """
  720. opts = sslverify.OpenSSLCertificateOptions(
  721. privateKey=self.sKey,
  722. certificate=self.sCert,
  723. extraCertChain=self.extraCertChain,
  724. )
  725. ctx = opts.getContext()
  726. self.assertIsInstance(ctx, SSL.Context)
  727. def test_acceptableCiphersAreAlwaysSet(self):
  728. """
  729. If the user doesn't supply custom acceptable ciphers, a shipped secure
  730. default is used. We can't check directly for it because the effective
  731. cipher string we set varies with platforms.
  732. """
  733. opts = sslverify.OpenSSLCertificateOptions(
  734. privateKey=self.sKey,
  735. certificate=self.sCert,
  736. )
  737. opts._contextFactory = FakeContext
  738. ctx = opts.getContext()
  739. self.assertEqual(opts._cipherString.encode("ascii"), ctx._cipherList)
  740. def test_givesMeaningfulErrorMessageIfNoCipherMatches(self):
  741. """
  742. If there is no valid cipher that matches the user's wishes,
  743. a L{ValueError} is raised.
  744. """
  745. self.assertRaises(
  746. ValueError,
  747. sslverify.OpenSSLCertificateOptions,
  748. privateKey=self.sKey,
  749. certificate=self.sCert,
  750. acceptableCiphers=sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString(
  751. ""
  752. ),
  753. )
  754. def test_honorsAcceptableCiphersArgument(self):
  755. """
  756. If acceptable ciphers are passed, they are used.
  757. """
  758. @implementer(interfaces.IAcceptableCiphers)
  759. class FakeAcceptableCiphers:
  760. def selectCiphers(self, _):
  761. return [sslverify.OpenSSLCipher("sentinel")]
  762. opts = sslverify.OpenSSLCertificateOptions(
  763. privateKey=self.sKey,
  764. certificate=self.sCert,
  765. acceptableCiphers=FakeAcceptableCiphers(),
  766. )
  767. opts._contextFactory = FakeContext
  768. ctx = opts.getContext()
  769. self.assertEqual(b"sentinel", ctx._cipherList)
  770. def test_basicSecurityOptionsAreSet(self):
  771. """
  772. Every context must have C{OP_NO_SSLv2}, C{OP_NO_COMPRESSION}, and
  773. C{OP_CIPHER_SERVER_PREFERENCE} set.
  774. """
  775. opts = sslverify.OpenSSLCertificateOptions(
  776. privateKey=self.sKey,
  777. certificate=self.sCert,
  778. )
  779. opts._contextFactory = FakeContext
  780. ctx = opts.getContext()
  781. options = (
  782. SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE
  783. )
  784. self.assertEqual(options, ctx._options & options)
  785. def test_modeIsSet(self):
  786. """
  787. Every context must be in C{MODE_RELEASE_BUFFERS} mode.
  788. """
  789. opts = sslverify.OpenSSLCertificateOptions(
  790. privateKey=self.sKey,
  791. certificate=self.sCert,
  792. )
  793. opts._contextFactory = FakeContext
  794. ctx = opts.getContext()
  795. self.assertEqual(SSL.MODE_RELEASE_BUFFERS, ctx._mode)
  796. def test_singleUseKeys(self):
  797. """
  798. If C{singleUseKeys} is set, every context must have
  799. C{OP_SINGLE_DH_USE} and C{OP_SINGLE_ECDH_USE} set.
  800. """
  801. opts = sslverify.OpenSSLCertificateOptions(
  802. privateKey=self.sKey,
  803. certificate=self.sCert,
  804. enableSingleUseKeys=True,
  805. )
  806. opts._contextFactory = FakeContext
  807. ctx = opts.getContext()
  808. options = SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE
  809. self.assertEqual(options, ctx._options & options)
  810. def test_methodIsDeprecated(self):
  811. """
  812. Passing C{method} to L{sslverify.OpenSSLCertificateOptions} is
  813. deprecated.
  814. """
  815. sslverify.OpenSSLCertificateOptions(
  816. privateKey=self.sKey,
  817. certificate=self.sCert,
  818. method=SSL.SSLv23_METHOD,
  819. )
  820. message = (
  821. "Passing method to twisted.internet.ssl.CertificateOptions "
  822. "was deprecated in Twisted 17.1.0. Please use a "
  823. "combination of insecurelyLowerMinimumTo, raiseMinimumTo, "
  824. "and lowerMaximumSecurityTo instead, as Twisted will "
  825. "correctly configure the method."
  826. )
  827. warnings = self.flushWarnings([self.test_methodIsDeprecated])
  828. self.assertEqual(1, len(warnings))
  829. self.assertEqual(DeprecationWarning, warnings[0]["category"])
  830. self.assertEqual(message, warnings[0]["message"])
  831. def test_tlsv12ByDefault(self):
  832. """
  833. L{sslverify.OpenSSLCertificateOptions} will make the default minimum
  834. TLS version v1.2, if no C{method}, or C{insecurelyLowerMinimumTo} is
  835. given.
  836. """
  837. opts = sslverify.OpenSSLCertificateOptions(
  838. privateKey=self.sKey, certificate=self.sCert
  839. )
  840. opts._contextFactory = FakeContext
  841. ctx = opts.getContext()
  842. options = (
  843. SSL.OP_NO_SSLv2
  844. | SSL.OP_NO_COMPRESSION
  845. | SSL.OP_CIPHER_SERVER_PREFERENCE
  846. | SSL.OP_NO_SSLv3
  847. | SSL.OP_NO_TLSv1
  848. | SSL.OP_NO_TLSv1_1
  849. )
  850. self.assertEqual(options, ctx._options & options)
  851. def test_tlsProtocolsAtLeastWithMinimum(self):
  852. """
  853. Passing C{insecurelyLowerMinimumTo} along with C{raiseMinimumTo} to
  854. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  855. exception.
  856. """
  857. with self.assertRaises(TypeError) as e:
  858. sslverify.OpenSSLCertificateOptions(
  859. privateKey=self.sKey,
  860. certificate=self.sCert,
  861. raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
  862. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  863. )
  864. self.assertIn("raiseMinimumTo", e.exception.args[0])
  865. self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0])
  866. self.assertIn("exclusive", e.exception.args[0])
  867. def test_tlsProtocolsNoMethodWithAtLeast(self):
  868. """
  869. Passing C{raiseMinimumTo} along with C{method} to
  870. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  871. exception.
  872. """
  873. with self.assertRaises(TypeError) as e:
  874. sslverify.OpenSSLCertificateOptions(
  875. privateKey=self.sKey,
  876. certificate=self.sCert,
  877. method=SSL.SSLv23_METHOD,
  878. raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
  879. )
  880. self.assertIn("method", e.exception.args[0])
  881. self.assertIn("raiseMinimumTo", e.exception.args[0])
  882. self.assertIn("exclusive", e.exception.args[0])
  883. def test_tlsProtocolsNoMethodWithMinimum(self):
  884. """
  885. Passing C{insecurelyLowerMinimumTo} along with C{method} to
  886. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  887. exception.
  888. """
  889. with self.assertRaises(TypeError) as e:
  890. sslverify.OpenSSLCertificateOptions(
  891. privateKey=self.sKey,
  892. certificate=self.sCert,
  893. method=SSL.SSLv23_METHOD,
  894. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  895. )
  896. self.assertIn("method", e.exception.args[0])
  897. self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0])
  898. self.assertIn("exclusive", e.exception.args[0])
  899. def test_tlsProtocolsNoMethodWithMaximum(self):
  900. """
  901. Passing C{lowerMaximumSecurityTo} along with C{method} to
  902. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  903. exception.
  904. """
  905. with self.assertRaises(TypeError) as e:
  906. sslverify.OpenSSLCertificateOptions(
  907. privateKey=self.sKey,
  908. certificate=self.sCert,
  909. method=SSL.TLS_METHOD,
  910. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
  911. )
  912. self.assertIn("method", e.exception.args[0])
  913. self.assertIn("lowerMaximumSecurityTo", e.exception.args[0])
  914. self.assertIn("exclusive", e.exception.args[0])
  915. def test_tlsVersionRangeInOrder(self):
  916. """
  917. Passing out of order TLS versions to C{insecurelyLowerMinimumTo} and
  918. C{lowerMaximumSecurityTo} will cause it to raise an exception.
  919. """
  920. with self.assertRaises(ValueError) as e:
  921. sslverify.OpenSSLCertificateOptions(
  922. privateKey=self.sKey,
  923. certificate=self.sCert,
  924. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
  925. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
  926. )
  927. self.assertEqual(
  928. e.exception.args,
  929. (
  930. (
  931. "insecurelyLowerMinimumTo needs to be lower than "
  932. "lowerMaximumSecurityTo"
  933. ),
  934. ),
  935. )
  936. def test_tlsVersionRangeInOrderAtLeast(self):
  937. """
  938. Passing out of order TLS versions to C{raiseMinimumTo} and
  939. C{lowerMaximumSecurityTo} will cause it to raise an exception.
  940. """
  941. with self.assertRaises(ValueError) as e:
  942. sslverify.OpenSSLCertificateOptions(
  943. privateKey=self.sKey,
  944. certificate=self.sCert,
  945. raiseMinimumTo=sslverify.TLSVersion.TLSv1_0,
  946. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
  947. )
  948. self.assertEqual(
  949. e.exception.args,
  950. (("raiseMinimumTo needs to be lower than " "lowerMaximumSecurityTo"),),
  951. )
  952. def test_tlsProtocolsreduceToMaxWithoutMin(self):
  953. """
  954. When calling L{sslverify.OpenSSLCertificateOptions} with
  955. C{lowerMaximumSecurityTo} but no C{raiseMinimumTo} or
  956. C{insecurelyLowerMinimumTo} set, and C{lowerMaximumSecurityTo} is
  957. below the minimum default, the minimum will be made the new maximum.
  958. """
  959. opts = sslverify.OpenSSLCertificateOptions(
  960. privateKey=self.sKey,
  961. certificate=self.sCert,
  962. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
  963. )
  964. opts._contextFactory = FakeContext
  965. ctx = opts.getContext()
  966. options = (
  967. SSL.OP_NO_SSLv2
  968. | SSL.OP_NO_COMPRESSION
  969. | SSL.OP_CIPHER_SERVER_PREFERENCE
  970. | SSL.OP_NO_TLSv1
  971. | SSL.OP_NO_TLSv1_1
  972. | SSL.OP_NO_TLSv1_2
  973. | opts._OP_NO_TLSv1_3
  974. )
  975. self.assertEqual(options, ctx._options & options)
  976. def test_tlsProtocolsSSLv3Only(self):
  977. """
  978. When calling L{sslverify.OpenSSLCertificateOptions} with
  979. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to
  980. SSLv3, it will exclude all others.
  981. """
  982. opts = sslverify.OpenSSLCertificateOptions(
  983. privateKey=self.sKey,
  984. certificate=self.sCert,
  985. insecurelyLowerMinimumTo=sslverify.TLSVersion.SSLv3,
  986. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
  987. )
  988. opts._contextFactory = FakeContext
  989. ctx = opts.getContext()
  990. options = (
  991. SSL.OP_NO_SSLv2
  992. | SSL.OP_NO_COMPRESSION
  993. | SSL.OP_CIPHER_SERVER_PREFERENCE
  994. | SSL.OP_NO_TLSv1
  995. | SSL.OP_NO_TLSv1_1
  996. | SSL.OP_NO_TLSv1_2
  997. | opts._OP_NO_TLSv1_3
  998. )
  999. self.assertEqual(options, ctx._options & options)
  1000. def test_tlsProtocolsTLSv1Point0Only(self):
  1001. """
  1002. When calling L{sslverify.OpenSSLCertificateOptions} with
  1003. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.0,
  1004. it will exclude all others.
  1005. """
  1006. opts = sslverify.OpenSSLCertificateOptions(
  1007. privateKey=self.sKey,
  1008. certificate=self.sCert,
  1009. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
  1010. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_0,
  1011. )
  1012. opts._contextFactory = FakeContext
  1013. ctx = opts.getContext()
  1014. options = (
  1015. SSL.OP_NO_SSLv2
  1016. | SSL.OP_NO_COMPRESSION
  1017. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1018. | SSL.OP_NO_SSLv3
  1019. | SSL.OP_NO_TLSv1_1
  1020. | SSL.OP_NO_TLSv1_2
  1021. | opts._OP_NO_TLSv1_3
  1022. )
  1023. self.assertEqual(options, ctx._options & options)
  1024. def test_tlsProtocolsTLSv1Point1Only(self):
  1025. """
  1026. When calling L{sslverify.OpenSSLCertificateOptions} with
  1027. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.1,
  1028. it will exclude all others.
  1029. """
  1030. opts = sslverify.OpenSSLCertificateOptions(
  1031. privateKey=self.sKey,
  1032. certificate=self.sCert,
  1033. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_1,
  1034. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_1,
  1035. )
  1036. opts._contextFactory = FakeContext
  1037. ctx = opts.getContext()
  1038. options = (
  1039. SSL.OP_NO_SSLv2
  1040. | SSL.OP_NO_COMPRESSION
  1041. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1042. | SSL.OP_NO_SSLv3
  1043. | SSL.OP_NO_TLSv1
  1044. | SSL.OP_NO_TLSv1_2
  1045. | opts._OP_NO_TLSv1_3
  1046. )
  1047. self.assertEqual(options, ctx._options & options)
  1048. def test_tlsProtocolsTLSv1Point2Only(self):
  1049. """
  1050. When calling L{sslverify.OpenSSLCertificateOptions} with
  1051. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.2,
  1052. it will exclude all others.
  1053. """
  1054. opts = sslverify.OpenSSLCertificateOptions(
  1055. privateKey=self.sKey,
  1056. certificate=self.sCert,
  1057. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  1058. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
  1059. )
  1060. opts._contextFactory = FakeContext
  1061. ctx = opts.getContext()
  1062. options = (
  1063. SSL.OP_NO_SSLv2
  1064. | SSL.OP_NO_COMPRESSION
  1065. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1066. | SSL.OP_NO_SSLv3
  1067. | SSL.OP_NO_TLSv1
  1068. | SSL.OP_NO_TLSv1_1
  1069. | opts._OP_NO_TLSv1_3
  1070. )
  1071. self.assertEqual(options, ctx._options & options)
  1072. def test_tlsProtocolsAllModernTLS(self):
  1073. """
  1074. When calling L{sslverify.OpenSSLCertificateOptions} with
  1075. C{insecurelyLowerMinimumTo} set to TLSv1.0 and
  1076. C{lowerMaximumSecurityTo} to TLSv1.2, it will exclude both SSLs and
  1077. the (unreleased) TLSv1.3.
  1078. """
  1079. opts = sslverify.OpenSSLCertificateOptions(
  1080. privateKey=self.sKey,
  1081. certificate=self.sCert,
  1082. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
  1083. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
  1084. )
  1085. opts._contextFactory = FakeContext
  1086. ctx = opts.getContext()
  1087. options = (
  1088. SSL.OP_NO_SSLv2
  1089. | SSL.OP_NO_COMPRESSION
  1090. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1091. | SSL.OP_NO_SSLv3
  1092. | opts._OP_NO_TLSv1_3
  1093. )
  1094. self.assertEqual(options, ctx._options & options)
  1095. def test_tlsProtocolsAtLeastAllSecureTLS(self):
  1096. """
  1097. When calling L{sslverify.OpenSSLCertificateOptions} with
  1098. C{raiseMinimumTo} set to TLSv1.2, it will ignore all TLSs below
  1099. 1.2 and SSL.
  1100. """
  1101. opts = sslverify.OpenSSLCertificateOptions(
  1102. privateKey=self.sKey,
  1103. certificate=self.sCert,
  1104. raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
  1105. )
  1106. opts._contextFactory = FakeContext
  1107. ctx = opts.getContext()
  1108. options = (
  1109. SSL.OP_NO_SSLv2
  1110. | SSL.OP_NO_COMPRESSION
  1111. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1112. | SSL.OP_NO_SSLv3
  1113. | SSL.OP_NO_TLSv1
  1114. | SSL.OP_NO_TLSv1_1
  1115. )
  1116. self.assertEqual(options, ctx._options & options)
  1117. def test_tlsProtocolsAtLeastWillAcceptHigherDefault(self):
  1118. """
  1119. When calling L{sslverify.OpenSSLCertificateOptions} with
  1120. C{raiseMinimumTo} set to a value lower than Twisted's default will
  1121. cause it to use the more secure default.
  1122. """
  1123. opts = sslverify.OpenSSLCertificateOptions(
  1124. privateKey=self.sKey,
  1125. certificate=self.sCert,
  1126. raiseMinimumTo=sslverify.TLSVersion.SSLv3,
  1127. )
  1128. opts._contextFactory = FakeContext
  1129. ctx = opts.getContext()
  1130. # Future maintainer warning: this will break if we change our default
  1131. # up, so you should change it to add the relevant OP_NO flags when we
  1132. # do make that change and this test fails.
  1133. options = (
  1134. SSL.OP_NO_SSLv2
  1135. | SSL.OP_NO_COMPRESSION
  1136. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1137. | SSL.OP_NO_SSLv3
  1138. | SSL.OP_NO_TLSv1
  1139. | SSL.OP_NO_TLSv1_1
  1140. )
  1141. self.assertEqual(options, ctx._options & options)
  1142. self.assertEqual(opts._defaultMinimumTLSVersion, sslverify.TLSVersion.TLSv1_2)
  1143. def test_tlsProtocolsAllSecureTLS(self):
  1144. """
  1145. When calling L{sslverify.OpenSSLCertificateOptions} with
  1146. C{insecurelyLowerMinimumTo} set to TLSv1.2, it will ignore all TLSs below
  1147. 1.2 and SSL.
  1148. """
  1149. opts = sslverify.OpenSSLCertificateOptions(
  1150. privateKey=self.sKey,
  1151. certificate=self.sCert,
  1152. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  1153. )
  1154. opts._contextFactory = FakeContext
  1155. ctx = opts.getContext()
  1156. options = (
  1157. SSL.OP_NO_SSLv2
  1158. | SSL.OP_NO_COMPRESSION
  1159. | SSL.OP_CIPHER_SERVER_PREFERENCE
  1160. | SSL.OP_NO_SSLv3
  1161. | SSL.OP_NO_TLSv1
  1162. | SSL.OP_NO_TLSv1_1
  1163. )
  1164. self.assertEqual(options, ctx._options & options)
  1165. def test_dhParams(self):
  1166. """
  1167. If C{dhParams} is set, they are loaded into each new context.
  1168. """
  1169. class FakeDiffieHellmanParameters:
  1170. _dhFile = FilePath(b"dh.params")
  1171. dhParams = FakeDiffieHellmanParameters()
  1172. opts = sslverify.OpenSSLCertificateOptions(
  1173. privateKey=self.sKey,
  1174. certificate=self.sCert,
  1175. dhParameters=dhParams,
  1176. )
  1177. opts._contextFactory = FakeContext
  1178. ctx = opts.getContext()
  1179. self.assertEqual(FakeDiffieHellmanParameters._dhFile.path, ctx._dhFilename)
  1180. def test_abbreviatingDistinguishedNames(self):
  1181. """
  1182. Check that abbreviations used in certificates correctly map to
  1183. complete names.
  1184. """
  1185. self.assertEqual(
  1186. sslverify.DN(CN=b"a", OU=b"hello"),
  1187. sslverify.DistinguishedName(
  1188. commonName=b"a", organizationalUnitName=b"hello"
  1189. ),
  1190. )
  1191. self.assertNotEqual(
  1192. sslverify.DN(CN=b"a", OU=b"hello"),
  1193. sslverify.DN(CN=b"a", OU=b"hello", emailAddress=b"xxx"),
  1194. )
  1195. dn = sslverify.DN(CN=b"abcdefg")
  1196. self.assertRaises(AttributeError, setattr, dn, "Cn", b"x")
  1197. self.assertEqual(dn.CN, dn.commonName)
  1198. dn.CN = b"bcdefga"
  1199. self.assertEqual(dn.CN, dn.commonName)
  1200. def testInspectDistinguishedName(self):
  1201. n = sslverify.DN(
  1202. commonName=b"common name",
  1203. organizationName=b"organization name",
  1204. organizationalUnitName=b"organizational unit name",
  1205. localityName=b"locality name",
  1206. stateOrProvinceName=b"state or province name",
  1207. countryName=b"country name",
  1208. emailAddress=b"email address",
  1209. )
  1210. s = n.inspect()
  1211. for k in [
  1212. "common name",
  1213. "organization name",
  1214. "organizational unit name",
  1215. "locality name",
  1216. "state or province name",
  1217. "country name",
  1218. "email address",
  1219. ]:
  1220. self.assertIn(k, s, f"{k!r} was not in inspect output.")
  1221. self.assertIn(k.title(), s, f"{k!r} was not in inspect output.")
  1222. def testInspectDistinguishedNameWithoutAllFields(self):
  1223. n = sslverify.DN(localityName=b"locality name")
  1224. s = n.inspect()
  1225. for k in [
  1226. "common name",
  1227. "organization name",
  1228. "organizational unit name",
  1229. "state or province name",
  1230. "country name",
  1231. "email address",
  1232. ]:
  1233. self.assertNotIn(k, s, f"{k!r} was in inspect output.")
  1234. self.assertNotIn(k.title(), s, f"{k!r} was in inspect output.")
  1235. self.assertIn("locality name", s)
  1236. self.assertIn("Locality Name", s)
  1237. def test_inspectCertificate(self):
  1238. """
  1239. Test that the C{inspect} method of L{sslverify.Certificate} returns
  1240. a human-readable string containing some basic information about the
  1241. certificate.
  1242. """
  1243. c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1244. pk = c.getPublicKey()
  1245. keyHash = pk.keyHash()
  1246. # Maintenance Note: the algorithm used to compute the "public key hash"
  1247. # is highly dubious and can differ between underlying versions of
  1248. # OpenSSL (and across versions of Twisted), since it is not actually
  1249. # the hash of the public key by itself. If we can get the appropriate
  1250. # APIs to get the hash of the key itself out of OpenSSL, then we should
  1251. # be able to make it statically declared inline below again rather than
  1252. # computing it here.
  1253. self.assertEqual(
  1254. c.inspect().split("\n"),
  1255. [
  1256. "Certificate For Subject:",
  1257. " Common Name: example.twistedmatrix.com",
  1258. " Country Name: US",
  1259. " Email Address: nobody@twistedmatrix.com",
  1260. " Locality Name: Boston",
  1261. " Organization Name: Twisted Matrix Labs",
  1262. " Organizational Unit Name: Security",
  1263. " State Or Province Name: Massachusetts",
  1264. "",
  1265. "Issuer:",
  1266. " Common Name: example.twistedmatrix.com",
  1267. " Country Name: US",
  1268. " Email Address: nobody@twistedmatrix.com",
  1269. " Locality Name: Boston",
  1270. " Organization Name: Twisted Matrix Labs",
  1271. " Organizational Unit Name: Security",
  1272. " State Or Province Name: Massachusetts",
  1273. "",
  1274. "Serial Number: 12345",
  1275. "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
  1276. "Public Key with Hash: " + keyHash,
  1277. ],
  1278. )
  1279. def test_publicKeyMatching(self):
  1280. """
  1281. L{PublicKey.matches} returns L{True} for keys from certificates with
  1282. the same key, and L{False} for keys from certificates with different
  1283. keys.
  1284. """
  1285. hostA = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1286. hostB = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1287. peerA = sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM)
  1288. self.assertTrue(hostA.getPublicKey().matches(hostB.getPublicKey()))
  1289. self.assertFalse(peerA.getPublicKey().matches(hostA.getPublicKey()))
  1290. def test_enablingAndDisablingSessions(self):
  1291. """
  1292. The enableSessions argument sets the session cache mode; it defaults to
  1293. False (at least until https://twistedmatrix.com/trac/ticket/9764 can be
  1294. resolved).
  1295. """
  1296. options = sslverify.OpenSSLCertificateOptions()
  1297. self.assertEqual(options.enableSessions, False)
  1298. ctx = options.getContext()
  1299. self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_OFF)
  1300. options = sslverify.OpenSSLCertificateOptions(enableSessions=True)
  1301. self.assertEqual(options.enableSessions, True)
  1302. ctx = options.getContext()
  1303. self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_SERVER)
  1304. def test_certificateOptionsSerialization(self):
  1305. """
  1306. Test that __setstate__(__getstate__()) round-trips properly.
  1307. """
  1308. firstOpts = sslverify.OpenSSLCertificateOptions(
  1309. privateKey=self.sKey,
  1310. certificate=self.sCert,
  1311. method=SSL.SSLv23_METHOD,
  1312. verify=True,
  1313. caCerts=[self.sCert],
  1314. verifyDepth=2,
  1315. requireCertificate=False,
  1316. verifyOnce=False,
  1317. enableSingleUseKeys=False,
  1318. enableSessions=False,
  1319. fixBrokenPeers=True,
  1320. enableSessionTickets=True,
  1321. )
  1322. context = firstOpts.getContext()
  1323. self.assertIs(context, firstOpts._context)
  1324. self.assertIsNotNone(context)
  1325. state = firstOpts.__getstate__()
  1326. self.assertNotIn("_context", state)
  1327. opts = sslverify.OpenSSLCertificateOptions()
  1328. opts.__setstate__(state)
  1329. self.assertEqual(opts.privateKey, self.sKey)
  1330. self.assertEqual(opts.certificate, self.sCert)
  1331. self.assertEqual(opts.method, SSL.SSLv23_METHOD)
  1332. self.assertTrue(opts.verify)
  1333. self.assertEqual(opts.caCerts, [self.sCert])
  1334. self.assertEqual(opts.verifyDepth, 2)
  1335. self.assertFalse(opts.requireCertificate)
  1336. self.assertFalse(opts.verifyOnce)
  1337. self.assertFalse(opts.enableSingleUseKeys)
  1338. self.assertFalse(opts.enableSessions)
  1339. self.assertTrue(opts.fixBrokenPeers)
  1340. self.assertTrue(opts.enableSessionTickets)
  1341. test_certificateOptionsSerialization.suppress = [ # type: ignore[attr-defined]
  1342. util.suppress(
  1343. category=DeprecationWarning,
  1344. message=r"twisted\.internet\._sslverify\.*__[gs]etstate__",
  1345. )
  1346. ]
  1347. def test_certificateOptionsSessionTickets(self):
  1348. """
  1349. Enabling session tickets should not set the OP_NO_TICKET option.
  1350. """
  1351. opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True)
  1352. ctx = opts.getContext()
  1353. self.assertEqual(0, ctx.set_options(0) & 0x00004000)
  1354. def test_certificateOptionsSessionTicketsDisabled(self):
  1355. """
  1356. Enabling session tickets should set the OP_NO_TICKET option.
  1357. """
  1358. opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False)
  1359. ctx = opts.getContext()
  1360. self.assertEqual(0x00004000, ctx.set_options(0) & 0x00004000)
  1361. def test_allowedAnonymousClientConnection(self):
  1362. """
  1363. Check that anonymous connections are allowed when certificates aren't
  1364. required on the server.
  1365. """
  1366. onData = defer.Deferred()
  1367. self.loopback(
  1368. sslverify.OpenSSLCertificateOptions(
  1369. privateKey=self.sKey, certificate=self.sCert, requireCertificate=False
  1370. ),
  1371. sslverify.OpenSSLCertificateOptions(requireCertificate=False),
  1372. onData=onData,
  1373. )
  1374. return onData.addCallback(
  1375. lambda result: self.assertEqual(result, WritingProtocol.byte)
  1376. )
  1377. def test_refusedAnonymousClientConnection(self):
  1378. """
  1379. Check that anonymous connections are refused when certificates are
  1380. required on the server.
  1381. """
  1382. onServerLost = defer.Deferred()
  1383. onClientLost = defer.Deferred()
  1384. self.loopback(
  1385. sslverify.OpenSSLCertificateOptions(
  1386. privateKey=self.sKey,
  1387. certificate=self.sCert,
  1388. verify=True,
  1389. caCerts=[self.sCert],
  1390. requireCertificate=True,
  1391. ),
  1392. sslverify.OpenSSLCertificateOptions(requireCertificate=False),
  1393. onServerLost=onServerLost,
  1394. onClientLost=onClientLost,
  1395. )
  1396. d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True)
  1397. def afterLost(result):
  1398. ((cSuccess, cResult), (sSuccess, sResult)) = result
  1399. self.assertFalse(cSuccess)
  1400. self.assertFalse(sSuccess)
  1401. # Win32 fails to report the SSL Error, and report a connection lost
  1402. # instead: there is a race condition so that's not totally
  1403. # surprising (see ticket #2877 in the tracker)
  1404. self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost))
  1405. self.assertIsInstance(sResult.value, SSL.Error)
  1406. return d.addCallback(afterLost)
  1407. def test_failedCertificateVerification(self):
  1408. """
  1409. Check that connecting with a certificate not accepted by the server CA
  1410. fails.
  1411. """
  1412. onServerLost = defer.Deferred()
  1413. onClientLost = defer.Deferred()
  1414. self.loopback(
  1415. sslverify.OpenSSLCertificateOptions(
  1416. privateKey=self.sKey,
  1417. certificate=self.sCert,
  1418. verify=False,
  1419. requireCertificate=False,
  1420. ),
  1421. sslverify.OpenSSLCertificateOptions(
  1422. verify=True, requireCertificate=False, caCerts=[self.cCert]
  1423. ),
  1424. onServerLost=onServerLost,
  1425. onClientLost=onClientLost,
  1426. )
  1427. d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True)
  1428. def afterLost(result):
  1429. ((cSuccess, cResult), (sSuccess, sResult)) = result
  1430. self.assertFalse(cSuccess)
  1431. self.assertFalse(sSuccess)
  1432. return d.addCallback(afterLost)
  1433. def test_successfulCertificateVerification(self):
  1434. """
  1435. Test a successful connection with client certificate validation on
  1436. server side.
  1437. """
  1438. onData = defer.Deferred()
  1439. self.loopback(
  1440. sslverify.OpenSSLCertificateOptions(
  1441. privateKey=self.sKey,
  1442. certificate=self.sCert,
  1443. verify=False,
  1444. requireCertificate=False,
  1445. ),
  1446. sslverify.OpenSSLCertificateOptions(
  1447. verify=True, requireCertificate=True, caCerts=[self.sCert]
  1448. ),
  1449. onData=onData,
  1450. )
  1451. return onData.addCallback(
  1452. lambda result: self.assertEqual(result, WritingProtocol.byte)
  1453. )
  1454. def test_successfulSymmetricSelfSignedCertificateVerification(self):
  1455. """
  1456. Test a successful connection with validation on both server and client
  1457. sides.
  1458. """
  1459. onData = defer.Deferred()
  1460. self.loopback(
  1461. sslverify.OpenSSLCertificateOptions(
  1462. privateKey=self.sKey,
  1463. certificate=self.sCert,
  1464. verify=True,
  1465. requireCertificate=True,
  1466. caCerts=[self.cCert],
  1467. ),
  1468. sslverify.OpenSSLCertificateOptions(
  1469. privateKey=self.cKey,
  1470. certificate=self.cCert,
  1471. verify=True,
  1472. requireCertificate=True,
  1473. caCerts=[self.sCert],
  1474. ),
  1475. onData=onData,
  1476. )
  1477. return onData.addCallback(
  1478. lambda result: self.assertEqual(result, WritingProtocol.byte)
  1479. )
  1480. def test_verification(self):
  1481. """
  1482. Check certificates verification building custom certificates data.
  1483. """
  1484. clientDN = sslverify.DistinguishedName(commonName="client")
  1485. clientKey = sslverify.KeyPair.generate()
  1486. clientCertReq = clientKey.certificateRequest(clientDN)
  1487. serverDN = sslverify.DistinguishedName(commonName="server")
  1488. serverKey = sslverify.KeyPair.generate()
  1489. serverCertReq = serverKey.certificateRequest(serverDN)
  1490. clientSelfCertReq = clientKey.certificateRequest(clientDN)
  1491. clientSelfCertData = clientKey.signCertificateRequest(
  1492. clientDN, clientSelfCertReq, lambda dn: True, 132
  1493. )
  1494. clientSelfCert = clientKey.newCertificate(clientSelfCertData)
  1495. serverSelfCertReq = serverKey.certificateRequest(serverDN)
  1496. serverSelfCertData = serverKey.signCertificateRequest(
  1497. serverDN, serverSelfCertReq, lambda dn: True, 516
  1498. )
  1499. serverSelfCert = serverKey.newCertificate(serverSelfCertData)
  1500. clientCertData = serverKey.signCertificateRequest(
  1501. serverDN, clientCertReq, lambda dn: True, 7
  1502. )
  1503. clientCert = clientKey.newCertificate(clientCertData)
  1504. serverCertData = clientKey.signCertificateRequest(
  1505. clientDN, serverCertReq, lambda dn: True, 42
  1506. )
  1507. serverCert = serverKey.newCertificate(serverCertData)
  1508. onData = defer.Deferred()
  1509. serverOpts = serverCert.options(serverSelfCert)
  1510. clientOpts = clientCert.options(clientSelfCert)
  1511. self.loopback(serverOpts, clientOpts, onData=onData)
  1512. return onData.addCallback(
  1513. lambda result: self.assertEqual(result, WritingProtocol.byte)
  1514. )
  1515. class OpenSSLOptionsECDHIntegrationTests(OpenSSLOptionsTestsMixin, TestCase):
  1516. """
  1517. ECDH-related integration tests for L{OpenSSLOptions}.
  1518. """
  1519. def test_ellipticCurveDiffieHellman(self):
  1520. """
  1521. Connections use ECDH when OpenSSL supports it.
  1522. """
  1523. if not get_elliptic_curves():
  1524. raise SkipTest("OpenSSL does not support ECDH.")
  1525. onData = defer.Deferred()
  1526. # TLS 1.3 cipher suites do not specify the key exchange
  1527. # mechanism:
  1528. # https://wiki.openssl.org/index.php/TLS1.3#Differences_with_TLS1.2_and_below
  1529. #
  1530. # and OpenSSL only supports ECHDE groups with TLS 1.3:
  1531. # https://wiki.openssl.org/index.php/TLS1.3#Groups
  1532. #
  1533. # so TLS 1.3 implies ECDHE. Force this test to use TLS 1.3 to
  1534. # ensure ECDH is selected when it might not be.
  1535. self.loopback(
  1536. sslverify.OpenSSLCertificateOptions(
  1537. privateKey=self.sKey,
  1538. certificate=self.sCert,
  1539. requireCertificate=False,
  1540. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3,
  1541. ),
  1542. sslverify.OpenSSLCertificateOptions(
  1543. requireCertificate=False,
  1544. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3,
  1545. ),
  1546. onData=onData,
  1547. )
  1548. @onData.addCallback
  1549. def assertECDH(_):
  1550. self.assertEqual(len(self.clientConn.factory.protocols), 1)
  1551. [clientProtocol] = self.clientConn.factory.protocols
  1552. cipher = clientProtocol.getHandle().get_cipher_name()
  1553. self.assertIn("ECDH", cipher)
  1554. return onData
  1555. class DeprecationTests(SynchronousTestCase):
  1556. """
  1557. Tests for deprecation of L{sslverify.OpenSSLCertificateOptions}'s support
  1558. of the pickle protocol.
  1559. """
  1560. if skipSSL:
  1561. skip = skipSSL
  1562. def test_getstateDeprecation(self):
  1563. """
  1564. L{sslverify.OpenSSLCertificateOptions.__getstate__} is deprecated.
  1565. """
  1566. self.callDeprecated(
  1567. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  1568. sslverify.OpenSSLCertificateOptions().__getstate__,
  1569. )
  1570. def test_setstateDeprecation(self):
  1571. """
  1572. L{sslverify.OpenSSLCertificateOptions.__setstate__} is deprecated.
  1573. """
  1574. self.callDeprecated(
  1575. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  1576. sslverify.OpenSSLCertificateOptions().__setstate__,
  1577. {},
  1578. )
  1579. class TrustRootTests(TestCase):
  1580. """
  1581. Tests for L{sslverify.OpenSSLCertificateOptions}' C{trustRoot} argument,
  1582. L{sslverify.platformTrust}, and their interactions.
  1583. """
  1584. if skipSSL:
  1585. skip = skipSSL
  1586. def setUp(self):
  1587. """
  1588. Patch L{sslverify._ChooseDiffieHellmanEllipticCurve}.
  1589. """
  1590. self.patch(
  1591. sslverify,
  1592. "_ChooseDiffieHellmanEllipticCurve",
  1593. FakeChooseDiffieHellmanEllipticCurve,
  1594. )
  1595. def test_caCertsPlatformDefaults(self):
  1596. """
  1597. Specifying a C{trustRoot} of L{sslverify.OpenSSLDefaultPaths} when
  1598. initializing L{sslverify.OpenSSLCertificateOptions} loads the
  1599. platform-provided trusted certificates via C{set_default_verify_paths}.
  1600. """
  1601. opts = sslverify.OpenSSLCertificateOptions(
  1602. trustRoot=sslverify.OpenSSLDefaultPaths(),
  1603. )
  1604. fc = FakeContext(SSL.TLSv1_METHOD)
  1605. opts._contextFactory = lambda method: fc
  1606. opts.getContext()
  1607. self.assertTrue(fc._defaultVerifyPathsSet)
  1608. def test_trustRootPlatformRejectsUntrustedCA(self):
  1609. """
  1610. Specifying a C{trustRoot} of L{platformTrust} when initializing
  1611. L{sslverify.OpenSSLCertificateOptions} causes certificates issued by a
  1612. newly created CA to be rejected by an SSL connection using these
  1613. options.
  1614. Note that this test should I{always} pass, even on platforms where the
  1615. CA certificates are not installed, as long as L{platformTrust} rejects
  1616. completely invalid / unknown root CA certificates. This is simply a
  1617. smoke test to make sure that verification is happening at all.
  1618. """
  1619. caSelfCert, serverCert = certificatesForAuthorityAndServer()
  1620. chainedCert = pathContainingDumpOf(self, serverCert, caSelfCert)
  1621. privateKey = pathContainingDumpOf(self, serverCert.privateKey)
  1622. sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection(
  1623. trustRoot=platformTrust(),
  1624. privateKeyFile=privateKey,
  1625. chainedCertFile=chainedCert,
  1626. )
  1627. # No data was received.
  1628. self.assertEqual(cWrapped.data, b"")
  1629. # It was an L{SSL.Error}.
  1630. self.assertEqual(cWrapped.lostReason.type, SSL.Error)
  1631. # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors.
  1632. err = cWrapped.lostReason.value
  1633. self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca")
  1634. def test_trustRootSpecificCertificate(self):
  1635. """
  1636. Specifying a L{Certificate} object for L{trustRoot} will result in that
  1637. certificate being the only trust root for a client.
  1638. """
  1639. caCert, serverCert = certificatesForAuthorityAndServer()
  1640. otherCa, otherServer = certificatesForAuthorityAndServer()
  1641. sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection(
  1642. trustRoot=caCert,
  1643. privateKeyFile=pathContainingDumpOf(self, serverCert.privateKey),
  1644. chainedCertFile=pathContainingDumpOf(self, serverCert),
  1645. )
  1646. pump.flush()
  1647. self.assertIsNone(cWrapped.lostReason)
  1648. self.assertEqual(cWrapped.data, sWrapped.greeting)
  1649. class ServiceIdentityTests(SynchronousTestCase):
  1650. """
  1651. Tests for the verification of the peer's service's identity via the
  1652. C{hostname} argument to L{sslverify.OpenSSLCertificateOptions}.
  1653. """
  1654. if skipSSL:
  1655. skip = skipSSL
  1656. def serviceIdentitySetup(
  1657. self,
  1658. clientHostname,
  1659. serverHostname,
  1660. serverContextSetup=lambda ctx: None,
  1661. validCertificate=True,
  1662. clientPresentsCertificate=False,
  1663. validClientCertificate=True,
  1664. serverVerifies=False,
  1665. buggyInfoCallback=False,
  1666. fakePlatformTrust=False,
  1667. useDefaultTrust=False,
  1668. ):
  1669. """
  1670. Connect a server and a client.
  1671. @param clientHostname: The I{client's idea} of the server's hostname;
  1672. passed as the C{hostname} to the
  1673. L{sslverify.OpenSSLCertificateOptions} instance.
  1674. @type clientHostname: L{unicode}
  1675. @param serverHostname: The I{server's own idea} of the server's
  1676. hostname; present in the certificate presented by the server.
  1677. @type serverHostname: L{unicode}
  1678. @param serverContextSetup: a 1-argument callable invoked with the
  1679. L{OpenSSL.SSL.Context} after it's produced.
  1680. @type serverContextSetup: L{callable} taking L{OpenSSL.SSL.Context}
  1681. returning L{None}.
  1682. @param validCertificate: Is the server's certificate valid? L{True} if
  1683. so, L{False} otherwise.
  1684. @type validCertificate: L{bool}
  1685. @param clientPresentsCertificate: Should the client present a
  1686. certificate to the server? Defaults to 'no'.
  1687. @type clientPresentsCertificate: L{bool}
  1688. @param validClientCertificate: If the client presents a certificate,
  1689. should it actually be a valid one, i.e. signed by the same CA that
  1690. the server is checking? Defaults to 'yes'.
  1691. @type validClientCertificate: L{bool}
  1692. @param serverVerifies: Should the server verify the client's
  1693. certificate? Defaults to 'no'.
  1694. @type serverVerifies: L{bool}
  1695. @param buggyInfoCallback: Should we patch the implementation so that
  1696. the C{info_callback} passed to OpenSSL to have a bug and raise an
  1697. exception (L{ZeroDivisionError})? Defaults to 'no'.
  1698. @type buggyInfoCallback: L{bool}
  1699. @param fakePlatformTrust: Should we fake the platformTrust to be the
  1700. same as our fake server certificate authority, so that we can test
  1701. it's being used? Defaults to 'no' and we just pass platform trust.
  1702. @type fakePlatformTrust: L{bool}
  1703. @param useDefaultTrust: Should we avoid passing the C{trustRoot} to
  1704. L{ssl.optionsForClientTLS}? Defaults to 'no'.
  1705. @type useDefaultTrust: L{bool}
  1706. @return: the client TLS protocol, the client wrapped protocol,
  1707. the server TLS protocol, the server wrapped protocol and
  1708. an L{IOPump} which, when its C{pump} and C{flush} methods are
  1709. called, will move data between the created client and server
  1710. protocol instances
  1711. @rtype: 5-L{tuple} of 4 L{IProtocol}s and L{IOPump}
  1712. """
  1713. serverCA, serverCert = certificatesForAuthorityAndServer(serverHostname)
  1714. other = {}
  1715. passClientCert = None
  1716. clientCA, clientCert = certificatesForAuthorityAndServer("client")
  1717. if serverVerifies:
  1718. other.update(trustRoot=clientCA)
  1719. if clientPresentsCertificate:
  1720. if validClientCertificate:
  1721. passClientCert = clientCert
  1722. else:
  1723. bogusCA, bogus = certificatesForAuthorityAndServer("client")
  1724. passClientCert = bogus
  1725. serverOpts = sslverify.OpenSSLCertificateOptions(
  1726. privateKey=serverCert.privateKey.original,
  1727. certificate=serverCert.original,
  1728. **other,
  1729. )
  1730. serverContextSetup(serverOpts.getContext())
  1731. if not validCertificate:
  1732. serverCA, otherServer = certificatesForAuthorityAndServer(serverHostname)
  1733. if buggyInfoCallback:
  1734. def broken(*a, **k):
  1735. """
  1736. Raise an exception.
  1737. @param a: Arguments for an C{info_callback}
  1738. @param k: Keyword arguments for an C{info_callback}
  1739. """
  1740. 1 / 0
  1741. self.patch(
  1742. sslverify.ClientTLSOptions,
  1743. "_identityVerifyingInfoCallback",
  1744. broken,
  1745. )
  1746. signature = {"hostname": clientHostname}
  1747. if passClientCert:
  1748. signature.update(clientCertificate=passClientCert)
  1749. if not useDefaultTrust:
  1750. signature.update(trustRoot=serverCA)
  1751. if fakePlatformTrust:
  1752. self.patch(sslverify, "platformTrust", lambda: serverCA)
  1753. clientOpts = sslverify.optionsForClientTLS(**signature)
  1754. class GreetingServer(protocol.Protocol):
  1755. greeting = b"greetings!"
  1756. lostReason = None
  1757. data = b""
  1758. def connectionMade(self):
  1759. self.transport.write(self.greeting)
  1760. def dataReceived(self, data):
  1761. self.data += data
  1762. def connectionLost(self, reason):
  1763. self.lostReason = reason
  1764. class GreetingClient(protocol.Protocol):
  1765. greeting = b"cheerio!"
  1766. data = b""
  1767. lostReason = None
  1768. def connectionMade(self):
  1769. self.transport.write(self.greeting)
  1770. def dataReceived(self, data):
  1771. self.data += data
  1772. def connectionLost(self, reason):
  1773. self.lostReason = reason
  1774. serverWrappedProto = GreetingServer()
  1775. clientWrappedProto = GreetingClient()
  1776. clientFactory = protocol.Factory()
  1777. clientFactory.protocol = lambda: clientWrappedProto
  1778. serverFactory = protocol.Factory()
  1779. serverFactory.protocol = lambda: serverWrappedProto
  1780. self.serverOpts = serverOpts
  1781. self.clientOpts = clientOpts
  1782. clientTLSFactory = TLSMemoryBIOFactory(
  1783. clientOpts, isClient=True, wrappedFactory=clientFactory
  1784. )
  1785. serverTLSFactory = TLSMemoryBIOFactory(
  1786. serverOpts, isClient=False, wrappedFactory=serverFactory
  1787. )
  1788. cProto, sProto, pump = connectedServerAndClient(
  1789. lambda: serverTLSFactory.buildProtocol(None),
  1790. lambda: clientTLSFactory.buildProtocol(None),
  1791. )
  1792. return cProto, sProto, clientWrappedProto, serverWrappedProto, pump
  1793. def test_invalidHostname(self):
  1794. """
  1795. When a certificate containing an invalid hostname is received from the
  1796. server, the connection is immediately dropped.
  1797. """
  1798. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1799. "wrong-host.example.com",
  1800. "correct-host.example.com",
  1801. )
  1802. self.assertEqual(cWrapped.data, b"")
  1803. self.assertEqual(sWrapped.data, b"")
  1804. cErr = cWrapped.lostReason.value
  1805. sErr = sWrapped.lostReason.value
  1806. self.assertIsInstance(cErr, VerificationError)
  1807. self.assertIsInstance(sErr, ConnectionClosed)
  1808. def test_validHostname(self):
  1809. """
  1810. Whenever a valid certificate containing a valid hostname is received,
  1811. connection proceeds normally.
  1812. """
  1813. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1814. "valid.example.com",
  1815. "valid.example.com",
  1816. )
  1817. self.assertEqual(cWrapped.data, b"greetings!")
  1818. cErr = cWrapped.lostReason
  1819. sErr = sWrapped.lostReason
  1820. self.assertIsNone(cErr)
  1821. self.assertIsNone(sErr)
  1822. def test_validHostnameInvalidCertificate(self):
  1823. """
  1824. When an invalid certificate containing a perfectly valid hostname is
  1825. received, the connection is aborted with an OpenSSL error.
  1826. """
  1827. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1828. "valid.example.com",
  1829. "valid.example.com",
  1830. validCertificate=False,
  1831. )
  1832. self.assertEqual(cWrapped.data, b"")
  1833. self.assertEqual(sWrapped.data, b"")
  1834. cErr = cWrapped.lostReason.value
  1835. sErr = sWrapped.lostReason.value
  1836. self.assertIsInstance(cErr, SSL.Error)
  1837. self.assertIsInstance(sErr, SSL.Error)
  1838. def test_realCAsBetterNotSignOurBogusTestCerts(self):
  1839. """
  1840. If we use the default trust from the platform, our dinky certificate
  1841. should I{really} fail.
  1842. """
  1843. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1844. "valid.example.com",
  1845. "valid.example.com",
  1846. validCertificate=False,
  1847. useDefaultTrust=True,
  1848. )
  1849. self.assertEqual(cWrapped.data, b"")
  1850. self.assertEqual(sWrapped.data, b"")
  1851. cErr = cWrapped.lostReason.value
  1852. sErr = sWrapped.lostReason.value
  1853. self.assertIsInstance(cErr, SSL.Error)
  1854. self.assertIsInstance(sErr, SSL.Error)
  1855. def test_butIfTheyDidItWouldWork(self):
  1856. """
  1857. L{ssl.optionsForClientTLS} should be using L{ssl.platformTrust} by
  1858. default, so if we fake that out then it should trust ourselves again.
  1859. """
  1860. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1861. "valid.example.com",
  1862. "valid.example.com",
  1863. useDefaultTrust=True,
  1864. fakePlatformTrust=True,
  1865. )
  1866. self.assertEqual(cWrapped.data, b"greetings!")
  1867. cErr = cWrapped.lostReason
  1868. sErr = sWrapped.lostReason
  1869. self.assertIsNone(cErr)
  1870. self.assertIsNone(sErr)
  1871. def test_clientPresentsCertificate(self):
  1872. """
  1873. When the server verifies and the client presents a valid certificate
  1874. for that verification by passing it to
  1875. L{sslverify.optionsForClientTLS}, communication proceeds.
  1876. """
  1877. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1878. "valid.example.com",
  1879. "valid.example.com",
  1880. validCertificate=True,
  1881. serverVerifies=True,
  1882. clientPresentsCertificate=True,
  1883. )
  1884. self.assertEqual(cWrapped.data, b"greetings!")
  1885. cErr = cWrapped.lostReason
  1886. sErr = sWrapped.lostReason
  1887. self.assertIsNone(cErr)
  1888. self.assertIsNone(sErr)
  1889. def test_clientPresentsBadCertificate(self):
  1890. """
  1891. When the server verifies and the client presents an invalid certificate
  1892. for that verification by passing it to
  1893. L{sslverify.optionsForClientTLS}, the connection cannot be established
  1894. with an SSL error.
  1895. """
  1896. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1897. "valid.example.com",
  1898. "valid.example.com",
  1899. validCertificate=True,
  1900. serverVerifies=True,
  1901. validClientCertificate=False,
  1902. clientPresentsCertificate=True,
  1903. )
  1904. self.assertEqual(cWrapped.data, b"")
  1905. cErr = cWrapped.lostReason.value
  1906. sErr = sWrapped.lostReason.value
  1907. self.assertIsInstance(cErr, SSL.Error)
  1908. self.assertIsInstance(sErr, SSL.Error)
  1909. @skipIf(skipSNI, skipSNI)
  1910. def test_hostnameIsIndicated(self):
  1911. """
  1912. Specifying the C{hostname} argument to L{CertificateOptions} also sets
  1913. the U{Server Name Extension
  1914. <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
  1915. field to the correct value.
  1916. """
  1917. names = []
  1918. def setupServerContext(ctx):
  1919. def servername_received(conn):
  1920. names.append(conn.get_servername().decode("ascii"))
  1921. ctx.set_tlsext_servername_callback(servername_received)
  1922. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1923. "valid.example.com", "valid.example.com", setupServerContext
  1924. )
  1925. self.assertEqual(names, ["valid.example.com"])
  1926. @skipIf(skipSNI, skipSNI)
  1927. def test_hostnameEncoding(self):
  1928. """
  1929. Hostnames are encoded as IDNA.
  1930. """
  1931. names = []
  1932. hello = "h\N{LATIN SMALL LETTER A WITH ACUTE}llo.example.com"
  1933. def setupServerContext(ctx):
  1934. def servername_received(conn):
  1935. serverIDNA = _idnaText(conn.get_servername())
  1936. names.append(serverIDNA)
  1937. ctx.set_tlsext_servername_callback(servername_received)
  1938. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1939. hello, hello, setupServerContext
  1940. )
  1941. self.assertEqual(names, [hello])
  1942. self.assertEqual(cWrapped.data, b"greetings!")
  1943. cErr = cWrapped.lostReason
  1944. sErr = sWrapped.lostReason
  1945. self.assertIsNone(cErr)
  1946. self.assertIsNone(sErr)
  1947. def test_fallback(self):
  1948. """
  1949. L{sslverify.simpleVerifyHostname} checks string equality on the
  1950. commonName of a connection's certificate's subject, doing nothing if it
  1951. matches and raising L{VerificationError} if it doesn't.
  1952. """
  1953. name = "something.example.com"
  1954. class Connection:
  1955. def get_peer_certificate(self):
  1956. """
  1957. Fake of L{OpenSSL.SSL.Connection.get_peer_certificate}.
  1958. @return: A certificate with a known common name.
  1959. @rtype: L{OpenSSL.crypto.X509}
  1960. """
  1961. cert = X509()
  1962. cert.get_subject().commonName = name
  1963. return cert
  1964. conn = Connection()
  1965. self.assertIs(
  1966. sslverify.simpleVerifyHostname(conn, "something.example.com"), None
  1967. )
  1968. self.assertRaises(
  1969. sslverify.SimpleVerificationError,
  1970. sslverify.simpleVerifyHostname,
  1971. conn,
  1972. "nonsense",
  1973. )
  1974. def test_surpriseFromInfoCallback(self):
  1975. """
  1976. pyOpenSSL isn't always so great about reporting errors. If one occurs
  1977. in the verification info callback, it should be logged and the
  1978. connection should be shut down (if possible, anyway; the app_data could
  1979. be clobbered but there's no point testing for that).
  1980. """
  1981. cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
  1982. "correct-host.example.com",
  1983. "correct-host.example.com",
  1984. buggyInfoCallback=True,
  1985. )
  1986. self.assertEqual(cWrapped.data, b"")
  1987. self.assertEqual(sWrapped.data, b"")
  1988. cErr = cWrapped.lostReason.value
  1989. sErr = sWrapped.lostReason.value
  1990. self.assertIsInstance(cErr, ZeroDivisionError)
  1991. self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error))
  1992. errors = self.flushLoggedErrors(ZeroDivisionError)
  1993. self.assertTrue(errors)
  1994. def negotiateProtocol(serverProtocols, clientProtocols, clientOptions=None):
  1995. """
  1996. Create the TLS connection and negotiate a next protocol.
  1997. @param serverProtocols: The protocols the server is willing to negotiate.
  1998. @param clientProtocols: The protocols the client is willing to negotiate.
  1999. @param clientOptions: The type of C{OpenSSLCertificateOptions} class to
  2000. use for the client. Defaults to C{OpenSSLCertificateOptions}.
  2001. @return: A L{tuple} of the negotiated protocol and the reason the
  2002. connection was lost.
  2003. """
  2004. caCertificate, serverCertificate = certificatesForAuthorityAndServer()
  2005. trustRoot = sslverify.OpenSSLCertificateAuthorities(
  2006. [
  2007. caCertificate.original,
  2008. ]
  2009. )
  2010. sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnectionInMemory(
  2011. trustRoot=trustRoot,
  2012. privateKey=serverCertificate.privateKey.original,
  2013. serverCertificate=serverCertificate.original,
  2014. clientProtocols=clientProtocols,
  2015. serverProtocols=serverProtocols,
  2016. clientOptions=clientOptions,
  2017. )
  2018. pump.flush()
  2019. return (cProto.negotiatedProtocol, cWrapped.lostReason)
  2020. class NPNOrALPNTests(TestCase):
  2021. """
  2022. NPN and ALPN protocol selection.
  2023. These tests only run on platforms that have a PyOpenSSL version >= 0.15,
  2024. and OpenSSL version 1.0.1 or later.
  2025. """
  2026. if skipSSL:
  2027. skip = skipSSL
  2028. elif skipNPN:
  2029. skip = skipNPN
  2030. def test_nextProtocolMechanismsNPNIsSupported(self):
  2031. """
  2032. When at least NPN is available on the platform, NPN is in the set of
  2033. supported negotiation protocols.
  2034. """
  2035. supportedProtocols = sslverify.protocolNegotiationMechanisms()
  2036. self.assertTrue(sslverify.ProtocolNegotiationSupport.NPN in supportedProtocols)
  2037. def test_NPNAndALPNSuccess(self):
  2038. """
  2039. When both ALPN and NPN are used, and both the client and server have
  2040. overlapping protocol choices, a protocol is successfully negotiated.
  2041. Further, the negotiated protocol is the first one in the list.
  2042. """
  2043. protocols = [b"h2", b"http/1.1"]
  2044. negotiatedProtocol, lostReason = negotiateProtocol(
  2045. clientProtocols=protocols,
  2046. serverProtocols=protocols,
  2047. )
  2048. self.assertEqual(negotiatedProtocol, b"h2")
  2049. self.assertIsNone(lostReason)
  2050. def test_NPNAndALPNDifferent(self):
  2051. """
  2052. Client and server have different protocol lists: only the common
  2053. element is chosen.
  2054. """
  2055. serverProtocols = [b"h2", b"http/1.1", b"spdy/2"]
  2056. clientProtocols = [b"spdy/3", b"http/1.1"]
  2057. negotiatedProtocol, lostReason = negotiateProtocol(
  2058. clientProtocols=clientProtocols,
  2059. serverProtocols=serverProtocols,
  2060. )
  2061. self.assertEqual(negotiatedProtocol, b"http/1.1")
  2062. self.assertIsNone(lostReason)
  2063. def test_NPNAndALPNNoAdvertise(self):
  2064. """
  2065. When one peer does not advertise any protocols, the connection is set
  2066. up with no next protocol.
  2067. """
  2068. protocols = [b"h2", b"http/1.1"]
  2069. negotiatedProtocol, lostReason = negotiateProtocol(
  2070. clientProtocols=protocols,
  2071. serverProtocols=[],
  2072. )
  2073. self.assertIsNone(negotiatedProtocol)
  2074. self.assertIsNone(lostReason)
  2075. def test_NPNAndALPNNoOverlap(self):
  2076. """
  2077. When the client and server have no overlap of protocols, the connection
  2078. fails.
  2079. """
  2080. clientProtocols = [b"h2", b"http/1.1"]
  2081. serverProtocols = [b"spdy/3"]
  2082. negotiatedProtocol, lostReason = negotiateProtocol(
  2083. serverProtocols=clientProtocols,
  2084. clientProtocols=serverProtocols,
  2085. )
  2086. self.assertIsNone(negotiatedProtocol)
  2087. self.assertEqual(lostReason.type, SSL.Error)
  2088. class ALPNTests(TestCase):
  2089. """
  2090. ALPN protocol selection.
  2091. These tests only run on platforms that have a PyOpenSSL version >= 0.15,
  2092. and OpenSSL version 1.0.2 or later.
  2093. This covers only the ALPN specific logic, as any platform that has ALPN
  2094. will also have NPN and so will run the NPNAndALPNTest suite as well.
  2095. """
  2096. if skipSSL:
  2097. skip = skipSSL
  2098. elif skipALPN:
  2099. skip = skipALPN
  2100. def test_nextProtocolMechanismsALPNIsSupported(self):
  2101. """
  2102. When ALPN is available on a platform, protocolNegotiationMechanisms
  2103. includes ALPN in the suported protocols.
  2104. """
  2105. supportedProtocols = sslverify.protocolNegotiationMechanisms()
  2106. self.assertTrue(sslverify.ProtocolNegotiationSupport.ALPN in supportedProtocols)
  2107. class NPNAndALPNAbsentTests(TestCase):
  2108. """
  2109. NPN/ALPN operations fail on platforms that do not support them.
  2110. These tests only run on platforms that have a PyOpenSSL version < 0.15,
  2111. an OpenSSL version earlier than 1.0.1, or an OpenSSL/cryptography built
  2112. without NPN support.
  2113. """
  2114. if skipSSL:
  2115. skip = skipSSL
  2116. elif not skipNPN or not skipALPN:
  2117. skip = "NPN and/or ALPN is present on this platform"
  2118. def test_nextProtocolMechanismsNoNegotiationSupported(self):
  2119. """
  2120. When neither NPN or ALPN are available on a platform, there are no
  2121. supported negotiation protocols.
  2122. """
  2123. supportedProtocols = sslverify.protocolNegotiationMechanisms()
  2124. self.assertFalse(supportedProtocols)
  2125. def test_NPNAndALPNNotImplemented(self):
  2126. """
  2127. A NotImplementedError is raised when using acceptableProtocols on a
  2128. platform that does not support either NPN or ALPN.
  2129. """
  2130. protocols = [b"h2", b"http/1.1"]
  2131. self.assertRaises(
  2132. NotImplementedError,
  2133. negotiateProtocol,
  2134. serverProtocols=protocols,
  2135. clientProtocols=protocols,
  2136. )
  2137. def test_NegotiatedProtocolReturnsNone(self):
  2138. """
  2139. negotiatedProtocol return L{None} even when NPN/ALPN aren't supported.
  2140. This works because, as neither are supported, negotiation isn't even
  2141. attempted.
  2142. """
  2143. serverProtocols = None
  2144. clientProtocols = None
  2145. negotiatedProtocol, lostReason = negotiateProtocol(
  2146. clientProtocols=clientProtocols,
  2147. serverProtocols=serverProtocols,
  2148. )
  2149. self.assertIsNone(negotiatedProtocol)
  2150. self.assertIsNone(lostReason)
  2151. class _NotSSLTransport:
  2152. def getHandle(self):
  2153. return self
  2154. class _MaybeSSLTransport:
  2155. def getHandle(self):
  2156. return self
  2157. def get_peer_certificate(self):
  2158. return None
  2159. def get_host_certificate(self):
  2160. return None
  2161. class _ActualSSLTransport:
  2162. def getHandle(self):
  2163. return self
  2164. def get_host_certificate(self):
  2165. return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
  2166. def get_peer_certificate(self):
  2167. return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
  2168. class ConstructorsTests(TestCase):
  2169. if skipSSL:
  2170. skip = skipSSL
  2171. def test_peerFromNonSSLTransport(self):
  2172. """
  2173. Verify that peerFromTransport raises an exception if the transport
  2174. passed is not actually an SSL transport.
  2175. """
  2176. x = self.assertRaises(
  2177. CertificateError,
  2178. sslverify.Certificate.peerFromTransport,
  2179. _NotSSLTransport(),
  2180. )
  2181. self.assertTrue(str(x).startswith("non-TLS"))
  2182. def test_peerFromBlankSSLTransport(self):
  2183. """
  2184. Verify that peerFromTransport raises an exception if the transport
  2185. passed is an SSL transport, but doesn't have a peer certificate.
  2186. """
  2187. x = self.assertRaises(
  2188. CertificateError,
  2189. sslverify.Certificate.peerFromTransport,
  2190. _MaybeSSLTransport(),
  2191. )
  2192. self.assertTrue(str(x).startswith("TLS"))
  2193. def test_hostFromNonSSLTransport(self):
  2194. """
  2195. Verify that hostFromTransport raises an exception if the transport
  2196. passed is not actually an SSL transport.
  2197. """
  2198. x = self.assertRaises(
  2199. CertificateError,
  2200. sslverify.Certificate.hostFromTransport,
  2201. _NotSSLTransport(),
  2202. )
  2203. self.assertTrue(str(x).startswith("non-TLS"))
  2204. def test_hostFromBlankSSLTransport(self):
  2205. """
  2206. Verify that hostFromTransport raises an exception if the transport
  2207. passed is an SSL transport, but doesn't have a host certificate.
  2208. """
  2209. x = self.assertRaises(
  2210. CertificateError,
  2211. sslverify.Certificate.hostFromTransport,
  2212. _MaybeSSLTransport(),
  2213. )
  2214. self.assertTrue(str(x).startswith("TLS"))
  2215. def test_hostFromSSLTransport(self):
  2216. """
  2217. Verify that hostFromTransport successfully creates the correct
  2218. certificate if passed a valid SSL transport.
  2219. """
  2220. self.assertEqual(
  2221. sslverify.Certificate.hostFromTransport(
  2222. _ActualSSLTransport()
  2223. ).serialNumber(),
  2224. 12345,
  2225. )
  2226. def test_peerFromSSLTransport(self):
  2227. """
  2228. Verify that peerFromTransport successfully creates the correct
  2229. certificate if passed a valid SSL transport.
  2230. """
  2231. self.assertEqual(
  2232. sslverify.Certificate.peerFromTransport(
  2233. _ActualSSLTransport()
  2234. ).serialNumber(),
  2235. 12346,
  2236. )
  2237. class MultipleCertificateTrustRootTests(TestCase):
  2238. """
  2239. Test the behavior of the trustRootFromCertificates() API call.
  2240. """
  2241. if skipSSL:
  2242. skip = skipSSL
  2243. def test_trustRootFromCertificatesPrivatePublic(self):
  2244. """
  2245. L{trustRootFromCertificates} accepts either a L{sslverify.Certificate}
  2246. or a L{sslverify.PrivateCertificate} instance.
  2247. """
  2248. privateCert = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR)
  2249. cert = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  2250. mt = sslverify.trustRootFromCertificates([privateCert, cert])
  2251. # Verify that the returned object acts correctly when used as a
  2252. # trustRoot= param to optionsForClientTLS.
  2253. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
  2254. trustRoot=mt,
  2255. privateKey=privateCert.privateKey.original,
  2256. serverCertificate=privateCert.original,
  2257. )
  2258. # This connection should succeed
  2259. self.assertEqual(cWrap.data, b"greetings!")
  2260. self.assertIsNone(cWrap.lostReason)
  2261. def test_trustRootSelfSignedServerCertificate(self):
  2262. """
  2263. L{trustRootFromCertificates} called with a single self-signed
  2264. certificate will cause L{optionsForClientTLS} to accept client
  2265. connections to a server with that certificate.
  2266. """
  2267. key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server")
  2268. selfSigned = sslverify.PrivateCertificate.fromCertificateAndKeyPair(
  2269. sslverify.Certificate(cert),
  2270. sslverify.KeyPair(key),
  2271. )
  2272. trust = sslverify.trustRootFromCertificates([selfSigned])
  2273. # Since we trust this exact certificate, connections to this server
  2274. # should succeed.
  2275. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
  2276. trustRoot=trust,
  2277. privateKey=selfSigned.privateKey.original,
  2278. serverCertificate=selfSigned.original,
  2279. )
  2280. self.assertEqual(cWrap.data, b"greetings!")
  2281. self.assertIsNone(cWrap.lostReason)
  2282. def test_trustRootCertificateAuthorityTrustsConnection(self):
  2283. """
  2284. L{trustRootFromCertificates} called with certificate A will cause
  2285. L{optionsForClientTLS} to accept client connections to a server with
  2286. certificate B where B is signed by A.
  2287. """
  2288. caCert, serverCert = certificatesForAuthorityAndServer()
  2289. trust = sslverify.trustRootFromCertificates([caCert])
  2290. # Since we've listed the CA's certificate as a trusted cert, a
  2291. # connection to the server certificate it signed should succeed.
  2292. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
  2293. trustRoot=trust,
  2294. privateKey=serverCert.privateKey.original,
  2295. serverCertificate=serverCert.original,
  2296. )
  2297. self.assertEqual(cWrap.data, b"greetings!")
  2298. self.assertIsNone(cWrap.lostReason)
  2299. def test_trustRootFromCertificatesUntrusted(self):
  2300. """
  2301. L{trustRootFromCertificates} called with certificate A will cause
  2302. L{optionsForClientTLS} to disallow any connections to a server with
  2303. certificate B where B is not signed by A.
  2304. """
  2305. key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server")
  2306. serverCert = sslverify.PrivateCertificate.fromCertificateAndKeyPair(
  2307. sslverify.Certificate(cert),
  2308. sslverify.KeyPair(key),
  2309. )
  2310. untrustedCert = sslverify.Certificate(
  2311. makeCertificate(O=b"CA Test Certificate", CN=b"unknown CA")[1]
  2312. )
  2313. trust = sslverify.trustRootFromCertificates([untrustedCert])
  2314. # Since we only trust 'untrustedCert' which has not signed our
  2315. # server's cert, we should reject this connection
  2316. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory(
  2317. trustRoot=trust,
  2318. privateKey=serverCert.privateKey.original,
  2319. serverCertificate=serverCert.original,
  2320. )
  2321. # This connection should fail, so no data was received.
  2322. self.assertEqual(cWrap.data, b"")
  2323. # It was an L{SSL.Error}.
  2324. self.assertEqual(cWrap.lostReason.type, SSL.Error)
  2325. # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors.
  2326. err = cWrap.lostReason.value
  2327. self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca")
  2328. def test_trustRootFromCertificatesOpenSSLObjects(self):
  2329. """
  2330. L{trustRootFromCertificates} rejects any L{OpenSSL.crypto.X509}
  2331. instances in the list passed to it.
  2332. """
  2333. private = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR)
  2334. certX509 = private.original
  2335. exception = self.assertRaises(
  2336. TypeError,
  2337. sslverify.trustRootFromCertificates,
  2338. [certX509],
  2339. )
  2340. self.assertEqual(
  2341. "certificates items must be twisted.internet.ssl.CertBase " "instances",
  2342. exception.args[0],
  2343. )
  2344. class OpenSSLCipherTests(TestCase):
  2345. """
  2346. Tests for twisted.internet._sslverify.OpenSSLCipher.
  2347. """
  2348. if skipSSL:
  2349. skip = skipSSL
  2350. cipherName = "CIPHER-STRING"
  2351. def test_constructorSetsFullName(self):
  2352. """
  2353. The first argument passed to the constructor becomes the full name.
  2354. """
  2355. self.assertEqual(
  2356. self.cipherName, sslverify.OpenSSLCipher(self.cipherName).fullName
  2357. )
  2358. def test_repr(self):
  2359. """
  2360. C{repr(cipher)} returns a valid constructor call.
  2361. """
  2362. cipher = sslverify.OpenSSLCipher(self.cipherName)
  2363. self.assertEqual(
  2364. cipher, eval(repr(cipher), {"OpenSSLCipher": sslverify.OpenSSLCipher})
  2365. )
  2366. def test_eqSameClass(self):
  2367. """
  2368. Equal type and C{fullName} means that the objects are equal.
  2369. """
  2370. cipher1 = sslverify.OpenSSLCipher(self.cipherName)
  2371. cipher2 = sslverify.OpenSSLCipher(self.cipherName)
  2372. self.assertEqual(cipher1, cipher2)
  2373. def test_eqSameNameDifferentType(self):
  2374. """
  2375. If ciphers have the same name but different types, they're still
  2376. different.
  2377. """
  2378. class DifferentCipher:
  2379. fullName = self.cipherName
  2380. self.assertNotEqual(
  2381. sslverify.OpenSSLCipher(self.cipherName),
  2382. DifferentCipher(),
  2383. )
  2384. class ExpandCipherStringTests(TestCase):
  2385. """
  2386. Tests for twisted.internet._sslverify._expandCipherString.
  2387. """
  2388. if skipSSL:
  2389. skip = skipSSL
  2390. def test_doesNotStumbleOverEmptyList(self):
  2391. """
  2392. If the expanded cipher list is empty, an empty L{list} is returned.
  2393. """
  2394. self.assertEqual(
  2395. tuple(), sslverify._expandCipherString("", SSL.SSLv23_METHOD, 0)
  2396. )
  2397. def test_doesNotSwallowOtherSSLErrors(self):
  2398. """
  2399. Only no cipher matches get swallowed, every other SSL error gets
  2400. propagated.
  2401. """
  2402. def raiser(_):
  2403. # Unfortunately, there seems to be no way to trigger a real SSL
  2404. # error artificially.
  2405. raise SSL.Error([["", "", ""]])
  2406. ctx = FakeContext(SSL.SSLv23_METHOD)
  2407. ctx.set_cipher_list = raiser
  2408. self.patch(sslverify.SSL, "Context", lambda _: ctx)
  2409. self.assertRaises(
  2410. SSL.Error, sslverify._expandCipherString, "ALL", SSL.SSLv23_METHOD, 0
  2411. )
  2412. def test_returnsTupleOfICiphers(self):
  2413. """
  2414. L{sslverify._expandCipherString} always returns a L{tuple} of
  2415. L{interfaces.ICipher}.
  2416. """
  2417. ciphers = sslverify._expandCipherString("ALL", SSL.SSLv23_METHOD, 0)
  2418. self.assertIsInstance(ciphers, tuple)
  2419. bogus = []
  2420. for c in ciphers:
  2421. if not interfaces.ICipher.providedBy(c):
  2422. bogus.append(c)
  2423. self.assertEqual([], bogus)
  2424. class AcceptableCiphersTests(TestCase):
  2425. """
  2426. Tests for twisted.internet._sslverify.OpenSSLAcceptableCiphers.
  2427. """
  2428. if skipSSL:
  2429. skip = skipSSL
  2430. def test_selectOnEmptyListReturnsEmptyList(self):
  2431. """
  2432. If no ciphers are available, nothing can be selected.
  2433. """
  2434. ac = sslverify.OpenSSLAcceptableCiphers(tuple())
  2435. self.assertEqual(tuple(), ac.selectCiphers(tuple()))
  2436. def test_selectReturnsOnlyFromAvailable(self):
  2437. """
  2438. Select only returns a cross section of what is available and what is
  2439. desirable.
  2440. """
  2441. ac = sslverify.OpenSSLAcceptableCiphers(
  2442. [
  2443. sslverify.OpenSSLCipher("A"),
  2444. sslverify.OpenSSLCipher("B"),
  2445. ]
  2446. )
  2447. self.assertEqual(
  2448. (sslverify.OpenSSLCipher("B"),),
  2449. ac.selectCiphers(
  2450. [sslverify.OpenSSLCipher("B"), sslverify.OpenSSLCipher("C")]
  2451. ),
  2452. )
  2453. def test_fromOpenSSLCipherStringExpandsToTupleOfCiphers(self):
  2454. """
  2455. If L{sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString} is
  2456. called it expands the string to a tuple of ciphers.
  2457. """
  2458. ac = sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString("ALL")
  2459. self.assertIsInstance(ac._ciphers, tuple)
  2460. self.assertTrue(all(sslverify.ICipher.providedBy(c) for c in ac._ciphers))
  2461. class DiffieHellmanParametersTests(TestCase):
  2462. """
  2463. Tests for twisted.internet._sslverify.OpenSSLDHParameters.
  2464. """
  2465. if skipSSL:
  2466. skip = skipSSL
  2467. filePath = FilePath(b"dh.params")
  2468. def test_fromFile(self):
  2469. """
  2470. Calling C{fromFile} with a filename returns an instance with that file
  2471. name saved.
  2472. """
  2473. params = sslverify.OpenSSLDiffieHellmanParameters.fromFile(self.filePath)
  2474. self.assertEqual(self.filePath, params._dhFile)
  2475. class FakeLibState:
  2476. """
  2477. State for L{FakeLib}
  2478. @param setECDHAutoRaises: An exception
  2479. L{FakeLib.SSL_CTX_set_ecdh_auto} should raise; if L{None},
  2480. nothing is raised.
  2481. @ivar ecdhContexts: A list of SSL contexts with which
  2482. L{FakeLib.SSL_CTX_set_ecdh_auto} was called
  2483. @type ecdhContexts: L{list} of L{OpenSSL.SSL.Context}s
  2484. @ivar ecdhValues: A list of boolean values with which
  2485. L{FakeLib.SSL_CTX_set_ecdh_auto} was called
  2486. @type ecdhValues: L{list} of L{boolean}s
  2487. """
  2488. __slots__ = ("setECDHAutoRaises", "ecdhContexts", "ecdhValues")
  2489. def __init__(self, setECDHAutoRaises):
  2490. self.setECDHAutoRaises = setECDHAutoRaises
  2491. self.ecdhContexts = []
  2492. self.ecdhValues = []
  2493. class FakeLib:
  2494. """
  2495. An introspectable fake of cryptography's lib object.
  2496. @param state: A L{FakeLibState} instance that contains this fake's
  2497. state.
  2498. """
  2499. def __init__(self, state):
  2500. self._state = state
  2501. def SSL_CTX_set_ecdh_auto(self, ctx, value):
  2502. """
  2503. Record the context and value under in the C{_state} instance
  2504. variable.
  2505. @see: L{FakeLibState}
  2506. @param ctx: An SSL context.
  2507. @type ctx: L{OpenSSL.SSL.Context}
  2508. @param value: A boolean value
  2509. @type value: L{bool}
  2510. """
  2511. self._state.ecdhContexts.append(ctx)
  2512. self._state.ecdhValues.append(value)
  2513. if self._state.setECDHAutoRaises is not None:
  2514. raise self._state.setECDHAutoRaises
  2515. class FakeLibTests(TestCase):
  2516. """
  2517. Tests for L{FakeLib}.
  2518. """
  2519. def test_SSL_CTX_set_ecdh_auto(self):
  2520. """
  2521. L{FakeLib.SSL_CTX_set_ecdh_auto} records context and value it
  2522. was called with.
  2523. """
  2524. state = FakeLibState(setECDHAutoRaises=None)
  2525. lib = FakeLib(state)
  2526. self.assertNot(state.ecdhContexts)
  2527. self.assertNot(state.ecdhValues)
  2528. context, value = "CONTEXT", True
  2529. lib.SSL_CTX_set_ecdh_auto(context, value)
  2530. self.assertEqual(state.ecdhContexts, [context])
  2531. self.assertEqual(state.ecdhValues, [True])
  2532. def test_SSL_CTX_set_ecdh_autoRaises(self):
  2533. """
  2534. L{FakeLib.SSL_CTX_set_ecdh_auto} raises the exception provided
  2535. by its state, while still recording its arguments.
  2536. """
  2537. state = FakeLibState(setECDHAutoRaises=ValueError)
  2538. lib = FakeLib(state)
  2539. self.assertNot(state.ecdhContexts)
  2540. self.assertNot(state.ecdhValues)
  2541. context, value = "CONTEXT", True
  2542. self.assertRaises(ValueError, lib.SSL_CTX_set_ecdh_auto, context, value)
  2543. self.assertEqual(state.ecdhContexts, [context])
  2544. self.assertEqual(state.ecdhValues, [True])
  2545. class FakeCryptoState:
  2546. """
  2547. State for L{FakeCrypto}
  2548. @param getEllipticCurveRaises: What
  2549. L{FakeCrypto.get_elliptic_curve} should raise; L{None} and it
  2550. won't raise anything
  2551. @param getEllipticCurveReturns: What
  2552. L{FakeCrypto.get_elliptic_curve} should return.
  2553. @ivar getEllipticCurveCalls: The arguments with which
  2554. L{FakeCrypto.get_elliptic_curve} has been called.
  2555. @type getEllipticCurveCalls: L{list}
  2556. """
  2557. __slots__ = (
  2558. "getEllipticCurveRaises",
  2559. "getEllipticCurveReturns",
  2560. "getEllipticCurveCalls",
  2561. )
  2562. def __init__(
  2563. self,
  2564. getEllipticCurveRaises,
  2565. getEllipticCurveReturns,
  2566. ):
  2567. self.getEllipticCurveRaises = getEllipticCurveRaises
  2568. self.getEllipticCurveReturns = getEllipticCurveReturns
  2569. self.getEllipticCurveCalls = []
  2570. class FakeCrypto:
  2571. """
  2572. An introspectable fake of pyOpenSSL's L{OpenSSL.crypto} module.
  2573. @ivar state: A L{FakeCryptoState} instance
  2574. """
  2575. def __init__(self, state):
  2576. self._state = state
  2577. def get_elliptic_curve(self, curve):
  2578. """
  2579. A fake that records the curve with which it was called.
  2580. @param curve: see L{crypto.get_elliptic_curve}
  2581. @return: see L{FakeCryptoState.getEllipticCurveReturns}
  2582. @raises: see L{FakeCryptoState.getEllipticCurveRaises}
  2583. """
  2584. self._state.getEllipticCurveCalls.append(curve)
  2585. if self._state.getEllipticCurveRaises is not None:
  2586. raise self._state.getEllipticCurveRaises
  2587. return self._state.getEllipticCurveReturns
  2588. class FakeCryptoTests(SynchronousTestCase):
  2589. """
  2590. Tests for L{FakeCrypto}.
  2591. """
  2592. def test_get_elliptic_curveRecordsArgument(self):
  2593. """
  2594. L{FakeCrypto.test_get_elliptic_curve} records the curve with
  2595. which it was called.
  2596. """
  2597. state = FakeCryptoState(
  2598. getEllipticCurveRaises=None,
  2599. getEllipticCurveReturns=None,
  2600. )
  2601. crypto = FakeCrypto(state)
  2602. crypto.get_elliptic_curve("a curve name")
  2603. self.assertEqual(state.getEllipticCurveCalls, ["a curve name"])
  2604. def test_get_elliptic_curveReturns(self):
  2605. """
  2606. L{FakeCrypto.test_get_elliptic_curve} returns the value
  2607. specified by its state object and records what it was called
  2608. with.
  2609. """
  2610. returnValue = "object"
  2611. state = FakeCryptoState(
  2612. getEllipticCurveRaises=None,
  2613. getEllipticCurveReturns=returnValue,
  2614. )
  2615. crypto = FakeCrypto(state)
  2616. self.assertIs(
  2617. crypto.get_elliptic_curve("another curve name"),
  2618. returnValue,
  2619. )
  2620. self.assertEqual(state.getEllipticCurveCalls, ["another curve name"])
  2621. def test_get_elliptic_curveRaises(self):
  2622. """
  2623. L{FakeCrypto.test_get_elliptic_curve} raises the exception
  2624. specified by its state object.
  2625. """
  2626. state = FakeCryptoState(
  2627. getEllipticCurveRaises=ValueError, getEllipticCurveReturns=None
  2628. )
  2629. crypto = FakeCrypto(state)
  2630. self.assertRaises(
  2631. ValueError,
  2632. crypto.get_elliptic_curve,
  2633. "yet another curve name",
  2634. )
  2635. self.assertEqual(
  2636. state.getEllipticCurveCalls,
  2637. ["yet another curve name"],
  2638. )
  2639. class ChooseDiffieHellmanEllipticCurveTests(SynchronousTestCase):
  2640. """
  2641. Tests for L{sslverify._ChooseDiffieHellmanEllipticCurve}.
  2642. @cvar OPENSSL_110: A version number for OpenSSL 1.1.0
  2643. @cvar OPENSSL_102: A version number for OpenSSL 1.0.2
  2644. @cvar OPENSSL_101: A version number for OpenSSL 1.0.1
  2645. @see:
  2646. U{https://wiki.openssl.org/index.php/Manual:OPENSSL_VERSION_NUMBER(3)}
  2647. """
  2648. if skipSSL:
  2649. skip = skipSSL
  2650. OPENSSL_110 = 0x1010007F
  2651. OPENSSL_102 = 0x100020EF
  2652. OPENSSL_101 = 0x1000114F
  2653. def setUp(self):
  2654. self.libState = FakeLibState(setECDHAutoRaises=False)
  2655. self.lib = FakeLib(self.libState)
  2656. self.cryptoState = FakeCryptoState(
  2657. getEllipticCurveReturns=None, getEllipticCurveRaises=None
  2658. )
  2659. self.crypto = FakeCrypto(self.cryptoState)
  2660. self.context = FakeContext(SSL.SSLv23_METHOD)
  2661. def test_openSSL110(self):
  2662. """
  2663. No configuration of contexts occurs under OpenSSL 1.1.0 and
  2664. later, because they create contexts with secure ECDH curves.
  2665. @see: U{http://twistedmatrix.com/trac/ticket/9210}
  2666. """
  2667. chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
  2668. self.OPENSSL_110,
  2669. openSSLlib=self.lib,
  2670. openSSLcrypto=self.crypto,
  2671. )
  2672. chooser.configureECDHCurve(self.context)
  2673. self.assertFalse(self.libState.ecdhContexts)
  2674. self.assertFalse(self.libState.ecdhValues)
  2675. self.assertFalse(self.cryptoState.getEllipticCurveCalls)
  2676. self.assertIsNone(self.context._ecCurve)
  2677. def test_openSSL102(self):
  2678. """
  2679. OpenSSL 1.0.2 does not set ECDH curves by default, but
  2680. C{SSL_CTX_set_ecdh_auto} requests that a context choose a
  2681. secure set curves automatically.
  2682. """
  2683. context = SSL.Context(SSL.SSLv23_METHOD)
  2684. chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
  2685. self.OPENSSL_102,
  2686. openSSLlib=self.lib,
  2687. openSSLcrypto=self.crypto,
  2688. )
  2689. chooser.configureECDHCurve(context)
  2690. self.assertEqual(self.libState.ecdhContexts, [context._context])
  2691. self.assertEqual(self.libState.ecdhValues, [True])
  2692. self.assertFalse(self.cryptoState.getEllipticCurveCalls)
  2693. self.assertIsNone(self.context._ecCurve)
  2694. def test_openSSL102SetECDHAutoRaises(self):
  2695. """
  2696. An exception raised by C{SSL_CTX_set_ecdh_auto} under OpenSSL
  2697. 1.0.2 is suppressed because ECDH is best-effort.
  2698. """
  2699. self.libState.setECDHAutoRaises = BaseException
  2700. context = SSL.Context(SSL.SSLv23_METHOD)
  2701. chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
  2702. self.OPENSSL_102,
  2703. openSSLlib=self.lib,
  2704. openSSLcrypto=self.crypto,
  2705. )
  2706. chooser.configureECDHCurve(context)
  2707. self.assertEqual(self.libState.ecdhContexts, [context._context])
  2708. self.assertEqual(self.libState.ecdhValues, [True])
  2709. self.assertFalse(self.cryptoState.getEllipticCurveCalls)
  2710. def test_openSSL101(self):
  2711. """
  2712. OpenSSL 1.0.1 does not set ECDH curves by default, nor does
  2713. it expose L{SSL_CTX_set_ecdh_auto}. Instead, a single ECDH
  2714. curve can be set with L{OpenSSL.SSL.Context.set_tmp_ecdh}.
  2715. """
  2716. self.cryptoState.getEllipticCurveReturns = curve = "curve object"
  2717. chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
  2718. self.OPENSSL_101,
  2719. openSSLlib=self.lib,
  2720. openSSLcrypto=self.crypto,
  2721. )
  2722. chooser.configureECDHCurve(self.context)
  2723. self.assertFalse(self.libState.ecdhContexts)
  2724. self.assertFalse(self.libState.ecdhValues)
  2725. self.assertEqual(
  2726. self.cryptoState.getEllipticCurveCalls,
  2727. [sslverify._defaultCurveName],
  2728. )
  2729. self.assertIs(self.context._ecCurve, curve)
  2730. def test_openSSL101SetECDHRaises(self):
  2731. """
  2732. An exception raised by L{OpenSSL.SSL.Context.set_tmp_ecdh}
  2733. under OpenSSL 1.0.1 is suppressed because ECHDE is best-effort.
  2734. """
  2735. def set_tmp_ecdh(ctx):
  2736. raise BaseException
  2737. self.context.set_tmp_ecdh = set_tmp_ecdh
  2738. chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
  2739. self.OPENSSL_101,
  2740. openSSLlib=self.lib,
  2741. openSSLcrypto=self.crypto,
  2742. )
  2743. chooser.configureECDHCurve(self.context)
  2744. self.assertFalse(self.libState.ecdhContexts)
  2745. self.assertFalse(self.libState.ecdhValues)
  2746. self.assertEqual(
  2747. self.cryptoState.getEllipticCurveCalls,
  2748. [sslverify._defaultCurveName],
  2749. )
  2750. def test_openSSL101NoECC(self):
  2751. """
  2752. Contexts created under an OpenSSL 1.0.1 that doesn't support
  2753. ECC have no configuration applied.
  2754. """
  2755. self.cryptoState.getEllipticCurveRaises = ValueError
  2756. chooser = sslverify._ChooseDiffieHellmanEllipticCurve(
  2757. self.OPENSSL_101,
  2758. openSSLlib=self.lib,
  2759. openSSLcrypto=self.crypto,
  2760. )
  2761. chooser.configureECDHCurve(self.context)
  2762. self.assertFalse(self.libState.ecdhContexts)
  2763. self.assertFalse(self.libState.ecdhValues)
  2764. self.assertIsNone(self.context._ecCurve)
  2765. class KeyPairTests(TestCase):
  2766. """
  2767. Tests for L{sslverify.KeyPair}.
  2768. """
  2769. if skipSSL:
  2770. skip = skipSSL
  2771. def setUp(self):
  2772. """
  2773. Create test certificate.
  2774. """
  2775. self.sKey = makeCertificate(O=b"Server Test Certificate", CN=b"server")[0]
  2776. def test_getstateDeprecation(self):
  2777. """
  2778. L{sslverify.KeyPair.__getstate__} is deprecated.
  2779. """
  2780. self.callDeprecated(
  2781. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  2782. sslverify.KeyPair(self.sKey).__getstate__,
  2783. )
  2784. def test_setstateDeprecation(self):
  2785. """
  2786. {sslverify.KeyPair.__setstate__} is deprecated.
  2787. """
  2788. state = sslverify.KeyPair(self.sKey).dump()
  2789. self.callDeprecated(
  2790. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  2791. sslverify.KeyPair(self.sKey).__setstate__,
  2792. state,
  2793. )
  2794. def test_noTrailingNewlinePemCert(self):
  2795. noTrailingNewlineKeyPemPath = getModule("twisted.test").filePath.sibling(
  2796. "cert.pem.no_trailing_newline"
  2797. )
  2798. certPEM = noTrailingNewlineKeyPemPath.getContent()
  2799. ssl.Certificate.loadPEM(certPEM)
  2800. class SelectVerifyImplementationTests(SynchronousTestCase):
  2801. """
  2802. Tests for L{_selectVerifyImplementation}.
  2803. """
  2804. if skipSSL:
  2805. skip = skipSSL
  2806. def test_dependencyMissing(self):
  2807. """
  2808. If I{service_identity} cannot be imported then
  2809. L{_selectVerifyImplementation} returns L{simpleVerifyHostname} and
  2810. L{SimpleVerificationError}.
  2811. """
  2812. with SetAsideModule("service_identity"):
  2813. sys.modules["service_identity"] = None
  2814. result = sslverify._selectVerifyImplementation()
  2815. expected = (
  2816. sslverify.simpleVerifyHostname,
  2817. sslverify.simpleVerifyIPAddress,
  2818. sslverify.SimpleVerificationError,
  2819. )
  2820. self.assertEqual(expected, result)
  2821. test_dependencyMissing.suppress = [ # type: ignore[attr-defined]
  2822. util.suppress(
  2823. message=(
  2824. "You do not have a working installation of the "
  2825. "service_identity module"
  2826. ),
  2827. ),
  2828. ]
  2829. def test_dependencyMissingWarning(self):
  2830. """
  2831. If I{service_identity} cannot be imported then
  2832. L{_selectVerifyImplementation} emits a L{UserWarning} advising the user
  2833. of the exact error.
  2834. """
  2835. with SetAsideModule("service_identity"):
  2836. sys.modules["service_identity"] = None
  2837. sslverify._selectVerifyImplementation()
  2838. [warning] = list(
  2839. warning
  2840. for warning in self.flushWarnings()
  2841. if warning["category"] == UserWarning
  2842. )
  2843. expectedMessage = (
  2844. "You do not have a working installation of the "
  2845. "service_identity module: "
  2846. "'import of service_identity halted; None in sys.modules'. "
  2847. "Please install it from "
  2848. "<https://pypi.python.org/pypi/service_identity> "
  2849. "and make sure all of its dependencies are satisfied. "
  2850. "Without the service_identity module, Twisted can perform only"
  2851. " rudimentary TLS client hostname verification. Many valid "
  2852. "certificate/hostname mappings may be rejected."
  2853. )
  2854. self.assertEqual(warning["message"], expectedMessage)
  2855. # Make sure we're abusing the warning system to a sufficient
  2856. # degree: there is no filename or line number that makes sense for
  2857. # this warning to "blame" for the problem. It is a system
  2858. # misconfiguration. So the location information should be blank
  2859. # (or as blank as we can make it).
  2860. self.assertEqual(warning["filename"], "")
  2861. self.assertEqual(warning["lineno"], 0)