change detection and check mode fixes for apt_key (#73334)

Change detection and check mode fixes for apt_key

* allow apt-key to use proxies
* add note about deprecation of apt-key itself
* expanded error msgs
* show all keys
* fix short_format parsing
* added more return info and documented it
This commit is contained in:
Brian Coca 2021-01-27 18:40:58 -05:00 committed by GitHub
parent 595413d113
commit 5aa4295d74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 201 additions and 81 deletions

View file

@ -0,0 +1,2 @@
bugfixes:
- several fixes to make apt_key better at identifying needs for change and also to avoid changes in check_mode.

View file

@ -20,7 +20,8 @@ short_description: Add or remove an apt key
description:
- Add or remove an I(apt) key, optionally downloading it.
notes:
- Doesn't download the key unless it really needs it.
- The apt-key command has been deprecated and suggests to 'manage keyring files in trusted.gpg.d instead'. See the Debian wiki for details.
This module is kept for backwards compatiblity for systems that still use apt-key as the main way to manage apt repository keys.
- As a sanity check, downloaded key id must match the one specified.
- "Use full fingerprint (40 characters) key ids to avoid key collisions.
To generate a full-fingerprint imported key: C(apt-key adv --list-public-keys --with-fingerprint --with-colons)."
@ -114,7 +115,40 @@ EXAMPLES = '''
state: present
'''
RETURN = '''#'''
RETURN = '''
after:
description: List of apt key ids or fingerprints after any modification
returned: on change
type: list
sample: ["D8576A8BA88D21E9", "3B4FE6ACC0B21F32", "D94AA3F0EFE21092", "871920D1991BC93C"]
before:
description: List of apt key ids or fingprints before any modifications
returned: always
type: list
sample: ["3B4FE6ACC0B21F32", "D94AA3F0EFE21092", "871920D1991BC93C"]
fp:
description: Fingerprint of the key to import
returned: always
type: str
sample: "D8576A8BA88D21E9"
id:
description: key id from source
returned: always
type: str
sample: "36A1D7869245C8950F966E92D8576A8BA88D21E9"
key_id:
description: calculated key id, it should be same as 'id', but can be different
returned: always
type: str
sample: "36A1D7869245C8950F966E92D8576A8BA88D21E9"
short_id:
description: caclulated short key id
returned: always
type: str
sample: "A88D21E9"
'''
import os
# FIXME: standardize into module_common
from traceback import format_exc
@ -125,18 +159,28 @@ from ansible.module_utils.urls import fetch_url
apt_key_bin = None
gpg_bin = None
lang_env = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')
def find_needed_binaries(module):
global apt_key_bin
global gpg_bin
apt_key_bin = module.get_bin_path('apt-key', required=True)
gpg_bin = module.get_bin_path('gpg', required=True)
# FIXME: Is there a reason that gpg and grep are checked? Is it just
# cruft or does the apt .deb package not require them (and if they're not
# installed, /usr/bin/apt-key fails?)
module.get_bin_path('gpg', required=True)
module.get_bin_path('grep', required=True)
def add_http_proxy(cmd):
for envvar in ('HTTPS_PROXY', 'https_proxy', 'HTTP_PROXY', 'http_proxy'):
proxy = os.environ.get(envvar)
if proxy:
break
if proxy:
cmd += ' --keyserver-options http-proxy=%s' % proxy
return cmd
def parse_key_id(key_id):
@ -157,7 +201,7 @@ def parse_key_id(key_id):
"""
# Make sure the key_id is valid hexadecimal
int(key_id, 16)
int(to_native(key_id), 16)
key_id = key_id.upper()
if key_id.startswith('0X'):
@ -176,23 +220,41 @@ def parse_key_id(key_id):
return short_key_id, fingerprint, key_id
def parse_output_for_keys(output, short_format=False):
found = []
lines = to_native(output).split('\n')
for line in lines:
if (line.startswith("pub") or line.startswith("sub")) and "expired" not in line:
try:
# apt key format
tokens = line.split()
code = tokens[1]
(len_type, real_code) = code.split("/")
except (IndexError, ValueError):
# gpg format
try:
tokens = line.split(':')
real_code = tokens[4]
except (IndexError, ValueError):
# invalid line, skip
continue
found.append(real_code)
if found and short_format:
found = shorten_key_ids(found)
return found
def all_keys(module, keyring, short_format):
if keyring:
if keyring is not None:
cmd = "%s --keyring %s adv --list-public-keys --keyid-format=long" % (apt_key_bin, keyring)
else:
cmd = "%s adv --list-public-keys --keyid-format=long" % apt_key_bin
(rc, out, err) = module.run_command(cmd)
results = []
lines = to_native(out).split('\n')
for line in lines:
if (line.startswith("pub") or line.startswith("sub")) and "expired" not in line:
tokens = line.split()
code = tokens[1]
(len_type, real_code) = code.split("/")
results.append(real_code)
if short_format:
results = shorten_key_ids(results)
return results
return parse_output_for_keys(out, short_format)
def shorten_key_ids(key_id_list):
@ -207,13 +269,10 @@ def shorten_key_ids(key_id_list):
def download_key(module, url):
# FIXME: move get_url code to common, allow for in-memory D/L, support proxies
# and reuse here
if url is None:
module.fail_json(msg="needed a URL but was not specified")
try:
rsp, info = fetch_url(module, url)
# note: validate_certs and other args are pulled from module directly
rsp, info = fetch_url(module, url, use_proxy=True)
if info['status'] != 200:
module.fail_json(msg="Failed to download key at %s: %s" % (url, info['msg']))
@ -222,13 +281,41 @@ def download_key(module, url):
module.fail_json(msg="error getting key id from url: %s" % url, traceback=format_exc())
def get_key_id_from_file(module, filename, data=None):
global lang_env
key = None
cmd = [gpg_bin, '--with-colons', filename]
(rc, out, err) = module.run_command(cmd, environ_update=lang_env, data=to_native(data))
if rc != 0:
module.fail_json(msg="Unable to extract key from '%s'" % ('inline data' if data is None else filename), stdout=out, stderr=err)
keys = parse_output_for_keys(out)
# assume we only want first key?
if keys:
key = keys[0]
return key
def get_key_id_from_data(module, data):
return get_key_id_from_file(module, '-', data)
def import_key(module, keyring, keyserver, key_id):
global lang_env
if keyring:
cmd = "%s --keyring %s adv --no-tty --keyserver %s --recv %s" % (apt_key_bin, keyring, keyserver, key_id)
else:
cmd = "%s adv --no-tty --keyserver %s --recv %s" % (apt_key_bin, keyserver, key_id)
# check for proxy
cmd = add_http_proxy(cmd)
for retry in range(5):
lang_env = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')
(rc, out, err) = module.run_command(cmd, environ_update=lang_env)
if rc == 0:
break
@ -286,6 +373,7 @@ def main():
mutually_exclusive=(('data', 'file', 'keyserver', 'url'),),
)
# parameters
key_id = module.params['id']
url = module.params['url']
data = module.params['data']
@ -293,72 +381,91 @@ def main():
keyring = module.params['keyring']
state = module.params['state']
keyserver = module.params['keyserver']
changed = False
fingerprint = short_key_id = key_id
# internal vars
short_format = False
if key_id:
try:
short_key_id, fingerprint, key_id = parse_key_id(key_id)
except ValueError:
module.fail_json(msg='Invalid key_id', id=key_id)
if len(fingerprint) == 8:
short_format = True
short_key_id = None
fingerprint = None
error_no_error = "apt-key did not return an error, but %s (check that the id is correct and *not* a subkey)"
# ensure we have requirements met
find_needed_binaries(module)
keys = all_keys(module, keyring, short_format)
return_values = {}
# initialize result dict
r = {'changed': False}
if not key_id:
if keyserver:
module.fail_json(msg="Missing key_id, required with keyserver.")
if url:
data = download_key(module, url)
if filename:
key_id = get_key_id_from_file(module, filename)
elif data:
key_id = get_key_id_from_data(module, data)
r['id'] = key_id
try:
short_key_id, fingerprint, key_id = parse_key_id(key_id)
r['short_id'] = short_key_id
r['fp'] = fingerprint
r['key_id'] = key_id
except ValueError:
module.fail_json(msg='Invalid key_id', **r)
if not fingerprint:
# invalid key should fail well before this point, but JIC ...
module.fail_json(msg="Unable to continue as we could not extract a valid fingerprint to compare against existing keys.", **r)
if len(key_id) == 8:
short_format = True
# get existing keys to verify if we need to change
r['before'] = keys = all_keys(module, keyring, short_format)
keys2 = []
if state == 'present':
if fingerprint and fingerprint in keys:
module.exit_json(changed=False)
elif fingerprint and fingerprint not in keys and module.check_mode:
# TODO: Someday we could go further -- write keys out to
# a temporary file and then extract the key id from there via gpg
# to decide if the key is installed or not.
module.exit_json(changed=True)
else:
if not filename and not data and not keyserver:
data = download_key(module, url)
if (short_format and short_key_id not in keys) or (not short_format and fingerprint not in keys):
r['changed'] = True
if not module.check_mode:
if filename:
add_key(module, filename, keyring)
elif keyserver:
import_key(module, keyring, keyserver, key_id)
elif data:
# this also takes care of url if key_id was not provided
add_key(module, "-", keyring, data)
elif url:
# we hit this branch only if key_id is supplied with url
data = download_key(module, url)
add_key(module, "-", keyring, data)
else:
module.fail_json(msg="No key to add ... how did i get here?!?!", **r)
if filename:
add_key(module, filename, keyring)
elif keyserver:
import_key(module, keyring, keyserver, key_id)
else:
add_key(module, "-", keyring, data)
changed = False
keys2 = all_keys(module, keyring, short_format)
if len(keys) != len(keys2):
changed = True
if fingerprint and fingerprint not in keys2:
module.fail_json(msg="key does not seem to have been added", id=key_id)
module.exit_json(changed=changed)
# verify it got added
r['after'] = keys2 = all_keys(module, keyring, short_format)
if (short_format and short_key_id not in keys2) or (not short_format and fingerprint not in keys2):
module.fail_json(msg=error_no_error % 'failed to add the key', **r)
elif state == 'absent':
if not key_id:
module.fail_json(msg="key is required")
module.fail_json(msg="key is required to remove a key", **r)
if fingerprint in keys:
if module.check_mode:
module.exit_json(changed=True)
r['changed'] = True
if not module.check_mode:
# we use the "short" id: key_id[-8:], short_format=True
# it's a workaround for https://bugs.launchpad.net/ubuntu/+source/apt/+bug/1481871
if short_key_id is not None and remove_key(module, short_key_id, keyring):
r['after'] = keys2 = all_keys(module, keyring, short_format)
if fingerprint in keys2:
module.fail_json(msg=error_no_error % 'the key was not removed', **r)
else:
module.fail_json(msg="error removing key_id", **r)
# we use the "short" id: key_id[-8:], short_format=True
# it's a workaround for https://bugs.launchpad.net/ubuntu/+source/apt/+bug/1481871
if remove_key(module, short_key_id, keyring):
keys = all_keys(module, keyring, short_format)
if fingerprint in keys:
module.fail_json(msg="apt-key del did not return an error but the key was not removed (check that the id is correct and *not* a subkey)",
id=key_id)
changed = True
else:
# FIXME: module.fail_json or exit-json immediately at point of failure
module.fail_json(msg="error removing key_id", **return_values)
module.exit_json(changed=changed, **return_values)
module.exit_json(**r)
if __name__ == '__main__':

View file

@ -1,8 +1,14 @@
- name: Ensure key is not there to start with.
apt_key:
id: 36A1D7869245C8950F966E92D8576A8BA88D21E9
state: absent
- name: run first docs example
apt_key:
keyserver: keyserver.ubuntu.com
id: 36A1D7869245C8950F966E92D8576A8BA88D21E9
register: apt_key_test0
- debug: var=apt_key_test0
- name: re-run first docs example

View file

@ -3,6 +3,11 @@
url: https://getfedora.org/static/fedora.gpg
dest: /tmp/fedora.gpg
- name: Ensure clean slate
apt_key:
id: 49FD77499570FF31
state: absent
- name: Run apt_key with both file and keyserver
apt_key:
file: /tmp/fedora.gpg
@ -31,7 +36,7 @@
- name: remove fedora.gpg
apt_key:
id: 97A1AE57C3A2372CCA3A4ABA6C13026D12C944D0
id: 49FD77499570FF31
state: absent
register: remove_fedora