openssl_privatekey: add support for format option (#60388)

* Add support for format option.

* Improve private key format detection.

* Fix raw format handling.

* Improve error handling.

* Improve raw key handling.

* Add failed raw test.

* Improve raw key loading.

* Simplify tests.

* Add raw format tests.

* Fail if format != 'auto_ignore' is specified for pyopenssl backend.

* Fix quoting.

* Bump version.

* Allow to convert private keys between different formats.

* Improve description.
This commit is contained in:
Felix Fontein 2019-10-17 10:40:13 +02:00 committed by GitHub
parent e3c7e35656
commit d00d0c81b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 346 additions and 21 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- "openssl_privatekey - add ``format`` and ``format_mismatch`` options."

View file

@ -1903,3 +1903,29 @@ else:
no >>= 1
count += 1
return count
PEM_START = '-----BEGIN '
PEM_END = '-----'
PKCS8_PRIVATEKEY_NAMES = ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY')
PKCS1_PRIVATEKEY_SUFFIX = ' PRIVATE KEY'
def identify_private_key_format(content):
'''Given the contents of a private key file, identifies its format.'''
# See https://github.com/openssl/openssl/blob/master/crypto/pem/pem_pkey.c#L40-L85
# (PEM_read_bio_PrivateKey)
# and https://github.com/openssl/openssl/blob/master/include/openssl/pem.h#L46-L47
# (PEM_STRING_PKCS8, PEM_STRING_PKCS8INF)
try:
lines = content.decode('utf-8').splitlines(False)
if lines[0].startswith(PEM_START) and lines[0].endswith(PEM_END) and len(lines[0]) > len(PEM_START) + len(PEM_END):
name = lines[0][len(PEM_START):-len(PEM_END)]
if name in PKCS8_PRIVATEKEY_NAMES:
return 'pkcs8'
if len(name) > len(PKCS1_PRIVATEKEY_SUFFIX) and name.endswith(PKCS1_PRIVATEKEY_SUFFIX):
return 'pkcs1'
return 'unknown-pem'
except UnicodeDecodeError:
pass
return 'raw'

View file

@ -123,6 +123,32 @@ options:
default: auto
choices: [ auto, cryptography, pyopenssl ]
version_added: "2.8"
format:
description:
- Determines which format the private key is written in. By default, PKCS1 (traditional OpenSSL format)
is used for all keys which support it. Please note that not every key can be exported in any format.
- The value C(auto) selects a fromat based on the key format. The value C(auto_ignore) does the same,
but for existing private key files, it will not force a regenerate when its format is not the automatically
selected one for generation.
- Note that if the format for an existing private key mismatches, the key is *regenerated* by default.
To change this behavior, use the I(format_mismatch) option.
- The I(format) option is only supported by the C(cryptography) backend. The C(pyopenssl) backend will
fail if a value different from C(auto_ignore) is used.
type: str
default: auto_ignore
choices: [ pkcs1, pkcs8, raw, auto, auto_ignore ]
version_added: "2.10"
format_mismatch:
description:
- Determines behavior of the module if the format of a private key does not match the expected format, but all
other parameters are as expected.
- If set to C(regenerate) (default), generates a new private key.
- If set to C(convert), the key will be converted to the new format instead.
- Only supported by the C(cryptography) backend.
type: str
default: regenerate
choices: [ regenerate, convert ]
version_added: "2.10"
backup:
description:
- Create a backup file including a timestamp so you can get
@ -293,6 +319,8 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
self.cipher = module.params['cipher']
self.privatekey = None
self.fingerprint = {}
self.format = module.params['format']
self.format_mismatch = module.params['format_mismatch']
self.backup = module.params['backup']
self.backup_file = None
@ -301,7 +329,13 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
module.params['mode'] = '0600'
@abc.abstractmethod
def _generate_private_key_data(self):
def _generate_private_key(self):
"""(Re-)Generate private key."""
pass
@abc.abstractmethod
def _get_private_key_data(self):
"""Return bytes for self.privatekey"""
pass
@abc.abstractmethod
@ -311,10 +345,19 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
def generate(self, module):
"""Generate a keypair."""
if not self.check(module, perms_required=False) or self.force:
if not self.check(module, perms_required=False, ignore_conversion=True) or self.force:
# Regenerate
if self.backup:
self.backup_file = module.backup_local(self.path)
privatekey_data = self._generate_private_key_data()
self._generate_private_key()
privatekey_data = self._get_private_key_data()
crypto_utils.write_file(module, privatekey_data, 0o600)
self.changed = True
elif not self.check(module, perms_required=False, ignore_conversion=False):
# Convert
if self.backup:
self.backup_file = module.backup_local(self.path)
privatekey_data = self._get_private_key_data()
crypto_utils.write_file(module, privatekey_data, 0o600)
self.changed = True
@ -336,7 +379,11 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
def _check_size_and_type(self):
pass
def check(self, module, perms_required=True):
@abc.abstractmethod
def _check_format(self):
pass
def check(self, module, perms_required=True, ignore_conversion=True):
"""Ensure the resource is in its desired state."""
state_and_perms = super(PrivateKeyBase, self).check(module, perms_required)
@ -344,7 +391,14 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject):
if not state_and_perms or not self._check_passphrase():
return False
return self._check_size_and_type()
if not self._check_size_and_type():
return False
if not self._check_format():
if ignore_conversion or self.format_mismatch != 'convert':
return False
return True
def dump(self):
"""Serialize the object into a dictionary."""
@ -374,14 +428,19 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase):
else:
module.fail_json(msg="PyOpenSSL backend only supports RSA and DSA keys.")
def _generate_private_key_data(self):
self.privatekey = crypto.PKey()
if self.format != 'auto_ignore':
module.fail_json(msg="PyOpenSSL backend only supports auto_ignore format.")
def _generate_private_key(self):
"""(Re-)Generate private key."""
self.privatekey = crypto.PKey()
try:
self.privatekey.generate_key(self.type, self.size)
except (TypeError, ValueError) as exc:
raise PrivateKeyError(exc)
def _get_private_key_data(self):
"""Return bytes for self.privatekey"""
if self.cipher and self.passphrase:
return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey,
self.cipher, to_bytes(self.passphrase))
@ -412,6 +471,10 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase):
return _check_size(privatekey) and _check_type(privatekey)
def _check_format(self):
# Not supported by this backend
return True
def dump(self):
"""Serialize the object into a dictionary."""
@ -489,8 +552,16 @@ class PrivateKeyCryptography(PrivateKeyBase):
if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448':
self.module.fail_json(msg='Your cryptography version does not support Ed448')
def _generate_private_key_data(self):
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL
def _get_wanted_format(self):
if self.format not in ('auto', 'auto_ignore'):
return self.format
if self.type in ('X25519', 'X448', 'Ed25519', 'Ed448'):
return 'pkcs8'
else:
return 'pkcs1'
def _generate_private_key(self):
"""(Re-)Generate private key."""
try:
if self.type == 'RSA':
self.privatekey = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key(
@ -505,16 +576,12 @@ class PrivateKeyCryptography(PrivateKeyBase):
)
if CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519':
self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate()
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8
if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate()
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8
if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519':
self.privatekey = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate()
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8
if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448':
self.privatekey = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate()
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8
if self.type == 'ECC' and self.curve in self.curves:
if self.curves[self.curve]['deprecated']:
self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve))
@ -525,6 +592,23 @@ class PrivateKeyCryptography(PrivateKeyBase):
except cryptography.exceptions.UnsupportedAlgorithm as dummy:
self.module.fail_json(msg='Cryptography backend does not support the algorithm required for {0}'.format(self.type))
def _get_private_key_data(self):
"""Return bytes for self.privatekey"""
# Select export format and encoding
try:
export_format = self._get_wanted_format()
export_encoding = cryptography.hazmat.primitives.serialization.Encoding.PEM
if export_format == 'pkcs1':
# "TraditionalOpenSSL" format is PKCS1
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL
elif export_format == 'pkcs8':
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8
elif export_format == 'raw':
export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.Raw
export_encoding = cryptography.hazmat.primitives.serialization.Encoding.Raw
except AttributeError:
self.module.fail_json(msg='Cryptography backend does not support the selected output format "{0}"'.format(self.format))
# Select key encryption
encryption_algorithm = cryptography.hazmat.primitives.serialization.NoEncryption()
if self.cipher and self.passphrase:
@ -534,17 +618,48 @@ class PrivateKeyCryptography(PrivateKeyBase):
self.module.fail_json(msg='Cryptography backend can only use "auto" for cipher option.')
# Serialize key
return self.privatekey.private_bytes(
encoding=cryptography.hazmat.primitives.serialization.Encoding.PEM,
format=export_format,
encryption_algorithm=encryption_algorithm
)
try:
return self.privatekey.private_bytes(
encoding=export_encoding,
format=export_format,
encryption_algorithm=encryption_algorithm
)
except ValueError as e:
self.module.fail_json(
msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format)
)
except Exception as dummy:
self.module.fail_json(
msg='Error while serializing the private key in the required format "{0}"'.format(self.format),
exception=traceback.format_exc()
)
def _load_privatekey(self):
try:
# Read bytes
with open(self.path, 'rb') as f:
data = f.read()
# Interpret bytes depending on format.
format = crypto_utils.identify_private_key_format(data)
if format == 'raw':
if len(data) == 56 and CRYPTOGRAPHY_HAS_X448:
return cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.from_private_bytes(data)
if len(data) == 57 and CRYPTOGRAPHY_HAS_ED448:
return cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.from_private_bytes(data)
if len(data) == 32:
if CRYPTOGRAPHY_HAS_X25519 and (self.type == 'X25519' or not CRYPTOGRAPHY_HAS_ED25519):
return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data)
if CRYPTOGRAPHY_HAS_ED25519 and (self.type == 'Ed25519' or not CRYPTOGRAPHY_HAS_X25519):
return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data)
if CRYPTOGRAPHY_HAS_X25519 and CRYPTOGRAPHY_HAS_ED25519:
try:
return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data)
except Exception:
return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data)
raise PrivateKeyError('Cannot load raw key')
else:
return cryptography.hazmat.primitives.serialization.load_pem_private_key(
f.read(),
data,
None if self.passphrase is None else to_bytes(self.passphrase),
backend=self.cryptography_backend
)
@ -565,12 +680,17 @@ class PrivateKeyCryptography(PrivateKeyBase):
def _check_passphrase(self):
try:
with open(self.path, 'rb') as f:
data = f.read()
format = crypto_utils.identify_private_key_format(data)
if format == 'raw':
# Raw keys cannot be encrypted
return self.passphrase is None
else:
return cryptography.hazmat.primitives.serialization.load_pem_private_key(
f.read(),
data,
None if self.passphrase is None else to_bytes(self.passphrase),
backend=self.cryptography_backend
)
return True
except Exception as dummy:
return False
@ -598,6 +718,17 @@ class PrivateKeyCryptography(PrivateKeyBase):
return False
def _check_format(self):
if self.format == 'auto_ignore':
return True
try:
with open(self.path, 'rb') as f:
content = f.read()
format = crypto_utils.identify_private_key_format(content)
return format == self._get_wanted_format()
except Exception as dummy:
return False
def dump(self):
"""Serialize the object into a dictionary."""
result = super(PrivateKeyCryptography, self).dump()
@ -627,6 +758,8 @@ def main():
passphrase=dict(type='str', no_log=True),
cipher=dict(type='str'),
backup=dict(type='bool', default=False),
format=dict(type='str', default='auto_ignore', choices=['pkcs1', 'pkcs8', 'raw', 'auto', 'auto_ignore']),
format_mismatch=dict(type='str', default='regenerate', choices=['regenerate', 'convert']),
select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'),
),
supports_check_mode=True,

View file

@ -277,3 +277,132 @@
stat:
path: '{{ output_dir }}/privatekey_mode.pem'
register: privatekey_mode_3_stat
- block:
- name: Generate privatekey_fmt_1 - auto format
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: auto
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_1
- name: Generate privatekey_fmt_1 - auto format (idempotent)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: auto
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_2
- name: Generate privatekey_fmt_1 - PKCS1 format
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: pkcs1
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_3
- name: Generate privatekey_fmt_1 - PKCS8 format
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: pkcs8
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_4
- name: Generate privatekey_fmt_1 - PKCS8 format (idempotent)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: pkcs8
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_5
- name: Generate privatekey_fmt_1 - auto format (ignore)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: auto_ignore
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_6
- name: Generate privatekey_fmt_1 - auto format (no ignore)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: auto
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_7
- name: Generate privatekey_fmt_1 - raw format (fail)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: raw
select_crypto_backend: '{{ select_crypto_backend }}'
ignore_errors: yes
register: privatekey_fmt_1_step_8
- name: Generate privatekey_fmt_1 - PKCS8 format (convert)
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_9_before
- name: Generate privatekey_fmt_1 - PKCS8 format (convert)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
format: pkcs8
format_mismatch: convert
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_9
- name: Generate privatekey_fmt_1 - PKCS8 format (convert)
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_fmt_1.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_9_after
when: 'select_crypto_backend == "cryptography"'
- block:
- name: Generate privatekey_fmt_2 - PKCS8 format
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_2.pem'
type: X448
format: pkcs8
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_2_step_1
- name: Generate privatekey_fmt_2 - PKCS8 format (idempotent)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_2.pem'
type: X448
format: pkcs8
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_2_step_2
- name: Generate privatekey_fmt_2 - raw format
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_2.pem'
type: X448
format: raw
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_2_step_3
- name: Generate privatekey_fmt_2 - raw format (idempotent)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_2.pem'
type: X448
format: raw
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_2_step_4
- name: Generate privatekey_fmt_2 - auto format (ignore)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_2.pem'
format: auto_ignore
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_5
- name: Generate privatekey_fmt_2 - auto format (no ignore)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_fmt_2.pem'
format: auto
select_crypto_backend: '{{ select_crypto_backend }}'
register: privatekey_fmt_1_step_6
when: 'select_crypto_backend == "cryptography" and cryptography_version.stdout is version("2.6", ">=")'

View file

@ -37,6 +37,8 @@
select_crypto_backend: pyopenssl
- import_tasks: ../tests/validate.yml
vars:
select_crypto_backend: pyopenssl
# FIXME: minimal pyOpenSSL version?!
when: pyopenssl_version.stdout is version('0.6', '>=')
@ -58,6 +60,8 @@
select_crypto_backend: cryptography
- import_tasks: ../tests/validate.yml
vars:
select_crypto_backend: pyopenssl
when: cryptography_version.stdout is version('0.5', '>=')
@ -85,6 +89,11 @@
- DSA
register: fingerprint_cryptography
- name: Verify that keys were not regenerated
assert:
that:
- fingerprint_cryptography is not changed
- name: Verify that fingerprints match
assert:
that: item.0.fingerprint[item.2] == item.1.fingerprint[item.2]

View file

@ -160,3 +160,29 @@
- privatekey_mode_3 is changed
- privatekey_mode_3_stat.stat.mode == '0400'
- privatekey_mode_1_stat.stat.mtime != privatekey_mode_3_stat.stat.mtime
- name: Validate format 1
assert:
that:
- privatekey_fmt_1_step_1 is changed
- privatekey_fmt_1_step_2 is not changed
- privatekey_fmt_1_step_3 is not changed
- privatekey_fmt_1_step_4 is changed
- privatekey_fmt_1_step_5 is not changed
- privatekey_fmt_1_step_6 is not changed
- privatekey_fmt_1_step_7 is changed
- privatekey_fmt_1_step_8 is failed
- privatekey_fmt_1_step_9 is changed
- privatekey_fmt_1_step_9_before.public_key == privatekey_fmt_1_step_9_after.public_key
when: 'select_crypto_backend == "cryptography"'
- name: Validate format 2
assert:
that:
- privatekey_fmt_2_step_1 is changed
- privatekey_fmt_2_step_2 is not changed
- privatekey_fmt_2_step_3 is changed
- privatekey_fmt_2_step_4 is not changed
- privatekey_fmt_2_step_5 is not changed
- privatekey_fmt_2_step_6 is changed
when: 'select_crypto_backend == "cryptography" and cryptography_version.stdout is version("2.6", ">=")'