|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- # -*- test-case-name: twisted.conch.test.test_ckeygen -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
-
- """
- Implementation module for the `ckeygen` command.
- """
-
-
- import getpass
- import os
- import socket
- import sys
- from functools import wraps
- from imp import reload
-
- from twisted.conch.ssh import keys
- from twisted.python import failure, filepath, log, usage
-
- if getpass.getpass == getpass.unix_getpass: # type: ignore[attr-defined]
- try:
- import termios # hack around broken termios
-
- termios.tcgetattr, termios.tcsetattr
- except (ImportError, AttributeError):
- sys.modules["termios"] = None # type: ignore[assignment]
- reload(getpass)
-
- supportedKeyTypes = dict()
-
-
- def _keyGenerator(keyType):
- def assignkeygenerator(keygenerator):
- @wraps(keygenerator)
- def wrapper(*args, **kwargs):
- return keygenerator(*args, **kwargs)
-
- supportedKeyTypes[keyType] = wrapper
- return wrapper
-
- return assignkeygenerator
-
-
- class GeneralOptions(usage.Options):
- synopsis = """Usage: ckeygen [options]
- """
-
- longdesc = "ckeygen manipulates public/private keys in various ways."
-
- optParameters = [
- ["bits", "b", None, "Number of bits in the key to create."],
- ["filename", "f", None, "Filename of the key file."],
- ["type", "t", None, "Specify type of key to create."],
- ["comment", "C", None, "Provide new comment."],
- ["newpass", "N", None, "Provide new passphrase."],
- ["pass", "P", None, "Provide old passphrase."],
- ["format", "o", "sha256-base64", "Fingerprint format of key file."],
- [
- "private-key-subtype",
- None,
- None,
- 'OpenSSH private key subtype to write ("PEM" or "v1").',
- ],
- ]
-
- optFlags = [
- ["fingerprint", "l", "Show fingerprint of key file."],
- ["changepass", "p", "Change passphrase of private key file."],
- ["quiet", "q", "Quiet."],
- ["no-passphrase", None, "Create the key with no passphrase."],
- ["showpub", "y", "Read private key file and print public key."],
- ]
-
- compData = usage.Completions(
- optActions={
- "type": usage.CompleteList(list(supportedKeyTypes.keys())),
- "private-key-subtype": usage.CompleteList(["PEM", "v1"]),
- }
- )
-
-
- def run():
- options = GeneralOptions()
- try:
- options.parseOptions(sys.argv[1:])
- except usage.UsageError as u:
- print("ERROR: %s" % u)
- options.opt_help()
- sys.exit(1)
- log.discardLogs()
- log.deferr = handleError # HACK
- if options["type"]:
- if options["type"].lower() in supportedKeyTypes:
- print("Generating public/private %s key pair." % (options["type"]))
- supportedKeyTypes[options["type"].lower()](options)
- else:
- sys.exit(
- "Key type was %s, must be one of %s"
- % (options["type"], ", ".join(supportedKeyTypes.keys()))
- )
- elif options["fingerprint"]:
- printFingerprint(options)
- elif options["changepass"]:
- changePassPhrase(options)
- elif options["showpub"]:
- displayPublicKey(options)
- else:
- options.opt_help()
- sys.exit(1)
-
-
- def enumrepresentation(options):
- if options["format"] == "md5-hex":
- options["format"] = keys.FingerprintFormats.MD5_HEX
- return options
- elif options["format"] == "sha256-base64":
- options["format"] = keys.FingerprintFormats.SHA256_BASE64
- return options
- else:
- raise keys.BadFingerPrintFormat(
- "Unsupported fingerprint format: {}".format(options["format"])
- )
-
-
- def handleError():
- global exitStatus
- exitStatus = 2
- log.err(failure.Failure())
- raise
-
-
- @_keyGenerator("rsa")
- def generateRSAkey(options):
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.asymmetric import rsa
-
- if not options["bits"]:
- options["bits"] = 2048
- keyPrimitive = rsa.generate_private_key(
- key_size=int(options["bits"]),
- public_exponent=65537,
- backend=default_backend(),
- )
- key = keys.Key(keyPrimitive)
- _saveKey(key, options)
-
-
- @_keyGenerator("dsa")
- def generateDSAkey(options):
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.asymmetric import dsa
-
- if not options["bits"]:
- options["bits"] = 1024
- keyPrimitive = dsa.generate_private_key(
- key_size=int(options["bits"]),
- backend=default_backend(),
- )
- key = keys.Key(keyPrimitive)
- _saveKey(key, options)
-
-
- @_keyGenerator("ecdsa")
- def generateECDSAkey(options):
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.asymmetric import ec
-
- if not options["bits"]:
- options["bits"] = 256
- # OpenSSH supports only mandatory sections of RFC5656.
- # See https://www.openssh.com/txt/release-5.7
- curve = b"ecdsa-sha2-nistp" + str(options["bits"]).encode("ascii")
- keyPrimitive = ec.generate_private_key(
- curve=keys._curveTable[curve], backend=default_backend()
- )
- key = keys.Key(keyPrimitive)
- _saveKey(key, options)
-
-
- @_keyGenerator("ed25519")
- def generateEd25519key(options):
- keyPrimitive = keys.Ed25519PrivateKey.generate()
- key = keys.Key(keyPrimitive)
- _saveKey(key, options)
-
-
- def _defaultPrivateKeySubtype(keyType):
- """
- Return a reasonable default private key subtype for a given key type.
-
- @type keyType: L{str}
- @param keyType: A key type, as returned by
- L{twisted.conch.ssh.keys.Key.type}.
-
- @rtype: L{str}
- @return: A private OpenSSH key subtype (C{'PEM'} or C{'v1'}).
- """
- if keyType == "Ed25519":
- # No PEM format is defined for Ed25519 keys.
- return "v1"
- else:
- return "PEM"
-
-
- def printFingerprint(options):
- if not options["filename"]:
- filename = os.path.expanduser("~/.ssh/id_rsa")
- options["filename"] = input("Enter file in which the key is (%s): " % filename)
- if os.path.exists(options["filename"] + ".pub"):
- options["filename"] += ".pub"
- options = enumrepresentation(options)
- try:
- key = keys.Key.fromFile(options["filename"])
- print(
- "%s %s %s"
- % (
- key.size(),
- key.fingerprint(options["format"]),
- os.path.basename(options["filename"]),
- )
- )
- except keys.BadKeyError:
- sys.exit("bad key")
-
-
- def changePassPhrase(options):
- if not options["filename"]:
- filename = os.path.expanduser("~/.ssh/id_rsa")
- options["filename"] = input("Enter file in which the key is (%s): " % filename)
- try:
- key = keys.Key.fromFile(options["filename"])
- except keys.EncryptedKeyError:
- # Raised if password not supplied for an encrypted key
- if not options.get("pass"):
- options["pass"] = getpass.getpass("Enter old passphrase: ")
- try:
- key = keys.Key.fromFile(options["filename"], passphrase=options["pass"])
- except keys.BadKeyError:
- sys.exit("Could not change passphrase: old passphrase error")
- except keys.EncryptedKeyError as e:
- sys.exit(f"Could not change passphrase: {e}")
- except keys.BadKeyError as e:
- sys.exit(f"Could not change passphrase: {e}")
-
- if not options.get("newpass"):
- while 1:
- p1 = getpass.getpass("Enter new passphrase (empty for no passphrase): ")
- p2 = getpass.getpass("Enter same passphrase again: ")
- if p1 == p2:
- break
- print("Passphrases do not match. Try again.")
- options["newpass"] = p1
-
- if options.get("private-key-subtype") is None:
- options["private-key-subtype"] = _defaultPrivateKeySubtype(key.type())
-
- try:
- newkeydata = key.toString(
- "openssh",
- subtype=options["private-key-subtype"],
- passphrase=options["newpass"],
- )
- except Exception as e:
- sys.exit(f"Could not change passphrase: {e}")
-
- try:
- keys.Key.fromString(newkeydata, passphrase=options["newpass"])
- except (keys.EncryptedKeyError, keys.BadKeyError) as e:
- sys.exit(f"Could not change passphrase: {e}")
-
- with open(options["filename"], "wb") as fd:
- fd.write(newkeydata)
-
- print("Your identification has been saved with the new passphrase.")
-
-
- def displayPublicKey(options):
- if not options["filename"]:
- filename = os.path.expanduser("~/.ssh/id_rsa")
- options["filename"] = input("Enter file in which the key is (%s): " % filename)
- try:
- key = keys.Key.fromFile(options["filename"])
- except keys.EncryptedKeyError:
- if not options.get("pass"):
- options["pass"] = getpass.getpass("Enter passphrase: ")
- key = keys.Key.fromFile(options["filename"], passphrase=options["pass"])
- displayKey = key.public().toString("openssh").decode("ascii")
- print(displayKey)
-
-
- def _inputSaveFile(prompt: str) -> str:
- """
- Ask the user where to save the key.
-
- This needs to be a separate function so the unit test can patch it.
- """
- return input(prompt)
-
-
- def _saveKey(key, options):
- """
- Persist a SSH key on local filesystem.
-
- @param key: Key which is persisted on local filesystem.
- @type key: C{keys.Key} implementation.
-
- @param options:
- @type options: L{dict}
- """
- KeyTypeMapping = {"EC": "ecdsa", "Ed25519": "ed25519", "RSA": "rsa", "DSA": "dsa"}
- keyTypeName = KeyTypeMapping[key.type()]
- if not options["filename"]:
- defaultPath = os.path.expanduser(f"~/.ssh/id_{keyTypeName}")
- newPath = _inputSaveFile(
- f"Enter file in which to save the key ({defaultPath}): "
- )
-
- options["filename"] = newPath.strip() or defaultPath
-
- if os.path.exists(options["filename"]):
- print("{} already exists.".format(options["filename"]))
- yn = input("Overwrite (y/n)? ")
- if yn[0].lower() != "y":
- sys.exit()
-
- if options.get("no-passphrase"):
- options["pass"] = b""
- elif not options["pass"]:
- while 1:
- p1 = getpass.getpass("Enter passphrase (empty for no passphrase): ")
- p2 = getpass.getpass("Enter same passphrase again: ")
- if p1 == p2:
- break
- print("Passphrases do not match. Try again.")
- options["pass"] = p1
-
- if options.get("private-key-subtype") is None:
- options["private-key-subtype"] = _defaultPrivateKeySubtype(key.type())
-
- comment = f"{getpass.getuser()}@{socket.gethostname()}"
-
- filepath.FilePath(options["filename"]).setContent(
- key.toString(
- "openssh",
- subtype=options["private-key-subtype"],
- passphrase=options["pass"],
- )
- )
- os.chmod(options["filename"], 33152)
-
- filepath.FilePath(options["filename"] + ".pub").setContent(
- key.public().toString("openssh", comment=comment)
- )
- options = enumrepresentation(options)
-
- print("Your identification has been saved in {}".format(options["filename"]))
- print("Your public key has been saved in {}.pub".format(options["filename"]))
- print("The key fingerprint in {} is:".format(options["format"]))
- print(key.fingerprint(options["format"]))
-
-
- if __name__ == "__main__":
- run()
|