openssl_* / x509_* modules: refactoring (#67540)

* Move common code to module_utils.

* Unify get_relative_time_option.
This commit is contained in:
Felix Fontein 2020-02-18 22:23:50 +01:00 committed by GitHub
parent b8f0cfc38a
commit de5c18af7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 151 deletions

View file

@ -109,6 +109,7 @@ try:
except ImportError:
CRYPTOGRAPHY_HAS_ED448 = False
HAS_CRYPTOGRAPHY = True
except ImportError:
# Error handled in the calling module.
CRYPTOGRAPHY_HAS_X25519 = False
@ -116,6 +117,7 @@ except ImportError:
CRYPTOGRAPHY_HAS_X448 = False
CRYPTOGRAPHY_HAS_ED25519 = False
CRYPTOGRAPHY_HAS_ED448 = False
HAS_CRYPTOGRAPHY = False
import abc
@ -129,7 +131,7 @@ import re
import tempfile
from ansible.module_utils import six
from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils._text import to_native, to_bytes, to_text
class OpenSSLObjectError(Exception):
@ -362,6 +364,40 @@ def convert_relative_to_datetime(relative_time_string):
return datetime.datetime.utcnow() - offset
def get_relative_time_option(input_string, input_name, backend='cryptography'):
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
string is provided.
The return value will be a datetime object for the cryptography backend,
and a ASN1 formatted string for the pyopenssl backend."""
result = to_native(input_string)
if result is None:
raise OpenSSLObjectError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
# Relative time
if result.startswith("+") or result.startswith("-"):
result_datetime = convert_relative_to_datetime(result)
if backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif backend == 'cryptography':
return result_datetime
# Absolute time
if backend == 'pyopenssl':
return input_string
elif backend == 'cryptography':
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
return datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
raise OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)
def select_message_digest(digest_string):
digest = None
if digest_string == 'sha256':
@ -2037,3 +2073,53 @@ def cryptography_compare_public_keys(key1, key2):
b = key2.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw)
return a == b
return key1.public_numbers() == key2.public_numbers()
if HAS_CRYPTOGRAPHY:
REVOCATION_REASON_MAP = {
'unspecified': x509.ReasonFlags.unspecified,
'key_compromise': x509.ReasonFlags.key_compromise,
'ca_compromise': x509.ReasonFlags.ca_compromise,
'affiliation_changed': x509.ReasonFlags.affiliation_changed,
'superseded': x509.ReasonFlags.superseded,
'cessation_of_operation': x509.ReasonFlags.cessation_of_operation,
'certificate_hold': x509.ReasonFlags.certificate_hold,
'privilege_withdrawn': x509.ReasonFlags.privilege_withdrawn,
'aa_compromise': x509.ReasonFlags.aa_compromise,
'remove_from_crl': x509.ReasonFlags.remove_from_crl,
}
REVOCATION_REASON_MAP_INVERSE = dict()
for k, v in REVOCATION_REASON_MAP.items():
REVOCATION_REASON_MAP_INVERSE[v] = k
def cryptography_decode_revoked_certificate(cert):
result = {
'serial_number': cert.serial_number,
'revocation_date': cert.revocation_date,
'issuer': None,
'issuer_critical': False,
'reason': None,
'reason_critical': False,
'invalidity_date': None,
'invalidity_date_critical': False,
}
try:
ext = cert.extensions.get_extension_for_class(x509.CertificateIssuer)
result['issuer'] = list(ext.value)
result['issuer_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
try:
ext = cert.extensions.get_extension_for_class(x509.CRLReason)
result['reason'] = ext.value.reason
result['reason_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
try:
ext = cert.extensions.get_extension_for_class(x509.InvalidityDate)
result['invalidity_date'] = ext.value.invalidity_date
result['invalidity_date_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
return result

View file

@ -959,34 +959,6 @@ class Certificate(crypto_utils.OpenSSLObject):
self.backup = module.params['backup']
self.backup_file = None
def get_relative_time_option(self, input_string, input_name):
"""Return an ASN1 formatted string if a relative timespec
or an ASN1 formatted string is provided."""
result = to_native(input_string)
if result is None:
raise CertificateError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
if result.startswith("+") or result.startswith("-"):
result_datetime = crypto_utils.convert_relative_to_datetime(
result)
if self.backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif self.backend == 'cryptography':
return result_datetime
if self.backend == 'cryptography':
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
return datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
raise CertificateError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)
return input_string
def _validate_privatekey(self):
if self.backend == 'pyopenssl':
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
@ -1147,8 +1119,8 @@ class SelfSignedCertificateCryptography(Certificate):
def __init__(self, module):
super(SelfSignedCertificateCryptography, self).__init__(module, 'cryptography')
self.create_subject_key_identifier = module.params['selfsigned_create_subject_key_identifier']
self.notBefore = self.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before')
self.notAfter = self.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after')
self.notBefore = crypto_utils.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend)
self.notAfter = crypto_utils.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend)
self.digest = crypto_utils.select_message_digest(module.params['selfsigned_digest'])
self.version = module.params['selfsigned_version']
self.serial_number = x509.random_serial_number()
@ -1280,8 +1252,8 @@ class SelfSignedCertificate(Certificate):
super(SelfSignedCertificate, self).__init__(module, 'pyopenssl')
if module.params['selfsigned_create_subject_key_identifier'] != 'create_if_not_provided':
module.fail_json(msg='selfsigned_create_subject_key_identifier cannot be used with the pyOpenSSL backend!')
self.notBefore = self.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before')
self.notAfter = self.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after')
self.notBefore = crypto_utils.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before', backend=self.backend)
self.notAfter = crypto_utils.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after', backend=self.backend)
self.digest = module.params['selfsigned_digest']
self.version = module.params['selfsigned_version']
self.serial_number = randint(1000, 99999)
@ -1379,8 +1351,8 @@ class OwnCACertificateCryptography(Certificate):
super(OwnCACertificateCryptography, self).__init__(module, 'cryptography')
self.create_subject_key_identifier = module.params['ownca_create_subject_key_identifier']
self.create_authority_key_identifier = module.params['ownca_create_authority_key_identifier']
self.notBefore = self.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before')
self.notAfter = self.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after')
self.notBefore = crypto_utils.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before', backend=self.backend)
self.notAfter = crypto_utils.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after', backend=self.backend)
self.digest = crypto_utils.select_message_digest(module.params['ownca_digest'])
self.version = module.params['ownca_version']
self.serial_number = x509.random_serial_number()
@ -1575,8 +1547,8 @@ class OwnCACertificate(Certificate):
def __init__(self, module):
super(OwnCACertificate, self).__init__(module, 'pyopenssl')
self.notBefore = self.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before')
self.notAfter = self.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after')
self.notBefore = crypto_utils.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before', backend=self.backend)
self.notAfter = crypto_utils.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after', backend=self.backend)
self.digest = module.params['ownca_digest']
self.version = module.params['ownca_version']
self.serial_number = randint(1000, 99999)
@ -1937,7 +1909,7 @@ class AssertOnlyCertificateBase(Certificate):
if self.not_before is not None:
cert_not_valid_before = self._validate_not_before()
if cert_not_valid_before != self.get_relative_time_option(self.not_before, 'not_before'):
if cert_not_valid_before != crypto_utils.get_relative_time_option(self.not_before, 'not_before', backend=self.backend):
messages.append(
'Invalid not_before component (got %s, expected %s to be present)' %
(cert_not_valid_before, self.not_before)
@ -1945,7 +1917,7 @@ class AssertOnlyCertificateBase(Certificate):
if self.not_after is not None:
cert_not_valid_after = self._validate_not_after()
if cert_not_valid_after != self.get_relative_time_option(self.not_after, 'not_after'):
if cert_not_valid_after != crypto_utils.get_relative_time_option(self.not_after, 'not_after', backend=self.backend):
messages.append(
'Invalid not_after component (got %s, expected %s to be present)' %
(cert_not_valid_after, self.not_after)
@ -2120,15 +2092,15 @@ class AssertOnlyCertificateCryptography(AssertOnlyCertificateBase):
return self.cert.not_valid_after
def _validate_valid_at(self):
rt = self.get_relative_time_option(self.valid_at, 'valid_at')
rt = crypto_utils.get_relative_time_option(self.valid_at, 'valid_at', backend=self.backend)
return self.cert.not_valid_before, rt, self.cert.not_valid_after
def _validate_invalid_at(self):
rt = self.get_relative_time_option(self.invalid_at, 'invalid_at')
rt = crypto_utils.get_relative_time_option(self.invalid_at, 'invalid_at', backend=self.backend)
return self.cert.not_valid_before, rt, self.cert.not_valid_after
def _validate_valid_in(self):
valid_in_date = self.get_relative_time_option(self.valid_in, "valid_in")
valid_in_date = crypto_utils.get_relative_time_option(self.valid_in, "valid_in", backend=self.backend)
return self.cert.not_valid_before, valid_in_date, self.cert.not_valid_after
@ -2285,17 +2257,17 @@ class AssertOnlyCertificate(AssertOnlyCertificateBase):
return self.cert.get_notAfter()
def _validate_valid_at(self):
rt = self.get_relative_time_option(self.valid_at, "valid_at")
rt = crypto_utils.get_relative_time_option(self.valid_at, "valid_at", backend=self.backend)
rt = to_bytes(rt, errors='surrogate_or_strict')
return self.cert.get_notBefore(), rt, self.cert.get_notAfter()
def _validate_invalid_at(self):
rt = self.get_relative_time_option(self.invalid_at, "invalid_at")
rt = crypto_utils.get_relative_time_option(self.invalid_at, "invalid_at", backend=self.backend)
rt = to_bytes(rt, errors='surrogate_or_strict')
return self.cert.get_notBefore(), rt, self.cert.get_notAfter()
def _validate_valid_in(self):
valid_in_asn1 = self.get_relative_time_option(self.valid_in, "valid_in")
valid_in_asn1 = crypto_utils.get_relative_time_option(self.valid_in, "valid_in", backend=self.backend)
valid_in_date = to_bytes(valid_in_asn1, errors='surrogate_or_strict')
return self.cert.get_notBefore(), valid_in_date, self.cert.get_notAfter()
@ -2306,7 +2278,7 @@ class EntrustCertificate(Certificate):
def __init__(self, module, backend):
super(EntrustCertificate, self).__init__(module, backend)
self.trackingId = None
self.notAfter = self.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after')
self.notAfter = crypto_utils.get_relative_time_option(module.params['entrust_not_after'], 'entrust_not_after', backend=self.backend)
if self.csr_content is None or not os.path.exists(self.csr_path):
raise CertificateError(

View file

@ -348,31 +348,6 @@ else:
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
def get_relative_time_option(input_string, input_name):
"""Return an ASN1 formatted string if a relative timespec
or an ASN1 formatted string is provided."""
result = input_string
if result.startswith("+") or result.startswith("-"):
return crypto_utils.convert_relative_to_datetime(result)
if result is None:
raise crypto_utils.OpenSSLObjectError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
result = datetime.datetime.strptime(input_string, date_fmt)
break
except ValueError:
pass
if not isinstance(result, datetime.datetime):
raise crypto_utils.OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)
return result
class CertificateInfo(crypto_utils.OpenSSLObject):
def __init__(self, module, backend):
super(CertificateInfo, self).__init__(
@ -394,7 +369,7 @@ class CertificateInfo(crypto_utils.OpenSSLObject):
self.module.fail_json(
msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v))
)
self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k))
self.valid_at[k] = crypto_utils.get_relative_time_option(v, 'valid_at.{0}'.format(k))
def generate(self):
# Empty method because crypto_utils.OpenSSLObject wants this

View file

@ -347,7 +347,6 @@ crl:
'''
import datetime
import os
import traceback
from distutils.version import LooseVersion
@ -369,25 +368,8 @@ try:
RevokedCertificateBuilder,
NameAttribute,
Name,
ReasonFlags,
)
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
REASON_MAP = {
'unspecified': ReasonFlags.unspecified,
'key_compromise': ReasonFlags.key_compromise,
'ca_compromise': ReasonFlags.ca_compromise,
'affiliation_changed': ReasonFlags.affiliation_changed,
'superseded': ReasonFlags.superseded,
'cessation_of_operation': ReasonFlags.cessation_of_operation,
'certificate_hold': ReasonFlags.certificate_hold,
'privilege_withdrawn': ReasonFlags.privilege_withdrawn,
'aa_compromise': ReasonFlags.aa_compromise,
'remove_from_crl': ReasonFlags.remove_from_crl,
}
REASON_MAP_INVERSE = dict()
for k, v in REASON_MAP.items():
REASON_MAP_INVERSE[v] = k
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
@ -404,27 +386,6 @@ class CRLError(crypto_utils.OpenSSLObjectError):
class CRL(crypto_utils.OpenSSLObject):
def get_relative_time_option(self, input_string, input_name):
"""Return an ASN1 formatted string if a relative timespec
or an ASN1 formatted string is provided."""
result = to_native(input_string)
if result is None:
raise CRLError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
if result.startswith("+") or result.startswith("-"):
return crypto_utils.convert_relative_to_datetime(result)
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
return datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
raise CRLError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)
def __init__(self, module):
super(CRL, self).__init__(
module.params['path'],
@ -447,8 +408,8 @@ class CRL(crypto_utils.OpenSSLObject):
self.issuer = crypto_utils.parse_name_field(module.params['issuer'])
self.issuer = [(entry[0], entry[1]) for entry in self.issuer if entry[1]]
self.last_update = self.get_relative_time_option(module.params['last_update'], 'last_update')
self.next_update = self.get_relative_time_option(module.params['next_update'], 'next_update')
self.last_update = crypto_utils.get_relative_time_option(module.params['last_update'], 'last_update')
self.next_update = crypto_utils.get_relative_time_option(module.params['next_update'], 'next_update')
self.digest = crypto_utils.select_message_digest(module.params['digest'])
if self.digest is None:
@ -494,15 +455,15 @@ class CRL(crypto_utils.OpenSSLObject):
if rc['issuer']:
result['issuer'] = [crypto_utils.cryptography_get_name(issuer) for issuer in rc['issuer']]
result['issuer_critical'] = rc['issuer_critical']
result['revocation_date'] = self.get_relative_time_option(
result['revocation_date'] = crypto_utils.get_relative_time_option(
rc['revocation_date'],
path_prefix + 'revocation_date'
)
if rc['reason']:
result['reason'] = REASON_MAP[rc['reason']]
result['reason'] = crypto_utils.REVOCATION_REASON_MAP[rc['reason']]
result['reason_critical'] = rc['reason_critical']
if rc['invalidity_date']:
result['invalidity_date'] = self.get_relative_time_option(
result['invalidity_date'] = crypto_utils.get_relative_time_option(
rc['invalidity_date'],
path_prefix + 'invalidity_date'
)
@ -563,37 +524,6 @@ class CRL(crypto_utils.OpenSSLObject):
entry['invalidity_date_critical'],
)
def _decode_revoked(self, cert):
result = {
'serial_number': cert.serial_number,
'revocation_date': cert.revocation_date,
'issuer': None,
'issuer_critical': False,
'reason': None,
'reason_critical': False,
'invalidity_date': None,
'invalidity_date_critical': False,
}
try:
ext = cert.extensions.get_extension_for_class(x509.CertificateIssuer)
result['issuer'] = list(ext.value)
result['issuer_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
try:
ext = cert.extensions.get_extension_for_class(x509.CRLReason)
result['reason'] = ext.value.reason
result['reason_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
try:
ext = cert.extensions.get_extension_for_class(x509.InvalidityDate)
result['invalidity_date'] = ext.value.invalidity_date
result['invalidity_date_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
return result
def check(self, perms_required=True):
"""Ensure the resource is in its desired state."""
@ -616,7 +546,7 @@ class CRL(crypto_utils.OpenSSLObject):
if want_issuer != [(sub.oid, sub.value) for sub in self.crl.issuer]:
return False
old_entries = [self._compress_entry(self._decode_revoked(cert)) for cert in self.crl]
old_entries = [self._compress_entry(crypto_utils.cryptography_decode_revoked_certificate(cert)) for cert in self.crl]
new_entries = [self._compress_entry(cert) for cert in self.revoked_certificates]
if self.update:
# We don't simply use a set so that duplicate entries are treated correctly
@ -649,7 +579,7 @@ class CRL(crypto_utils.OpenSSLObject):
if self.update and self.crl:
new_entries = set([self._compress_entry(entry) for entry in self.revoked_certificates])
for entry in self.crl:
decoded_entry = self._compress_entry(self._decode_revoked(entry))
decoded_entry = self._compress_entry(crypto_utils.cryptography_decode_revoked_certificate(entry))
if decoded_entry not in new_entries:
crl = crl.add_revoked_certificate(entry)
for entry in self.revoked_certificates:
@ -700,7 +630,7 @@ class CRL(crypto_utils.OpenSSLObject):
[crypto_utils.cryptography_decode_name(issuer) for issuer in entry['issuer']]
if entry['issuer'] is not None else None,
'issuer_critical': entry['issuer_critical'],
'reason': REASON_MAP_INVERSE.get(entry['reason']) if entry['reason'] is not None else None,
'reason': crypto_utils.REVOCATION_REASON_MAP_INVERSE.get(entry['reason']) if entry['reason'] is not None else None,
'reason_critical': entry['reason_critical'],
'invalidity_date':
entry['invalidity_date'].strftime(TIMESTAMP_FORMAT)
@ -758,7 +688,7 @@ class CRL(crypto_utils.OpenSSLObject):
result['issuer'][k] = v
result['revoked_certificates'] = []
for cert in self.crl:
entry = self._decode_revoked(cert)
entry = crypto_utils.cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(self._dump_revoked(entry))
if self.return_content: