Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

sspi.py 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. """
  2. Helper classes for SSPI authentication via the win32security module.
  3. SSPI authentication involves a token-exchange "dance", the exact details
  4. of which depends on the authentication provider used. There are also
  5. a number of complex flags and constants that need to be used - in most
  6. cases, there are reasonable defaults.
  7. These classes attempt to hide these details from you until you really need
  8. to know. They are not designed to handle all cases, just the common ones.
  9. If you need finer control than offered here, just use the win32security
  10. functions directly.
  11. """
  12. # Based on Roger Upole's sspi demos.
  13. # $Id$
  14. import sspicon
  15. import win32security
  16. error = win32security.error
  17. class _BaseAuth(object):
  18. def __init__(self):
  19. self.reset()
  20. def reset(self):
  21. """Reset everything to an unauthorized state"""
  22. self.ctxt = None
  23. self.authenticated = False
  24. self.initiator_name = None
  25. self.service_name = None
  26. # The next seq_num for an encrypt/sign operation
  27. self.next_seq_num = 0
  28. def _get_next_seq_num(self):
  29. """Get the next sequence number for a transmission. Default
  30. implementation is to increment a counter
  31. """
  32. ret = self.next_seq_num
  33. self.next_seq_num = self.next_seq_num + 1
  34. return ret
  35. def encrypt(self, data):
  36. """Encrypt a string, returning a tuple of (encrypted_data, trailer).
  37. These can be passed to decrypt to get back the original string.
  38. """
  39. pkg_size_info = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
  40. trailersize = pkg_size_info["SecurityTrailer"]
  41. encbuf = win32security.PySecBufferDescType()
  42. encbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  43. encbuf.append(
  44. win32security.PySecBufferType(trailersize, sspicon.SECBUFFER_TOKEN)
  45. )
  46. encbuf[0].Buffer = data
  47. self.ctxt.EncryptMessage(0, encbuf, self._get_next_seq_num())
  48. return encbuf[0].Buffer, encbuf[1].Buffer
  49. def decrypt(self, data, trailer):
  50. """Decrypt a previously encrypted string, returning the orignal data"""
  51. encbuf = win32security.PySecBufferDescType()
  52. encbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  53. encbuf.append(
  54. win32security.PySecBufferType(len(trailer), sspicon.SECBUFFER_TOKEN)
  55. )
  56. encbuf[0].Buffer = data
  57. encbuf[1].Buffer = trailer
  58. self.ctxt.DecryptMessage(encbuf, self._get_next_seq_num())
  59. return encbuf[0].Buffer
  60. def sign(self, data):
  61. """sign a string suitable for transmission, returning the signature.
  62. Passing the data and signature to verify will determine if the data
  63. is unchanged.
  64. """
  65. pkg_size_info = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
  66. sigsize = pkg_size_info["MaxSignature"]
  67. sigbuf = win32security.PySecBufferDescType()
  68. sigbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  69. sigbuf.append(win32security.PySecBufferType(sigsize, sspicon.SECBUFFER_TOKEN))
  70. sigbuf[0].Buffer = data
  71. self.ctxt.MakeSignature(0, sigbuf, self._get_next_seq_num())
  72. return sigbuf[1].Buffer
  73. def verify(self, data, sig):
  74. """Verifies data and its signature. If verification fails, an sspi.error
  75. will be raised.
  76. """
  77. sigbuf = win32security.PySecBufferDescType()
  78. sigbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  79. sigbuf.append(win32security.PySecBufferType(len(sig), sspicon.SECBUFFER_TOKEN))
  80. sigbuf[0].Buffer = data
  81. sigbuf[1].Buffer = sig
  82. self.ctxt.VerifySignature(sigbuf, self._get_next_seq_num())
  83. def unwrap(self, token):
  84. """
  85. GSSAPI's unwrap with SSPI.
  86. https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi
  87. Usable mainly with Kerberos SSPI package, but this is not enforced.
  88. Return the clear text, and a boolean that is True if the token was encrypted.
  89. """
  90. buffer = win32security.PySecBufferDescType()
  91. # This buffer will contain a "stream", which is the token coming from the other side
  92. buffer.append(
  93. win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM)
  94. )
  95. buffer[0].Buffer = token
  96. # This buffer will receive the clear, or just unwrapped text if no encryption was used.
  97. # Will be resized by the lib.
  98. buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA))
  99. pfQOP = self.ctxt.DecryptMessage(buffer, self._get_next_seq_num())
  100. r = buffer[1].Buffer
  101. return r, not (pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT)
  102. def wrap(self, msg, encrypt=False):
  103. """
  104. GSSAPI's wrap with SSPI.
  105. https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi
  106. Usable mainly with Kerberos SSPI package, but this is not enforced.
  107. Wrap a message to be sent to the other side. Encrypted if encrypt is True.
  108. """
  109. size_info = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
  110. trailer_size = size_info["SecurityTrailer"]
  111. block_size = size_info["BlockSize"]
  112. buffer = win32security.PySecBufferDescType()
  113. # This buffer will contain unencrypted data to wrap, and maybe encrypt.
  114. buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA))
  115. buffer[0].Buffer = msg
  116. # Will receive the token that forms the beginning of the msg
  117. buffer.append(
  118. win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN)
  119. )
  120. # The trailer is needed in case of block encryption
  121. buffer.append(
  122. win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING)
  123. )
  124. fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT
  125. self.ctxt.EncryptMessage(fQOP, buffer, self._get_next_seq_num())
  126. # Sec token, then data, then padding
  127. r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer
  128. return r
  129. def _amend_ctx_name(self):
  130. """Adds initiator and service names in the security context for ease of use"""
  131. if not self.authenticated:
  132. raise ValueError("Sec context is not completely authenticated")
  133. try:
  134. names = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES)
  135. except error:
  136. # The SSP doesn't provide these attributes.
  137. pass
  138. else:
  139. self.initiator_name, self.service_name = names
  140. class ClientAuth(_BaseAuth):
  141. """Manages the client side of an SSPI authentication handshake"""
  142. def __init__(
  143. self,
  144. pkg_name, # Name of the package to used.
  145. client_name=None, # User for whom credentials are used.
  146. auth_info=None, # or a tuple of (username, domain, password)
  147. targetspn=None, # Target security context provider name.
  148. scflags=None, # security context flags
  149. datarep=sspicon.SECURITY_NETWORK_DREP,
  150. ):
  151. if scflags is None:
  152. scflags = (
  153. sspicon.ISC_REQ_INTEGRITY
  154. | sspicon.ISC_REQ_SEQUENCE_DETECT
  155. | sspicon.ISC_REQ_REPLAY_DETECT
  156. | sspicon.ISC_REQ_CONFIDENTIALITY
  157. )
  158. self.scflags = scflags
  159. self.datarep = datarep
  160. self.targetspn = targetspn
  161. self.pkg_info = win32security.QuerySecurityPackageInfo(pkg_name)
  162. (
  163. self.credentials,
  164. self.credentials_expiry,
  165. ) = win32security.AcquireCredentialsHandle(
  166. client_name,
  167. self.pkg_info["Name"],
  168. sspicon.SECPKG_CRED_OUTBOUND,
  169. None,
  170. auth_info,
  171. )
  172. _BaseAuth.__init__(self)
  173. def authorize(self, sec_buffer_in):
  174. """Perform *one* step of the client authentication process. Pass None for the first round"""
  175. if (
  176. sec_buffer_in is not None
  177. and type(sec_buffer_in) != win32security.PySecBufferDescType
  178. ):
  179. # User passed us the raw data - wrap it into a SecBufferDesc
  180. sec_buffer_new = win32security.PySecBufferDescType()
  181. tokenbuf = win32security.PySecBufferType(
  182. self.pkg_info["MaxToken"], sspicon.SECBUFFER_TOKEN
  183. )
  184. tokenbuf.Buffer = sec_buffer_in
  185. sec_buffer_new.append(tokenbuf)
  186. sec_buffer_in = sec_buffer_new
  187. sec_buffer_out = win32security.PySecBufferDescType()
  188. tokenbuf = win32security.PySecBufferType(
  189. self.pkg_info["MaxToken"], sspicon.SECBUFFER_TOKEN
  190. )
  191. sec_buffer_out.append(tokenbuf)
  192. ## input context handle should be NULL on first call
  193. ctxtin = self.ctxt
  194. if self.ctxt is None:
  195. self.ctxt = win32security.PyCtxtHandleType()
  196. err, attr, exp = win32security.InitializeSecurityContext(
  197. self.credentials,
  198. ctxtin,
  199. self.targetspn,
  200. self.scflags,
  201. self.datarep,
  202. sec_buffer_in,
  203. self.ctxt,
  204. sec_buffer_out,
  205. )
  206. # Stash these away incase someone needs to know the state from the
  207. # final call.
  208. self.ctxt_attr = attr
  209. self.ctxt_expiry = exp
  210. if err in (sspicon.SEC_I_COMPLETE_NEEDED, sspicon.SEC_I_COMPLETE_AND_CONTINUE):
  211. self.ctxt.CompleteAuthToken(sec_buffer_out)
  212. self.authenticated = err == 0
  213. if self.authenticated:
  214. self._amend_ctx_name()
  215. return err, sec_buffer_out
  216. class ServerAuth(_BaseAuth):
  217. """Manages the server side of an SSPI authentication handshake"""
  218. def __init__(
  219. self, pkg_name, spn=None, scflags=None, datarep=sspicon.SECURITY_NETWORK_DREP
  220. ):
  221. self.spn = spn
  222. self.datarep = datarep
  223. if scflags is None:
  224. scflags = (
  225. sspicon.ASC_REQ_INTEGRITY
  226. | sspicon.ASC_REQ_SEQUENCE_DETECT
  227. | sspicon.ASC_REQ_REPLAY_DETECT
  228. | sspicon.ASC_REQ_CONFIDENTIALITY
  229. )
  230. # Should we default to sspicon.KerbAddExtraCredentialsMessage
  231. # if pkg_name=='Kerberos'?
  232. self.scflags = scflags
  233. self.pkg_info = win32security.QuerySecurityPackageInfo(pkg_name)
  234. (
  235. self.credentials,
  236. self.credentials_expiry,
  237. ) = win32security.AcquireCredentialsHandle(
  238. spn, self.pkg_info["Name"], sspicon.SECPKG_CRED_INBOUND, None, None
  239. )
  240. _BaseAuth.__init__(self)
  241. def authorize(self, sec_buffer_in):
  242. """Perform *one* step of the server authentication process."""
  243. if (
  244. sec_buffer_in is not None
  245. and type(sec_buffer_in) != win32security.PySecBufferDescType
  246. ):
  247. # User passed us the raw data - wrap it into a SecBufferDesc
  248. sec_buffer_new = win32security.PySecBufferDescType()
  249. tokenbuf = win32security.PySecBufferType(
  250. self.pkg_info["MaxToken"], sspicon.SECBUFFER_TOKEN
  251. )
  252. tokenbuf.Buffer = sec_buffer_in
  253. sec_buffer_new.append(tokenbuf)
  254. sec_buffer_in = sec_buffer_new
  255. sec_buffer_out = win32security.PySecBufferDescType()
  256. tokenbuf = win32security.PySecBufferType(
  257. self.pkg_info["MaxToken"], sspicon.SECBUFFER_TOKEN
  258. )
  259. sec_buffer_out.append(tokenbuf)
  260. ## input context handle is None initially, then handle returned from last call thereafter
  261. ctxtin = self.ctxt
  262. if self.ctxt is None:
  263. self.ctxt = win32security.PyCtxtHandleType()
  264. err, attr, exp = win32security.AcceptSecurityContext(
  265. self.credentials,
  266. ctxtin,
  267. sec_buffer_in,
  268. self.scflags,
  269. self.datarep,
  270. self.ctxt,
  271. sec_buffer_out,
  272. )
  273. # Stash these away incase someone needs to know the state from the
  274. # final call.
  275. self.ctxt_attr = attr
  276. self.ctxt_expiry = exp
  277. if err in (sspicon.SEC_I_COMPLETE_NEEDED, sspicon.SEC_I_COMPLETE_AND_CONTINUE):
  278. self.ctxt.CompleteAuthToken(sec_buffer_out)
  279. self.authenticated = err == 0
  280. if self.authenticated:
  281. self._amend_ctx_name()
  282. return err, sec_buffer_out
  283. if __name__ == "__main__":
  284. # This is the security package (the security support provider / the security backend)
  285. # we want to use for this example.
  286. ssp = "Kerberos" # or "NTLM" or "Negotiate" which enable negotiation between
  287. # Kerberos (prefered) and NTLM (if not supported on the other side).
  288. flags = (
  289. sspicon.ISC_REQ_MUTUAL_AUTH
  290. | sspicon.ISC_REQ_INTEGRITY # mutual authentication
  291. | sspicon.ISC_REQ_SEQUENCE_DETECT # check for integrity
  292. | sspicon.ISC_REQ_CONFIDENTIALITY # enable out-of-order messages
  293. | sspicon.ISC_REQ_REPLAY_DETECT # request confidentiality # request replay detection
  294. )
  295. # Get our identity, mandatory for the Kerberos case *for this example*
  296. # Kerberos cannot be used if we don't tell it the target we want
  297. # to authenticate to.
  298. cred_handle, exp = win32security.AcquireCredentialsHandle(
  299. None, ssp, sspicon.SECPKG_CRED_INBOUND, None, None
  300. )
  301. cred = cred_handle.QueryCredentialsAttributes(sspicon.SECPKG_CRED_ATTR_NAMES)
  302. print("We are:", cred)
  303. # Setup the 2 contexts. In real life, only one is needed: the other one is
  304. # created in the process we want to communicate with.
  305. sspiclient = ClientAuth(ssp, scflags=flags, targetspn=cred)
  306. sspiserver = ServerAuth(ssp, scflags=flags)
  307. print(
  308. "SSP : %s (%s)" % (sspiclient.pkg_info["Name"], sspiclient.pkg_info["Comment"])
  309. )
  310. # Perform the authentication dance, each loop exchanging more information
  311. # on the way to completing authentication.
  312. sec_buffer = None
  313. client_step = 0
  314. server_step = 0
  315. while not (sspiclient.authenticated) or len(sec_buffer[0].Buffer):
  316. client_step += 1
  317. err, sec_buffer = sspiclient.authorize(sec_buffer)
  318. print("Client step %s" % client_step)
  319. if sspiserver.authenticated and len(sec_buffer[0].Buffer) == 0:
  320. break
  321. server_step += 1
  322. err, sec_buffer = sspiserver.authorize(sec_buffer)
  323. print("Server step %s" % server_step)
  324. # Authentication process is finished.
  325. print("Initiator name from the service side:", sspiserver.initiator_name)
  326. print("Service name from the client side: ", sspiclient.service_name)
  327. data = "hello".encode("ascii") # py3k-friendly
  328. # Simple signature, not compatible with GSSAPI.
  329. sig = sspiclient.sign(data)
  330. sspiserver.verify(data, sig)
  331. # Encryption
  332. encrypted, sig = sspiclient.encrypt(data)
  333. decrypted = sspiserver.decrypt(encrypted, sig)
  334. assert decrypted == data
  335. # GSSAPI wrapping, no encryption (NTLM always encrypts)
  336. wrapped = sspiclient.wrap(data)
  337. unwrapped, was_encrypted = sspiserver.unwrap(wrapped)
  338. print("encrypted ?", was_encrypted)
  339. assert data == unwrapped
  340. # GSSAPI wrapping, with encryption
  341. wrapped = sspiserver.wrap(data, encrypt=True)
  342. unwrapped, was_encrypted = sspiclient.unwrap(wrapped)
  343. print("encrypted ?", was_encrypted)
  344. assert data == unwrapped
  345. print("cool!")