diff --git a/lib/ansible/modules/crypto/openssl_publickey.py b/lib/ansible/modules/crypto/openssl_publickey.py index b8d1bd2f118..e57c0249dda 100644 --- a/lib/ansible/modules/crypto/openssl_publickey.py +++ b/lib/ansible/modules/crypto/openssl_publickey.py @@ -131,10 +131,12 @@ fingerprint: sha512: "fd:ed:5e:39:48:5f:9f:fe:7f:25:06:3f:79:08:cd:ee:a5:e7:b3:3d:13:82:87:1f:84:e1:f5:c7:28:77:53:94:86:56:38:69:f0:d9:35:22:01:1e:a6:60:...:0f:9b" ''' -import errno +import hashlib +import os + +from ansible.module_utils import crypto as crypto_utils +from ansible.module_utils._text import to_native from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.crypto import get_fingerprint -from ansible.module_utils.pycompat24 import get_exception try: from OpenSSL import crypto @@ -145,37 +147,38 @@ except ImportError: else: pyopenssl_found = True -import os -from ansible.module_utils._text import to_native - -class PublicKeyError(Exception): +class PublicKeyError(crypto_utils.OpenSSLObjectError): pass -class PublicKey(object): +class PublicKey(crypto_utils.OpenSSLObject): def __init__(self, module): - self.state = module.params['state'] - self.force = module.params['force'] + super(PublicKey, self).__init__( + module.params['path'], + module.params['state'], + module.params['force'], + module.check_mode + ) self.format = module.params['format'] - self.name = os.path.basename(module.params['path']) - self.path = module.params['path'] self.privatekey_path = module.params['privatekey_path'] self.privatekey_passphrase = module.params['privatekey_passphrase'] self.privatekey = None - self.changed = True self.fingerprint = {} - self.check_mode = module.check_mode def generate(self, module): """Generate the public key.""" - if not os.path.exists(self.path) or self.force: - try: - privatekey_content = open(self.privatekey_path, 'r').read() + if not os.path.exists(self.privatekey_path): + raise PublicKeyError( + 'The private key %s does not exist' % self.privatekey_path + ) + if not self.check(module, perms_required=False) or self.force: + try: if self.format == 'OpenSSH': + privatekey_content = open(self.privatekey_path, 'rb').read() key = crypto_serialization.load_pem_private_key(privatekey_content, password=self.privatekey_passphrase, backend=default_backend()) @@ -184,39 +187,57 @@ class PublicKey(object): crypto_serialization.PublicFormat.OpenSSH ) else: - self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content) + self.privatekey = crypto_utils.load_privatekey( + self.privatekey_path, self.privatekey_passphrase + ) publickey_content = crypto.dump_publickey(crypto.FILETYPE_PEM, self.privatekey) - publickey_file = open(self.path, 'wb') - publickey_file.write(publickey_content) - publickey_file.close() + with open(self.path, 'wb') as publickey_file: + publickey_file.write(publickey_content) - file_args = module.load_file_common_arguments(module.params) - if module.set_fs_attributes_if_different(file_args, False): - self.changed = True + self.changed = True except (IOError, OSError) as exc: raise PublicKeyError(exc) except AttributeError as exc: self.remove() raise PublicKeyError('You need to have PyOpenSSL>=16.0.0 to generate public keys') - else: - self.changed = False + self.fingerprint = crypto_utils.get_fingerprint( + self.privatekey_path, + self.privatekey_passphrase + ) file_args = module.load_file_common_arguments(module.params) - self.fingerprint = get_fingerprint(self.privatekey_path, self.privatekey_passphrase) if module.set_fs_attributes_if_different(file_args, False): self.changed = True - def remove(self): - """Remove the public key from the filesystem.""" + def check(self, module, perms_required=True): + """Ensure the resource is in its desired state.""" + + state_and_perms = super(PublicKey, self).check(module, perms_required) + + def _check_privatekey(): + if not os.path.exists(self.privatekey_path): + return False + + current_publickey = crypto.dump_publickey( + crypto.FILETYPE_ASN1, + crypto.load_publickey(crypto.FILETYPE_PEM, open(self.path, 'rb').read()) + ) + + desired_publickey = crypto.dump_publickey( + crypto.FILETYPE_ASN1, + crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase) + ) + + return hashlib.md5(current_publickey).hexdigest() == hashlib.md5(desired_publickey).hexdigest() + + if not state_and_perms: + return state_and_perms + + return _check_privatekey() + + - try: - os.remove(self.path) - except OSError as exc: - if exc.errno != errno.ENOENT: - raise PublicKeyError(exc) - else: - self.changed = False def dump(self): """Serialize the object into a dictionary.""" @@ -235,7 +256,7 @@ class PublicKey(object): def main(): module = AnsibleModule( - argument_spec = dict( + argument_spec=dict( state=dict(default='present', choices=['present', 'absent'], type='str'), force=dict(default=False, type='bool'), path=dict(required=True, type='path'), @@ -243,34 +264,28 @@ def main(): format=dict(type='str', choices=['PEM', 'OpenSSH'], default='PEM'), privatekey_passphrase=dict(type='path', no_log=True), ), - supports_check_mode = True, - add_file_common_args = True, + supports_check_mode=True, + add_file_common_args=True, + required_if=[('state', 'present', ['privatekey_path'])] ) if not pyopenssl_found: module.fail_json(msg='the python pyOpenSSL module is required') - path = module.params['path'] - privatekey_path = module.params['privatekey_path'] base_dir = os.path.dirname(module.params['path']) - if not os.path.isdir(base_dir): - module.fail_json(name=base_dir, msg='The directory %s does not exist or the file is not a directory' % base_dir) + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) public_key = PublicKey(module) + if public_key.state == 'present': - # This is only applicable when generating a new public key. - # When removing one the privatekey_path should not be required. - if not privatekey_path: - module.fail_json(msg='When generating a new public key you must specify a private key') - - if not os.path.exists(privatekey_path): - module.fail_json(name=privatekey_path, msg='The private key %s does not exist' % privatekey_path) - if module.check_mode: result = public_key.dump() - result['changed'] = module.params['force'] or not os.path.exists(path) + result['changed'] = module.params['force'] or not public_key.check(module) module.exit_json(**result) try: @@ -281,7 +296,7 @@ def main(): if module.check_mode: result = public_key.dump() - result['changed'] = os.path.exists(path) + result['changed'] = os.path.exists(module.params['path']) module.exit_json(**result) try: