From 65d7f0d17b448f47ab8e961db9a825cadc9e1b9c Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Fri, 5 Apr 2019 16:47:05 +0200 Subject: [PATCH] Add openssl_certificate_info module (#54709) * Add certificate_info module. * Improve normalization. * Add extension dump. * Add support for basic_constraints and ocsp_must_staple. * Update docs. * Add serial number. * Remove superfluous code. * Fix formulation. * Improve examples. * Improve result docs. * Forgot to add tests. * Adjust when no fingerprints can be computed. --- lib/ansible/module_utils/crypto.py | 267 ++++++- .../crypto/openssl_certificate_info.py | 708 ++++++++++++++++++ .../targets/openssl_certificate_info/aliases | 2 + .../openssl_certificate_info/meta/main.yml | 2 + .../openssl_certificate_info/tasks/impl.yml | 52 ++ .../openssl_certificate_info/tasks/main.yml | 151 ++++ 6 files changed, 1179 insertions(+), 3 deletions(-) create mode 100644 lib/ansible/modules/crypto/openssl_certificate_info.py create mode 100644 test/integration/targets/openssl_certificate_info/aliases create mode 100644 test/integration/targets/openssl_certificate_info/meta/main.yml create mode 100644 test/integration/targets/openssl_certificate_info/tasks/impl.yml create mode 100644 test/integration/targets/openssl_certificate_info/tasks/main.yml diff --git a/lib/ansible/module_utils/crypto.py b/lib/ansible/module_utils/crypto.py index 20d80f63354..8f036993ae2 100644 --- a/lib/ansible/module_utils/crypto.py +++ b/lib/ansible/module_utils/crypto.py @@ -14,9 +14,16 @@ # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . +# +# -------------------------------------------------------------- +# A clearly marked portion of this file is licensed under the BSD +# Copyright (c) 2015, 2016 Paul Kehrer (@reaperhulk) +# Copyright (c) 2017 Fraser Tweedale (@frasertweedale) +# For more details, search for the function _obj2txt(). try: + import OpenSSL from OpenSSL import crypto except ImportError: # An error will be raised in the calling class to let the end @@ -35,6 +42,8 @@ except ImportError: import abc +import base64 +import binascii import datetime import errno import hashlib @@ -332,6 +341,225 @@ class OpenSSLObject(object): pass +_OID_MAP = { + # First entry is 'canonical' name + "2.5.29.37.0": ('Any Extended Key Usage', 'anyExtendedKeyUsage'), + "1.3.6.1.5.5.7.1.3": ('qcStatements', ), + "1.3.6.1.5.5.7.3.10": ('DVCS', 'dvcs'), + "1.3.6.1.5.5.7.3.7": ('IPSec User', 'ipsecUser'), + "1.3.6.1.5.5.7.1.2": ('Biometric Info', 'biometricInfo'), +} + +_NORMALIZE_NAMES = { + 'CN': 'commonName', + 'C': 'countryName', + 'L': 'localityName', + 'ST': 'stateOrProvinceName', + 'street': 'streetAddress', + 'O': 'organizationName', + 'OU': 'organizationalUnitName', + 'SN': 'surname', + 'GN': 'givenName', + 'UID': 'userId', + 'userID': 'userId', + 'DC': 'domainComponent', + 'jurisdictionC': 'jurisdictionCountryName', + 'jurisdictionL': 'jurisdictionLocalityName', + 'jurisdictionST': 'jurisdictionStateOrProvinceName', + 'serverAuth': 'TLS Web Server Authentication', + 'clientAuth': 'TLS Web Client Authentication', + 'codeSigning': 'Code Signing', + 'emailProtection': 'E-mail Protection', + 'timeStamping': 'Time Stamping', + 'OCSPSigning': 'OCSP Signing', +} + +for dotted, names in _OID_MAP.items(): + for name in names[1:]: + _NORMALIZE_NAMES[name] = names[0] + + +def pyopenssl_normalize_name(name): + nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(name)) + if nid != 0: + b_name = OpenSSL._util.lib.OBJ_nid2ln(nid) + name = to_text(OpenSSL._util.ffi.string(b_name)) + return _NORMALIZE_NAMES.get(name, name) + + +# ##################################################################################### +# ##################################################################################### +# # This excerpt is dual licensed under the terms of the Apache License, Version +# # 2.0, and the BSD License. See the LICENSE file at +# # https://github.com/pyca/cryptography/blob/master/LICENSE for complete details. +# # +# # Adapted from cryptography's hazmat/backends/openssl/decode_asn1.py +# # +# # Copyright (c) 2015, 2016 Paul Kehrer (@reaperhulk) +# # Copyright (c) 2017 Fraser Tweedale (@frasertweedale) +# # +# # Relevant commits from cryptography project (https://github.com/pyca/cryptography): +# # pyca/cryptography@719d536dd691e84e208534798f2eb4f82aaa2e07 +# # pyca/cryptography@5ab6d6a5c05572bd1c75f05baf264a2d0001894a +# # pyca/cryptography@2e776e20eb60378e0af9b7439000d0e80da7c7e3 +# # pyca/cryptography@fb309ed24647d1be9e319b61b1f2aa8ebb87b90b +# # pyca/cryptography@2917e460993c475c72d7146c50dc3bbc2414280d +# # pyca/cryptography@3057f91ea9a05fb593825006d87a391286a4d828 +# # pyca/cryptography@d607dd7e5bc5c08854ec0c9baff70ba4a35be36f +def _obj2txt(openssl_lib, openssl_ffi, obj): + # Set to 80 on the recommendation of + # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values + # + # But OIDs longer than this occur in real life (e.g. Active + # Directory makes some very long OIDs). So we need to detect + # and properly handle the case where the default buffer is not + # big enough. + # + buf_len = 80 + buf = openssl_ffi.new("char[]", buf_len) + + # 'res' is the number of bytes that *would* be written if the + # buffer is large enough. If 'res' > buf_len - 1, we need to + # alloc a big-enough buffer and go again. + res = openssl_lib.OBJ_obj2txt(buf, buf_len, obj, 1) + if res > buf_len - 1: # account for terminating null byte + buf_len = res + 1 + buf = openssl_ffi.new("char[]", buf_len) + res = openssl_lib.OBJ_obj2txt(buf, buf_len, obj, 1) + return openssl_ffi.buffer(buf, res)[:].decode() +# ##################################################################################### +# ##################################################################################### + + +def cryptography_get_extensions_from_cert(cert): + # Since cryptography won't give us the DER value for an extension + # (that is only stored for unrecognized extensions), we have to re-do + # the extension parsing outselves. + result = dict() + backend = cert._backend + x509_obj = cert._x509 + + for i in range(backend._lib.X509_get_ext_count(x509_obj)): + ext = backend._lib.X509_get_ext(x509_obj, i) + if ext == backend._ffi.NULL: + continue + crit = backend._lib.X509_EXTENSION_get_critical(ext) + data = backend._lib.X509_EXTENSION_get_data(ext) + backend.openssl_assert(data != backend._ffi.NULL) + der = backend._ffi.buffer(data.data, data.length)[:] + entry = dict( + critical=(crit == 1), + value=base64.b64encode(der), + ) + oid = _obj2txt(backend._lib, backend._ffi, backend._lib.X509_EXTENSION_get_object(ext)) + result[oid] = entry + return result + + +def pyopenssl_get_extensions_from_cert(cert): + # While pyOpenSSL allows us to get an extension's DER value, it won't + # give us the dotted string for an OID. So we have to do some magic to + # get hold of it. + result = dict() + ext_count = cert.get_extension_count() + for i in range(0, ext_count): + ext = cert.get_extension(i) + entry = dict( + critical=bool(ext.get_critical()), + value=base64.b64encode(ext.get_data()), + ) + oid = _obj2txt( + OpenSSL._util.lib, + OpenSSL._util.ffi, + OpenSSL._util.lib.X509_EXTENSION_get_object(ext._extension) + ) + # This could also be done a bit simpler: + # + # oid = _obj2txt(OpenSSL._util.lib, OpenSSL._util.ffi, OpenSSL._util.lib.OBJ_nid2obj(ext._nid)) + # + # Unfortunately this gives the wrong result in case the linked OpenSSL + # doesn't know the OID. That's why we have to get the OID dotted string + # similarly to how cryptography does it. + result[oid] = entry + return result + + +def crpytography_name_to_oid(name): + if name in ('CN', 'commonName'): + return x509.oid.NameOID.COMMON_NAME + if name in ('C', 'countryName'): + return x509.oid.NameOID.COUNTRY_NAME + if name in ('L', 'localityName'): + return x509.oid.NameOID.LOCALITY_NAME + if name in ('ST', 'stateOrProvinceName'): + return x509.oid.NameOID.STATE_OR_PROVINCE_NAME + if name in ('street', 'streetAddress'): + return x509.oid.NameOID.STREET_ADDRESS + if name in ('O', 'organizationName'): + return x509.oid.NameOID.ORGANIZATION_NAME + if name in ('OU', 'organizationalUnitName'): + return x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME + if name in ('serialNumber', ): + return x509.oid.NameOID.SERIAL_NUMBER + if name in ('SN', 'surname'): + return x509.oid.NameOID.SURNAME + if name in ('GN', 'givenName'): + return x509.oid.NameOID.GIVEN_NAME + if name in ('title', ): + return x509.oid.NameOID.TITLE + if name in ('generationQualifier', ): + return x509.oid.NameOID.GENERATION_QUALIFIER + if name in ('x500UniqueIdentifier', ): + return x509.oid.NameOID.X500_UNIQUE_IDENTIFIER + if name in ('dnQualifier', ): + return x509.oid.NameOID.DN_QUALIFIER + if name in ('pseudonym', ): + return x509.oid.NameOID.PSEUDONYM + if name in ('UID', 'userId', 'UserID'): + return x509.oid.NameOID.USER_ID + if name in ('DC', 'domainComponent'): + return x509.oid.NameOID.DOMAIN_COMPONENT + if name in ('emailAddress', ): + return x509.oid.NameOID.EMAIL_ADDRESS + if name in ('jurisdictionC', 'jurisdictionCountryName'): + return x509.oid.NameOID.JURISDICTION_COUNTRY_NAME + if name in ('jurisdictionL', 'jurisdictionLocalityName'): + return x509.oid.NameOID.JURISDICTION_LOCALITY_NAME + if name in ('jurisdictionST', 'jurisdictionStateOrProvinceName'): + return x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME + if name in ('businessCategory', ): + return x509.oid.NameOID.BUSINESS_CATEGORY + if name in ('postalAddress', ): + return x509.oid.NameOID.POSTAL_ADDRESS + if name in ('postalCode', ): + return x509.oid.NameOID.POSTAL_CODE + if name in ('serverAuth', 'TLS Web Server Authentication'): + return x509.oid.ExtendedKeyUsageOID.SERVER_AUTH + if name in ('clientAuth', 'TLS Web Client Authentication'): + return x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH + if name in ('codeSigning', 'Code Signing'): + return x509.oid.ExtendedKeyUsageOID.CODE_SIGNING + if name in ('emailProtection', 'E-mail Protection'): + return x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION + if name in ('timeStamping', 'Time Stamping'): + return x509.oid.ExtendedKeyUsageOID.TIME_STAMPING + if name in ('OCSPSigning', 'OCSP Signing'): + return x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING + if name in ('anyExtendedKeyUsage', 'Any Extended Key Usage'): + return x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE + for dotted, names in _OID_MAP.items(): + if name in names: + return x509.oid.ObjectIdentifier(dotted) + raise OpenSSLObjectError('Cannot find OID for "{0}"'.format(name)) + + +def crpytography_oid_to_name(oid): + dotted_string = oid.dotted_string + names = _OID_MAP.get(dotted_string) + name = names[0] if names else oid._name + return _NORMALIZE_NAMES.get(name, name) + + def cryptography_get_name_oid(id): ''' Given a symbolic ID, finds the appropriate OID for use with cryptography. @@ -367,7 +595,7 @@ def cryptography_get_name_oid(id): return x509.oid.NameOID.DN_QUALIFIER if id in ('pseudonym', ): return x509.oid.NameOID.PSEUDONYM - if id in ('UID', 'userId'): + if id in ('UID', 'userId', 'userID'): return x509.oid.NameOID.USER_ID if id in ('DC', 'domainComponent'): return x509.oid.NameOID.DOMAIN_COMPONENT @@ -409,6 +637,39 @@ def cryptography_get_name(name): raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}" (potentially unsupported by cryptography backend)'.format(name)) +def _get_hex(bytes): + if bytes is None: + return bytes + data = binascii.hexlify(bytes) + data = to_text(b':'.join(data[i:i + 2] for i in range(0, len(data), 2))) + return data + + +def cryptography_decode_name(name): + ''' + Given a cryptography x509.Name object, returns a string. + Raises an OpenSSLObjectError if the name is not supported. + ''' + if isinstance(name, x509.DNSName): + return 'DNS:{0}'.format(name.value) + if isinstance(name, x509.IPAddress): + return 'IP:{0}'.format(name.value.compressed) + if isinstance(name, x509.RFC822Name): + return 'email:{0}'.format(name.value) + if isinstance(name, x509.UniformResourceIdentifier): + return 'URI:{0}'.format(name.value) + if isinstance(name, x509.DirectoryName): + # FIXME: test + return 'DirName:' + ''.join(['/{0}:{1}'.format(attribute.oid._name, attribute.value) for attribute in name.value]) + if isinstance(name, x509.RegisteredID): + # FIXME: test + return 'RegisteredID:{0}'.format(name.value) + if isinstance(name, x509.OtherName): + # FIXME: test + return '{0}:{1}'.format(name.type_id.dotted_string, _get_hex(name.value)) + raise OpenSSLObjectError('Cannot decode name "{0}"'.format(name)) + + def _cryptography_get_keyusage(usage): ''' Given a key usage identifier string, returns the parameter name used by cryptography's x509.KeyUsage(). @@ -474,10 +735,10 @@ def cryptography_get_ext_keyusage(usage): if usage in ('OCSPSigning', 'OCSP Signing'): return x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING if usage in ('anyExtendedKeyUsage', 'Any Extended Key Usage'): - return x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE + return x509.oid.ObjectIdentifier("2.5.29.37.0") if usage in ('qcStatements', ): return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.3") - if usage in ('DVCS', ): + if usage in ('DVCS', 'dvcs'): return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.10") if usage in ('IPSec User', 'ipsecUser'): return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.7") diff --git a/lib/ansible/modules/crypto/openssl_certificate_info.py b/lib/ansible/modules/crypto/openssl_certificate_info.py new file mode 100644 index 00000000000..f0937207a0f --- /dev/null +++ b/lib/ansible/modules/crypto/openssl_certificate_info.py @@ -0,0 +1,708 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2016-2017, Yanis Guenane +# Copyright: (c) 2017, Markus Teufelberger +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +ANSIBLE_METADATA = {'metadata_version': '1.1', + 'status': ['preview'], + 'supported_by': 'community'} + +DOCUMENTATION = r''' +--- +module: openssl_certificate_info +version_added: '2.8' +short_description: Provide information of OpenSSL X.509 certificates +description: + - This module allows one to query information on OpenSSL certificates. + - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the + cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements) + cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with + C(select_crypto_backend)) +requirements: + - PyOpenSSL >= 0.15 or cryptography >= 1.6 +author: + - Felix Fontein (@felixfontein) + - Yanis Guenane (@Spredzy) + - Markus Teufelberger (@MarkusTeufelberger) +options: + path: + description: + - Remote absolute path where the certificate file is loaded from. + type: path + required: true + valid_at: + description: + - A dict of names mapping to time specifications. Every time specified here + will be checked whether the certificate is valid at this point. See the + C(valid_at) return value for informations on the result. + - Time can be specified either as relative time or as absolute timestamp. + - Time will always be interpreted as UTC. + - Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer + + C([w | d | h | m | s]) (e.g. C(+32w1d2h), and ASN.1 TIME (i.e. pattern C(YYYYMMDDHHMMSSZ)). + Note that all timestamps will be treated as being in UTC. + + select_crypto_backend: + description: + - Determines which crypto backend to use. + - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). + - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. + - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + type: str + default: auto + choices: [ auto, cryptography, pyopenssl ] + +notes: + - All timestamp values are provided in ASN.1 TIME format, i.e. following the C(YYYYMMDDHHMMSSZ) pattern. + They are all in UTC. +seealso: +- module: openssl_certificate +''' + +EXAMPLES = r''' +- name: Generate a Self Signed OpenSSL certificate + openssl_certificate: + path: /etc/ssl/crt/ansible.com.crt + privatekey_path: /etc/ssl/private/ansible.com.pem + csr_path: /etc/ssl/csr/ansible.com.csr + provider: selfsigned + + +# Get information on the certificate + +- name: Get information on generated certificate + openssl_certificate_info: + path: /etc/ssl/crt/ansible.com.crt + register: result + +- name: Dump information + debug: + var: result + + +# Check whether the certificate is valid or not valid at certain times, fail +# if this is not the case. The first task (openssl_certificate_info) collects +# the information, and the second task (assert) validates the result and +# makes the playbook fail in case something is not as expected. + +- name: Test whether that certificate is valid tomorrow and/or in three weeks + openssl_certificate_info: + path: /etc/ssl/crt/ansible.com.crt + valid_at: + point_1: "+1d" + point_2: "+3w" + register: result + +- name: Validate that certificate is valid tomorrow, but not in three weeks + assert: + that: + - result.valid_at.point_1 # valid in one day + - not result.valid_at.point_2 # not valid in three weeks +''' + +RETURN = r''' +expired: + description: Whether the certificate is expired (i.e. C(notAfter) is in the past) + returned: success + type: bool +basic_constraints: + description: Entries in the C(basic_constraints) extension, or C(none) if extension is not present. + returned: success + type: list + sample: "[CA:TRUE, pathlen:1]" +basic_constraints_critical: + description: Whether the C(basic_constraints) extension is critical. + returned: success + type: bool +extended_key_usage: + description: Entries in the C(extended_key_usage) extension, or C(none) if extension is not present. + returned: success + type: list + sample: "[Biometric Info, DVCS, Time Stamping]" +extended_key_usage_critical: + description: Whether the C(extended_key_usage) extension is critical. + returned: success + type: bool +extensions_by_oid: + description: Returns a dictionary for every extension OID + returned: success + type: complex + contains: + critical: + description: Whether the extension is critical. + returned: success + type: bool + value: + description: The Base64 encoded value (in DER format) of the extension + returned: success + type: str + sample: "MAMCAQU=" + sample: '{"1.3.6.1.5.5.7.1.24": { "critical": false, "value": "MAMCAQU="}}' +key_usage: + description: Entries in the C(key_usage) extension, or C(none) if extension is not present. + returned: success + type: str + sample: "[Key Agreement, Data Encipherment]" +key_usage_critical: + description: Whether the C(key_usage) extension is critical. + returned: success + type: bool +subject_alt_name: + description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present. + returned: success + type: list + sample: "[DNS:www.ansible.com, IP:1.2.3.4]" +subject_alt_name_critical: + description: Whether the C(subject_alt_name) extension is critical. + returned: success + type: bool +ocsp_must_staple: + description: C(yes) if the OCSP Must Staple extension is present, C(none) otherwise. + returned: success + type: bool +ocsp_must_staple_critical: + description: Whether the C(ocsp_must_staple) extension is critical. + returned: success + type: bool +issuer: + description: The certificate's issuer. + returned: success + type: dict + sample: '{"organizationName": "Ansible"}' +subject: + description: The certificate's subject. + returned: success + type: dict + sample: '{"commonName": "www.example.com", "emailAddress": "test@example.com"}' +not_after: + description: C(notAfter) date as ASN.1 TIME + returned: success + type: str + sample: 20190413202428Z +not_before: + description: C(notBefore) date as ASN.1 TIME + returned: success + type: str + sample: 20190331202428Z +public_key: + description: Certificate's public key in PEM format + returned: success + type: str + sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..." +public_key_fingerprints: + description: + - Fingerprints of certificate's public key. + - For every hash algorithm available, the fingerprint is computed. + returned: success + type: dict + sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63', + 'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..." +signature_algorithm: + description: The signature algorithm used to sign the certificate. + returned: success + type: str + sample: sha256WithRSAEncryption +serial_number: + description: The certificate's serial number. + returned: success + type: int + sample: 1234 +version: + description: The certificate version. + returned: success + type: int + sample: 3 +valid_at: + description: For every time stamp provided in the I(valid_at) option, a + boolean whether the certificate is valid at that point in time + or not. + returned: success + type: dict +''' + + +import abc +import datetime +import os +import traceback +from distutils.version import LooseVersion + +from ansible.module_utils import crypto as crypto_utils +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_native, to_text, to_bytes + +MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' +MINIMAL_PYOPENSSL_VERSION = '0.15' + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + import ipaddress + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # OpenSSL 1.1.0 or newer + OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" + OPENSSL_MUST_STAPLE_VALUE = b"status_request" + else: + # OpenSSL 1.0.x or older + OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" + OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + from cryptography import x509 + from cryptography.hazmat.primitives import serialization + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + + +TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" + + +def get_relative_time_option(input_string, input_name): + """Return an ASN1 formatted string if a relative timespec + or an ASN1 formatted string is provided.""" + result = input_string + if result.startswith("+") or result.startswith("-"): + return crypto_utils.convert_relative_to_datetime(result) + if result is None: + raise crypto_utils.CertificateError( + 'The timespec "%s" for %s is not valid' % + input_string, input_name) + for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: + try: + result = datetime.datetime.strptime(input_string, date_fmt) + break + except ValueError: + pass + + if not isinstance(result, datetime.datetime): + raise crypto_utils.CertificateError( + 'The time spec "%s" for %s is invalid' % + (input_string, input_name) + ) + return result + + +class CertificateInfo(crypto_utils.OpenSSLObject): + def __init__(self, module, backend): + super(CertificateInfo, self).__init__( + module.params['path'], + 'present', + False, + module.check_mode, + ) + self.backend = backend + self.module = module + + self.valid_at = module.params['valid_at'] + if self.valid_at: + for k, v in self.valid_at.items(): + if not isinstance(v, string_types): + self.module.fail_json( + msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v)) + ) + self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k)) + + def generate(self): + # Empty method because crypto_utils.OpenSSLObject wants this + pass + + def dump(self): + # Empty method because crypto_utils.OpenSSLObject wants this + pass + + @abc.abstractmethod + def _get_signature_algorithm(self): + pass + + @abc.abstractmethod + def _get_subject(self): + pass + + @abc.abstractmethod + def _get_issuer(self): + pass + + @abc.abstractmethod + def _get_version(self): + pass + + @abc.abstractmethod + def _get_key_usage(self): + pass + + @abc.abstractmethod + def _get_extended_key_usage(self): + pass + + @abc.abstractmethod + def _get_basic_constraints(self): + pass + + @abc.abstractmethod + def _get_ocsp_must_staple(self): + pass + + @abc.abstractmethod + def _get_subject_alt_name(self): + pass + + @abc.abstractmethod + def _get_not_before(self): + pass + + @abc.abstractmethod + def _get_not_after(self): + pass + + @abc.abstractmethod + def _get_public_key(self, binary): + pass + + @abc.abstractmethod + def _get_serial_number(self): + pass + + @abc.abstractmethod + def _get_all_extensions(self): + pass + + def get_info(self): + result = dict() + self.cert = crypto_utils.load_certificate(self.path, backend=self.backend) + + result['signature_algorithm'] = self._get_signature_algorithm() + result['subject'] = self._get_subject() + result['issuer'] = self._get_issuer() + result['version'] = self._get_version() + result['key_usage'], result['key_usage_critical'] = self._get_key_usage() + result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage() + result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints() + result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple() + result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name() + + not_before = self._get_not_before() + not_after = self._get_not_after() + result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT) + result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT) + result['expired'] = not_after < datetime.datetime.utcnow() + + result['valid_at'] = dict() + if self.valid_at: + for k, v in self.valid_at.items(): + result['valid_at'][k] = not_before <= v <= not_after + + result['public_key'] = self._get_public_key(binary=False) + pk = self._get_public_key(binary=True) + result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict() + + result['serial_number'] = self._get_serial_number() + result['extensions_by_oid'] = self._get_all_extensions() + + return result + + +class CertificateInfoCryptography(CertificateInfo): + """Validate the supplied cert, using the cryptography backend""" + def __init__(self, module): + super(CertificateInfoCryptography, self).__init__(module, 'cryptography') + + def _get_signature_algorithm(self): + return crypto_utils.crpytography_oid_to_name(self.cert.signature_algorithm_oid) + + def _get_subject(self): + result = dict() + for attribute in self.cert.subject: + result[crypto_utils.crpytography_oid_to_name(attribute.oid)] = attribute.value + return result + + def _get_issuer(self): + result = dict() + for attribute in self.cert.issuer: + result[crypto_utils.crpytography_oid_to_name(attribute.oid)] = attribute.value + return result + + def _get_version(self): + if self.cert.version == x509.Version.v1: + return 1 + if self.cert.version == x509.Version.v3: + return 3 + return "unknown" + + def _get_key_usage(self): + try: + current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage) + current_key_usage = current_key_ext.value + key_usage = dict( + digital_signature=current_key_usage.digital_signature, + content_commitment=current_key_usage.content_commitment, + key_encipherment=current_key_usage.key_encipherment, + data_encipherment=current_key_usage.data_encipherment, + key_agreement=current_key_usage.key_agreement, + key_cert_sign=current_key_usage.key_cert_sign, + crl_sign=current_key_usage.crl_sign, + encipher_only=False, + decipher_only=False, + ) + if key_usage['key_agreement']: + key_usage.update(dict( + encipher_only=current_key_usage.encipher_only, + decipher_only=current_key_usage.decipher_only + )) + + key_usage_names = dict( + digital_signature='Digital Signature', + content_commitment='Non Repudiation', + key_encipherment='Key Encipherment', + data_encipherment='Data Encipherment', + key_agreement='Key Agreement', + key_cert_sign='Certificate Sign', + crl_sign='CRL Sign', + encipher_only='Encipher Only', + decipher_only='Decipher Only', + ) + return sorted([ + key_usage_names[name] for name, value in key_usage.items() if value + ]), current_key_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_extended_key_usage(self): + try: + ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) + return sorted([ + crypto_utils.crpytography_oid_to_name(eku) for eku in ext_keyusage_ext.value + ]), ext_keyusage_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_basic_constraints(self): + try: + ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints) + result = [] + result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE')) + if ext_keyusage_ext.value.path_length is not None: + result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length)) + return sorted(result), ext_keyusage_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_ocsp_must_staple(self): + try: + try: + # This only works with cryptography >= 2.1 + tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature) + value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value + except AttributeError as dummy: + # Fallback for cryptography < 2.1 + oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") + tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid) + value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05" + return value, tlsfeature_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_subject_alt_name(self): + try: + san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) + result = [crypto_utils.cryptography_decode_name(san) for san in san_ext.value] + return result, san_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_not_before(self): + return self.cert.not_valid_before + + def _get_not_after(self): + return self.cert.not_valid_after + + def _get_public_key(self, binary): + return self.cert.public_key().public_bytes( + serialization.Encoding.DER if binary else serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + def _get_serial_number(self): + return self.cert.serial_number + + def _get_all_extensions(self): + return crypto_utils.cryptography_get_extensions_from_cert(self.cert) + + +class CertificateInfoPyOpenSSL(CertificateInfo): + """validate the supplied certificate.""" + + def __init__(self, module): + super(CertificateInfoPyOpenSSL, self).__init__(module, 'pyopenssl') + + def _get_signature_algorithm(self): + return to_text(self.cert.get_signature_algorithm()) + + def __get_name(self, name): + result = dict() + for sub in name.get_components(): + result[crypto_utils.pyopenssl_normalize_name(sub[0])] = to_text(sub[1]) + return result + + def _get_subject(self): + return self.__get_name(self.cert.get_subject()) + + def _get_issuer(self): + return self.__get_name(self.cert.get_issuer()) + + def _get_version(self): + # Version numbers in certs are off by one: + # v1: 0, v2: 1, v3: 2 ... + return self.cert.get_version() + 1 + + def _get_extension(self, short_name): + for extension_idx in range(0, self.cert.get_extension_count()): + extension = self.cert.get_extension(extension_idx) + if extension.get_short_name() == short_name: + result = [ + crypto_utils.pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',') + ] + return sorted(result), bool(extension.get_critical()) + return None, False + + def _get_key_usage(self): + return self._get_extension(b'keyUsage') + + def _get_extended_key_usage(self): + return self._get_extension(b'extendedKeyUsage') + + def _get_basic_constraints(self): + return self._get_extension(b'basicConstraints') + + def _get_ocsp_must_staple(self): + extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())] + oms_ext = [ + ext for ext in extensions + if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE + ] + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: + # Older versions of libssl don't know about OCSP Must Staple + oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) + if oms_ext: + return True, bool(oms_ext[0].get_critical()) + else: + return None, False + + def _normalize_san(self, san): + if san.startswith('IP Address:'): + san = 'IP:' + san[len('IP Address:'):] + if san.startswith('IP:'): + ip = ipaddress.ip_address(san[3:]) + san = 'IP:{0}'.format(ip.compressed) + return san + + def _get_subject_alt_name(self): + for extension_idx in range(0, self.cert.get_extension_count()): + extension = self.cert.get_extension(extension_idx) + if extension.get_short_name() == b'subjectAltName': + result = [self._normalize_san(altname.strip()) for altname in + to_text(extension, errors='surrogate_or_strict').split(', ')] + return result, bool(extension.get_critical()) + return None, False + + def _get_not_before(self): + time_string = to_native(self.cert.get_notBefore()) + return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") + + def _get_not_after(self): + time_string = to_native(self.cert.get_notAfter()) + return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") + + def _get_public_key(self, binary): + try: + return crypto.dump_publickey( + crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM, + self.cert.get_pubkey() + ) + except AttributeError: + self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' + 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') + + def _get_serial_number(self): + return self.cert.get_serial_number() + + def _get_all_extensions(self): + return crypto_utils.pyopenssl_get_extensions_from_cert(self.cert) + + +def main(): + module = AnsibleModule( + argument_spec=dict( + path=dict(type='path', required=True), + valid_at=dict(type='dict'), + select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), + ), + supports_check_mode=True, + ) + + try: + base_dir = os.path.dirname(module.params['path']) or '.' + if not os.path.isdir(base_dir): + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) + + backend = module.params['select_crypto_backend'] + if backend == 'auto': + # Detect what backend we can use + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # If cryptography is available we'll use it + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Fail if no backend has been found + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR) + try: + getattr(crypto.X509Req, 'get_extensions') + except AttributeError: + module.fail_json(msg='You need to have PyOpenSSL>=0.15') + + certificate = CertificateInfoPyOpenSSL(module) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR) + certificate = CertificateInfoCryptography(module) + + result = certificate.get_info() + module.exit_json(**result) + except crypto_utils.OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) + + +if __name__ == "__main__": + main() diff --git a/test/integration/targets/openssl_certificate_info/aliases b/test/integration/targets/openssl_certificate_info/aliases new file mode 100644 index 00000000000..6eae8bd8ddc --- /dev/null +++ b/test/integration/targets/openssl_certificate_info/aliases @@ -0,0 +1,2 @@ +shippable/posix/group1 +destructive diff --git a/test/integration/targets/openssl_certificate_info/meta/main.yml b/test/integration/targets/openssl_certificate_info/meta/main.yml new file mode 100644 index 00000000000..800aff64284 --- /dev/null +++ b/test/integration/targets/openssl_certificate_info/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - setup_openssl diff --git a/test/integration/targets/openssl_certificate_info/tasks/impl.yml b/test/integration/targets/openssl_certificate_info/tasks/impl.yml new file mode 100644 index 00000000000..dbf942602a7 --- /dev/null +++ b/test/integration/targets/openssl_certificate_info/tasks/impl.yml @@ -0,0 +1,52 @@ +--- +- debug: + msg: "Executing tests with backend {{ select_crypto_backend }}" + +- name: ({{select_crypto_backend}}) Get certificate info + openssl_certificate_info: + path: '{{ output_dir }}/cert_1.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Update result list + set_fact: + info_results: "{{ info_results + [result] }}" + +- name: ({{select_crypto_backend}}) Get certificate info + openssl_certificate_info: + path: '{{ output_dir }}/cert_2.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + valid_at: + today: "+0d" + past: "20190101235901Z" + twentydays: "+20d" + register: result +- assert: + that: + - result.valid_at.today + - not result.valid_at.past + - not result.valid_at.twentydays + +- name: Update result list + set_fact: + info_results: "{{ info_results + [result] }}" + +- name: ({{select_crypto_backend}}) Get certificate info + openssl_certificate_info: + path: '{{ output_dir }}/cert_3.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Update result list + set_fact: + info_results: "{{ info_results + [result] }}" + +- name: ({{select_crypto_backend}}) Get certificate info + openssl_certificate_info: + path: '{{ output_dir }}/cert_4.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Update result list + set_fact: + info_results: "{{ info_results + [result] }}" diff --git a/test/integration/targets/openssl_certificate_info/tasks/main.yml b/test/integration/targets/openssl_certificate_info/tasks/main.yml new file mode 100644 index 00000000000..bff52bade14 --- /dev/null +++ b/test/integration/targets/openssl_certificate_info/tasks/main.yml @@ -0,0 +1,151 @@ +--- +- name: Generate privatekey + openssl_privatekey: + path: '{{ output_dir }}/privatekey.pem' + +- name: Generate privatekey with password + openssl_privatekey: + path: '{{ output_dir }}/privatekeypw.pem' + passphrase: hunter2 + cipher: auto + select_crypto_backend: cryptography + +- name: Generate CSR 1 + openssl_csr: + path: '{{ output_dir }}/csr_1.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.example.com + C: de + L: Somewhere + ST: Zurich + streetAddress: Welcome Street + O: Ansible + organizationalUnitName: Crypto Department + serialNumber: "1234" + SN: Last Name + GN: First Name + title: Chief + pseudonym: test + UID: asdf + emailAddress: test@example.com + postalAddress: 1234 Somewhere + postalCode: "1234" + useCommonNameForSAN: no + key_usage: + - digitalSignature + - keyAgreement + - Non Repudiation + - Key Encipherment + - dataEncipherment + - Certificate Sign + - cRLSign + - Encipher Only + - decipherOnly + key_usage_critical: yes + extended_key_usage: + - serverAuth # the same as "TLS Web Server Authentication" + - TLS Web Server Authentication + - TLS Web Client Authentication + - Code Signing + - E-mail Protection + - timeStamping + - OCSPSigning + - Any Extended Key Usage + - qcStatements + - DVCS + - IPSec User + - biometricInfo + subject_alt_name: + - "DNS:www.ansible.com" + - "IP:1.2.3.4" + - "IP:::1" + - "email:test@example.org" + - "URI:https://example.org/test/index.html" + basic_constraints: + - "CA:TRUE" + - "pathlen:23" + basic_constraints_critical: yes + ocsp_must_staple: yes + +- name: Generate CSR 2 + openssl_csr: + path: '{{ output_dir }}/csr_2.csr' + privatekey_path: '{{ output_dir }}/privatekeypw.pem' + privatekey_passphrase: hunter2 + useCommonNameForSAN: no + basic_constraints: + - "CA:TRUE" + +- name: Generate CSR 3 + openssl_csr: + path: '{{ output_dir }}/csr_3.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + useCommonNameForSAN: no + subject_alt_name: + - "DNS:*.ansible.com" + - "DNS:*.example.org" + - "IP:DEAD:BEEF::1" + basic_constraints: + - "CA:FALSE" + +- name: Generate CSR 4 + openssl_csr: + path: '{{ output_dir }}/csr_4.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + useCommonNameForSAN: no + +- name: Generate selfsigned certificates + openssl_certificate: + path: '{{ output_dir }}/cert_{{ item }}.pem' + csr_path: '{{ output_dir }}/csr_{{ item }}.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + provider: selfsigned + selfsigned_digest: sha256 + selfsigned_not_after: "+10d" + selfsigned_not_before: "-3d" + loop: + - 1 + - 2 + - 3 + - 4 + +- name: Prepare result list + set_fact: + info_results: [] + +- name: Running tests with pyOpenSSL backend + include_tasks: impl.yml + vars: + select_crypto_backend: pyopenssl + when: pyopenssl_version.stdout is version('0.15', '>=') + +- name: Prepare result list + set_fact: + pyopenssl_info_results: "{{ info_results }}" + info_results: [] + +- name: Running tests with cryptography backend + include_tasks: impl.yml + vars: + select_crypto_backend: cryptography + when: cryptography_version.stdout is version('1.6', '>=') + +- name: Prepare result list + set_fact: + cryptography_info_results: "{{ info_results }}" + +- block: + - name: Dump pyOpenSSL results + debug: + var: pyopenssl_info_results + - name: Dump cryptography results + debug: + var: cryptography_info_results + - name: Compare results + assert: + that: + - item.0 == item.1 + quiet: yes + loop: "{{ pyopenssl_info_results | zip(cryptography_info_results) | list }}" + when: pyopenssl_version.stdout is version('0.15', '>=') and cryptography_version.stdout is version('1.6', '>=')