Relative time support for crypto modules (openssl_certificate) (#50570)

* Move relative time handling to module_utils and rewrite it

* Fix cases with no seconds defined

* fix a small typo along the way

* add relative time handling to the ownca provider in openssl_certificate

* add initial integration test for relative time ownca

* quote the documentation to produce valid yaml

* move timespec conversion and validation to the init function

* fix small edge case in conversion function

* add relative timestamp handling to the selfsigned provider

* add get_relative_time_option

* add relative timestamp handling to valid_in

* pep8 fix indentation

* add quotes in error message

* add changelog fragment

* Update changelogs/fragments/50570-relative_time_crypto.yaml

Co-Authored-By: MarkusTeufelberger <mteufelberger@mgit.at>
This commit is contained in:
MarkusTeufelberger 2019-01-22 21:41:02 +01:00 committed by John R Barker
parent 152d7b674d
commit c1bc556b0a
5 changed files with 128 additions and 72 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- "openssl_certificate - Add support for relative time offsets in the ``selfsigned_not_before``/``selfsigned_not_after``/``ownca_not_before``/``ownca_not_after`` and ``valid_in`` parameters."

View file

@ -24,9 +24,11 @@ except ImportError:
pass
import abc
import datetime
import errno
import hashlib
import os
import re
from ansible.module_utils import six
from ansible.module_utils._text import to_bytes
@ -129,6 +131,37 @@ def parse_name_field(input_dict):
return result
def convert_relative_to_datetime(relative_time_string):
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
parsed_result = re.match(
r"^(?P<prefix>[+-])((?P<weeks>\d+)[wW])?((?P<days>\d+)[dD])?((?P<hours>\d+)[hH])?((?P<minutes>\d+)[mM])?((?P<seconds>\d+)[sS]?)?$",
relative_time_string)
if parsed_result is None or len(relative_time_string) == 1:
# not matched or only a single "+" or "-"
return None
offset = datetime.timedelta(0)
if parsed_result.group("weeks") is not None:
offset += datetime.timedelta(weeks=int(parsed_result.group("weeks")))
if parsed_result.group("days") is not None:
offset += datetime.timedelta(days=int(parsed_result.group("days")))
if parsed_result.group("hours") is not None:
offset += datetime.timedelta(hours=int(parsed_result.group("hours")))
if parsed_result.group("minutes") is not None:
offset += datetime.timedelta(
minutes=int(parsed_result.group("minutes")))
if parsed_result.group("seconds") is not None:
offset += datetime.timedelta(
seconds=int(parsed_result.group("seconds")))
if parsed_result.group("prefix") == "+":
return datetime.datetime.utcnow() + offset
else:
return datetime.datetime.utcnow() - offset
@six.add_metaclass(abc.ABCMeta)
class OpenSSLObject(object):

View file

@ -21,7 +21,7 @@ author: "David Kainz (@lolcube)"
version_added: "2.8"
short_description: Generate OpenSSH host or user certificates.
description:
- Generate and regenerate OpensSSH host or user certificates.
- Generate and regenerate OpenSSH host or user certificates.
requirements:
- "ssh-keygen"
options:
@ -203,6 +203,7 @@ from datetime import timedelta
from shutil import copy2
from shutil import rmtree
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.crypto import convert_relative_to_datetime
from ansible.module_utils._text import to_native
@ -332,24 +333,12 @@ class Certificate(object):
def convert_to_datetime(self, module, timestring):
if self.is_relative(timestring):
dispatched_time = re.findall("^([+\\-])((\\d+)[w])?((\\d+)[d])?((\\d+)[h])?((\\d+)[m])?((\\d+)[s])?$", timestring, re.I)
if not dispatched_time:
module.fail_json(msg="'%s' is not a valid time format." % timestring)
dispatched_time = dispatched_time[0]
if dispatched_time[0] == "+":
return datetime.utcnow() + timedelta(
weeks=int('0' + dispatched_time[2]),
days=int('0' + dispatched_time[4]),
hours=int('0' + dispatched_time[6]),
minutes=int('0' + dispatched_time[8]),
seconds=int('0' + dispatched_time[10]))
result = convert_relative_to_datetime(timestring)
if result is None:
module.fail_json(
msg="'%s' is not a valid time format." % timestring)
else:
return datetime.utcnow() - timedelta(
weeks=int('0' + dispatched_time[2]),
days=int('0' + dispatched_time[4]),
hours=int('0' + dispatched_time[6]),
minutes=int('0' + dispatched_time[8]),
seconds=int('0' + dispatched_time[10]))
return result
else:
formats = ["%Y-%m-%d",
"%Y-%m-%d %H:%M:%S",

View file

@ -85,15 +85,23 @@ options:
- Digest algorithm to be used when self-signing the certificate
selfsigned_not_before:
default: +0s
description:
- The timestamp at which the certificate starts being valid. The timestamp is formatted as an ASN.1 TIME.
If this value is not specified, certificate will start being valid from now.
- "The point in time the certificate is valid from. Time can be specified either as relative time or as absolute timestamp.
Time will always be interpreted as UTC. Valid formats are: C([+-]timespec | ASN.1 TIME)
where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
Note that if using relative time this module is NOT idempotent.
If this value is not specified, the certificate will start being valid from now."
aliases: [ selfsigned_notBefore ]
selfsigned_not_after:
default: +3650d
description:
- The timestamp at which the certificate stops being valid. The timestamp is formatted as an ASN.1 TIME.
If this value is not specified, certificate will stop being valid 10 years from now.
- "The point in time at which the certificate stops being valid. Time can be specified either as relative time or as absolute timestamp.
Time will always be interpreted as UTC. Valid formats are: C([+-]timespec | ASN.1 TIME)
where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
Note that if using relative time this module is NOT idempotent.
If this value is not specified, the certificate will stop being valid 10 years from now."
aliases: [ selfsigned_notAfter ]
ownca_path:
@ -124,15 +132,23 @@ options:
version_added: "2.7"
ownca_not_before:
default: +0s
description:
- The timestamp at which the certificate starts being valid. The timestamp is formatted as an ASN.1 TIME.
If this value is not specified, certificate will start being valid from now.
- "The point in time the certificate is valid from. Time can be specified either as relative time or as absolute timestamp.
Time will always be interpreted as UTC. Valid formats are: C([+-]timespec | ASN.1 TIME)
where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
Note that if using relative time this module is NOT idempotent.
If this value is not specified, the certificate will start being valid from now."
version_added: "2.7"
ownca_not_after:
default: +3650d
description:
- The timestamp at which the certificate stops being valid. The timestamp is formatted as an ASN.1 TIME.
If this value is not specified, certificate will stop being valid 10 years from now.
- "The point in time at which the certificate stops being valid. Time can be specified either as relative time or as absolute timestamp.
Time will always be interpreted as UTC. Valid formats are: C([+-]timespec | ASN.1 TIME)
where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
Note that if using relative time this module is NOT idempotent.
If this value is not specified, the certificate will stop being valid 10 years from now."
version_added: "2.7"
acme_accountkey_path:
@ -209,7 +225,10 @@ options:
valid_in:
description:
- The certificate must still be valid in I(valid_in) seconds from now.
- "The certificate must still be valid at this relative time offset from now.
Valid formats are: C([+-]timespec | number_of_seconds)
where timespec can be an integer + C([w | d | h | m | s]) (e.g. C(+32w1d2h).
Note that if using this parameter, this module is NOT idempotent."
key_usage:
description:
@ -436,6 +455,19 @@ class Certificate(crypto_utils.OpenSSLObject):
self.csr = None
self.module = module
def get_relative_time_option(self, input_string, input_name):
"""Return an ASN1 formatted string if a relative timespec
or an ASN1 formatted string is provided."""
result = input_string
if result.startswith("+") or result.startswith("-"):
result = crypto_utils.convert_relative_to_datetime(
result).strftime("%Y%m%d%H%M%SZ")
if result is None:
raise CertificateError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
return result
def check(self, module, perms_required=True):
"""Ensure the resource is in its desired state."""
@ -495,8 +527,8 @@ class SelfSignedCertificate(Certificate):
def __init__(self, module):
super(SelfSignedCertificate, self).__init__(module)
self.notBefore = module.params['selfsigned_notBefore']
self.notAfter = module.params['selfsigned_notAfter']
self.notBefore = self.get_relative_time_option(module.params['selfsigned_not_before'], 'selfsigned_not_before')
self.notAfter = self.get_relative_time_option(module.params['selfsigned_not_after'], 'selfsigned_not_after')
self.digest = module.params['selfsigned_digest']
self.version = module.params['selfsigned_version']
self.serial_number = randint(1000, 99999)
@ -520,16 +552,8 @@ class SelfSignedCertificate(Certificate):
if not self.check(module, perms_required=False) or self.force:
cert = crypto.X509()
cert.set_serial_number(self.serial_number)
if self.notBefore:
cert.set_notBefore(to_bytes(self.notBefore))
else:
cert.gmtime_adj_notBefore(0)
if self.notAfter:
cert.set_notAfter(to_bytes(self.notAfter))
else:
# If no NotAfter specified, expire in
# 10 years. 315360000 is 10 years in seconds.
cert.gmtime_adj_notAfter(315360000)
cert.set_notBefore(to_bytes(self.notBefore))
cert.set_notAfter(to_bytes(self.notAfter))
cert.set_subject(self.csr.get_subject())
cert.set_issuer(self.csr.get_subject())
cert.set_version(self.version - 1)
@ -561,11 +585,9 @@ class SelfSignedCertificate(Certificate):
}
if check_mode:
now = datetime.datetime.utcnow()
ten = now.replace(now.year + 10)
result.update({
'notBefore': self.notBefore if self.notBefore else now.strftime("%Y%m%d%H%M%SZ"),
'notAfter': self.notAfter if self.notAfter else ten.strftime("%Y%m%d%H%M%SZ"),
'notBefore': self.notBefore,
'notAfter': self.notAfter,
'serial_number': self.serial_number,
})
else:
@ -583,8 +605,8 @@ class OwnCACertificate(Certificate):
def __init__(self, module):
super(OwnCACertificate, self).__init__(module)
self.notBefore = module.params['ownca_not_before']
self.notAfter = module.params['ownca_not_after']
self.notBefore = self.get_relative_time_option(module.params['ownca_not_before'], 'ownca_not_before')
self.notAfter = self.get_relative_time_option(module.params['ownca_not_after'], 'ownca_not_after')
self.digest = module.params['ownca_digest']
self.version = module.params['ownca_version']
self.serial_number = randint(1000, 99999)
@ -617,16 +639,8 @@ class OwnCACertificate(Certificate):
if not self.check(module, perms_required=False) or self.force:
cert = crypto.X509()
cert.set_serial_number(self.serial_number)
if self.notBefore:
cert.set_notBefore(to_bytes(self.notBefore))
else:
cert.gmtime_adj_notBefore(0)
if self.notAfter:
cert.set_notAfter(to_bytes(self.notAfter))
else:
# If no NotAfter specified, expire in
# 10 years. 315360000 is 10 years in seconds.
cert.gmtime_adj_notAfter(315360000)
cert.set_notBefore(to_bytes(self.notBefore))
cert.set_notAfter(to_bytes(self.notAfter))
cert.set_subject(self.csr.get_subject())
cert.set_issuer(self.ca_cert.get_subject())
cert.set_version(self.version - 1)
@ -660,11 +674,9 @@ class OwnCACertificate(Certificate):
}
if check_mode:
now = datetime.datetime.utcnow()
ten = now.replace(now.year + 10)
result.update({
'notBefore': self.notBefore if self.notBefore else now.strftime("%Y%m%d%H%M%SZ"),
'notAfter': self.notAfter if self.notAfter else ten.strftime("%Y%m%d%H%M%SZ"),
'notBefore': self.notBefore,
'notAfter': self.notAfter,
'serial_number': self.serial_number,
})
else:
@ -854,15 +866,22 @@ class AssertOnlyCertificate(Certificate):
def _validate_valid_in():
if self.valid_in:
valid_in_date = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.valid_in)
valid_in_date = to_bytes(valid_in_date.strftime('%Y%m%d%H%M%SZ'), errors='surrogate_or_strict')
if not (self.cert.get_notBefore() <= valid_in_date <= self.cert.get_notAfter()):
if not self.valid_in.startswith("+") and not self.valid_in.startswith("-"):
try:
int(self.valid_in)
except ValueError:
raise CertificateError(
'The supplied value for "valid_in" (%s) is not an integer or a valid timespec' % self.valid_in)
self.valid_in = "+" + self.valid_in + "s"
valid_in_asn1 = self.get_relative_time_option(self.valid_in, "valid_in")
valid_in_date = to_bytes(valid_in_asn1, errors='surrogate_or_strict')
if not (self.cert.get_notBefore() <= valid_in_date <=
self.cert.get_notAfter()):
self.message.append(
'Certificate is not valid in %s seconds from now (%s) - notBefore: %s - notAfter: %s' % (self.valid_in,
valid_in_date,
self.cert.get_notBefore(),
self.cert.get_notAfter())
)
'Certificate is not valid in %s from now (that would be %s) - notBefore: %s - notAfter: %s'
% (self.valid_in, valid_in_date,
self.cert.get_notBefore(),
self.cert.get_notAfter()))
for validation in ['signature_algorithms', 'subject', 'issuer',
'has_expired', 'version', 'keyUsage',
@ -1004,13 +1023,15 @@ def main():
notAfter=dict(type='str', aliases=['not_after']),
valid_at=dict(type='str'),
invalid_at=dict(type='str'),
valid_in=dict(type='int'),
valid_in=dict(type='str'),
# provider: selfsigned
selfsigned_version=dict(type='int', default='3'),
selfsigned_digest=dict(type='str', default='sha256'),
selfsigned_notBefore=dict(type='str', aliases=['selfsigned_not_before']),
selfsigned_notAfter=dict(type='str', aliases=['selfsigned_not_after']),
selfsigned_not_before=dict(
type='str', default='+0s', aliases=['selfsigned_notBefore']),
selfsigned_not_after=dict(
type='str', default='+3650d', aliases=['selfsigned_notAfter']),
# provider: ownca
ownca_path=dict(type='path'),
@ -1018,8 +1039,8 @@ def main():
ownca_privatekey_passphrase=dict(type='path', no_log=True),
ownca_digest=dict(type='str', default='sha256'),
ownca_version=dict(type='int', default='3'),
ownca_not_before=dict(type='str'),
ownca_not_after=dict(type='str'),
ownca_not_before=dict(type='str', default='+0s'),
ownca_not_after=dict(type='str', default='+3650d'),
# provider: acme
acme_accountkey_path=dict(type='path'),

View file

@ -129,6 +129,17 @@
ownca_path: '{{ output_dir }}/ca_cert.pem'
ownca_privatekey_path: '{{ output_dir }}/ca_privatekey.pem'
- name: Create ownca certificate with relative notBefore and notAfter
openssl_certificate:
provider: ownca
ownca_not_before: +1s
ownca_not_after: +52w
path: "{{ output_dir }}/ownca_cert4.pem"
csr_path: "{{ output_dir }}/csr.csr"
privatekey_path: "{{ output_dir }}/privatekey3.pem"
ownca_path: '{{ output_dir }}/ca_cert.pem'
ownca_privatekey_path: '{{ output_dir }}/ca_privatekey.pem'
- name: Generate ownca ECC certificate
openssl_certificate:
path: '{{ output_dir }}/ownca_cert_ecc.pem'