openssl_* modules: allow direct input for some files (#66384)

* Allow to not read content from file.

* Allow to feed content directly into _info modules.

* Allow to feed non-primary content into openssl_certificate, openssl_csr and openssl_publickey.

* Rename changelog.
This commit is contained in:
Felix Fontein 2020-01-12 19:12:33 +01:00 committed by GitHub
parent 011e0176c2
commit c380b18dcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 412 additions and 128 deletions

View file

@ -0,0 +1,7 @@
minor_changes:
- "openssl_certificate_info - allow to provide certificate content via ``content`` option (https://github.com/ansible/ansible/issues/64776)."
- "openssl_csr_info - allow to provide CSR content via ``content`` option."
- "openssl_privatekey_info - allow to provide private key content via ``content`` option."
- "openssl_certificate - allow to provide content of some input files via the ``csr_content``, ``privatekey_content``, ``ownca_privatekey_content`` and ``ownca_content`` options."
- "openssl_csr - allow to provide private key content via ``private_key_content`` option."
- "openssl_publickey - allow to provide private key content via ``private_key_content`` option."

View file

@ -166,10 +166,10 @@ def get_fingerprint_of_bytes(source):
return fingerprint
def get_fingerprint(path, passphrase=None):
def get_fingerprint(path, passphrase=None, content=None):
"""Generate the fingerprint of the public key. """
privatekey = load_privatekey(path, passphrase, check_passphrase=False)
privatekey = load_privatekey(path, passphrase=passphrase, content=content, check_passphrase=False)
try:
publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey)
except AttributeError:
@ -252,12 +252,15 @@ def load_privatekey(path, passphrase=None, check_passphrase=True, content=None,
raise OpenSSLObjectError(exc)
def load_certificate(path, backend='pyopenssl'):
def load_certificate(path, content=None, backend='pyopenssl'):
"""Load the specified certificate."""
try:
if content is None:
with open(path, 'rb') as cert_fh:
cert_content = cert_fh.read()
else:
cert_content = content
if backend == 'pyopenssl':
return crypto.load_certificate(crypto.FILETYPE_PEM, cert_content)
elif backend == 'cryptography':
@ -266,11 +269,14 @@ def load_certificate(path, backend='pyopenssl'):
raise OpenSSLObjectError(exc)
def load_certificate_request(path, backend='pyopenssl'):
def load_certificate_request(path, content=None, backend='pyopenssl'):
"""Load the specified certificate signing request."""
try:
if content is None:
with open(path, 'rb') as csr_fh:
csr_content = csr_fh.read()
else:
csr_content = content
except (IOError, OSError) as exc:
raise OpenSSLObjectError(exc)
if backend == 'pyopenssl':

View file

@ -82,16 +82,31 @@ options:
description:
- Path to the Certificate Signing Request (CSR) used to generate this certificate.
- This is not required in C(assertonly) mode.
- This is mutually exclusive with I(csr_content).
type: path
csr_content:
description:
- Content of the Certificate Signing Request (CSR) used to generate this certificate.
- This is not required in C(assertonly) mode.
- This is mutually exclusive with I(csr_path).
type: str
version_added: "2.10"
privatekey_path:
description:
- Path to the private key to use when signing the certificate.
- This is mutually exclusive with I(privatekey_content).
type: path
privatekey_content:
description:
- Path to the private key to use when signing the certificate.
- This is mutually exclusive with I(privatekey_path).
type: str
version_added: "2.10"
privatekey_passphrase:
description:
- The passphrase for the I(privatekey_path).
- The passphrase for the I(privatekey_path) resp. I(privatekey_content).
- This is required if the private key is password protected.
type: str
@ -158,19 +173,35 @@ options:
description:
- Remote absolute path of the CA (Certificate Authority) certificate.
- This is only used by the C(ownca) provider.
- This is mutually exclusive with I(ownca_content).
type: path
version_added: "2.7"
ownca_content:
description:
- Content of the CA (Certificate Authority) certificate.
- This is only used by the C(ownca) provider.
- This is mutually exclusive with I(ownca_path).
type: str
version_added: "2.10"
ownca_privatekey_path:
description:
- Path to the CA (Certificate Authority) private key to use when signing the certificate.
- This is only used by the C(ownca) provider.
- This is mutually exclusive with I(ownca_privatekey_content).
type: path
version_added: "2.7"
ownca_privatekey_content:
description:
- Path to the CA (Certificate Authority) private key to use when signing the certificate.
- This is only used by the C(ownca) provider.
- This is mutually exclusive with I(ownca_privatekey_path).
type: str
version_added: "2.10"
ownca_privatekey_passphrase:
description:
- The passphrase for the I(ownca_privatekey_path).
- The passphrase for the I(ownca_privatekey_path) resp. I(ownca_privatekey_content).
- This is only used by the C(ownca) provider.
type: str
version_added: "2.7"
@ -812,7 +843,7 @@ EXAMPLES = r'''
RETURN = r'''
filename:
description: Path to the generated Certificate
description: Path to the generated certificate.
returned: changed or success
type: str
sample: /etc/ssl/crt/www.ansible.com.crt
@ -829,6 +860,7 @@ import abc
import datetime
import time
import os
import tempfile
import traceback
from distutils.version import LooseVersion
@ -884,8 +916,14 @@ class Certificate(crypto_utils.OpenSSLObject):
self.provider = module.params['provider']
self.privatekey_path = module.params['privatekey_path']
self.privatekey_content = module.params['privatekey_content']
if self.privatekey_content is not None:
self.privatekey_content = self.privatekey_content.encode('utf-8')
self.privatekey_passphrase = module.params['privatekey_passphrase']
self.csr_path = module.params['csr_path']
self.csr_content = module.params['csr_content']
if self.csr_content is not None:
self.csr_content = self.csr_content.encode('utf-8')
self.cert = None
self.privatekey = None
self.csr = None
@ -1011,11 +1049,12 @@ class Certificate(crypto_utils.OpenSSLObject):
except Exception as dummy:
return False
if self.privatekey_path:
if self.privatekey_path or self.privatekey_content:
try:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase,
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
backend=self.backend
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
@ -1023,8 +1062,12 @@ class Certificate(crypto_utils.OpenSSLObject):
if not self._validate_privatekey():
return False
if self.csr_path:
self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend)
if self.csr_path or self.csr_content:
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
backend=self.backend
)
if not self._validate_csr():
return False
@ -1087,21 +1130,28 @@ class SelfSignedCertificateCryptography(Certificate):
self.version = module.params['selfsigned_version']
self.serial_number = x509.random_serial_number()
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file {0} does not exist'.format(self.csr_path)
)
if not os.path.exists(self.privatekey_path):
if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
raise CertificateError(
'The private key file {0} does not exist'.format(self.privatekey_path)
)
self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend)
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
backend=self.backend
)
self._module = module
try:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path, self.privatekey_passphrase, backend=self.backend
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
backend=self.backend
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
module.fail_json(msg=to_native(exc))
@ -1115,11 +1165,11 @@ class SelfSignedCertificateCryptography(Certificate):
self.digest = None
def generate(self, module):
if not os.path.exists(self.privatekey_path):
if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
raise CertificateError(
'The private key %s does not exist' % self.privatekey_path
)
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file %s does not exist' % self.csr_path
)
@ -1210,31 +1260,36 @@ class SelfSignedCertificate(Certificate):
self.version = module.params['selfsigned_version']
self.serial_number = randint(1000, 99999)
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file {0} does not exist'.format(self.csr_path)
)
if not os.path.exists(self.privatekey_path):
if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
raise CertificateError(
'The private key file {0} does not exist'.format(self.privatekey_path)
)
self.csr = crypto_utils.load_certificate_request(self.csr_path)
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
)
try:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path, self.privatekey_passphrase
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
module.fail_json(msg=str(exc))
def generate(self, module):
if not os.path.exists(self.privatekey_path):
if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
raise CertificateError(
'The private key %s does not exist' % self.privatekey_path
)
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file %s does not exist' % self.csr_path
)
@ -1301,27 +1356,44 @@ class OwnCACertificateCryptography(Certificate):
self.version = module.params['ownca_version']
self.serial_number = x509.random_serial_number()
self.ca_cert_path = module.params['ownca_path']
self.ca_cert_content = module.params['ownca_content']
if self.ca_cert_content is not None:
self.ca_cert_content = self.ca_cert_content.encode('utf-8')
self.ca_privatekey_path = module.params['ownca_privatekey_path']
self.ca_privatekey_content = module.params['ownca_privatekey_content']
if self.ca_privatekey_content is not None:
self.ca_privatekey_content = self.ca_privatekey_content.encode('utf-8')
self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase']
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file {0} does not exist'.format(self.csr_path)
)
if not os.path.exists(self.ca_cert_path):
if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
raise CertificateError(
'The CA certificate file {0} does not exist'.format(self.ca_cert_path)
)
if not os.path.exists(self.ca_privatekey_path):
if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
raise CertificateError(
'The CA private key file {0} does not exist'.format(self.ca_privatekey_path)
)
self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend)
self.ca_cert = crypto_utils.load_certificate(self.ca_cert_path, backend=self.backend)
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
backend=self.backend
)
self.ca_cert = crypto_utils.load_certificate(
path=self.ca_cert_path,
content=self.ca_cert_content,
backend=self.backend
)
try:
self.ca_private_key = crypto_utils.load_privatekey(
self.ca_privatekey_path, self.ca_privatekey_passphrase, backend=self.backend
path=self.ca_privatekey_path,
content=self.ca_privatekey_content,
passphrase=self.ca_privatekey_passphrase,
backend=self.backend
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
module.fail_json(msg=str(exc))
@ -1336,17 +1408,17 @@ class OwnCACertificateCryptography(Certificate):
def generate(self, module):
if not os.path.exists(self.ca_cert_path):
if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
raise CertificateError(
'The CA certificate %s does not exist' % self.ca_cert_path
)
if not os.path.exists(self.ca_privatekey_path):
if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
raise CertificateError(
'The CA private key %s does not exist' % self.ca_privatekey_path
)
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file %s does not exist' % self.csr_path
)
@ -1481,44 +1553,58 @@ class OwnCACertificate(Certificate):
if module.params['ownca_create_authority_key_identifier']:
module.warn('ownca_create_authority_key_identifier is ignored by the pyOpenSSL backend!')
self.ca_cert_path = module.params['ownca_path']
self.ca_cert_content = module.params['ownca_content']
if self.ca_cert_content is not None:
self.ca_cert_content = self.ca_cert_content.encode('utf-8')
self.ca_privatekey_path = module.params['ownca_privatekey_path']
self.ca_privatekey_content = module.params['ownca_privatekey_content']
if self.ca_privatekey_content is not None:
self.ca_privatekey_content = self.ca_privatekey_content.encode('utf-8')
self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase']
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file {0} does not exist'.format(self.csr_path)
)
if not os.path.exists(self.ca_cert_path):
if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
raise CertificateError(
'The CA certificate file {0} does not exist'.format(self.ca_cert_path)
)
if not os.path.exists(self.ca_privatekey_path):
if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
raise CertificateError(
'The CA private key file {0} does not exist'.format(self.ca_privatekey_path)
)
self.csr = crypto_utils.load_certificate_request(self.csr_path)
self.ca_cert = crypto_utils.load_certificate(self.ca_cert_path)
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
)
self.ca_cert = crypto_utils.load_certificate(
path=self.ca_cert_path,
content=self.ca_cert_content,
)
try:
self.ca_privatekey = crypto_utils.load_privatekey(
self.ca_privatekey_path, self.ca_privatekey_passphrase
path=self.ca_privatekey_path,
content=self.ca_privatekey_content,
passphrase=self.ca_privatekey_passphrase
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
module.fail_json(msg=str(exc))
def generate(self, module):
if not os.path.exists(self.ca_cert_path):
if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path):
raise CertificateError(
'The CA certificate %s does not exist' % self.ca_cert_path
)
if not os.path.exists(self.ca_privatekey_path):
if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path):
raise CertificateError(
'The CA private key %s does not exist' % self.ca_privatekey_path
)
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file %s does not exist' % self.csr_path
)
@ -1630,17 +1716,22 @@ class AssertOnlyCertificateBase(Certificate):
# Load objects
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
if self.privatekey_path is not None:
if self.privatekey_path is not None or self.privatekey_content is not None:
try:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase,
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
backend=self.backend
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise CertificateError(exc)
if self.csr_path is not None:
self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend)
if self.csr_path is not None or self.csr_content is not None:
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
backend=self.backend
)
@abc.abstractmethod
def _validate_privatekey(self):
@ -1712,28 +1803,28 @@ class AssertOnlyCertificateBase(Certificate):
def assertonly(self, module):
messages = []
if self.privatekey_path is not None:
if self.privatekey_path is not None or self.privatekey_content is not None:
if not self._validate_privatekey():
messages.append(
'Certificate %s and private key %s do not match' %
(self.path, self.privatekey_path)
(self.path, self.privatekey_path or '(provided in module options)')
)
if self.csr_path is not None:
if self.csr_path is not None or self.csr_content is not None:
if not self._validate_csr_signature():
messages.append(
'Certificate %s and CSR %s do not match: private key mismatch' %
(self.path, self.csr_path)
(self.path, self.csr_path or '(provided in module options)')
)
if not self._validate_csr_subject():
messages.append(
'Certificate %s and CSR %s do not match: subject mismatch' %
(self.path, self.csr_path)
(self.path, self.csr_path or '(provided in module options)')
)
if not self._validate_csr_extensions():
messages.append(
'Certificate %s and CSR %s do not match: extensions mismatch' %
(self.path, self.csr_path)
(self.path, self.csr_path or '(provided in module options)')
)
if self.signature_algorithms is not None:
@ -2179,12 +2270,16 @@ class EntrustCertificate(Certificate):
self.trackingId = None
self.notAfter = self.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after')
if not os.path.exists(self.csr_path):
if self.csr_content is None or not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file {0} does not exist'.format(self.csr_path)
)
self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend)
self.csr = crypto_utils.load_certificate_request(
path=self.csr_path,
content=self.csr_content,
backend=self.backend,
)
# ECS API defaults to using the validated organization tied to the account.
# We want to always force behavior of trying to use the organization provided in the CSR.
@ -2229,6 +2324,9 @@ class EntrustCertificate(Certificate):
if not self.check(module, perms_required=False) or self.force:
# Read the CSR that was generated for us
body = {}
if self.csr_content is not None:
body['csr'] = self.csr_content
else:
with open(self.csr_path, 'r') as csr_file:
body['csr'] = csr_file.read()
@ -2346,12 +2444,7 @@ class AcmeCertificate(Certificate):
def generate(self, module):
if not os.path.exists(self.privatekey_path):
raise CertificateError(
'The private key %s does not exist' % self.privatekey_path
)
if not os.path.exists(self.csr_path):
if self.csr_content is None and not os.path.exists(self.csr_path):
raise CertificateError(
'The certificate signing request file %s does not exist' % self.csr_path
)
@ -2372,6 +2465,25 @@ class AcmeCertificate(Certificate):
if self.use_chain:
command.append('--chain')
command.extend(['--account-key', self.accountkey_path])
if self.csr_content is not None:
# We need to temporarily write the CSR to disk
fd, tmpsrc = tempfile.mkstemp()
module.add_cleanup_file(tmpsrc) # Ansible will delete the file on exit
f = os.fdopen(fd, 'wb')
try:
f.write(self.csr_content)
except Exception as err:
try:
f.close()
except Exception as dummy:
pass
module.fail_json(
msg="failed to create temporary CSR file: %s" % to_native(err),
exception=traceback.format_exc()
)
f.close()
command.extend(['--csr', tmpsrc])
else:
command.extend(['--csr', self.csr_path])
command.extend(['--acme-dir', self.challenge_path])
@ -2411,11 +2523,13 @@ def main():
provider=dict(type='str', choices=['acme', 'assertonly', 'entrust', 'ownca', 'selfsigned']),
force=dict(type='bool', default=False,),
csr_path=dict(type='path'),
csr_content=dict(type='str'),
backup=dict(type='bool', default=False),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
# General properties of a certificate
privatekey_path=dict(type='path'),
privatekey_content=dict(type='str'),
privatekey_passphrase=dict(type='str', no_log=True),
# provider: assertonly
@ -2451,7 +2565,9 @@ def main():
# provider: ownca
ownca_path=dict(type='path'),
ownca_content=dict(type='str'),
ownca_privatekey_path=dict(type='path'),
ownca_privatekey_content=dict(type='str'),
ownca_privatekey_passphrase=dict(type='str', no_log=True),
ownca_digest=dict(type='str', default='sha256'),
ownca_version=dict(type='int', default=3),
@ -2490,7 +2606,13 @@ def main():
['provider', 'entrust', ['entrust_requester_email', 'entrust_requester_name', 'entrust_requester_phone',
'entrust_api_user', 'entrust_api_key', 'entrust_api_client_cert_path',
'entrust_api_client_cert_key_path']],
]
],
mutually_exclusive=[
['csr_path', 'csr_content'],
['privatekey_path', 'privatekey_content'],
['ownca_path', 'ownca_content'],
['ownca_privatekey_path', 'ownca_privatekey_content'],
],
)
try:
@ -2498,8 +2620,8 @@ def main():
certificate = CertificateAbsent(module)
else:
if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None:
module.fail_json(msg='csr_path is required when provider is not assertonly')
if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None and module.params['csr_content'] is None:
module.fail_json(msg='csr_path or csr_content is required when provider is not assertonly')
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
@ -2513,6 +2635,19 @@ def main():
module.deprecate("The 'assertonly' provider is deprecated; please see the examples of "
"the 'openssl_certificate' module on how to replace it with other modules",
version='2.13')
elif provider == 'selfsigned':
if module.params['privatekey_path'] is None and module.params['privatekey_content'] is None:
module.fail_json(msg='One of privatekey_path and privatekey_content must be specified for the selfsigned provider.')
elif provider == 'acme':
if module.params['acme_accountkey_path'] is None:
module.fail_json(msg='The acme_accountkey_path option must be specified for the acme provider.')
if module.params['acme_challenge_path'] is None:
module.fail_json(msg='The acme_challenge_path option must be specified for the acme provider.')
elif provider == 'ownca':
if module.params['ownca_path'] is None and module.params['ownca_content'] is None:
module.fail_json(msg='One of ownca_path and ownca_content must be specified for the ownca provider.')
if module.params['ownca_privatekey_path'] is None and module.params['ownca_privatekey_content'] is None:
module.fail_json(msg='One of ownca_privatekey_path and ownca_privatekey_content must be specified for the ownca provider.')
backend = module.params['select_crypto_backend']
if backend == 'auto':

