123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- # -*- test-case-name: twisted.conch.test.test_checkers -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
-
- """
- Provide L{ICredentialsChecker} implementations to be used in Conch protocols.
- """
-
-
- import binascii
- import errno
- import sys
- from base64 import decodebytes
- from typing import IO, Callable, Iterable, Iterator, Mapping, Optional, Tuple, cast
-
- from zope.interface import Interface, implementer, providedBy
-
- from incremental import Version
- from typing_extensions import Literal, Protocol
-
- from twisted.conch import error
- from twisted.conch.ssh import keys
- from twisted.cred.checkers import ICredentialsChecker
- from twisted.cred.credentials import ISSHPrivateKey, IUsernamePassword
- from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
- from twisted.internet import defer
- from twisted.logger import Logger
- from twisted.plugins.cred_unix import verifyCryptedPassword
- from twisted.python import failure, reflect
- from twisted.python.deprecate import deprecatedModuleAttribute
- from twisted.python.filepath import FilePath
- from twisted.python.util import runAsEffectiveUser
-
- _log = Logger()
-
-
- class UserRecord(Tuple[str, str, int, int, str, str, str]):
- """
- A record in a UNIX-style password database. See L{pwd} for field details.
-
- This corresponds to the undocumented type L{pwd.struct_passwd}, but lacks named
- field accessors.
- """
-
- @property
- def pw_dir(self) -> str:
- ...
-
-
- class UserDB(Protocol):
- """
- A database of users by name, like the stdlib L{pwd} module.
-
- See L{twisted.python.fakepwd} for an in-memory implementation.
- """
-
- def getpwnam(self, username: str) -> UserRecord:
- """
- Lookup a user record by name.
-
- @raises KeyError: when no such user exists
- """
-
-
- pwd: Optional[UserDB]
- try:
- import pwd as _pwd
- except ImportError:
- pwd = None
- else:
- pwd = cast(UserDB, _pwd)
-
-
- try:
- import spwd as _spwd
- except ImportError:
- spwd = None
- else:
- spwd = _spwd
-
-
- class CryptedPasswordRecord(Protocol):
- """
- A sequence where the item at index 1 may be a crypted password.
-
- Both L{pwd.struct_passwd} and L{spwd.struct_spwd} conform to this protocol.
- """
-
- def __getitem__(self, index: Literal[1]) -> str:
- """
- Get the crypted password.
- """
-
-
- def _lookupUser(userdb: UserDB, username: bytes) -> UserRecord:
- """
- Lookup a user by name in a L{pwd}-style database.
-
- @param userdb: The user database.
-
- @param username: Identifying name in bytes. This will be decoded according
- to the filesystem encoding, as the L{pwd} module does internally.
-
- @raises KeyError: when the user doesn't exist
- """
- return userdb.getpwnam(username.decode(sys.getfilesystemencoding()))
-
-
- def _pwdGetByName(username: str) -> Optional[CryptedPasswordRecord]:
- """
- Look up a user in the /etc/passwd database using the pwd module. If the
- pwd module is not available, return None.
-
- @param username: the username of the user to return the passwd database
- information for.
-
- @returns: A L{pwd.struct_passwd}, where field 1 may contain a crypted
- password, or L{None} when the L{pwd} database is unavailable.
-
- @raises KeyError: when no such user exists
- """
- if pwd is None:
- return None
- return cast(CryptedPasswordRecord, pwd.getpwnam(username))
-
-
- def _shadowGetByName(username: str) -> Optional[CryptedPasswordRecord]:
- """
- Look up a user in the /etc/shadow database using the spwd module. If it is
- not available, return L{None}.
-
- @param username: the username of the user to return the shadow database
- information for.
- @type username: L{str}
-
- @returns: A L{spwd.struct_spwd}, where field 1 may contain a crypted
- password, or L{None} when the L{spwd} database is unavailable.
-
- @raises KeyError: when no such user exists
- """
- if spwd is not None:
- f = spwd.getspnam
- else:
- return None
- return cast(CryptedPasswordRecord, runAsEffectiveUser(0, 0, f, username))
-
-
- @implementer(ICredentialsChecker)
- class UNIXPasswordDatabase:
- """
- A checker which validates users out of the UNIX password databases, or
- databases of a compatible format.
-
- @ivar _getByNameFunctions: a C{list} of functions which are called in order
- to validate a user. The default value is such that the C{/etc/passwd}
- database will be tried first, followed by the C{/etc/shadow} database.
- """
-
- credentialInterfaces = (IUsernamePassword,)
-
- def __init__(self, getByNameFunctions=None):
- if getByNameFunctions is None:
- getByNameFunctions = [_pwdGetByName, _shadowGetByName]
- self._getByNameFunctions = getByNameFunctions
-
- def requestAvatarId(self, credentials):
- # We get bytes, but the Py3 pwd module uses str. So attempt to decode
- # it using the same method that CPython does for the file on disk.
- username = credentials.username.decode(sys.getfilesystemencoding())
- password = credentials.password.decode(sys.getfilesystemencoding())
-
- for func in self._getByNameFunctions:
- try:
- pwnam = func(username)
- except KeyError:
- return defer.fail(UnauthorizedLogin("invalid username"))
- else:
- if pwnam is not None:
- crypted = pwnam[1]
- if crypted == "":
- continue
-
- if verifyCryptedPassword(crypted, password):
- return defer.succeed(credentials.username)
- # fallback
- return defer.fail(UnauthorizedLogin("unable to verify password"))
-
-
- @implementer(ICredentialsChecker)
- class SSHPublicKeyDatabase:
- """
- Checker that authenticates SSH public keys, based on public keys listed in
- authorized_keys and authorized_keys2 files in user .ssh/ directories.
- """
-
- credentialInterfaces = (ISSHPrivateKey,)
-
- _userdb: UserDB = cast(UserDB, pwd)
-
- def requestAvatarId(self, credentials):
- d = defer.maybeDeferred(self.checkKey, credentials)
- d.addCallback(self._cbRequestAvatarId, credentials)
- d.addErrback(self._ebRequestAvatarId)
- return d
-
- def _cbRequestAvatarId(self, validKey, credentials):
- """
- Check whether the credentials themselves are valid, now that we know
- if the key matches the user.
-
- @param validKey: A boolean indicating whether or not the public key
- matches a key in the user's authorized_keys file.
-
- @param credentials: The credentials offered by the user.
- @type credentials: L{ISSHPrivateKey} provider
-
- @raise UnauthorizedLogin: (as a failure) if the key does not match the
- user in C{credentials}. Also raised if the user provides an invalid
- signature.
-
- @raise ValidPublicKey: (as a failure) if the key matches the user but
- the credentials do not include a signature. See
- L{error.ValidPublicKey} for more information.
-
- @return: The user's username, if authentication was successful.
- """
- if not validKey:
- return failure.Failure(UnauthorizedLogin("invalid key"))
- if not credentials.signature:
- return failure.Failure(error.ValidPublicKey())
- else:
- try:
- pubKey = keys.Key.fromString(credentials.blob)
- if pubKey.verify(credentials.signature, credentials.sigData):
- return credentials.username
- except Exception: # any error should be treated as a failed login
- _log.failure("Error while verifying key")
- return failure.Failure(UnauthorizedLogin("error while verifying key"))
- return failure.Failure(UnauthorizedLogin("unable to verify key"))
-
- def getAuthorizedKeysFiles(self, credentials):
- """
- Return a list of L{FilePath} instances for I{authorized_keys} files
- which might contain information about authorized keys for the given
- credentials.
-
- On OpenSSH servers, the default location of the file containing the
- list of authorized public keys is
- U{$HOME/.ssh/authorized_keys<http://www.openbsd.org/cgi-bin/man.cgi?query=sshd_config>}.
-
- I{$HOME/.ssh/authorized_keys2} is also returned, though it has been
- U{deprecated by OpenSSH since
- 2001<http://marc.info/?m=100508718416162>}.
-
- @return: A list of L{FilePath} instances to files with the authorized keys.
- """
- pwent = _lookupUser(self._userdb, credentials.username)
- root = FilePath(pwent.pw_dir).child(".ssh")
- files = ["authorized_keys", "authorized_keys2"]
- return [root.child(f) for f in files]
-
- def checkKey(self, credentials):
- """
- Retrieve files containing authorized keys and check against user
- credentials.
- """
- ouid, ogid = _lookupUser(self._userdb, credentials.username)[2:4]
- for filepath in self.getAuthorizedKeysFiles(credentials):
- if not filepath.exists():
- continue
- try:
- lines = filepath.open()
- except OSError as e:
- if e.errno == errno.EACCES:
- lines = runAsEffectiveUser(ouid, ogid, filepath.open)
- else:
- raise
- with lines:
- for l in lines:
- l2 = l.split()
- if len(l2) < 2:
- continue
- try:
- if decodebytes(l2[1]) == credentials.blob:
- return True
- except binascii.Error:
- continue
- return False
-
- def _ebRequestAvatarId(self, f):
- if not f.check(UnauthorizedLogin):
- _log.error(
- "Unauthorized login due to internal error: {error}", error=f.value
- )
- return failure.Failure(UnauthorizedLogin("unable to get avatar id"))
- return f
-
-
- @implementer(ICredentialsChecker)
- class SSHProtocolChecker:
- """
- SSHProtocolChecker is a checker that requires multiple authentications
- to succeed. To add a checker, call my registerChecker method with
- the checker and the interface.
-
- After each successful authenticate, I call my areDone method with the
- avatar id. To get a list of the successful credentials for an avatar id,
- use C{SSHProcotolChecker.successfulCredentials[avatarId]}. If L{areDone}
- returns True, the authentication has succeeded.
- """
-
- def __init__(self):
- self.checkers = {}
- self.successfulCredentials = {}
-
- @property
- def credentialInterfaces(self):
- return list(self.checkers.keys())
-
- def registerChecker(self, checker, *credentialInterfaces):
- if not credentialInterfaces:
- credentialInterfaces = checker.credentialInterfaces
- for credentialInterface in credentialInterfaces:
- self.checkers[credentialInterface] = checker
-
- def requestAvatarId(self, credentials):
- """
- Part of the L{ICredentialsChecker} interface. Called by a portal with
- some credentials to check if they'll authenticate a user. We check the
- interfaces that the credentials provide against our list of acceptable
- checkers. If one of them matches, we ask that checker to verify the
- credentials. If they're valid, we call our L{_cbGoodAuthentication}
- method to continue.
-
- @param credentials: the credentials the L{Portal} wants us to verify
- """
- ifac = providedBy(credentials)
- for i in ifac:
- c = self.checkers.get(i)
- if c is not None:
- d = defer.maybeDeferred(c.requestAvatarId, credentials)
- return d.addCallback(self._cbGoodAuthentication, credentials)
- return defer.fail(
- UnhandledCredentials(
- "No checker for %s" % ", ".join(map(reflect.qual, ifac))
- )
- )
-
- def _cbGoodAuthentication(self, avatarId, credentials):
- """
- Called if a checker has verified the credentials. We call our
- L{areDone} method to see if the whole of the successful authentications
- are enough. If they are, we return the avatar ID returned by the first
- checker.
- """
- if avatarId not in self.successfulCredentials:
- self.successfulCredentials[avatarId] = []
- self.successfulCredentials[avatarId].append(credentials)
- if self.areDone(avatarId):
- del self.successfulCredentials[avatarId]
- return avatarId
- else:
- raise error.NotEnoughAuthentication()
-
- def areDone(self, avatarId):
- """
- Override to determine if the authentication is finished for a given
- avatarId.
-
- @param avatarId: the avatar returned by the first checker. For
- this checker to function correctly, all the checkers must
- return the same avatar ID.
- """
- return True
-
-
- deprecatedModuleAttribute(
- Version("Twisted", 15, 0, 0),
- (
- "Please use twisted.conch.checkers.SSHPublicKeyChecker, "
- "initialized with an instance of "
- "twisted.conch.checkers.UNIXAuthorizedKeysFiles instead."
- ),
- __name__,
- "SSHPublicKeyDatabase",
- )
-
-
- class IAuthorizedKeysDB(Interface):
- """
- An object that provides valid authorized ssh keys mapped to usernames.
-
- @since: 15.0
- """
-
- def getAuthorizedKeys(avatarId):
- """
- Gets an iterable of authorized keys that are valid for the given
- C{avatarId}.
-
- @param avatarId: the ID of the avatar
- @type avatarId: valid return value of
- L{twisted.cred.checkers.ICredentialsChecker.requestAvatarId}
-
- @return: an iterable of L{twisted.conch.ssh.keys.Key}
- """
-
-
- def readAuthorizedKeyFile(
- fileobj: IO[bytes], parseKey: Callable[[bytes], keys.Key] = keys.Key.fromString
- ) -> Iterator[keys.Key]:
- """
- Reads keys from an authorized keys file. Any non-comment line that cannot
- be parsed as a key will be ignored, although that particular line will
- be logged.
-
- @param fileobj: something from which to read lines which can be parsed
- as keys
- @param parseKey: a callable that takes bytes and returns a
- L{twisted.conch.ssh.keys.Key}, mainly to be used for testing. The
- default is L{twisted.conch.ssh.keys.Key.fromString}.
- @return: an iterable of L{twisted.conch.ssh.keys.Key}
- @since: 15.0
- """
- for line in fileobj:
- line = line.strip()
- if line and not line.startswith(b"#"): # for comments
- try:
- yield parseKey(line)
- except keys.BadKeyError as e:
- _log.error(
- "Unable to parse line {line!r} as a key: {error!s}",
- line=line,
- error=e,
- )
-
-
- def _keysFromFilepaths(
- filepaths: Iterable[FilePath], parseKey: Callable[[bytes], keys.Key]
- ) -> Iterable[keys.Key]:
- """
- Helper function that turns an iterable of filepaths into a generator of
- keys. If any file cannot be read, a message is logged but it is
- otherwise ignored.
-
- @param filepaths: iterable of L{twisted.python.filepath.FilePath}.
- @type filepaths: iterable
-
- @param parseKey: a callable that takes a string and returns a
- L{twisted.conch.ssh.keys.Key}
- @type parseKey: L{callable}
-
- @return: generator of L{twisted.conch.ssh.keys.Key}
-
- @since: 15.0
- """
- for fp in filepaths:
- if fp.exists():
- try:
- with fp.open() as f:
- yield from readAuthorizedKeyFile(f, parseKey)
- except OSError as e:
- _log.error("Unable to read {path!r}: {error!s}", path=fp.path, error=e)
-
-
- @implementer(IAuthorizedKeysDB)
- class InMemorySSHKeyDB:
- """
- Object that provides SSH public keys based on a dictionary of usernames
- mapped to L{twisted.conch.ssh.keys.Key}s.
-
- @since: 15.0
- """
-
- def __init__(self, mapping: Mapping[bytes, Iterable[keys.Key]]) -> None:
- """
- Initializes a new L{InMemorySSHKeyDB}.
-
- @param mapping: mapping of usernames to iterables of
- L{twisted.conch.ssh.keys.Key}s
-
- """
- self._mapping = mapping
-
- def getAuthorizedKeys(self, username: bytes) -> Iterable[keys.Key]:
- """
- Look up the authorized keys for a user.
-
- @param username: Name of the user
- """
- return self._mapping.get(username, [])
-
-
- @implementer(IAuthorizedKeysDB)
- class UNIXAuthorizedKeysFiles:
- """
- Object that provides SSH public keys based on public keys listed in
- authorized_keys and authorized_keys2 files in UNIX user .ssh/ directories.
- If any of the files cannot be read, a message is logged but that file is
- otherwise ignored.
-
- @since: 15.0
- """
-
- _userdb: UserDB
-
- def __init__(
- self,
- userdb: Optional[UserDB] = None,
- parseKey: Callable[[bytes], keys.Key] = keys.Key.fromString,
- ):
- """
- Initializes a new L{UNIXAuthorizedKeysFiles}.
-
- @param userdb: access to the Unix user account and password database
- (default is the Python module L{pwd}, if available)
-
- @param parseKey: a callable that takes a string and returns a
- L{twisted.conch.ssh.keys.Key}, mainly to be used for testing. The
- default is L{twisted.conch.ssh.keys.Key.fromString}.
- """
- if userdb is not None:
- self._userdb = userdb
- elif pwd is not None:
- self._userdb = pwd
- else:
- raise ValueError("No pwd module found, and no userdb argument passed.")
- self._parseKey = parseKey
-
- def getAuthorizedKeys(self, username: bytes) -> Iterable[keys.Key]:
- try:
- passwd = _lookupUser(self._userdb, username)
- except KeyError:
- return ()
-
- root = FilePath(passwd.pw_dir).child(".ssh")
- files = ["authorized_keys", "authorized_keys2"]
- return _keysFromFilepaths((root.child(f) for f in files), self._parseKey)
-
-
- @implementer(ICredentialsChecker)
- class SSHPublicKeyChecker:
- """
- Checker that authenticates SSH public keys, based on public keys listed in
- authorized_keys and authorized_keys2 files in user .ssh/ directories.
-
- Initializing this checker with a L{UNIXAuthorizedKeysFiles} should be
- used instead of L{twisted.conch.checkers.SSHPublicKeyDatabase}.
-
- @since: 15.0
- """
-
- credentialInterfaces = (ISSHPrivateKey,)
-
- def __init__(self, keydb: IAuthorizedKeysDB) -> None:
- """
- Initializes a L{SSHPublicKeyChecker}.
-
- @param keydb: a provider of L{IAuthorizedKeysDB}
- """
- self._keydb = keydb
-
- def requestAvatarId(self, credentials):
- d = defer.execute(self._sanityCheckKey, credentials)
- d.addCallback(self._checkKey, credentials)
- d.addCallback(self._verifyKey, credentials)
- return d
-
- def _sanityCheckKey(self, credentials):
- """
- Checks whether the provided credentials are a valid SSH key with a
- signature (does not actually verify the signature).
-
- @param credentials: the credentials offered by the user
- @type credentials: L{ISSHPrivateKey} provider
-
- @raise ValidPublicKey: the credentials do not include a signature. See
- L{error.ValidPublicKey} for more information.
-
- @raise BadKeyError: The key included with the credentials is not
- recognized as a key.
-
- @return: the key in the credentials
- @rtype: L{twisted.conch.ssh.keys.Key}
- """
- if not credentials.signature:
- raise error.ValidPublicKey()
-
- return keys.Key.fromString(credentials.blob)
-
- def _checkKey(self, pubKey, credentials):
- """
- Checks the public key against all authorized keys (if any) for the
- user.
-
- @param pubKey: the key in the credentials (just to prevent it from
- having to be calculated again)
- @type pubKey:
-
- @param credentials: the credentials offered by the user
- @type credentials: L{ISSHPrivateKey} provider
-
- @raise UnauthorizedLogin: If the key is not authorized, or if there
- was any error obtaining a list of authorized keys for the user.
-
- @return: C{pubKey} if the key is authorized
- @rtype: L{twisted.conch.ssh.keys.Key}
- """
- if any(
- key == pubKey for key in self._keydb.getAuthorizedKeys(credentials.username)
- ):
- return pubKey
-
- raise UnauthorizedLogin("Key not authorized")
-
- def _verifyKey(self, pubKey, credentials):
- """
- Checks whether the credentials themselves are valid, now that we know
- if the key matches the user.
-
- @param pubKey: the key in the credentials (just to prevent it from
- having to be calculated again)
- @type pubKey: L{twisted.conch.ssh.keys.Key}
-
- @param credentials: the credentials offered by the user
- @type credentials: L{ISSHPrivateKey} provider
-
- @raise UnauthorizedLogin: If the key signature is invalid or there
- was any error verifying the signature.
-
- @return: The user's username, if authentication was successful
- @rtype: L{bytes}
- """
- try:
- if pubKey.verify(credentials.signature, credentials.sigData):
- return credentials.username
- except Exception as e: # Any error should be treated as a failed login
- raise UnauthorizedLogin("Error while verifying key") from e
-
- raise UnauthorizedLogin("Key signature invalid.")
|