Add openssl_certificate_info module (#54709)

* Add certificate_info module.

* Improve normalization.

* Add extension dump.

* Add support for basic_constraints and ocsp_must_staple.

* Update docs.

* Add serial number.

* Remove superfluous code.

* Fix formulation.

* Improve examples.

* Improve result docs.

* Forgot to add tests.

* Adjust when no fingerprints can be computed.
This commit is contained in:
Felix Fontein 2019-04-05 16:47:05 +02:00 committed by John R Barker
parent c0e7b643bf
commit 65d7f0d17b
6 changed files with 1179 additions and 3 deletions

View file

@ -14,9 +14,16 @@
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
# --------------------------------------------------------------
# A clearly marked portion of this file is licensed under the BSD
# Copyright (c) 2015, 2016 Paul Kehrer (@reaperhulk)
# Copyright (c) 2017 Fraser Tweedale (@frasertweedale)
# For more details, search for the function _obj2txt().
try:
import OpenSSL
from OpenSSL import crypto
except ImportError:
# An error will be raised in the calling class to let the end
@ -35,6 +42,8 @@ except ImportError:
import abc
import base64
import binascii
import datetime
import errno
import hashlib
@ -332,6 +341,225 @@ class OpenSSLObject(object):
pass
_OID_MAP = {
# First entry is 'canonical' name
"2.5.29.37.0": ('Any Extended Key Usage', 'anyExtendedKeyUsage'),
"1.3.6.1.5.5.7.1.3": ('qcStatements', ),
"1.3.6.1.5.5.7.3.10": ('DVCS', 'dvcs'),
"1.3.6.1.5.5.7.3.7": ('IPSec User', 'ipsecUser'),
"1.3.6.1.5.5.7.1.2": ('Biometric Info', 'biometricInfo'),
}
_NORMALIZE_NAMES = {
'CN': 'commonName',
'C': 'countryName',
'L': 'localityName',
'ST': 'stateOrProvinceName',
'street': 'streetAddress',
'O': 'organizationName',
'OU': 'organizationalUnitName',
'SN': 'surname',
'GN': 'givenName',
'UID': 'userId',
'userID': 'userId',
'DC': 'domainComponent',
'jurisdictionC': 'jurisdictionCountryName',
'jurisdictionL': 'jurisdictionLocalityName',
'jurisdictionST': 'jurisdictionStateOrProvinceName',
'serverAuth': 'TLS Web Server Authentication',
'clientAuth': 'TLS Web Client Authentication',
'codeSigning': 'Code Signing',
'emailProtection': 'E-mail Protection',
'timeStamping': 'Time Stamping',
'OCSPSigning': 'OCSP Signing',
}
for dotted, names in _OID_MAP.items():
for name in names[1:]:
_NORMALIZE_NAMES[name] = names[0]
def pyopenssl_normalize_name(name):
nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(name))
if nid != 0:
b_name = OpenSSL._util.lib.OBJ_nid2ln(nid)
name = to_text(OpenSSL._util.ffi.string(b_name))
return _NORMALIZE_NAMES.get(name, name)
# #####################################################################################
# #####################################################################################
# # This excerpt is dual licensed under the terms of the Apache License, Version
# # 2.0, and the BSD License. See the LICENSE file at
# # https://github.com/pyca/cryptography/blob/master/LICENSE for complete details.
# #
# # Adapted from cryptography's hazmat/backends/openssl/decode_asn1.py
# #
# # Copyright (c) 2015, 2016 Paul Kehrer (@reaperhulk)
# # Copyright (c) 2017 Fraser Tweedale (@frasertweedale)
# #
# # Relevant commits from cryptography project (https://github.com/pyca/cryptography):
# # pyca/cryptography@719d536dd691e84e208534798f2eb4f82aaa2e07
# # pyca/cryptography@5ab6d6a5c05572bd1c75f05baf264a2d0001894a
# # pyca/cryptography@2e776e20eb60378e0af9b7439000d0e80da7c7e3
# # pyca/cryptography@fb309ed24647d1be9e319b61b1f2aa8ebb87b90b
# # pyca/cryptography@2917e460993c475c72d7146c50dc3bbc2414280d
# # pyca/cryptography@3057f91ea9a05fb593825006d87a391286a4d828
# # pyca/cryptography@d607dd7e5bc5c08854ec0c9baff70ba4a35be36f
def _obj2txt(openssl_lib, openssl_ffi, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
#
# But OIDs longer than this occur in real life (e.g. Active
# Directory makes some very long OIDs). So we need to detect
# and properly handle the case where the default buffer is not
# big enough.
#
buf_len = 80
buf = openssl_ffi.new("char[]", buf_len)
# 'res' is the number of bytes that *would* be written if the
# buffer is large enough. If 'res' > buf_len - 1, we need to
# alloc a big-enough buffer and go again.
res = openssl_lib.OBJ_obj2txt(buf, buf_len, obj, 1)
if res > buf_len - 1: # account for terminating null byte
buf_len = res + 1
buf = openssl_ffi.new("char[]", buf_len)
res = openssl_lib.OBJ_obj2txt(buf, buf_len, obj, 1)
return openssl_ffi.buffer(buf, res)[:].decode()
# #####################################################################################
# #####################################################################################
def cryptography_get_extensions_from_cert(cert):
# Since cryptography won't give us the DER value for an extension
# (that is only stored for unrecognized extensions), we have to re-do
# the extension parsing outselves.
result = dict()
backend = cert._backend
x509_obj = cert._x509
for i in range(backend._lib.X509_get_ext_count(x509_obj)):
ext = backend._lib.X509_get_ext(x509_obj, i)
if ext == backend._ffi.NULL:
continue
crit = backend._lib.X509_EXTENSION_get_critical(ext)
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
entry = dict(
critical=(crit == 1),
value=base64.b64encode(der),
)
oid = _obj2txt(backend._lib, backend._ffi, backend._lib.X509_EXTENSION_get_object(ext))
result[oid] = entry
return result
def pyopenssl_get_extensions_from_cert(cert):
# While pyOpenSSL allows us to get an extension's DER value, it won't
# give us the dotted string for an OID. So we have to do some magic to
# get hold of it.
result = dict()
ext_count = cert.get_extension_count()
for i in range(0, ext_count):
ext = cert.get_extension(i)
entry = dict(
critical=bool(ext.get_critical()),
value=base64.b64encode(ext.get_data()),
)
oid = _obj2txt(
OpenSSL._util.lib,
OpenSSL._util.ffi,
OpenSSL._util.lib.X509_EXTENSION_get_object(ext._extension)
)
# This could also be done a bit simpler:
#
# oid = _obj2txt(OpenSSL._util.lib, OpenSSL._util.ffi, OpenSSL._util.lib.OBJ_nid2obj(ext._nid))
#
# Unfortunately this gives the wrong result in case the linked OpenSSL
# doesn't know the OID. That's why we have to get the OID dotted string
# similarly to how cryptography does it.
result[oid] = entry
return result
def crpytography_name_to_oid(name):
if name in ('CN', 'commonName'):
return x509.oid.NameOID.COMMON_NAME
if name in ('C', 'countryName'):
return x509.oid.NameOID.COUNTRY_NAME
if name in ('L', 'localityName'):
return x509.oid.NameOID.LOCALITY_NAME
if name in ('ST', 'stateOrProvinceName'):
return x509.oid.NameOID.STATE_OR_PROVINCE_NAME
if name in ('street', 'streetAddress'):
return x509.oid.NameOID.STREET_ADDRESS
if name in ('O', 'organizationName'):
return x509.oid.NameOID.ORGANIZATION_NAME
if name in ('OU', 'organizationalUnitName'):
return x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME
if name in ('serialNumber', ):
return x509.oid.NameOID.SERIAL_NUMBER
if name in ('SN', 'surname'):
return x509.oid.NameOID.SURNAME
if name in ('GN', 'givenName'):
return x509.oid.NameOID.GIVEN_NAME
if name in ('title', ):
return x509.oid.NameOID.TITLE
if name in ('generationQualifier', ):
return x509.oid.NameOID.GENERATION_QUALIFIER
if name in ('x500UniqueIdentifier', ):
return x509.oid.NameOID.X500_UNIQUE_IDENTIFIER
if name in ('dnQualifier', ):
return x509.oid.NameOID.DN_QUALIFIER
if name in ('pseudonym', ):
return x509.oid.NameOID.PSEUDONYM
if name in ('UID', 'userId', 'UserID'):
return x509.oid.NameOID.USER_ID
if name in ('DC', 'domainComponent'):
return x509.oid.NameOID.DOMAIN_COMPONENT
if name in ('emailAddress', ):
return x509.oid.NameOID.EMAIL_ADDRESS
if name in ('jurisdictionC', 'jurisdictionCountryName'):
return x509.oid.NameOID.JURISDICTION_COUNTRY_NAME
if name in ('jurisdictionL', 'jurisdictionLocalityName'):
return x509.oid.NameOID.JURISDICTION_LOCALITY_NAME
if name in ('jurisdictionST', 'jurisdictionStateOrProvinceName'):
return x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME
if name in ('businessCategory', ):
return x509.oid.NameOID.BUSINESS_CATEGORY
if name in ('postalAddress', ):
return x509.oid.NameOID.POSTAL_ADDRESS
if name in ('postalCode', ):
return x509.oid.NameOID.POSTAL_CODE
if name in ('serverAuth', 'TLS Web Server Authentication'):
return x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
if name in ('clientAuth', 'TLS Web Client Authentication'):
return x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
if name in ('codeSigning', 'Code Signing'):
return x509.oid.ExtendedKeyUsageOID.CODE_SIGNING
if name in ('emailProtection', 'E-mail Protection'):
return x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION
if name in ('timeStamping', 'Time Stamping'):
return x509.oid.ExtendedKeyUsageOID.TIME_STAMPING
if name in ('OCSPSigning', 'OCSP Signing'):
return x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING
if name in ('anyExtendedKeyUsage', 'Any Extended Key Usage'):
return x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE
for dotted, names in _OID_MAP.items():
if name in names:
return x509.oid.ObjectIdentifier(dotted)
raise OpenSSLObjectError('Cannot find OID for "{0}"'.format(name))
def crpytography_oid_to_name(oid):
dotted_string = oid.dotted_string
names = _OID_MAP.get(dotted_string)
name = names[0] if names else oid._name
return _NORMALIZE_NAMES.get(name, name)
def cryptography_get_name_oid(id):
'''
Given a symbolic ID, finds the appropriate OID for use with cryptography.
@ -367,7 +595,7 @@ def cryptography_get_name_oid(id):
return x509.oid.NameOID.DN_QUALIFIER
if id in ('pseudonym', ):
return x509.oid.NameOID.PSEUDONYM
if id in ('UID', 'userId'):
if id in ('UID', 'userId', 'userID'):
return x509.oid.NameOID.USER_ID
if id in ('DC', 'domainComponent'):
return x509.oid.NameOID.DOMAIN_COMPONENT
@ -409,6 +637,39 @@ def cryptography_get_name(name):
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}" (potentially unsupported by cryptography backend)'.format(name))
def _get_hex(bytes):
if bytes is None:
return bytes
data = binascii.hexlify(bytes)
data = to_text(b':'.join(data[i:i + 2] for i in range(0, len(data), 2)))
return data
def cryptography_decode_name(name):
'''
Given a cryptography x509.Name object, returns a string.
Raises an OpenSSLObjectError if the name is not supported.
'''
if isinstance(name, x509.DNSName):
return 'DNS:{0}'.format(name.value)
if isinstance(name, x509.IPAddress):
return 'IP:{0}'.format(name.value.compressed)
if isinstance(name, x509.RFC822Name):
return 'email:{0}'.format(name.value)
if isinstance(name, x509.UniformResourceIdentifier):
return 'URI:{0}'.format(name.value)
if isinstance(name, x509.DirectoryName):
# FIXME: test
return 'DirName:' + ''.join(['/{0}:{1}'.format(attribute.oid._name, attribute.value) for attribute in name.value])
if isinstance(name, x509.RegisteredID):
# FIXME: test
return 'RegisteredID:{0}'.format(name.value)
if isinstance(name, x509.OtherName):
# FIXME: test
return '{0}:{1}'.format(name.type_id.dotted_string, _get_hex(name.value))
raise OpenSSLObjectError('Cannot decode name "{0}"'.format(name))
def _cryptography_get_keyusage(usage):
'''
Given a key usage identifier string, returns the parameter name used by cryptography's x509.KeyUsage().
@ -474,10 +735,10 @@ def cryptography_get_ext_keyusage(usage):
if usage in ('OCSPSigning', 'OCSP Signing'):
return x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING
if usage in ('anyExtendedKeyUsage', 'Any Extended Key Usage'):
return x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE
return x509.oid.ObjectIdentifier("2.5.29.37.0")
if usage in ('qcStatements', ):
return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.3")
if usage in ('DVCS', ):
if usage in ('DVCS', 'dvcs'):
return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.10")
if usage in ('IPSec User', 'ipsecUser'):
return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.7")

View file

@ -0,0 +1,708 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: openssl_certificate_info
version_added: '2.8'
short_description: Provide information of OpenSSL X.509 certificates
description:
- This module allows one to query information on OpenSSL certificates.
- It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the
cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements)
cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with
C(select_crypto_backend))
requirements:
- PyOpenSSL >= 0.15 or cryptography >= 1.6
author:
- Felix Fontein (@felixfontein)
- Yanis Guenane (@Spredzy)
- Markus Teufelberger (@MarkusTeufelberger)
options:
path:
description:
- Remote absolute path where the certificate file is loaded from.
type: path
required: true
valid_at:
description:
- A dict of names mapping to time specifications. Every time specified here
will be checked whether the certificate is valid at this point. See the
C(valid_at) return value for informations on the result.
- Time can be specified either as relative time or as absolute timestamp.
- Time will always be interpreted as UTC.
- Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
+ C([w | d | h | m | s]) (e.g. C(+32w1d2h), and ASN.1 TIME (i.e. pattern C(YYYYMMDDHHMMSSZ)).
Note that all timestamps will be treated as being in UTC.
select_crypto_backend:
description:
- Determines which crypto backend to use.
- The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
type: str
default: auto
choices: [ auto, cryptography, pyopenssl ]
notes:
- All timestamp values are provided in ASN.1 TIME format, i.e. following the C(YYYYMMDDHHMMSSZ) pattern.
They are all in UTC.
seealso:
- module: openssl_certificate
'''
EXAMPLES = r'''
- name: Generate a Self Signed OpenSSL certificate
openssl_certificate:
path: /etc/ssl/crt/ansible.com.crt
privatekey_path: /etc/ssl/private/ansible.com.pem
csr_path: /etc/ssl/csr/ansible.com.csr
provider: selfsigned
# Get information on the certificate
- name: Get information on generated certificate
openssl_certificate_info:
path: /etc/ssl/crt/ansible.com.crt
register: result
- name: Dump information
debug:
var: result
# Check whether the certificate is valid or not valid at certain times, fail
# if this is not the case. The first task (openssl_certificate_info) collects
# the information, and the second task (assert) validates the result and
# makes the playbook fail in case something is not as expected.
- name: Test whether that certificate is valid tomorrow and/or in three weeks
openssl_certificate_info:
path: /etc/ssl/crt/ansible.com.crt
valid_at:
point_1: "+1d"
point_2: "+3w"
register: result
- name: Validate that certificate is valid tomorrow, but not in three weeks
assert:
that:
- result.valid_at.point_1 # valid in one day
- not result.valid_at.point_2 # not valid in three weeks
'''
RETURN = r'''
expired:
description: Whether the certificate is expired (i.e. C(notAfter) is in the past)
returned: success
type: bool
basic_constraints:
description: Entries in the C(basic_constraints) extension, or C(none) if extension is not present.
returned: success
type: list
sample: "[CA:TRUE, pathlen:1]"
basic_constraints_critical:
description: Whether the C(basic_constraints) extension is critical.
returned: success
type: bool
extended_key_usage:
description: Entries in the C(extended_key_usage) extension, or C(none) if extension is not present.
returned: success
type: list
sample: "[Biometric Info, DVCS, Time Stamping]"
extended_key_usage_critical:
description: Whether the C(extended_key_usage) extension is critical.
returned: success
type: bool
extensions_by_oid:
description: Returns a dictionary for every extension OID
returned: success
type: complex
contains:
critical:
description: Whether the extension is critical.
returned: success
type: bool
value:
description: The Base64 encoded value (in DER format) of the extension
returned: success
type: str
sample: "MAMCAQU="
sample: '{"1.3.6.1.5.5.7.1.24": { "critical": false, "value": "MAMCAQU="}}'
key_usage:
description: Entries in the C(key_usage) extension, or C(none) if extension is not present.
returned: success
type: str
sample: "[Key Agreement, Data Encipherment]"
key_usage_critical:
description: Whether the C(key_usage) extension is critical.
returned: success
type: bool
subject_alt_name:
description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
returned: success
type: list
sample: "[DNS:www.ansible.com, IP:1.2.3.4]"
subject_alt_name_critical:
description: Whether the C(subject_alt_name) extension is critical.
returned: success
type: bool
ocsp_must_staple:
description: C(yes) if the OCSP Must Staple extension is present, C(none) otherwise.
returned: success
type: bool
ocsp_must_staple_critical:
description: Whether the C(ocsp_must_staple) extension is critical.
returned: success
type: bool
issuer:
description: The certificate's issuer.
returned: success
type: dict
sample: '{"organizationName": "Ansible"}'
subject:
description: The certificate's subject.
returned: success
type: dict
sample: '{"commonName": "www.example.com", "emailAddress": "test@example.com"}'
not_after:
description: C(notAfter) date as ASN.1 TIME
returned: success
type: str
sample: 20190413202428Z
not_before:
description: C(notBefore) date as ASN.1 TIME
returned: success
type: str
sample: 20190331202428Z
public_key:
description: Certificate's public key in PEM format
returned: success
type: str
sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..."
public_key_fingerprints:
description:
- Fingerprints of certificate's public key.
- For every hash algorithm available, the fingerprint is computed.
returned: success
type: dict
sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63',
'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..."
signature_algorithm:
description: The signature algorithm used to sign the certificate.
returned: success
type: str
sample: sha256WithRSAEncryption
serial_number:
description: The certificate's serial number.
returned: success
type: int
sample: 1234
version:
description: The certificate version.
returned: success
type: int
sample: 3
valid_at:
description: For every time stamp provided in the I(valid_at) option, a
boolean whether the certificate is valid at that point in time
or not.
returned: success
type: dict
'''
import abc
import datetime
import os
import traceback
from distutils.version import LooseVersion
from ansible.module_utils import crypto as crypto_utils
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_native, to_text, to_bytes
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
MINIMAL_PYOPENSSL_VERSION = '0.15'
PYOPENSSL_IMP_ERR = None
try:
import OpenSSL
from OpenSSL import crypto
import ipaddress
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
# OpenSSL 1.1.0 or newer
OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
OPENSSL_MUST_STAPLE_VALUE = b"status_request"
else:
# OpenSSL 1.0.x or older
OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
except ImportError:
PYOPENSSL_IMP_ERR = traceback.format_exc()
PYOPENSSL_FOUND = False
else:
PYOPENSSL_FOUND = True
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography import x509
from cryptography.hazmat.primitives import serialization
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
def get_relative_time_option(input_string, input_name):
"""Return an ASN1 formatted string if a relative timespec
or an ASN1 formatted string is provided."""
result = input_string
if result.startswith("+") or result.startswith("-"):
return crypto_utils.convert_relative_to_datetime(result)
if result is None:
raise crypto_utils.CertificateError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
result = datetime.datetime.strptime(input_string, date_fmt)
break
except ValueError:
pass
if not isinstance(result, datetime.datetime):
raise crypto_utils.CertificateError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)
return result
class CertificateInfo(crypto_utils.OpenSSLObject):
def __init__(self, module, backend):
super(CertificateInfo, self).__init__(
module.params['path'],
'present',
False,
module.check_mode,
)
self.backend = backend
self.module = module
self.valid_at = module.params['valid_at']
if self.valid_at:
for k, v in self.valid_at.items():
if not isinstance(v, string_types):
self.module.fail_json(
msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v))
)
self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k))
def generate(self):
# Empty method because crypto_utils.OpenSSLObject wants this
pass
def dump(self):
# Empty method because crypto_utils.OpenSSLObject wants this
pass
@abc.abstractmethod
def _get_signature_algorithm(self):
pass
@abc.abstractmethod
def _get_subject(self):
pass
@abc.abstractmethod
def _get_issuer(self):
pass
@abc.abstractmethod
def _get_version(self):
pass
@abc.abstractmethod
def _get_key_usage(self):
pass
@abc.abstractmethod
def _get_extended_key_usage(self):
pass
@abc.abstractmethod
def _get_basic_constraints(self):
pass
@abc.abstractmethod
def _get_ocsp_must_staple(self):
pass
@abc.abstractmethod
def _get_subject_alt_name(self):
pass
@abc.abstractmethod
def _get_not_before(self):
pass
@abc.abstractmethod
def _get_not_after(self):
pass
@abc.abstractmethod
def _get_public_key(self, binary):
pass
@abc.abstractmethod
def _get_serial_number(self):
pass
@abc.abstractmethod
def _get_all_extensions(self):
pass
def get_info(self):
result = dict()
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
result['signature_algorithm'] = self._get_signature_algorithm()
result['subject'] = self._get_subject()
result['issuer'] = self._get_issuer()
result['version'] = self._get_version()
result['key_usage'], result['key_usage_critical'] = self._get_key_usage()
result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage()
result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
not_before = self._get_not_before()
not_after = self._get_not_after()
result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT)
result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT)
result['expired'] = not_after < datetime.datetime.utcnow()
result['valid_at'] = dict()
if self.valid_at:
for k, v in self.valid_at.items():
result['valid_at'][k] = not_before <= v <= not_after
result['public_key'] = self._get_public_key(binary=False)
pk = self._get_public_key(binary=True)
result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict()
result['serial_number'] = self._get_serial_number()
result['extensions_by_oid'] = self._get_all_extensions()
return result
class CertificateInfoCryptography(CertificateInfo):
"""Validate the supplied cert, using the cryptography backend"""
def __init__(self, module):
super(CertificateInfoCryptography, self).__init__(module, 'cryptography')
def _get_signature_algorithm(self):
return crypto_utils.crpytography_oid_to_name(self.cert.signature_algorithm_oid)
def _get_subject(self):
result = dict()
for attribute in self.cert.subject:
result[crypto_utils.crpytography_oid_to_name(attribute.oid)] = attribute.value
return result
def _get_issuer(self):
result = dict()
for attribute in self.cert.issuer:
result[crypto_utils.crpytography_oid_to_name(attribute.oid)] = attribute.value
return result
def _get_version(self):
if self.cert.version == x509.Version.v1:
return 1
if self.cert.version == x509.Version.v3:
return 3
return "unknown"
def _get_key_usage(self):
try:
current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage)
current_key_usage = current_key_ext.value
key_usage = dict(
digital_signature=current_key_usage.digital_signature,
content_commitment=current_key_usage.content_commitment,
key_encipherment=current_key_usage.key_encipherment,
data_encipherment=current_key_usage.data_encipherment,
key_agreement=current_key_usage.key_agreement,
key_cert_sign=current_key_usage.key_cert_sign,
crl_sign=current_key_usage.crl_sign,
encipher_only=False,
decipher_only=False,
)
if key_usage['key_agreement']:
key_usage.update(dict(
encipher_only=current_key_usage.encipher_only,
decipher_only=current_key_usage.decipher_only
))
key_usage_names = dict(
digital_signature='Digital Signature',
content_commitment='Non Repudiation',
key_encipherment='Key Encipherment',
data_encipherment='Data Encipherment',
key_agreement='Key Agreement',
key_cert_sign='Certificate Sign',
crl_sign='CRL Sign',
encipher_only='Encipher Only',
decipher_only='Decipher Only',
)
return sorted([
key_usage_names[name] for name, value in key_usage.items() if value
]), current_key_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_extended_key_usage(self):
try:
ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
return sorted([
crypto_utils.crpytography_oid_to_name(eku) for eku in ext_keyusage_ext.value
]), ext_keyusage_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_basic_constraints(self):
try:
ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints)
result = []
result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE'))
if ext_keyusage_ext.value.path_length is not None:
result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length))
return sorted(result), ext_keyusage_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_ocsp_must_staple(self):
try:
try:
# This only works with cryptography >= 2.1
tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature)
value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
except AttributeError as dummy:
# Fallback for cryptography < 2.1
oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid)
value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
return value, tlsfeature_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_subject_alt_name(self):
try:
san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
result = [crypto_utils.cryptography_decode_name(san) for san in san_ext.value]
return result, san_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_not_before(self):
return self.cert.not_valid_before
def _get_not_after(self):
return self.cert.not_valid_after
def _get_public_key(self, binary):
return self.cert.public_key().public_bytes(
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo
)
def _get_serial_number(self):
return self.cert.serial_number
def _get_all_extensions(self):
return crypto_utils.cryptography_get_extensions_from_cert(self.cert)
class CertificateInfoPyOpenSSL(CertificateInfo):
"""validate the supplied certificate."""
def __init__(self, module):
super(CertificateInfoPyOpenSSL, self).__init__(module, 'pyopenssl')
def _get_signature_algorithm(self):
return to_text(self.cert.get_signature_algorithm())
def __get_name(self, name):
result = dict()
for sub in name.get_components():
result[crypto_utils.pyopenssl_normalize_name(sub[0])] = to_text(sub[1])
return result
def _get_subject(self):
return self.__get_name(self.cert.get_subject())
def _get_issuer(self):
return self.__get_name(self.cert.get_issuer())
def _get_version(self):
# Version numbers in certs are off by one:
# v1: 0, v2: 1, v3: 2 ...
return self.cert.get_version() + 1
def _get_extension(self, short_name):
for extension_idx in range(0, self.cert.get_extension_count()):
extension = self.cert.get_extension(extension_idx)
if extension.get_short_name() == short_name:
result = [
crypto_utils.pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',')
]
return sorted(result), bool(extension.get_critical())
return None, False
def _get_key_usage(self):
return self._get_extension(b'keyUsage')
def _get_extended_key_usage(self):
return self._get_extension(b'extendedKeyUsage')
def _get_basic_constraints(self):
return self._get_extension(b'basicConstraints')
def _get_ocsp_must_staple(self):
extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())]
oms_ext = [
ext for ext in extensions
if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE
]
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
# Older versions of libssl don't know about OCSP Must Staple
oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
if oms_ext:
return True, bool(oms_ext[0].get_critical())
else:
return None, False
def _normalize_san(self, san):
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)
return san
def _get_subject_alt_name(self):
for extension_idx in range(0, self.cert.get_extension_count()):
extension = self.cert.get_extension(extension_idx)
if extension.get_short_name() == b'subjectAltName':
result = [self._normalize_san(altname.strip()) for altname in
to_text(extension, errors='surrogate_or_strict').split(', ')]
return result, bool(extension.get_critical())
return None, False
def _get_not_before(self):
time_string = to_native(self.cert.get_notBefore())
return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
def _get_not_after(self):
time_string = to_native(self.cert.get_notAfter())
return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
def _get_public_key(self, binary):
try:
return crypto.dump_publickey(
crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
self.cert.get_pubkey()
)
except AttributeError:
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
def _get_serial_number(self):
return self.cert.get_serial_number()
def _get_all_extensions(self):
return crypto_utils.pyopenssl_get_extensions_from_cert(self.cert)
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
valid_at=dict(type='dict'),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
supports_check_mode=True,
)
try:
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
)
backend = module.params['select_crypto_backend']
if backend == 'auto':
# Detect what backend we can use
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
# If cryptography is available we'll use it
if can_use_cryptography:
backend = 'cryptography'
elif can_use_pyopenssl:
backend = 'pyopenssl'
# Fail if no backend has been found
if backend == 'auto':
module.fail_json(msg=("Can't detect any of the required Python libraries "
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION))
if backend == 'pyopenssl':
if not PYOPENSSL_FOUND:
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
try:
getattr(crypto.X509Req, 'get_extensions')
except AttributeError:
module.fail_json(msg='You need to have PyOpenSSL>=0.15')
certificate = CertificateInfoPyOpenSSL(module)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
certificate = CertificateInfoCryptography(module)
result = certificate.get_info()
module.exit_json(**result)
except crypto_utils.OpenSSLObjectError as exc:
module.fail_json(msg=to_native(exc))
if __name__ == "__main__":
main()

View file

@ -0,0 +1,2 @@
shippable/posix/group1
destructive

View file

@ -0,0 +1,2 @@
dependencies:
- setup_openssl

View file

@ -0,0 +1,52 @@
---
- debug:
msg: "Executing tests with backend {{ select_crypto_backend }}"
- name: ({{select_crypto_backend}}) Get certificate info
openssl_certificate_info:
path: '{{ output_dir }}/cert_1.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Update result list
set_fact:
info_results: "{{ info_results + [result] }}"
- name: ({{select_crypto_backend}}) Get certificate info
openssl_certificate_info:
path: '{{ output_dir }}/cert_2.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
valid_at:
today: "+0d"
past: "20190101235901Z"
twentydays: "+20d"
register: result
- assert:
that:
- result.valid_at.today
- not result.valid_at.past
- not result.valid_at.twentydays
- name: Update result list
set_fact:
info_results: "{{ info_results + [result] }}"
- name: ({{select_crypto_backend}}) Get certificate info
openssl_certificate_info:
path: '{{ output_dir }}/cert_3.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Update result list
set_fact:
info_results: "{{ info_results + [result] }}"
- name: ({{select_crypto_backend}}) Get certificate info
openssl_certificate_info:
path: '{{ output_dir }}/cert_4.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Update result list
set_fact:
info_results: "{{ info_results + [result] }}"

View file

@ -0,0 +1,151 @@
---
- name: Generate privatekey
openssl_privatekey:
path: '{{ output_dir }}/privatekey.pem'
- name: Generate privatekey with password
openssl_privatekey:
path: '{{ output_dir }}/privatekeypw.pem'
passphrase: hunter2
cipher: auto
select_crypto_backend: cryptography
- name: Generate CSR 1
openssl_csr:
path: '{{ output_dir }}/csr_1.csr'
privatekey_path: '{{ output_dir }}/privatekey.pem'
subject:
commonName: www.example.com
C: de
L: Somewhere
ST: Zurich
streetAddress: Welcome Street
O: Ansible
organizationalUnitName: Crypto Department
serialNumber: "1234"
SN: Last Name
GN: First Name
title: Chief
pseudonym: test
UID: asdf
emailAddress: test@example.com
postalAddress: 1234 Somewhere
postalCode: "1234"
useCommonNameForSAN: no
key_usage:
- digitalSignature
- keyAgreement
- Non Repudiation
- Key Encipherment
- dataEncipherment
- Certificate Sign
- cRLSign
- Encipher Only
- decipherOnly
key_usage_critical: yes
extended_key_usage:
- serverAuth # the same as "TLS Web Server Authentication"
- TLS Web Server Authentication
- TLS Web Client Authentication
- Code Signing
- E-mail Protection
- timeStamping
- OCSPSigning
- Any Extended Key Usage
- qcStatements
- DVCS
- IPSec User
- biometricInfo
subject_alt_name:
- "DNS:www.ansible.com"
- "IP:1.2.3.4"
- "IP:::1"
- "email:test@example.org"
- "URI:https://example.org/test/index.html"
basic_constraints:
- "CA:TRUE"
- "pathlen:23"
basic_constraints_critical: yes
ocsp_must_staple: yes
- name: Generate CSR 2
openssl_csr:
path: '{{ output_dir }}/csr_2.csr'
privatekey_path: '{{ output_dir }}/privatekeypw.pem'
privatekey_passphrase: hunter2
useCommonNameForSAN: no
basic_constraints:
- "CA:TRUE"
- name: Generate CSR 3
openssl_csr:
path: '{{ output_dir }}/csr_3.csr'
privatekey_path: '{{ output_dir }}/privatekey.pem'
useCommonNameForSAN: no
subject_alt_name:
- "DNS:*.ansible.com"
- "DNS:*.example.org"
- "IP:DEAD:BEEF::1"
basic_constraints:
- "CA:FALSE"
- name: Generate CSR 4
openssl_csr:
path: '{{ output_dir }}/csr_4.csr'
privatekey_path: '{{ output_dir }}/privatekey.pem'
useCommonNameForSAN: no
- name: Generate selfsigned certificates
openssl_certificate:
path: '{{ output_dir }}/cert_{{ item }}.pem'
csr_path: '{{ output_dir }}/csr_{{ item }}.csr'
privatekey_path: '{{ output_dir }}/privatekey.pem'
provider: selfsigned
selfsigned_digest: sha256
selfsigned_not_after: "+10d"
selfsigned_not_before: "-3d"
loop:
- 1
- 2
- 3
- 4
- name: Prepare result list
set_fact:
info_results: []
- name: Running tests with pyOpenSSL backend
include_tasks: impl.yml
vars:
select_crypto_backend: pyopenssl
when: pyopenssl_version.stdout is version('0.15', '>=')
- name: Prepare result list
set_fact:
pyopenssl_info_results: "{{ info_results }}"
info_results: []
- name: Running tests with cryptography backend
include_tasks: impl.yml
vars:
select_crypto_backend: cryptography
when: cryptography_version.stdout is version('1.6', '>=')
- name: Prepare result list
set_fact:
cryptography_info_results: "{{ info_results }}"
- block:
- name: Dump pyOpenSSL results
debug:
var: pyopenssl_info_results
- name: Dump cryptography results
debug:
var: cryptography_info_results
- name: Compare results
assert:
that:
- item.0 == item.1
quiet: yes
loop: "{{ pyopenssl_info_results | zip(cryptography_info_results) | list }}"
when: pyopenssl_version.stdout is version('0.15', '>=') and cryptography_version.stdout is version('1.6', '>=')