View file

@ -34,8 +34,14 @@ options:
path:
description:
- Remote absolute path where the certificate file is loaded from.
- Either I(path) or I(content) must be specified, but not both.
type: path
required: true
content:
description:
- Content of the X.509 certificate in PEM format.
- Either I(path) or I(content) must be specified, but not both.
type: str
version_added: "2.10"
valid_at:
description:
- A dict of names mapping to time specifications. Every time specified here
@ -370,13 +376,16 @@ def get_relative_time_option(input_string, input_name):
class CertificateInfo(crypto_utils.OpenSSLObject):
def __init__(self, module, backend):
super(CertificateInfo, self).__init__(
module.params['path'],
module.params['path'] or '',
'present',
False,
module.check_mode,
)
self.backend = backend
self.module = module
self.content = module.params['content']
if self.content is not None:
self.content = self.content.encode('utf-8')
self.valid_at = module.params['valid_at']
if self.valid_at:
@ -465,7 +474,7 @@ class CertificateInfo(crypto_utils.OpenSSLObject):
def get_info(self):
result = dict()
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
self.cert = crypto_utils.load_certificate(self.path, content=self.content, backend=self.backend)
result['signature_algorithm'] = self._get_signature_algorithm()
subject = self._get_subject_ordered()
@ -810,14 +819,22 @@ class CertificateInfoPyOpenSSL(CertificateInfo):
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
path=dict(type='path'),
content=dict(type='str'),
valid_at=dict(type='dict'),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
required_one_of=(
['path', 'content'],
),
mutually_exclusive=(
['path', 'content'],
),
supports_check_mode=True,
)
try:
if module.params['path'] is not None:
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(

View file

@ -48,8 +48,14 @@ options:
privatekey_path:
description:
- The path to the private key to use when signing the certificate signing request.
- Required if I(state) is C(present).
- Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both.
type: path
privatekey_content:
description:
- The content of the private key to use when signing the certificate signing request.
- Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both.
type: str
version_added: "2.10"
privatekey_passphrase:
description:
- The passphrase for the private key.
@ -295,6 +301,12 @@ EXAMPLES = r'''
privatekey_path: /etc/ssl/private/ansible.com.pem
common_name: www.ansible.com
- name: Generate an OpenSSL Certificate Signing Request with an inline key
openssl_csr:
path: /etc/ssl/csr/www.ansible.com.csr
privatekey_content: "{{ private_key_content }}"
common_name: www.ansible.com
- name: Generate an OpenSSL Certificate Signing Request with a passphrase protected private key
openssl_csr:
path: /etc/ssl/csr/www.ansible.com.csr
@ -355,7 +367,9 @@ EXAMPLES = r'''
RETURN = r'''
privatekey:
description: Path to the TLS/SSL private key the CSR was generated for
description:
- Path to the TLS/SSL private key the CSR was generated for
- Will be C(none) if the private key has been provided in I(privatekey_content).
returned: changed or success
type: str
sample: /etc/ssl/private/ansible.com.pem
@ -474,6 +488,9 @@ class CertificateSigningRequestBase(crypto_utils.OpenSSLObject):
)
self.digest = module.params['digest']
self.privatekey_path = module.params['privatekey_path']
self.privatekey_content = module.params['privatekey_content']
if self.privatekey_content is not None:
self.privatekey_content = self.privatekey_content.encode('utf-8')
self.privatekey_passphrase = module.params['privatekey_passphrase']
self.version = module.params['version']
self.subjectAltName = module.params['subject_alt_name']
@ -655,7 +672,11 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase):
def _load_private_key(self):
try:
self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
self.privatekey = crypto_utils.load_privatekey(
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise CertificateSigningRequestError(exc)
@ -842,9 +863,13 @@ class CertificateSigningRequestCryptography(CertificateSigningRequestBase):
def _load_private_key(self):
try:
if self.privatekey_content is not None:
content = self.privatekey_content
else:
with open(self.privatekey_path, 'rb') as f:
content = f.read()
self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key(
f.read(),
content,
None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase),
backend=self.cryptography_backend
)
@ -1003,6 +1028,7 @@ def main():
state=dict(type='str', default='present', choices=['absent', 'present']),
digest=dict(type='str', default='sha256'),
privatekey_path=dict(type='path'),
privatekey_content=dict(type='str'),
privatekey_passphrase=dict(type='str', no_log=True),
version=dict(type='int', default=1),
force=dict(type='bool', default=False),
@ -1035,7 +1061,10 @@ def main():
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
required_together=[('authority_cert_issuer', 'authority_cert_serial_number')],
required_if=[('state', 'present', ['privatekey_path'])],
required_if=[('state', 'present', ['privatekey_path', 'privatekey_content'], True)],
mutually_exclusive=(
['privatekey_path', 'privatekey_content'],
),
add_file_common_args=True,
supports_check_mode=True,
)

View file

@ -35,9 +35,14 @@ options:
path:
description:
- Remote absolute path where the CSR file is loaded from.
- Either I(path) or I(content) must be specified, but not both.
type: path
required: true
content:
description:
- Content of the CSR file.
- Either I(path) or I(content) must be specified, but not both.
type: str
version_added: "2.10"
select_crypto_backend:
description:
- Determines which crypto backend to use.
@ -257,13 +262,16 @@ TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
class CertificateSigningRequestInfo(crypto_utils.OpenSSLObject):
def __init__(self, module, backend):
super(CertificateSigningRequestInfo, self).__init__(
module.params['path'],
module.params['path'] or '',
'present',
False,
module.check_mode,
)
self.backend = backend
self.module = module
self.content = module.params['content']
if self.content is not None:
self.content = self.content.encode('utf-8')
def generate(self):
# Empty method because crypto_utils.OpenSSLObject wants this
@ -319,7 +327,7 @@ class CertificateSigningRequestInfo(crypto_utils.OpenSSLObject):
def get_info(self):
result = dict()
self.csr = crypto_utils.load_certificate_request(self.path, backend=self.backend)
self.csr = crypto_utils.load_certificate_request(self.path, content=self.content, backend=self.backend)
subject = self._get_subject_ordered()
result['subject'] = dict()
@ -591,13 +599,21 @@ class CertificateSigningRequestInfoPyOpenSSL(CertificateSigningRequestInfo):
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
path=dict(type='path'),
content=dict(type='str'),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
required_one_of=(
['path', 'content'],
),
mutually_exclusive=(
['path', 'content'],
),
supports_check_mode=True,
)
try:
if module.params['path'] is not None:
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(

View file

@ -606,7 +606,7 @@ class PrivateKeyCryptography(PrivateKeyBase):
format=export_format,
encryption_algorithm=encryption_algorithm
)
except ValueError as e:
except ValueError as dummy:
self.module.fail_json(
msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format)
)

View file

@ -38,7 +38,12 @@ options:
description:
- Remote absolute path where the private key file is loaded from.
type: path
required: true
content:
description:
- Content of the private key file.
- Either I(path) or I(content) must be specified, but not both.
type: str
version_added: "2.10"
passphrase:
description:
- The passphrase for the private key.
@ -318,13 +323,14 @@ def _is_cryptography_key_consistent(key, key_public_data, key_private_data):
class PrivateKeyInfo(crypto_utils.OpenSSLObject):
def __init__(self, module, backend):
super(PrivateKeyInfo, self).__init__(
module.params['path'],
module.params['path'] or '',
'present',
False,
module.check_mode,
)
self.backend = backend
self.module = module
self.content = module.params['content']
self.passphrase = module.params['passphrase']
self.return_private_key_data = module.params['return_private_key_data']
@ -355,6 +361,10 @@ class PrivateKeyInfo(crypto_utils.OpenSSLObject):
can_parse_key=False,
key_is_consistent=None,
)
if self.content is not None:
priv_key_detail = self.content.encode('utf-8')
result['can_load_key'] = True
else:
try:
with open(self.path, 'rb') as b_priv_key_fh:
priv_key_detail = b_priv_key_fh.read()
@ -576,15 +586,23 @@ class PrivateKeyInfoPyOpenSSL(PrivateKeyInfo):
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
path=dict(type='path'),
content=dict(type='str'),
passphrase=dict(type='str', no_log=True),
return_private_key_data=dict(type='bool', default=False),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
required_one_of=(
['path', 'content'],
),
mutually_exclusive=(
['path', 'content'],
),
supports_check_mode=True,
)
try:
if module.params['path'] is not None:
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(

View file

@ -58,8 +58,16 @@ options:
privatekey_path:
description:
- Path to the TLS/SSL private key from which to generate the public key.
- Required if I(state) is C(present).
- Either I(privatekey_path) or I(privatekey_content) must be specified, but not both.
If I(state) is C(present), one of them is required.
type: path
privatekey_content:
description:
- The content of the TLS/SSL private key from which to generate the public key.
- Either I(privatekey_path) or I(privatekey_content) must be specified, but not both.
If I(state) is C(present), one of them is required.
type: str
version_added: "2.10"
privatekey_passphrase:
description:
- The passphrase for the private key.
@ -98,6 +106,11 @@ EXAMPLES = r'''
path: /etc/ssl/public/ansible.com.pem
privatekey_path: /etc/ssl/private/ansible.com.pem
- name: Generate an OpenSSL public key in PEM format from an inline key
openssl_publickey:
path: /etc/ssl/public/ansible.com.pem
privatekey_content: "{{ private_key_content }}"
- name: Generate an OpenSSL public key in OpenSSH v2 format
openssl_publickey:
path: /etc/ssl/public/ansible.com.pem
@ -119,13 +132,14 @@ EXAMPLES = r'''
- name: Remove an OpenSSL public key
openssl_publickey:
path: /etc/ssl/public/ansible.com.pem
privatekey_path: /etc/ssl/private/ansible.com.pem
state: absent
'''
RETURN = r'''
privatekey:
description: Path to the TLS/SSL private key the public key was generated from.
description:
- Path to the TLS/SSL private key the public key was generated from.
- Will be C(none) if the private key has been provided in I(privatekey_content).
returned: changed or success
type: str
sample: /etc/ssl/private/ansible.com.pem
@ -191,7 +205,7 @@ else:
CRYPTOGRAPHY_FOUND = True
from ansible.module_utils import crypto as crypto_utils
from ansible.module_utils._text import to_native, to_bytes
from ansible.module_utils._text import to_native
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
@ -210,6 +224,9 @@ class PublicKey(crypto_utils.OpenSSLObject):
)
self.format = module.params['format']
self.privatekey_path = module.params['privatekey_path']
self.privatekey_content = module.params['privatekey_content']
if self.privatekey_content is not None:
self.privatekey_content = self.privatekey_content.encode('utf-8')
self.privatekey_passphrase = module.params['privatekey_passphrase']
self.privatekey = None
self.fingerprint = {}
@ -220,8 +237,9 @@ class PublicKey(crypto_utils.OpenSSLObject):
def _create_publickey(self, module):
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase,
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
backend=self.backend
)
if self.backend == 'cryptography':
@ -244,7 +262,7 @@ class PublicKey(crypto_utils.OpenSSLObject):
def generate(self, module):
"""Generate the public key."""
if not os.path.exists(self.privatekey_path):
if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
raise PublicKeyError(
'The private key %s does not exist' % self.privatekey_path
)
@ -264,8 +282,9 @@ class PublicKey(crypto_utils.OpenSSLObject):
raise PublicKeyError(exc)
self.fingerprint = crypto_utils.get_fingerprint(
self.privatekey_path,
self.privatekey_passphrase
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase
)
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
@ -277,7 +296,7 @@ class PublicKey(crypto_utils.OpenSSLObject):
state_and_perms = super(PublicKey, self).check(module, perms_required)
def _check_privatekey():
if not os.path.exists(self.privatekey_path):
if self.privatekey_content is None and not os.path.exists(self.privatekey_path):
return False
try:
@ -346,6 +365,7 @@ def main():
force=dict(type='bool', default=False),
path=dict(type='path', required=True),
privatekey_path=dict(type='path'),
privatekey_content=dict(type='str'),
format=dict(type='str', default='PEM', choices=['OpenSSH', 'PEM']),
privatekey_passphrase=dict(type='str', no_log=True),
backup=dict(type='bool', default=False),
@ -353,7 +373,10 @@ def main():
),
supports_check_mode=True,
add_file_common_args=True,
required_if=[('state', 'present', ['privatekey_path'])],
required_if=[('state', 'present', ['privatekey_path', 'privatekey_content'], True)],
mutually_exclusive=(
['privatekey_path', 'privatekey_content'],
),
)
minimal_cryptography_version = MINIMAL_CRYPTOGRAPHY_VERSION

View file

@ -35,6 +35,17 @@
set_fact:
info_results: "{{ info_results + [result] }}"
- name: ({{select_crypto_backend}}) Get certificate info directly
openssl_certificate_info:
content: '{{ lookup("file", output_dir ~ "/cert_1.pem") }}'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result_direct
- name: ({{select_crypto_backend}}) Compare output of direct and loaded info
assert:
that:
- result == result_direct
- name: ({{select_crypto_backend}}) Get certificate info
openssl_certificate_info:
path: '{{ output_dir }}/cert_2.pem'

View file

@ -32,6 +32,17 @@
set_fact:
info_results: "{{ info_results + [result] }}"
- name: ({{select_crypto_backend}}) Get CSR info directly
openssl_csr_info:
content: '{{ lookup("file", output_dir ~ "/csr_1.csr") }}'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result_direct
- name: ({{select_crypto_backend}}) Compare output of direct and loaded info
assert:
that:
- result == result_direct
- name: ({{select_crypto_backend}}) Get CSR info
openssl_csr_info:
path: '{{ output_dir }}/csr_2.csr'

View file

@ -24,6 +24,17 @@
set_fact:
info_results: "{{ info_results | combine({'key1': result}) }}"
- name: ({{select_crypto_backend}}) Get key 1 info directly
openssl_privatekey_info:
content: '{{ lookup("file", output_dir ~ "/privatekey_1.pem") }}'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result_direct
- name: ({{select_crypto_backend}}) Compare output of direct and loaded info
assert:
that:
- result == result_direct
- name: ({{select_crypto_backend}}) Get key 2 info
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_2.pem'