acme_certificate: allow to download alternate certificate chains (#56334)

* Improve link handling.

* Also fetch alternate certificate chains.

* Add retrieve_all_alternates option.

* Simplify code.

* Forgot when condition.

* Add tests for retrieve_all_alternates.

* Fixes.

* Moved utility function for link parsing to module_utils.

* Fix grammar.
This commit is contained in:
Felix Fontein 2019-08-09 23:54:48 +02:00 committed by GitHub
parent e9fc095123
commit 8b68feb67e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 67 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- "acme_certificate - all alternate chains can be retrieved using the new ``retrieve_all_alternates`` option."

View file

@ -30,6 +30,7 @@ import traceback
from ansible.module_utils._text import to_native, to_text, to_bytes
from ansible.module_utils.urls import fetch_url
from ansible.module_utils.compat import ipaddress as compat_ipaddress
from ansible.module_utils.six.moves.urllib.parse import unquote
try:
import cryptography
@ -930,3 +931,13 @@ def set_crypto_backend(module):
module.debug('Using cryptography backend (library version {0})'.format(CRYPTOGRAPHY_VERSION))
else:
module.debug('Using OpenSSL binary backend')
def process_links(info, callback):
'''
Process link header, calls callback for every link header with the URL and relation as options.
'''
if 'link' in info:
link = info['link']
for url, relation in re.findall(r'<([^>]+)>;rel="(\w+)"', link):
callback(unquote(url), relation)

View file

@ -197,6 +197,15 @@ options:
type: bool
default: no
version_added: 2.6
retrieve_all_alternates:
description:
- "When set to C(yes), will retrieve all alternate chains offered by the ACME CA.
These will not be written to disk, but will be returned together with the main
chain as C(all_chains). See the documentation for the C(all_chains) return
value for details."
type: bool
default: no
version_added: "2.9"
'''
EXAMPLES = r'''
@ -372,6 +381,26 @@ account_uri:
returned: changed
type: str
version_added: "2.5"
all_chains:
description:
- When I(retrieve_all_alternates) is set to C(yes), the module will query the ACME server
for alternate chains. This return value will contain a list of all chains returned,
the first entry being the main chain returned by the server.
- See L(Section 7.4.2 of RFC8555,https://tools.ietf.org/html/rfc8555#section-7.4.2) for details.
returned: when certificate was retrieved and I(retrieve_all_alternates) is set to C(yes)
type: list
contains:
chain:
description:
- The certificate chain, excluding the root, as concatenated PEM certificates.
type: str
returned: always
full_chain:
description:
- The certificate chain, excluding the root, but including the leaf certificate,
as concatenated PEM certificates.
type: str
returned: always
'''
from ansible.module_utils.acme import (
@ -383,6 +412,7 @@ from ansible.module_utils.acme import (
openssl_get_csr_identifiers,
cryptography_get_cert_days,
set_crypto_backend,
process_links,
)
import base64
@ -392,6 +422,7 @@ import os
import re
import textwrap
import time
import urllib
from datetime import datetime
from ansible.module_utils.basic import AnsibleModule
@ -684,19 +715,23 @@ class ACMEClient(object):
chain.append(''.join(current))
current = []
# Process link-up headers if there was no chain in reply
if not chain and 'link' in info:
link = info['link']
parsed_link = re.match(r'<(.+)>;rel="(\w+)"', link)
if parsed_link and parsed_link.group(2) == "up":
chain_link = parsed_link.group(1)
chain_result, chain_info = self.account.get_request(chain_link, parse_json_result=False)
if chain_info['status'] in [200, 201]:
chain.append(self._der_to_pem(chain_result))
alternates = []
def f(link, relation):
if relation == 'up':
# Process link-up headers if there was no chain in reply
if not chain:
chain_result, chain_info = self.account.get_request(link, parse_json_result=False)
if chain_info['status'] in [200, 201]:
chain.append(self._der_to_pem(chain_result))
elif relation == 'alternate':
alternates.append(link)
process_links(info, f)
if cert is None or current:
raise ModuleFailException("Failed to parse certificate chain download from {0}: {1} (headers: {2})".format(url, content, info))
return {'cert': cert, 'chain': chain}
return {'cert': cert, 'chain': chain, 'alternates': alternates}
def _new_cert_v1(self):
'''
@ -712,14 +747,15 @@ class ACMEClient(object):
result, info = self.account.send_signed_request(self.directory['new-cert'], new_cert)
chain = []
if 'link' in info:
link = info['link']
parsed_link = re.match(r'<(.+)>;rel="(\w+)"', link)
if parsed_link and parsed_link.group(2) == "up":
chain_link = parsed_link.group(1)
chain_result, chain_info = self.account.get_request(chain_link, parse_json_result=False)
def f(link, relation):
if relation == 'up':
chain_result, chain_info = self.account.get_request(link, parse_json_result=False)
if chain_info['status'] in [200, 201]:
chain = [self._der_to_pem(chain_result)]
chain.clear()
chain.append(self._der_to_pem(chain_result))
process_links(info, f)
if info['status'] not in [200, 201]:
raise ModuleFailException("Error new cert: CODE: {0} RESULT: {1}".format(info['status'], result))
@ -873,6 +909,30 @@ class ACMEClient(object):
else:
cert_uri = self._finalize_cert()
cert = self._download_cert(cert_uri)
if self.module.params['retrieve_all_alternates']:
alternate_chains = []
for alternate in cert['alternates']:
try:
alt_cert = self._download_cert(alternate)
except ModuleFailException as e:
self.module.warn('Error while downloading alternative certificate {0}: {1}'.format(alternate, e))
continue
alt_chain = alt_cert.get('chain', [])
if alt_chain:
alternate_chains.append(alt_chain)
else:
self.module.warn('Alternative certificate {0} chain is empty'.format(alternate))
self.all_chains = []
def _append_all_chains(chain):
self.all_chains.append(dict(
chain=("\n".join(chain)).encode('utf8'),
full_chain=(cert['cert'] + "\n".join(chain)).encode('utf8'),
))
_append_all_chains(cert.get('chain', []))
for alt_chain in alternate_chains:
_append_all_chains(alt_chain.get('chain', []))
if cert['cert'] is not None:
pem_cert = cert['cert']
@ -939,6 +999,7 @@ def main():
remaining_days=dict(type='int', default=10),
deactivate_authzs=dict(type='bool', default=False),
force=dict(type='bool', default=False),
retrieve_all_alternates=dict(type='bool', default=False),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
),
required_one_of=(
@ -979,6 +1040,7 @@ def main():
else:
client = ACMEClient(module)
client.cert_days = cert_days
other = dict()
if client.is_first_step():
# First run: start challenges / start new order
client.start_challenges()
@ -987,6 +1049,8 @@ def main():
try:
client.finish_challenges()
client.get_certificate()
if module.params['retrieve_all_alternates']:
other['all_chains'] = client.all_chains
finally:
if module.params['deactivate_authzs']:
client.deactivate_authzs()
@ -1003,7 +1067,8 @@ def main():
account_uri=client.account.uri,
challenge_data=data,
challenge_data_dns=data_dns,
cert_days=client.cert_days
cert_days=client.cert_days,
**other
)
else:
module.exit_json(changed=False, cert_days=cert_days)

View file

@ -57,6 +57,10 @@
remaining_days: 10
terms_agreed: yes
account_email: "example@example.org"
retrieve_all_alternates: yes
- name: Store obtain results for cert 1
set_fact:
cert_1_obtain_results: "{{ certificate_obtain_result }}"
- name: Obtain cert 2
include_tasks: obtain-cert.yml
vars:
@ -73,6 +77,9 @@
remaining_days: 10
terms_agreed: no
account_email: ""
- name: Store obtain results for cert 2
set_fact:
cert_2_obtain_results: "{{ certificate_obtain_result }}"
- name: Obtain cert 3
include_tasks: obtain-cert.yml
vars:

View file

@ -7,6 +7,14 @@
assert:
that:
- "'DNS:example.com' in cert_1_text.stdout"
- name: Check that certificate 1 retrieval got all chains
assert:
that:
- "'all_chains' in cert_1_obtain_results"
- "'chain' in cert_1_obtain_results.all_chains[0]"
- "'full_chain' in cert_1_obtain_results.all_chains[0]"
- "lookup('file', output_dir ~ '/cert-1-chain.pem', rstrip=False) == cert_1_obtain_results.all_chains[0].chain"
- "lookup('file', output_dir ~ '/cert-1-fullchain.pem', rstrip=False) == cert_1_obtain_results.all_chains[0].full_chain"
- name: Check that certificate 2 is valid
assert:
@ -17,6 +25,10 @@
that:
- "'DNS:*.example.com' in cert_2_text.stdout"
- "'DNS:example.com' in cert_2_text.stdout"
- name: Check that certificate 2 retrieval did not get all chains
assert:
that:
- "'all_chains' not in cert_2_obtain_results"
- name: Check that certificate 3 is valid
assert:

View file

@ -26,7 +26,8 @@
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: no
account_key: "{{ output_dir }}/{{ account_key }}.pem"
account_key: "{{ (output_dir ~ '/' ~ account_key ~ '.pem') if account_key_content is not defined else omit }}"
account_key_content: "{{ account_key_content | default(omit) }}"
modify_account: "{{ modify_account }}"
csr: "{{ output_dir }}/{{ certificate_name }}.csr"
dest: "{{ output_dir }}/{{ certificate_name }}.pem"
@ -39,31 +40,6 @@
terms_agreed: "{{ terms_agreed }}"
account_email: "{{ account_email }}"
register: challenge_data
when: account_key_content is not defined
- name: ({{ certgen_title }}) Obtain cert, step 1 (using account key data)
acme_certificate:
select_crypto_backend: "{{ select_crypto_backend }}"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: no
account_key_content: "{{ account_key_content }}"
modify_account: "{{ modify_account }}"
csr: "{{ output_dir }}/{{ certificate_name }}.csr"
dest: "{{ output_dir }}/{{ certificate_name }}.pem"
fullchain_dest: "{{ output_dir }}/{{ certificate_name }}-fullchain.pem"
chain_dest: "{{ output_dir }}/{{ certificate_name }}-chain.pem"
challenge: "{{ challenge }}"
deactivate_authzs: "{{ deactivate_authzs }}"
force: "{{ force }}"
remaining_days: "{{ remaining_days }}"
terms_agreed: "{{ terms_agreed }}"
account_email: "{{ account_email }}"
register: challenge_data_content
when: account_key_content is defined
- name: ({{ certgen_title }}) Copy challenge data (when using account key data)
set_fact:
challenge_data: "{{ challenge_data_content }}"
when: account_key_content is defined
- name: ({{ certgen_title }}) Print challenge data
debug:
var: challenge_data
@ -120,7 +96,8 @@
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: no
account_key: "{{ output_dir }}/{{ account_key }}.pem"
account_key: "{{ (output_dir ~ '/' ~ account_key ~ '.pem') if account_key_content is not defined else omit }}"
account_key_content: "{{ account_key_content | default(omit) }}"
account_uri: "{{ challenge_data.account_uri }}"
modify_account: "{{ modify_account }}"
csr: "{{ output_dir }}/{{ certificate_name }}.csr"
@ -134,28 +111,9 @@
terms_agreed: "{{ terms_agreed }}"
account_email: "{{ account_email }}"
data: "{{ challenge_data }}"
when: challenge_data is changed and account_key_content is not defined
- name: ({{ certgen_title }}) Obtain cert, step 2 (using account key data)
acme_certificate:
select_crypto_backend: "{{ select_crypto_backend }}"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: no
account_key_content: "{{ account_key_content }}"
account_uri: "{{ challenge_data.account_uri }}"
modify_account: "{{ modify_account }}"
csr: "{{ output_dir }}/{{ certificate_name }}.csr"
dest: "{{ output_dir }}/{{ certificate_name }}.pem"
fullchain_dest: "{{ output_dir }}/{{ certificate_name }}-fullchain.pem"
chain_dest: "{{ output_dir }}/{{ certificate_name }}-chain.pem"
challenge: "{{ challenge }}"
deactivate_authzs: "{{ deactivate_authzs }}"
force: "{{ force }}"
remaining_days: "{{ remaining_days }}"
terms_agreed: "{{ terms_agreed }}"
account_email: "{{ account_email }}"
data: "{{ challenge_data }}"
when: challenge_data is changed and account_key_content is defined
retrieve_all_alternates: "{{ retrieve_all_alternates | default(omit) }}"
register: certificate_obtain_result
when: challenge_data is changed
- name: ({{ certgen_title }}) Deleting HTTP challenges
uri:
url: "http://{{ acme_host }}:5000/http/{{ item.key }}/{{ item.value['http-01'].resource[('.well-known/acme-challenge/'|length):] }}"