diff --git a/changelogs/fragments/66384-openssl-content.yml b/changelogs/fragments/66384-openssl-content.yml new file mode 100644 index 00000000000..86e3342888a --- /dev/null +++ b/changelogs/fragments/66384-openssl-content.yml @@ -0,0 +1,7 @@ +minor_changes: +- "openssl_certificate_info - allow to provide certificate content via ``content`` option (https://github.com/ansible/ansible/issues/64776)." +- "openssl_csr_info - allow to provide CSR content via ``content`` option." +- "openssl_privatekey_info - allow to provide private key content via ``content`` option." +- "openssl_certificate - allow to provide content of some input files via the ``csr_content``, ``privatekey_content``, ``ownca_privatekey_content`` and ``ownca_content`` options." +- "openssl_csr - allow to provide private key content via ``private_key_content`` option." +- "openssl_publickey - allow to provide private key content via ``private_key_content`` option." diff --git a/lib/ansible/module_utils/crypto.py b/lib/ansible/module_utils/crypto.py index df5eea096ca..3467bee2d7f 100644 --- a/lib/ansible/module_utils/crypto.py +++ b/lib/ansible/module_utils/crypto.py @@ -166,10 +166,10 @@ def get_fingerprint_of_bytes(source): return fingerprint -def get_fingerprint(path, passphrase=None): +def get_fingerprint(path, passphrase=None, content=None): """Generate the fingerprint of the public key. """ - privatekey = load_privatekey(path, passphrase, check_passphrase=False) + privatekey = load_privatekey(path, passphrase=passphrase, content=content, check_passphrase=False) try: publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey) except AttributeError: @@ -252,12 +252,15 @@ def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, raise OpenSSLObjectError(exc) -def load_certificate(path, backend='pyopenssl'): +def load_certificate(path, content=None, backend='pyopenssl'): """Load the specified certificate.""" try: - with open(path, 'rb') as cert_fh: - cert_content = cert_fh.read() + if content is None: + with open(path, 'rb') as cert_fh: + cert_content = cert_fh.read() + else: + cert_content = content if backend == 'pyopenssl': return crypto.load_certificate(crypto.FILETYPE_PEM, cert_content) elif backend == 'cryptography': @@ -266,11 +269,14 @@ def load_certificate(path, backend='pyopenssl'): raise OpenSSLObjectError(exc) -def load_certificate_request(path, backend='pyopenssl'): +def load_certificate_request(path, content=None, backend='pyopenssl'): """Load the specified certificate signing request.""" try: - with open(path, 'rb') as csr_fh: - csr_content = csr_fh.read() + if content is None: + with open(path, 'rb') as csr_fh: + csr_content = csr_fh.read() + else: + csr_content = content except (IOError, OSError) as exc: raise OpenSSLObjectError(exc) if backend == 'pyopenssl': diff --git a/lib/ansible/modules/crypto/openssl_certificate.py b/lib/ansible/modules/crypto/openssl_certificate.py index 37e0df485f9..dcb0544583d 100644 --- a/lib/ansible/modules/crypto/openssl_certificate.py +++ b/lib/ansible/modules/crypto/openssl_certificate.py @@ -82,16 +82,31 @@ options: description: - Path to the Certificate Signing Request (CSR) used to generate this certificate. - This is not required in C(assertonly) mode. + - This is mutually exclusive with I(csr_content). type: path + csr_content: + description: + - Content of the Certificate Signing Request (CSR) used to generate this certificate. + - This is not required in C(assertonly) mode. + - This is mutually exclusive with I(csr_path). + type: str + version_added: "2.10" privatekey_path: description: - Path to the private key to use when signing the certificate. + - This is mutually exclusive with I(privatekey_content). type: path + privatekey_content: + description: + - Path to the private key to use when signing the certificate. + - This is mutually exclusive with I(privatekey_path). + type: str + version_added: "2.10" privatekey_passphrase: description: - - The passphrase for the I(privatekey_path). + - The passphrase for the I(privatekey_path) resp. I(privatekey_content). - This is required if the private key is password protected. type: str @@ -158,19 +173,35 @@ options: description: - Remote absolute path of the CA (Certificate Authority) certificate. - This is only used by the C(ownca) provider. + - This is mutually exclusive with I(ownca_content). type: path version_added: "2.7" + ownca_content: + description: + - Content of the CA (Certificate Authority) certificate. + - This is only used by the C(ownca) provider. + - This is mutually exclusive with I(ownca_path). + type: str + version_added: "2.10" ownca_privatekey_path: description: - Path to the CA (Certificate Authority) private key to use when signing the certificate. - This is only used by the C(ownca) provider. + - This is mutually exclusive with I(ownca_privatekey_content). type: path version_added: "2.7" + ownca_privatekey_content: + description: + - Path to the CA (Certificate Authority) private key to use when signing the certificate. + - This is only used by the C(ownca) provider. + - This is mutually exclusive with I(ownca_privatekey_path). + type: str + version_added: "2.10" ownca_privatekey_passphrase: description: - - The passphrase for the I(ownca_privatekey_path). + - The passphrase for the I(ownca_privatekey_path) resp. I(ownca_privatekey_content). - This is only used by the C(ownca) provider. type: str version_added: "2.7" @@ -812,7 +843,7 @@ EXAMPLES = r''' RETURN = r''' filename: - description: Path to the generated Certificate + description: Path to the generated certificate. returned: changed or success type: str sample: /etc/ssl/crt/www.ansible.com.crt @@ -829,6 +860,7 @@ import abc import datetime import time import os +import tempfile import traceback from distutils.version import LooseVersion @@ -884,8 +916,14 @@ class Certificate(crypto_utils.OpenSSLObject): self.provider = module.params['provider'] self.privatekey_path = module.params['privatekey_path'] + self.privatekey_content = module.params['privatekey_content'] + if self.privatekey_content is not None: + self.privatekey_content = self.privatekey_content.encode('utf-8') self.privatekey_passphrase = module.params['privatekey_passphrase'] self.csr_path = module.params['csr_path'] + self.csr_content = module.params['csr_content'] + if self.csr_content is not None: + self.csr_content = self.csr_content.encode('utf-8') self.cert = None self.privatekey = None self.csr = None @@ -1011,11 +1049,12 @@ class Certificate(crypto_utils.OpenSSLObject): except Exception as dummy: return False - if self.privatekey_path: + if self.privatekey_path or self.privatekey_content: try: self.privatekey = crypto_utils.load_privatekey( - self.privatekey_path, - self.privatekey_passphrase, + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase, backend=self.backend ) except crypto_utils.OpenSSLBadPassphraseError as exc: @@ -1023,8 +1062,12 @@ class Certificate(crypto_utils.OpenSSLObject): if not self._validate_privatekey(): return False - if self.csr_path: - self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend) + if self.csr_path or self.csr_content: + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + backend=self.backend + ) if not self._validate_csr(): return False @@ -1087,21 +1130,28 @@ class SelfSignedCertificateCryptography(Certificate): self.version = module.params['selfsigned_version'] self.serial_number = x509.random_serial_number() - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file {0} does not exist'.format(self.csr_path) ) - if not os.path.exists(self.privatekey_path): + if self.privatekey_content is None and not os.path.exists(self.privatekey_path): raise CertificateError( 'The private key file {0} does not exist'.format(self.privatekey_path) ) - self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend) + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + backend=self.backend + ) self._module = module try: self.privatekey = crypto_utils.load_privatekey( - self.privatekey_path, self.privatekey_passphrase, backend=self.backend + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase, + backend=self.backend ) except crypto_utils.OpenSSLBadPassphraseError as exc: module.fail_json(msg=to_native(exc)) @@ -1115,11 +1165,11 @@ class SelfSignedCertificateCryptography(Certificate): self.digest = None def generate(self, module): - if not os.path.exists(self.privatekey_path): + if self.privatekey_content is None and not os.path.exists(self.privatekey_path): raise CertificateError( 'The private key %s does not exist' % self.privatekey_path ) - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file %s does not exist' % self.csr_path ) @@ -1210,31 +1260,36 @@ class SelfSignedCertificate(Certificate): self.version = module.params['selfsigned_version'] self.serial_number = randint(1000, 99999) - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file {0} does not exist'.format(self.csr_path) ) - if not os.path.exists(self.privatekey_path): + if self.privatekey_content is None and not os.path.exists(self.privatekey_path): raise CertificateError( 'The private key file {0} does not exist'.format(self.privatekey_path) ) - self.csr = crypto_utils.load_certificate_request(self.csr_path) + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + ) try: self.privatekey = crypto_utils.load_privatekey( - self.privatekey_path, self.privatekey_passphrase + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase, ) except crypto_utils.OpenSSLBadPassphraseError as exc: module.fail_json(msg=str(exc)) def generate(self, module): - if not os.path.exists(self.privatekey_path): + if self.privatekey_content is None and not os.path.exists(self.privatekey_path): raise CertificateError( 'The private key %s does not exist' % self.privatekey_path ) - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file %s does not exist' % self.csr_path ) @@ -1301,27 +1356,44 @@ class OwnCACertificateCryptography(Certificate): self.version = module.params['ownca_version'] self.serial_number = x509.random_serial_number() self.ca_cert_path = module.params['ownca_path'] + self.ca_cert_content = module.params['ownca_content'] + if self.ca_cert_content is not None: + self.ca_cert_content = self.ca_cert_content.encode('utf-8') self.ca_privatekey_path = module.params['ownca_privatekey_path'] + self.ca_privatekey_content = module.params['ownca_privatekey_content'] + if self.ca_privatekey_content is not None: + self.ca_privatekey_content = self.ca_privatekey_content.encode('utf-8') self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase'] - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file {0} does not exist'.format(self.csr_path) ) - if not os.path.exists(self.ca_cert_path): + if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path): raise CertificateError( 'The CA certificate file {0} does not exist'.format(self.ca_cert_path) ) - if not os.path.exists(self.ca_privatekey_path): + if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path): raise CertificateError( 'The CA private key file {0} does not exist'.format(self.ca_privatekey_path) ) - self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend) - self.ca_cert = crypto_utils.load_certificate(self.ca_cert_path, backend=self.backend) + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + backend=self.backend + ) + self.ca_cert = crypto_utils.load_certificate( + path=self.ca_cert_path, + content=self.ca_cert_content, + backend=self.backend + ) try: self.ca_private_key = crypto_utils.load_privatekey( - self.ca_privatekey_path, self.ca_privatekey_passphrase, backend=self.backend + path=self.ca_privatekey_path, + content=self.ca_privatekey_content, + passphrase=self.ca_privatekey_passphrase, + backend=self.backend ) except crypto_utils.OpenSSLBadPassphraseError as exc: module.fail_json(msg=str(exc)) @@ -1336,17 +1408,17 @@ class OwnCACertificateCryptography(Certificate): def generate(self, module): - if not os.path.exists(self.ca_cert_path): + if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path): raise CertificateError( 'The CA certificate %s does not exist' % self.ca_cert_path ) - if not os.path.exists(self.ca_privatekey_path): + if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path): raise CertificateError( 'The CA private key %s does not exist' % self.ca_privatekey_path ) - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file %s does not exist' % self.csr_path ) @@ -1481,44 +1553,58 @@ class OwnCACertificate(Certificate): if module.params['ownca_create_authority_key_identifier']: module.warn('ownca_create_authority_key_identifier is ignored by the pyOpenSSL backend!') self.ca_cert_path = module.params['ownca_path'] + self.ca_cert_content = module.params['ownca_content'] + if self.ca_cert_content is not None: + self.ca_cert_content = self.ca_cert_content.encode('utf-8') self.ca_privatekey_path = module.params['ownca_privatekey_path'] + self.ca_privatekey_content = module.params['ownca_privatekey_content'] + if self.ca_privatekey_content is not None: + self.ca_privatekey_content = self.ca_privatekey_content.encode('utf-8') self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase'] - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file {0} does not exist'.format(self.csr_path) ) - if not os.path.exists(self.ca_cert_path): + if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path): raise CertificateError( 'The CA certificate file {0} does not exist'.format(self.ca_cert_path) ) - if not os.path.exists(self.ca_privatekey_path): + if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path): raise CertificateError( 'The CA private key file {0} does not exist'.format(self.ca_privatekey_path) ) - self.csr = crypto_utils.load_certificate_request(self.csr_path) - self.ca_cert = crypto_utils.load_certificate(self.ca_cert_path) + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + ) + self.ca_cert = crypto_utils.load_certificate( + path=self.ca_cert_path, + content=self.ca_cert_content, + ) try: self.ca_privatekey = crypto_utils.load_privatekey( - self.ca_privatekey_path, self.ca_privatekey_passphrase + path=self.ca_privatekey_path, + content=self.ca_privatekey_content, + passphrase=self.ca_privatekey_passphrase ) except crypto_utils.OpenSSLBadPassphraseError as exc: module.fail_json(msg=str(exc)) def generate(self, module): - if not os.path.exists(self.ca_cert_path): + if self.ca_cert_content is None and not os.path.exists(self.ca_cert_path): raise CertificateError( 'The CA certificate %s does not exist' % self.ca_cert_path ) - if not os.path.exists(self.ca_privatekey_path): + if self.ca_privatekey_content is None and not os.path.exists(self.ca_privatekey_path): raise CertificateError( 'The CA private key %s does not exist' % self.ca_privatekey_path ) - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file %s does not exist' % self.csr_path ) @@ -1630,17 +1716,22 @@ class AssertOnlyCertificateBase(Certificate): # Load objects self.cert = crypto_utils.load_certificate(self.path, backend=self.backend) - if self.privatekey_path is not None: + if self.privatekey_path is not None or self.privatekey_content is not None: try: self.privatekey = crypto_utils.load_privatekey( - self.privatekey_path, - self.privatekey_passphrase, + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase, backend=self.backend ) except crypto_utils.OpenSSLBadPassphraseError as exc: raise CertificateError(exc) - if self.csr_path is not None: - self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend) + if self.csr_path is not None or self.csr_content is not None: + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + backend=self.backend + ) @abc.abstractmethod def _validate_privatekey(self): @@ -1712,28 +1803,28 @@ class AssertOnlyCertificateBase(Certificate): def assertonly(self, module): messages = [] - if self.privatekey_path is not None: + if self.privatekey_path is not None or self.privatekey_content is not None: if not self._validate_privatekey(): messages.append( 'Certificate %s and private key %s do not match' % - (self.path, self.privatekey_path) + (self.path, self.privatekey_path or '(provided in module options)') ) - if self.csr_path is not None: + if self.csr_path is not None or self.csr_content is not None: if not self._validate_csr_signature(): messages.append( 'Certificate %s and CSR %s do not match: private key mismatch' % - (self.path, self.csr_path) + (self.path, self.csr_path or '(provided in module options)') ) if not self._validate_csr_subject(): messages.append( 'Certificate %s and CSR %s do not match: subject mismatch' % - (self.path, self.csr_path) + (self.path, self.csr_path or '(provided in module options)') ) if not self._validate_csr_extensions(): messages.append( 'Certificate %s and CSR %s do not match: extensions mismatch' % - (self.path, self.csr_path) + (self.path, self.csr_path or '(provided in module options)') ) if self.signature_algorithms is not None: @@ -2179,12 +2270,16 @@ class EntrustCertificate(Certificate): self.trackingId = None self.notAfter = self.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after') - if not os.path.exists(self.csr_path): + if self.csr_content is None or not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file {0} does not exist'.format(self.csr_path) ) - self.csr = crypto_utils.load_certificate_request(self.csr_path, backend=self.backend) + self.csr = crypto_utils.load_certificate_request( + path=self.csr_path, + content=self.csr_content, + backend=self.backend, + ) # ECS API defaults to using the validated organization tied to the account. # We want to always force behavior of trying to use the organization provided in the CSR. @@ -2229,8 +2324,11 @@ class EntrustCertificate(Certificate): if not self.check(module, perms_required=False) or self.force: # Read the CSR that was generated for us body = {} - with open(self.csr_path, 'r') as csr_file: - body['csr'] = csr_file.read() + if self.csr_content is not None: + body['csr'] = self.csr_content + else: + with open(self.csr_path, 'r') as csr_file: + body['csr'] = csr_file.read() body['certType'] = module.params['entrust_cert_type'] @@ -2346,12 +2444,7 @@ class AcmeCertificate(Certificate): def generate(self, module): - if not os.path.exists(self.privatekey_path): - raise CertificateError( - 'The private key %s does not exist' % self.privatekey_path - ) - - if not os.path.exists(self.csr_path): + if self.csr_content is None and not os.path.exists(self.csr_path): raise CertificateError( 'The certificate signing request file %s does not exist' % self.csr_path ) @@ -2372,7 +2465,26 @@ class AcmeCertificate(Certificate): if self.use_chain: command.append('--chain') command.extend(['--account-key', self.accountkey_path]) - command.extend(['--csr', self.csr_path]) + if self.csr_content is not None: + # We need to temporarily write the CSR to disk + fd, tmpsrc = tempfile.mkstemp() + module.add_cleanup_file(tmpsrc) # Ansible will delete the file on exit + f = os.fdopen(fd, 'wb') + try: + f.write(self.csr_content) + except Exception as err: + try: + f.close() + except Exception as dummy: + pass + module.fail_json( + msg="failed to create temporary CSR file: %s" % to_native(err), + exception=traceback.format_exc() + ) + f.close() + command.extend(['--csr', tmpsrc]) + else: + command.extend(['--csr', self.csr_path]) command.extend(['--acme-dir', self.challenge_path]) try: @@ -2411,11 +2523,13 @@ def main(): provider=dict(type='str', choices=['acme', 'assertonly', 'entrust', 'ownca', 'selfsigned']), force=dict(type='bool', default=False,), csr_path=dict(type='path'), + csr_content=dict(type='str'), backup=dict(type='bool', default=False), select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), # General properties of a certificate privatekey_path=dict(type='path'), + privatekey_content=dict(type='str'), privatekey_passphrase=dict(type='str', no_log=True), # provider: assertonly @@ -2451,7 +2565,9 @@ def main(): # provider: ownca ownca_path=dict(type='path'), + ownca_content=dict(type='str'), ownca_privatekey_path=dict(type='path'), + ownca_privatekey_content=dict(type='str'), ownca_privatekey_passphrase=dict(type='str', no_log=True), ownca_digest=dict(type='str', default='sha256'), ownca_version=dict(type='int', default=3), @@ -2490,7 +2606,13 @@ def main(): ['provider', 'entrust', ['entrust_requester_email', 'entrust_requester_name', 'entrust_requester_phone', 'entrust_api_user', 'entrust_api_key', 'entrust_api_client_cert_path', 'entrust_api_client_cert_key_path']], - ] + ], + mutually_exclusive=[ + ['csr_path', 'csr_content'], + ['privatekey_path', 'privatekey_content'], + ['ownca_path', 'ownca_content'], + ['ownca_privatekey_path', 'ownca_privatekey_content'], + ], ) try: @@ -2498,8 +2620,8 @@ def main(): certificate = CertificateAbsent(module) else: - if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None: - module.fail_json(msg='csr_path is required when provider is not assertonly') + if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None and module.params['csr_content'] is None: + module.fail_json(msg='csr_path or csr_content is required when provider is not assertonly') base_dir = os.path.dirname(module.params['path']) or '.' if not os.path.isdir(base_dir): @@ -2513,6 +2635,19 @@ def main(): module.deprecate("The 'assertonly' provider is deprecated; please see the examples of " "the 'openssl_certificate' module on how to replace it with other modules", version='2.13') + elif provider == 'selfsigned': + if module.params['privatekey_path'] is None and module.params['privatekey_content'] is None: + module.fail_json(msg='One of privatekey_path and privatekey_content must be specified for the selfsigned provider.') + elif provider == 'acme': + if module.params['acme_accountkey_path'] is None: + module.fail_json(msg='The acme_accountkey_path option must be specified for the acme provider.') + if module.params['acme_challenge_path'] is None: + module.fail_json(msg='The acme_challenge_path option must be specified for the acme provider.') + elif provider == 'ownca': + if module.params['ownca_path'] is None and module.params['ownca_content'] is None: + module.fail_json(msg='One of ownca_path and ownca_content must be specified for the ownca provider.') + if module.params['ownca_privatekey_path'] is None and module.params['ownca_privatekey_content'] is None: + module.fail_json(msg='One of ownca_privatekey_path and ownca_privatekey_content must be specified for the ownca provider.') backend = module.params['select_crypto_backend'] if backend == 'auto': diff --git a/lib/ansible/modules/crypto/openssl_certificate_info.py b/lib/ansible/modules/crypto/openssl_certificate_info.py index 7fa82e354a9..cf6d0fc2a06 100644 --- a/lib/ansible/modules/crypto/openssl_certificate_info.py +++ b/lib/ansible/modules/crypto/openssl_certificate_info.py @@ -34,8 +34,14 @@ options: path: description: - Remote absolute path where the certificate file is loaded from. + - Either I(path) or I(content) must be specified, but not both. type: path - required: true + content: + description: + - Content of the X.509 certificate in PEM format. + - Either I(path) or I(content) must be specified, but not both. + type: str + version_added: "2.10" valid_at: description: - A dict of names mapping to time specifications. Every time specified here @@ -370,13 +376,16 @@ def get_relative_time_option(input_string, input_name): class CertificateInfo(crypto_utils.OpenSSLObject): def __init__(self, module, backend): super(CertificateInfo, self).__init__( - module.params['path'], + module.params['path'] or '', 'present', False, module.check_mode, ) self.backend = backend self.module = module + self.content = module.params['content'] + if self.content is not None: + self.content = self.content.encode('utf-8') self.valid_at = module.params['valid_at'] if self.valid_at: @@ -465,7 +474,7 @@ class CertificateInfo(crypto_utils.OpenSSLObject): def get_info(self): result = dict() - self.cert = crypto_utils.load_certificate(self.path, backend=self.backend) + self.cert = crypto_utils.load_certificate(self.path, content=self.content, backend=self.backend) result['signature_algorithm'] = self._get_signature_algorithm() subject = self._get_subject_ordered() @@ -810,20 +819,28 @@ class CertificateInfoPyOpenSSL(CertificateInfo): def main(): module = AnsibleModule( argument_spec=dict( - path=dict(type='path', required=True), + path=dict(type='path'), + content=dict(type='str'), valid_at=dict(type='dict'), select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), ), + required_one_of=( + ['path', 'content'], + ), + mutually_exclusive=( + ['path', 'content'], + ), supports_check_mode=True, ) try: - base_dir = os.path.dirname(module.params['path']) or '.' - if not os.path.isdir(base_dir): - module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir - ) + if module.params['path'] is not None: + base_dir = os.path.dirname(module.params['path']) or '.' + if not os.path.isdir(base_dir): + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) backend = module.params['select_crypto_backend'] if backend == 'auto': diff --git a/lib/ansible/modules/crypto/openssl_csr.py b/lib/ansible/modules/crypto/openssl_csr.py index 2a72cac9c8d..d3a6b8cb995 100644 --- a/lib/ansible/modules/crypto/openssl_csr.py +++ b/lib/ansible/modules/crypto/openssl_csr.py @@ -48,8 +48,14 @@ options: privatekey_path: description: - The path to the private key to use when signing the certificate signing request. - - Required if I(state) is C(present). + - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both. type: path + privatekey_content: + description: + - The content of the private key to use when signing the certificate signing request. + - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both. + type: str + version_added: "2.10" privatekey_passphrase: description: - The passphrase for the private key. @@ -295,6 +301,12 @@ EXAMPLES = r''' privatekey_path: /etc/ssl/private/ansible.com.pem common_name: www.ansible.com +- name: Generate an OpenSSL Certificate Signing Request with an inline key + openssl_csr: + path: /etc/ssl/csr/www.ansible.com.csr + privatekey_content: "{{ private_key_content }}" + common_name: www.ansible.com + - name: Generate an OpenSSL Certificate Signing Request with a passphrase protected private key openssl_csr: path: /etc/ssl/csr/www.ansible.com.csr @@ -355,7 +367,9 @@ EXAMPLES = r''' RETURN = r''' privatekey: - description: Path to the TLS/SSL private key the CSR was generated for + description: + - Path to the TLS/SSL private key the CSR was generated for + - Will be C(none) if the private key has been provided in I(privatekey_content). returned: changed or success type: str sample: /etc/ssl/private/ansible.com.pem @@ -474,6 +488,9 @@ class CertificateSigningRequestBase(crypto_utils.OpenSSLObject): ) self.digest = module.params['digest'] self.privatekey_path = module.params['privatekey_path'] + self.privatekey_content = module.params['privatekey_content'] + if self.privatekey_content is not None: + self.privatekey_content = self.privatekey_content.encode('utf-8') self.privatekey_passphrase = module.params['privatekey_passphrase'] self.version = module.params['version'] self.subjectAltName = module.params['subject_alt_name'] @@ -655,7 +672,11 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase): def _load_private_key(self): try: - self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase) + self.privatekey = crypto_utils.load_privatekey( + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase + ) except crypto_utils.OpenSSLBadPassphraseError as exc: raise CertificateSigningRequestError(exc) @@ -842,12 +863,16 @@ class CertificateSigningRequestCryptography(CertificateSigningRequestBase): def _load_private_key(self): try: - with open(self.privatekey_path, 'rb') as f: - self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key( - f.read(), - None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase), - backend=self.cryptography_backend - ) + if self.privatekey_content is not None: + content = self.privatekey_content + else: + with open(self.privatekey_path, 'rb') as f: + content = f.read() + self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key( + content, + None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase), + backend=self.cryptography_backend + ) except Exception as e: raise CertificateSigningRequestError(e) @@ -1003,6 +1028,7 @@ def main(): state=dict(type='str', default='present', choices=['absent', 'present']), digest=dict(type='str', default='sha256'), privatekey_path=dict(type='path'), + privatekey_content=dict(type='str'), privatekey_passphrase=dict(type='str', no_log=True), version=dict(type='int', default=1), force=dict(type='bool', default=False), @@ -1035,7 +1061,10 @@ def main(): select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), ), required_together=[('authority_cert_issuer', 'authority_cert_serial_number')], - required_if=[('state', 'present', ['privatekey_path'])], + required_if=[('state', 'present', ['privatekey_path', 'privatekey_content'], True)], + mutually_exclusive=( + ['privatekey_path', 'privatekey_content'], + ), add_file_common_args=True, supports_check_mode=True, ) diff --git a/lib/ansible/modules/crypto/openssl_csr_info.py b/lib/ansible/modules/crypto/openssl_csr_info.py index cb80b55b017..713ee33808c 100644 --- a/lib/ansible/modules/crypto/openssl_csr_info.py +++ b/lib/ansible/modules/crypto/openssl_csr_info.py @@ -35,9 +35,14 @@ options: path: description: - Remote absolute path where the CSR file is loaded from. + - Either I(path) or I(content) must be specified, but not both. type: path - required: true - + content: + description: + - Content of the CSR file. + - Either I(path) or I(content) must be specified, but not both. + type: str + version_added: "2.10" select_crypto_backend: description: - Determines which crypto backend to use. @@ -257,13 +262,16 @@ TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" class CertificateSigningRequestInfo(crypto_utils.OpenSSLObject): def __init__(self, module, backend): super(CertificateSigningRequestInfo, self).__init__( - module.params['path'], + module.params['path'] or '', 'present', False, module.check_mode, ) self.backend = backend self.module = module + self.content = module.params['content'] + if self.content is not None: + self.content = self.content.encode('utf-8') def generate(self): # Empty method because crypto_utils.OpenSSLObject wants this @@ -319,7 +327,7 @@ class CertificateSigningRequestInfo(crypto_utils.OpenSSLObject): def get_info(self): result = dict() - self.csr = crypto_utils.load_certificate_request(self.path, backend=self.backend) + self.csr = crypto_utils.load_certificate_request(self.path, content=self.content, backend=self.backend) subject = self._get_subject_ordered() result['subject'] = dict() @@ -591,19 +599,27 @@ class CertificateSigningRequestInfoPyOpenSSL(CertificateSigningRequestInfo): def main(): module = AnsibleModule( argument_spec=dict( - path=dict(type='path', required=True), + path=dict(type='path'), + content=dict(type='str'), select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), ), + required_one_of=( + ['path', 'content'], + ), + mutually_exclusive=( + ['path', 'content'], + ), supports_check_mode=True, ) try: - base_dir = os.path.dirname(module.params['path']) or '.' - if not os.path.isdir(base_dir): - module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir - ) + if module.params['path'] is not None: + base_dir = os.path.dirname(module.params['path']) or '.' + if not os.path.isdir(base_dir): + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) backend = module.params['select_crypto_backend'] if backend == 'auto': diff --git a/lib/ansible/modules/crypto/openssl_privatekey.py b/lib/ansible/modules/crypto/openssl_privatekey.py index 00c715eb04f..f7632e923b2 100644 --- a/lib/ansible/modules/crypto/openssl_privatekey.py +++ b/lib/ansible/modules/crypto/openssl_privatekey.py @@ -606,7 +606,7 @@ class PrivateKeyCryptography(PrivateKeyBase): format=export_format, encryption_algorithm=encryption_algorithm ) - except ValueError as e: + except ValueError as dummy: self.module.fail_json( msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format) ) diff --git a/lib/ansible/modules/crypto/openssl_privatekey_info.py b/lib/ansible/modules/crypto/openssl_privatekey_info.py index e03de8ea030..c3f2b16dadb 100644 --- a/lib/ansible/modules/crypto/openssl_privatekey_info.py +++ b/lib/ansible/modules/crypto/openssl_privatekey_info.py @@ -38,7 +38,12 @@ options: description: - Remote absolute path where the private key file is loaded from. type: path - required: true + content: + description: + - Content of the private key file. + - Either I(path) or I(content) must be specified, but not both. + type: str + version_added: "2.10" passphrase: description: - The passphrase for the private key. @@ -318,13 +323,14 @@ def _is_cryptography_key_consistent(key, key_public_data, key_private_data): class PrivateKeyInfo(crypto_utils.OpenSSLObject): def __init__(self, module, backend): super(PrivateKeyInfo, self).__init__( - module.params['path'], + module.params['path'] or '', 'present', False, module.check_mode, ) self.backend = backend self.module = module + self.content = module.params['content'] self.passphrase = module.params['passphrase'] self.return_private_key_data = module.params['return_private_key_data'] @@ -355,12 +361,16 @@ class PrivateKeyInfo(crypto_utils.OpenSSLObject): can_parse_key=False, key_is_consistent=None, ) - try: - with open(self.path, 'rb') as b_priv_key_fh: - priv_key_detail = b_priv_key_fh.read() + if self.content is not None: + priv_key_detail = self.content.encode('utf-8') result['can_load_key'] = True - except (IOError, OSError) as exc: - self.module.fail_json(msg=to_native(exc), **result) + else: + try: + with open(self.path, 'rb') as b_priv_key_fh: + priv_key_detail = b_priv_key_fh.read() + result['can_load_key'] = True + except (IOError, OSError) as exc: + self.module.fail_json(msg=to_native(exc), **result) try: self.key = crypto_utils.load_privatekey( path=None, @@ -576,21 +586,29 @@ class PrivateKeyInfoPyOpenSSL(PrivateKeyInfo): def main(): module = AnsibleModule( argument_spec=dict( - path=dict(type='path', required=True), + path=dict(type='path'), + content=dict(type='str'), passphrase=dict(type='str', no_log=True), return_private_key_data=dict(type='bool', default=False), select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), ), + required_one_of=( + ['path', 'content'], + ), + mutually_exclusive=( + ['path', 'content'], + ), supports_check_mode=True, ) try: - base_dir = os.path.dirname(module.params['path']) or '.' - if not os.path.isdir(base_dir): - module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir - ) + if module.params['path'] is not None: + base_dir = os.path.dirname(module.params['path']) or '.' + if not os.path.isdir(base_dir): + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) backend = module.params['select_crypto_backend'] if backend == 'auto': diff --git a/lib/ansible/modules/crypto/openssl_publickey.py b/lib/ansible/modules/crypto/openssl_publickey.py index 10a1bffd6a0..9511448fa40 100644 --- a/lib/ansible/modules/crypto/openssl_publickey.py +++ b/lib/ansible/modules/crypto/openssl_publickey.py @@ -58,8 +58,16 @@ options: privatekey_path: description: - Path to the TLS/SSL private key from which to generate the public key. - - Required if I(state) is C(present). + - Either I(privatekey_path) or I(privatekey_content) must be specified, but not both. + If I(state) is C(present), one of them is required. type: path + privatekey_content: + description: + - The content of the TLS/SSL private key from which to generate the public key. + - Either I(privatekey_path) or I(privatekey_content) must be specified, but not both. + If I(state) is C(present), one of them is required. + type: str + version_added: "2.10" privatekey_passphrase: description: - The passphrase for the private key. @@ -98,6 +106,11 @@ EXAMPLES = r''' path: /etc/ssl/public/ansible.com.pem privatekey_path: /etc/ssl/private/ansible.com.pem +- name: Generate an OpenSSL public key in PEM format from an inline key + openssl_publickey: + path: /etc/ssl/public/ansible.com.pem + privatekey_content: "{{ private_key_content }}" + - name: Generate an OpenSSL public key in OpenSSH v2 format openssl_publickey: path: /etc/ssl/public/ansible.com.pem @@ -119,13 +132,14 @@ EXAMPLES = r''' - name: Remove an OpenSSL public key openssl_publickey: path: /etc/ssl/public/ansible.com.pem - privatekey_path: /etc/ssl/private/ansible.com.pem state: absent ''' RETURN = r''' privatekey: - description: Path to the TLS/SSL private key the public key was generated from. + description: + - Path to the TLS/SSL private key the public key was generated from. + - Will be C(none) if the private key has been provided in I(privatekey_content). returned: changed or success type: str sample: /etc/ssl/private/ansible.com.pem @@ -191,7 +205,7 @@ else: CRYPTOGRAPHY_FOUND = True from ansible.module_utils import crypto as crypto_utils -from ansible.module_utils._text import to_native, to_bytes +from ansible.module_utils._text import to_native from ansible.module_utils.basic import AnsibleModule, missing_required_lib @@ -210,6 +224,9 @@ class PublicKey(crypto_utils.OpenSSLObject): ) self.format = module.params['format'] self.privatekey_path = module.params['privatekey_path'] + self.privatekey_content = module.params['privatekey_content'] + if self.privatekey_content is not None: + self.privatekey_content = self.privatekey_content.encode('utf-8') self.privatekey_passphrase = module.params['privatekey_passphrase'] self.privatekey = None self.fingerprint = {} @@ -220,8 +237,9 @@ class PublicKey(crypto_utils.OpenSSLObject): def _create_publickey(self, module): self.privatekey = crypto_utils.load_privatekey( - self.privatekey_path, - self.privatekey_passphrase, + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase, backend=self.backend ) if self.backend == 'cryptography': @@ -244,7 +262,7 @@ class PublicKey(crypto_utils.OpenSSLObject): def generate(self, module): """Generate the public key.""" - if not os.path.exists(self.privatekey_path): + if self.privatekey_content is None and not os.path.exists(self.privatekey_path): raise PublicKeyError( 'The private key %s does not exist' % self.privatekey_path ) @@ -264,8 +282,9 @@ class PublicKey(crypto_utils.OpenSSLObject): raise PublicKeyError(exc) self.fingerprint = crypto_utils.get_fingerprint( - self.privatekey_path, - self.privatekey_passphrase + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase ) file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): @@ -277,7 +296,7 @@ class PublicKey(crypto_utils.OpenSSLObject): state_and_perms = super(PublicKey, self).check(module, perms_required) def _check_privatekey(): - if not os.path.exists(self.privatekey_path): + if self.privatekey_content is None and not os.path.exists(self.privatekey_path): return False try: @@ -346,6 +365,7 @@ def main(): force=dict(type='bool', default=False), path=dict(type='path', required=True), privatekey_path=dict(type='path'), + privatekey_content=dict(type='str'), format=dict(type='str', default='PEM', choices=['OpenSSH', 'PEM']), privatekey_passphrase=dict(type='str', no_log=True), backup=dict(type='bool', default=False), @@ -353,7 +373,10 @@ def main(): ), supports_check_mode=True, add_file_common_args=True, - required_if=[('state', 'present', ['privatekey_path'])], + required_if=[('state', 'present', ['privatekey_path', 'privatekey_content'], True)], + mutually_exclusive=( + ['privatekey_path', 'privatekey_content'], + ), ) minimal_cryptography_version = MINIMAL_CRYPTOGRAPHY_VERSION diff --git a/test/integration/targets/openssl_certificate_info/tasks/impl.yml b/test/integration/targets/openssl_certificate_info/tasks/impl.yml index 2f65626dac7..2ae89ed7fd6 100644 --- a/test/integration/targets/openssl_certificate_info/tasks/impl.yml +++ b/test/integration/targets/openssl_certificate_info/tasks/impl.yml @@ -35,6 +35,17 @@ set_fact: info_results: "{{ info_results + [result] }}" +- name: ({{select_crypto_backend}}) Get certificate info directly + openssl_certificate_info: + content: '{{ lookup("file", output_dir ~ "/cert_1.pem") }}' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result_direct + +- name: ({{select_crypto_backend}}) Compare output of direct and loaded info + assert: + that: + - result == result_direct + - name: ({{select_crypto_backend}}) Get certificate info openssl_certificate_info: path: '{{ output_dir }}/cert_2.pem' diff --git a/test/integration/targets/openssl_csr_info/tasks/impl.yml b/test/integration/targets/openssl_csr_info/tasks/impl.yml index 9516583ef5d..2ab8b87bfdf 100644 --- a/test/integration/targets/openssl_csr_info/tasks/impl.yml +++ b/test/integration/targets/openssl_csr_info/tasks/impl.yml @@ -32,6 +32,17 @@ set_fact: info_results: "{{ info_results + [result] }}" +- name: ({{select_crypto_backend}}) Get CSR info directly + openssl_csr_info: + content: '{{ lookup("file", output_dir ~ "/csr_1.csr") }}' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result_direct + +- name: ({{select_crypto_backend}}) Compare output of direct and loaded info + assert: + that: + - result == result_direct + - name: ({{select_crypto_backend}}) Get CSR info openssl_csr_info: path: '{{ output_dir }}/csr_2.csr' diff --git a/test/integration/targets/openssl_privatekey_info/tasks/impl.yml b/test/integration/targets/openssl_privatekey_info/tasks/impl.yml index 2bb8d489ffc..c74acdec795 100644 --- a/test/integration/targets/openssl_privatekey_info/tasks/impl.yml +++ b/test/integration/targets/openssl_privatekey_info/tasks/impl.yml @@ -24,6 +24,17 @@ set_fact: info_results: "{{ info_results | combine({'key1': result}) }}" +- name: ({{select_crypto_backend}}) Get key 1 info directly + openssl_privatekey_info: + content: '{{ lookup("file", output_dir ~ "/privatekey_1.pem") }}' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result_direct + +- name: ({{select_crypto_backend}}) Compare output of direct and loaded info + assert: + that: + - result == result_direct + - name: ({{select_crypto_backend}}) Get key 2 info openssl_privatekey_info: path: '{{ output_dir }}/privatekey_2.pem'