crypto modules: use module_utils.compat.ipaddress when possible (#55278)
* Use module_utils.compat.ipaddress where possible.
* Simplify reverse pointer computation.
* Use dummy for unused variables.
* Remove from ignore list.
* Adjust fix.
* Fix text handling for Python 2.
* Add changelog.
(cherry picked from commit c8a15b9dbc
)
This commit is contained in:
parent
368375810b
commit
2e80441948
7 changed files with 25 additions and 70 deletions
|
@ -0,0 +1,3 @@
|
|||
bugfixes:
|
||||
- "openssl_csr, openssl_csr_info - use ``ipaddress`` module bundled with Ansible for normalizations needed for pyOpenSSL backend."
|
||||
- "acme_certificate - use ``ipaddress`` module bundled with Ansible for normalizations needed for OpenSSL backend."
|
|
@ -29,6 +29,7 @@ import traceback
|
|||
|
||||
from ansible.module_utils._text import to_native, to_text, to_bytes
|
||||
from ansible.module_utils.urls import fetch_url
|
||||
from ansible.module_utils.compat import ipaddress as compat_ipaddress
|
||||
|
||||
try:
|
||||
import cryptography
|
||||
|
@ -46,7 +47,7 @@ try:
|
|||
HAS_CURRENT_CRYPTOGRAPHY = (LooseVersion(CRYPTOGRAPHY_VERSION) >= LooseVersion('1.5'))
|
||||
if HAS_CURRENT_CRYPTOGRAPHY:
|
||||
_cryptography_backend = cryptography.hazmat.backends.default_backend()
|
||||
except Exception as _:
|
||||
except Exception as dummy:
|
||||
HAS_CURRENT_CRYPTOGRAPHY = False
|
||||
|
||||
|
||||
|
@ -90,7 +91,7 @@ def write_file(module, dest, content):
|
|||
except Exception as err:
|
||||
try:
|
||||
f.close()
|
||||
except Exception as e:
|
||||
except Exception as dummy:
|
||||
pass
|
||||
os.remove(tmpsrc)
|
||||
raise ModuleFailException("failed to create temporary content file: %s" % to_native(err), exception=traceback.format_exc())
|
||||
|
@ -101,7 +102,7 @@ def write_file(module, dest, content):
|
|||
if not os.path.exists(tmpsrc):
|
||||
try:
|
||||
os.remove(tmpsrc)
|
||||
except Exception as e:
|
||||
except Exception as dummy:
|
||||
pass
|
||||
raise ModuleFailException("Source %s does not exist" % (tmpsrc))
|
||||
if not os.access(tmpsrc, os.R_OK):
|
||||
|
@ -174,7 +175,7 @@ def _parse_key_openssl(openssl_binary, module, key_file=None, key_content=None):
|
|||
except Exception as err:
|
||||
try:
|
||||
f.close()
|
||||
except Exception as e:
|
||||
except Exception as dummy:
|
||||
pass
|
||||
raise ModuleFailException("failed to create temporary content file: %s" % to_native(err), exception=traceback.format_exc())
|
||||
f.close()
|
||||
|
@ -824,44 +825,11 @@ class ACMEAccount(object):
|
|||
|
||||
|
||||
def _normalize_ip(ip):
|
||||
if ':' not in ip:
|
||||
# For IPv4 addresses: remove trailing zeros per nibble
|
||||
ip = '.'.join([nibble.lstrip('0') or '0' for nibble in ip.split('.')])
|
||||
try:
|
||||
return to_native(compat_ipaddress.ip_address(to_text(ip)).compressed)
|
||||
except ValueError:
|
||||
# We don't want to error out on something IPAddress() can't parse
|
||||
return ip
|
||||
# For IPv6 addresses:
|
||||
# 1. Make them lowercase and split
|
||||
ip = ip.lower()
|
||||
i = ip.find('::')
|
||||
if i >= 0:
|
||||
front = ip[:i].split(':') or []
|
||||
back = ip[i + 2:].split(':') or []
|
||||
ip = front + ['0'] * (8 - len(front) - len(back)) + back
|
||||
else:
|
||||
ip = ip.split(':')
|
||||
# 2. Remove trailing zeros per nibble
|
||||
ip = [nibble.lstrip('0') or '0' for nibble in ip]
|
||||
# 3. Find longest consecutive sequence of zeros
|
||||
zeros_start = -1
|
||||
zeros_length = -1
|
||||
current_start = -1
|
||||
for i, nibble in enumerate(ip):
|
||||
if nibble == '0':
|
||||
if current_start < 0:
|
||||
current_start = i
|
||||
elif current_start >= 0:
|
||||
if i - current_start > zeros_length:
|
||||
zeros_start = current_start
|
||||
zeros_length = i - current_start
|
||||
current_start = -1
|
||||
if current_start >= 0:
|
||||
if 8 - current_start > zeros_length:
|
||||
zeros_start = current_start
|
||||
zeros_length = 8 - current_start
|
||||
# 4. If the sequence has at least two elements, contract
|
||||
if zeros_length >= 2:
|
||||
return ':'.join(ip[:zeros_start]) + '::' + ':'.join(ip[zeros_start + zeros_length:])
|
||||
# 5. If not, return full IP
|
||||
return ':'.join(ip)
|
||||
|
||||
|
||||
def openssl_get_csr_identifiers(openssl_binary, module, csr_filename):
|
||||
|
@ -910,7 +878,7 @@ def cryptography_get_csr_identifiers(module, csr_filename):
|
|||
if isinstance(name, cryptography.x509.DNSName):
|
||||
identifiers.add(('dns', name.value))
|
||||
elif isinstance(name, cryptography.x509.IPAddress):
|
||||
identifiers.add(('ip', _normalize_ip(str(name.value))))
|
||||
identifiers.add(('ip', name.value.compressed))
|
||||
else:
|
||||
raise ModuleFailException('Found unsupported SAN identifier {0}'.format(name))
|
||||
return identifiers
|
||||
|
@ -952,7 +920,7 @@ def set_crypto_backend(module):
|
|||
elif backend == 'cryptography':
|
||||
try:
|
||||
cryptography.__version__
|
||||
except Exception as _:
|
||||
except Exception as dummy:
|
||||
module.fail_json(msg='Cannot find cryptography module!')
|
||||
HAS_CURRENT_CRYPTOGRAPHY = True
|
||||
else:
|
||||
|
|
|
@ -395,6 +395,7 @@ from datetime import datetime
|
|||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils._text import to_bytes
|
||||
from ansible.module_utils.compat import ipaddress as compat_ipaddress
|
||||
|
||||
|
||||
def get_cert_days(module, cert_file):
|
||||
|
@ -550,26 +551,10 @@ class ACMEClient(object):
|
|||
elif challenge_type == 'tls-alpn-01':
|
||||
# https://tools.ietf.org/html/draft-ietf-acme-tls-alpn-05#section-3
|
||||
if identifier_type == 'ip':
|
||||
if ':' in identifier:
|
||||
# IPv6 address: use reverse IP6.ARPA mapping (RFC3596)
|
||||
i = identifier.find('::')
|
||||
if i >= 0:
|
||||
nibbles = [nibble for nibble in identifier[:i].split(':') if nibble]
|
||||
suffix = [nibble for nibble in identifier[i + 1:].split(':') if nibble]
|
||||
if len(nibbles) + len(suffix) < 8:
|
||||
nibbles.extend(['0'] * (8 - len(nibbles) - len(suffix)))
|
||||
nibbles.extend(suffix)
|
||||
else:
|
||||
nibbles = identifier.split(':')
|
||||
resource = []
|
||||
for nibble in reversed(nibbles):
|
||||
nibble = '0' * (4 - len(nibble)) + nibble.lower()
|
||||
for octet in reversed(nibble):
|
||||
resource.append(octet)
|
||||
resource = '.'.join(resource) + '.ip6.arpa.'
|
||||
else:
|
||||
# IPv4 address: use reverse IN-ADDR.ARPA mapping (RFC1034)
|
||||
resource = '.'.join(reversed(identifier.split('.'))) + '.in-addr.arpa.'
|
||||
# IPv4/IPv6 address: use reverse mapping (RFC1034, RFC3596)
|
||||
resource = compat_ipaddress.ip_address(identifier).reverse_pointer
|
||||
if not resource.endswith('.'):
|
||||
resource += '.'
|
||||
else:
|
||||
resource = identifier
|
||||
value = base64.b64encode(hashlib.sha256(to_bytes(keyauthorization)).digest())
|
||||
|
|
|
@ -235,6 +235,7 @@ from ansible.module_utils import crypto as crypto_utils
|
|||
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
|
||||
from ansible.module_utils.six import string_types
|
||||
from ansible.module_utils._text import to_native, to_text, to_bytes
|
||||
from ansible.module_utils.compat import ipaddress as compat_ipaddress
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
|
||||
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
||||
|
@ -243,7 +244,6 @@ PYOPENSSL_IMP_ERR = None
|
|||
try:
|
||||
import OpenSSL
|
||||
from OpenSSL import crypto
|
||||
import ipaddress
|
||||
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
||||
# OpenSSL 1.1.0 or newer
|
||||
|
@ -609,7 +609,7 @@ class CertificateInfoPyOpenSSL(CertificateInfo):
|
|||
if san.startswith('IP Address:'):
|
||||
san = 'IP:' + san[len('IP Address:'):]
|
||||
if san.startswith('IP:'):
|
||||
ip = ipaddress.ip_address(san[3:])
|
||||
ip = compat_ipaddress.ip_address(san[3:])
|
||||
san = 'IP:{0}'.format(ip.compressed)
|
||||
return san
|
||||
|
||||
|
|
|
@ -336,6 +336,7 @@ from distutils.version import LooseVersion
|
|||
from ansible.module_utils import crypto as crypto_utils
|
||||
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
|
||||
from ansible.module_utils._text import to_native, to_bytes, to_text
|
||||
from ansible.module_utils.compat import ipaddress as compat_ipaddress
|
||||
|
||||
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
|
||||
|
@ -368,7 +369,6 @@ try:
|
|||
import cryptography.hazmat.backends
|
||||
import cryptography.hazmat.primitives.serialization
|
||||
import cryptography.hazmat.primitives.hashes
|
||||
import ipaddress
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
|
@ -560,7 +560,7 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase):
|
|||
if san.startswith('IP Address:'):
|
||||
san = 'IP:' + san[len('IP Address:'):]
|
||||
if san.startswith('IP:'):
|
||||
ip = ipaddress.ip_address(san[3:])
|
||||
ip = compat_ipaddress.ip_address(san[3:])
|
||||
san = 'IP:{0}'.format(ip.compressed)
|
||||
return san
|
||||
|
||||
|
|
|
@ -165,6 +165,7 @@ from ansible.module_utils import crypto as crypto_utils
|
|||
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
|
||||
from ansible.module_utils.six import string_types
|
||||
from ansible.module_utils._text import to_native, to_text, to_bytes
|
||||
from ansible.module_utils.compat import ipaddress as compat_ipaddress
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
|
||||
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
||||
|
@ -173,7 +174,6 @@ PYOPENSSL_IMP_ERR = None
|
|||
try:
|
||||
import OpenSSL
|
||||
from OpenSSL import crypto
|
||||
import ipaddress
|
||||
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
||||
# OpenSSL 1.1.0 or newer
|
||||
|
@ -444,7 +444,7 @@ class CertificateSigningRequestInfoPyOpenSSL(CertificateSigningRequestInfo):
|
|||
if san.startswith('IP Address:'):
|
||||
san = 'IP:' + san[len('IP Address:'):]
|
||||
if san.startswith('IP:'):
|
||||
ip = ipaddress.ip_address(san[3:])
|
||||
ip = compat_ipaddress.ip_address(san[3:])
|
||||
san = 'IP:{0}'.format(ip.compressed)
|
||||
return san
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ lib/ansible/cli/console.py blacklisted-name
|
|||
lib/ansible/compat/selectors/_selectors2.py blacklisted-name
|
||||
lib/ansible/executor/playbook_executor.py blacklisted-name
|
||||
lib/ansible/executor/task_queue_manager.py blacklisted-name
|
||||
lib/ansible/module_utils/acme.py blacklisted-name
|
||||
lib/ansible/module_utils/facts/network/linux.py blacklisted-name
|
||||
lib/ansible/module_utils/network/edgeswitch/edgeswitch_interface.py duplicate-string-formatting-argument
|
||||
lib/ansible/module_utils/urls.py blacklisted-name
|
||||
|
|
Loading…
Add table
Reference in a new issue