Add verify subcommand to 'ansible-galaxy collection' (#65618)

* [WIP] Add verify subcommand command to 'ansible-galaxy collection'

* Fix pep8 and use consistent display order

* WIP - docs

* Remove some redundancy in verify display messages by using an error queue for each collection

* Share common code and improve output format

* clean up documentation

* typo

* Apply suggestions from code review

Co-Authored-By: Sandra McCann <samccann@redhat.com>

* Move ModifiedContent namedtuple to the global scope

Add a public metadata property

Rename function to _get_json_from_tar_file

* Add some unit tests

* fix using common functions after rebase

* changelog

* Improve efficiency finding specific installed collections

Improve efficiency by only downloading the tar.gz from the galaxy server for comparison after checking that the collection has been installed

Handle multiple collection paths

Fix up tests

* pep8

* reword that for accuracy

* use more common code and verify collection name

* Improve error message and add documentation

* Update unit tests and add coverage for both local and remote collections that are unable to be located

* Only validate collections using the format collection_namespace.collection_name

Fix tests to reflect that

Fix documentation

* Fix a bug when a local collection does not contain a file

* pep8

* Fix formatting

Co-authored-by: Sandra McCann <samccann@redhat.com>
This commit is contained in:
Sloane Hertel 2020-02-14 09:57:04 -05:00 committed by GitHub
parent 1b9b551b61
commit 97f011cf94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 934 additions and 47 deletions

View file

@ -0,0 +1,4 @@
minor_changes:
- ansible-galaxy - Add a `verify` subcommand to `ansible-galaxy collection`. The collection found on
the galaxy server is downloaded to a tempfile to compare the checksums of the files listed in the
MANIFEST.json and the FILES.json with the contents of the installed collection.

View file

@ -47,6 +47,54 @@ Configuring the ``ansible-galaxy`` client
.. _using_collections:
Verifying collections
=====================
Verifying collections with ``ansible-galaxy``
---------------------------------------------
Once installed, you can verify that the content of the installed collection matches the content of the collection on the server. This feature expects that the collection is installed in one of the configured collection paths and that the collection exists on one of the configured galaxy servers.
.. code-block:: bash
ansible-galaxy collection verify my_namespace.my_collection
The output of the ``ansible-galaxy collection verify`` command is quiet if it is successful. If a collection has been modified, the altered files are listed under the collection name.
.. code-block:: bash
ansible-galaxy collection verify my_namespace.my_collection
Collection my_namespace.my_collection contains modified content in the following files:
my_namespace.my_collection
plugins/inventory/my_inventory.py
plugins/modules/my_module.py
You can use the ``-vvv`` flag to display additional information, such as the version and path of the installed collection, the URL of the remote collection used for validation, and successful verification output.
.. code-block:: bash
ansible-galaxy collection verify my_namespace.my_collection -vvv
...
Verifying 'my_namespace.my_collection:1.0.0'.
Installed collection found at '/path/to/ansible_collections/my_namespace/my_collection/'
Remote collection found at 'https://galaxy.ansible.com/download/my_namespace-my_collection-1.0.0.tar.gz'
Successfully verified that checksums for 'my_namespace.my_collection:1.0.0' match the remote collection
If you have a pre-release or non-latest version of a collection installed you should include the specific version to verify. If the version is omitted, the installed collection is verified against the latest version available on the server.
.. code-block:: bash
ansible-galaxy collection verify my_namespace.my_collection:1.0.0
In addition to the ``namespace.collection_name:version`` format, you can provide the collections to verify in a ``requirements.yml`` file. Dependencies listed in ``requirements.yml`` are not included in the verify process and should be verified separately.
.. code-block:: bash
ansible-galaxy collection verify -r requirements.yml
Verifying against ``tar.gz`` files is not supported. If your ``requirements.yml`` contains paths to tar files or URLs for installation, you can use the ``--ignore-errors`` flag to ensure that all collections using the ``namespace.name`` format in the file are processed.
Using collections in a Playbook
===============================

View file

@ -28,6 +28,7 @@ from ansible.galaxy.collection import (
publish_collection,
validate_collection_name,
validate_collection_path,
verify_collections
)
from ansible.galaxy.login import GalaxyLogin
from ansible.galaxy.role import GalaxyRole
@ -117,6 +118,7 @@ class GalaxyCLI(CLI):
self.add_build_options(collection_parser, parents=[common, force])
self.add_publish_options(collection_parser, parents=[common])
self.add_install_options(collection_parser, parents=[common, force])
self.add_verify_options(collection_parser, parents=[common, collections_path])
# Add sub parser for the Galaxy role actions
role = type_parser.add_parser('role', help='Manage an Ansible Galaxy role.')
@ -235,6 +237,19 @@ class GalaxyCLI(CLI):
info_parser.add_argument('args', nargs='+', help='role', metavar='role_name[,version]')
def add_verify_options(self, parser, parents=None):
galaxy_type = 'collection'
verify_parser = parser.add_parser('verify', parents=parents, help='Compare checksums with the collection(s) '
'found on the server and the installed copy. This does not verify dependencies.')
verify_parser.set_defaults(func=self.execute_verify)
verify_parser.add_argument('args', metavar='{0}_name'.format(galaxy_type), nargs='*', help='The collection(s) name or '
'path/url to a tar.gz collection artifact. This is mutually exclusive with --requirements-file.')
verify_parser.add_argument('-i', '--ignore-errors', dest='ignore_errors', action='store_true', default=False,
help='Ignore errors during verification and continue with the next specified collection.')
verify_parser.add_argument('-r', '--requirements-file', dest='requirements',
help='A file containing a list of collections to be verified.')
def add_install_options(self, parser, parents=None):
galaxy_type = 'collection' if parser.metavar == 'COLLECTION_ACTION' else 'role'
@ -583,6 +598,27 @@ class GalaxyCLI(CLI):
return meta_value
def _require_one_of_collections_requirements(self, collections, requirements_file):
if collections and requirements_file:
raise AnsibleError("The positional collection_name arg and --requirements-file are mutually exclusive.")
elif not collections and not requirements_file:
raise AnsibleError("You must specify a collection name or a requirements file.")
elif requirements_file:
requirements_file = GalaxyCLI._resolve_path(requirements_file)
requirements = self._parse_requirements_file(requirements_file, allow_old_format=False)['collections']
else:
requirements = []
for collection_input in collections:
requirement = None
if os.path.isfile(to_bytes(collection_input, errors='surrogate_or_strict')) or \
urlparse(collection_input).scheme.lower() in ['http', 'https']:
# Arg is a file path or URL to a collection
name = collection_input
else:
name, dummy, requirement = collection_input.partition(':')
requirements.append((name, requirement or '*', None))
return requirements
############################
# execute actions
############################
@ -794,6 +830,22 @@ class GalaxyCLI(CLI):
self.pager(data)
def execute_verify(self):
collections = context.CLIARGS['args']
search_paths = context.CLIARGS['collections_path']
ignore_certs = context.CLIARGS['ignore_certs']
ignore_errors = context.CLIARGS['ignore_errors']
requirements_file = context.CLIARGS['requirements']
requirements = self._require_one_of_collections_requirements(collections, requirements_file)
resolved_paths = [validate_collection_path(GalaxyCLI._resolve_path(path)) for path in search_paths]
verify_collections(requirements, resolved_paths, self.api_servers, (not ignore_certs), ignore_errors)
return 0
def execute_install(self):
"""
Install one or more roles(``ansible-galaxy role install``), or one or more collections(``ansible-galaxy collection install``).
@ -811,25 +863,7 @@ class GalaxyCLI(CLI):
no_deps = context.CLIARGS['no_deps']
force_deps = context.CLIARGS['force_with_deps']
if collections and requirements_file:
raise AnsibleError("The positional collection_name arg and --requirements-file are mutually exclusive.")
elif not collections and not requirements_file:
raise AnsibleError("You must specify a collection name or a requirements file.")
if requirements_file:
requirements_file = GalaxyCLI._resolve_path(requirements_file)
requirements = self._parse_requirements_file(requirements_file, allow_old_format=False)['collections']
else:
requirements = []
for collection_input in collections:
requirement = None
if os.path.isfile(to_bytes(collection_input, errors='surrogate_or_strict')) or \
urlparse(collection_input).scheme.lower() in ['http', 'https']:
# Arg is a file path or URL to a collection
name = collection_input
else:
name, dummy, requirement = collection_input.partition(':')
requirements.append((name, requirement or '*', None))
requirements = self._require_one_of_collections_requirements(collections, requirements_file)
output_path = GalaxyCLI._resolve_path(output_path)
collections_path = C.COLLECTIONS_PATHS

View file

@ -16,6 +16,7 @@ import threading
import time
import yaml
from collections import namedtuple
from contextlib import contextmanager
from distutils.version import LooseVersion, StrictVersion
from hashlib import sha256
@ -47,6 +48,8 @@ display = Display()
MANIFEST_FORMAT = 1
ModifiedContent = namedtuple('ModifiedContent', ['filename', 'expected', 'installed'])
class CollectionRequirement:
@ -191,6 +194,68 @@ class CollectionRequirement:
self.versions = set([self.latest_version])
self._get_metadata()
def verify(self, remote_collection, path, b_temp_tar_path):
if not self.skip:
display.display("'%s' has not been installed, nothing to verify" % (to_text(self)))
return
collection_path = os.path.join(path, self.namespace, self.name)
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
display.vvv("Verifying '%s:%s'." % (to_text(self), self.latest_version))
display.vvv("Installed collection found at '%s'" % collection_path)
display.vvv("Remote collection found at '%s'" % remote_collection.metadata.download_url)
# Compare installed version versus requirement version
if self.latest_version != remote_collection.latest_version:
err = "%s has the version '%s' but is being compared to '%s'" % (to_text(self), self.latest_version, remote_collection.latest_version)
display.display(err)
return
modified_content = []
# Verify the manifest hash matches before verifying the file manifest
expected_hash = _get_tar_file_hash(b_temp_tar_path, 'MANIFEST.json')
self._verify_file_hash(b_collection_path, 'MANIFEST.json', expected_hash, modified_content)
manifest = _get_json_from_tar_file(b_temp_tar_path, 'MANIFEST.json')
# Use the manifest to verify the file manifest checksum
file_manifest_data = manifest['file_manifest_file']
file_manifest_filename = file_manifest_data['name']
expected_hash = file_manifest_data['chksum_%s' % file_manifest_data['chksum_type']]
# Verify the file manifest before using it to verify individual files
self._verify_file_hash(b_collection_path, file_manifest_filename, expected_hash, modified_content)
file_manifest = _get_json_from_tar_file(b_temp_tar_path, file_manifest_filename)
# Use the file manifest to verify individual file checksums
for manifest_data in file_manifest['files']:
if manifest_data['ftype'] == 'file':
expected_hash = manifest_data['chksum_%s' % manifest_data['chksum_type']]
self._verify_file_hash(b_collection_path, manifest_data['name'], expected_hash, modified_content)
if modified_content:
display.display("Collection %s contains modified content in the following files:" % to_text(self))
display.display(to_text(self))
display.vvv(to_text(self.b_path))
for content_change in modified_content:
display.display(' %s' % content_change.filename)
display.vvv(" Expected: %s\n Found: %s" % (content_change.expected, content_change.installed))
else:
display.vvv("Successfully verified that checksums for '%s:%s' match the remote collection" % (to_text(self), self.latest_version))
def _verify_file_hash(self, b_path, filename, expected_hash, error_queue):
b_file_path = to_bytes(os.path.join(to_text(b_path), filename), errors='surrogate_or_strict')
if not os.path.isfile(b_file_path):
actual_hash = None
else:
with open(b_file_path, mode='rb') as file_object:
actual_hash = _consume_file(file_object)
if expected_hash != actual_hash:
error_queue.append(ModifiedContent(filename=filename, expected=expected_hash, installed=actual_hash))
def _get_metadata(self):
if self._metadata:
return
@ -488,6 +553,55 @@ def validate_collection_path(collection_path):
return collection_path
def verify_collections(collections, search_paths, apis, validate_certs, ignore_errors):
with _display_progress():
with _tempdir() as b_temp_path:
for collection in collections:
try:
local_collection = None
b_collection = to_bytes(collection[0], errors='surrogate_or_strict')
if os.path.isfile(b_collection) or urlparse(collection[0]).scheme.lower() in ['http', 'https'] or len(collection[0].split('.')) != 2:
raise AnsibleError(message="'%s' is not a valid collection name. The format namespace.name is expected." % collection[0])
collection_name = collection[0]
namespace, name = collection_name.split('.')
collection_version = collection[1]
# Verify local collection exists before downloading it from a galaxy server
for search_path in search_paths:
b_search_path = to_bytes(os.path.join(search_path, namespace, name), errors='surrogate_or_strict')
if os.path.isdir(b_search_path):
local_collection = CollectionRequirement.from_path(b_search_path, False)
break
if local_collection is None:
raise AnsibleError(message='Collection %s is not installed in any of the collection paths.' % collection_name)
# Download collection on a galaxy server for comparison
try:
remote_collection = CollectionRequirement.from_name(collection_name, apis, collection_version, False, parent=None)
except AnsibleError as e:
if e.message == 'Failed to find collection %s:%s' % (collection[0], collection[1]):
raise AnsibleError('Failed to find remote collection %s:%s on any of the galaxy servers' % (collection[0], collection[1]))
raise
download_url = remote_collection.metadata.download_url
headers = {}
remote_collection.api._add_auth_token(headers, download_url, required=False)
b_temp_tar_path = _download_file(download_url, b_temp_path, None, validate_certs, headers=headers)
local_collection.verify(remote_collection, search_path, b_temp_tar_path)
except AnsibleError as err:
if ignore_errors:
display.warning("Failed to verify collection %s but skipping due to --ignore-errors being set. "
"Error: %s" % (collection[0], to_text(err)))
else:
raise
@contextmanager
def _tempdir():
b_temp_path = tempfile.mkdtemp(dir=to_bytes(C.DEFAULT_LOCAL_TMP, errors='surrogate_or_strict'))
@ -907,14 +1021,9 @@ def _download_file(url, b_path, expected_hash, validate_certs, headers=None):
unredirected_headers=['Authorization'], http_agent=user_agent())
with open(b_file_path, 'wb') as download_file:
data = resp.read(bufsize)
while data:
digest.update(data)
download_file.write(data)
data = resp.read(bufsize)
actual_hash = _consume_file(resp, download_file)
if expected_hash:
actual_hash = digest.hexdigest()
display.vvvv("Validating downloaded file hash %s with expected hash %s" % (actual_hash, expected_hash))
if expected_hash != actual_hash:
raise AnsibleError("Mismatch artifact hash with downloaded file")
@ -923,29 +1032,13 @@ def _download_file(url, b_path, expected_hash, validate_certs, headers=None):
def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
n_filename = to_native(filename, errors='surrogate_or_strict')
try:
member = tar.getmember(n_filename)
except KeyError:
raise AnsibleError("Collection tar at '%s' does not contain the expected file '%s'." % (to_native(tar.name),
n_filename))
with tempfile.NamedTemporaryFile(dir=b_temp_path, delete=False) as tmpfile_obj:
bufsize = 65536
sha256_digest = sha256()
with _tarfile_extract(tar, member) as tar_obj:
data = tar_obj.read(bufsize)
while data:
tmpfile_obj.write(data)
tmpfile_obj.flush()
sha256_digest.update(data)
data = tar_obj.read(bufsize)
actual_hash = sha256_digest.hexdigest()
with _get_tar_file_member(tar, filename) as tar_obj:
with tempfile.NamedTemporaryFile(dir=b_temp_path, delete=False) as tmpfile_obj:
actual_hash = _consume_file(tar_obj, tmpfile_obj)
if expected_hash and actual_hash != expected_hash:
raise AnsibleError("Checksum mismatch for '%s' inside collection at '%s'"
% (n_filename, to_native(tar.name)))
% (to_native(filename, errors='surrogate_or_strict'), to_native(tar.name)))
b_dest_filepath = os.path.join(b_dest, to_bytes(filename, errors='surrogate_or_strict'))
b_parent_dir = os.path.split(b_dest_filepath)[0]
@ -955,3 +1048,49 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
os.makedirs(b_parent_dir)
shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath)
def _get_tar_file_member(tar, filename):
n_filename = to_native(filename, errors='surrogate_or_strict')
try:
member = tar.getmember(n_filename)
except KeyError:
raise AnsibleError("Collection tar at '%s' does not contain the expected file '%s'." % (
to_native(tar.name),
n_filename))
return _tarfile_extract(tar, member)
def _get_json_from_tar_file(b_path, filename):
file_contents = ''
with tarfile.open(b_path, mode='r') as collection_tar:
with _get_tar_file_member(collection_tar, filename) as tar_obj:
bufsize = 65536
data = tar_obj.read(bufsize)
while data:
file_contents += to_text(data)
data = tar_obj.read(bufsize)
return json.loads(file_contents)
def _get_tar_file_hash(b_path, filename):
with tarfile.open(b_path, mode='r') as collection_tar:
with _get_tar_file_member(collection_tar, filename) as tar_obj:
return _consume_file(tar_obj)
def _consume_file(read_from, write_to=None):
bufsize = 65536
sha256_digest = sha256()
data = read_from.read(bufsize)
while data:
if write_to is not None:
write_to.write(data)
write_to.flush()
sha256_digest.update(data)
data = read_from.read(bufsize)
return sha256_digest.hexdigest()

View file

@ -14,13 +14,14 @@ import uuid
from hashlib import sha256
from io import BytesIO
from units.compat.mock import MagicMock
from units.compat.mock import MagicMock, mock_open, patch
from ansible import context
from ansible.cli.galaxy import GalaxyCLI
from ansible.errors import AnsibleError
from ansible.galaxy import api, collection, token
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.six.moves import builtins
from ansible.utils import context_objects as co
from ansible.utils.display import Display
from ansible.utils.hashing import secure_hash_s
@ -84,7 +85,7 @@ def galaxy_yml(request, tmp_path_factory):
@pytest.fixture()
def tmp_tarfile(tmp_path_factory):
def tmp_tarfile(tmp_path_factory, manifest_info):
''' Creates a temporary tar file for _extract_tar_file tests '''
filename = u'ÅÑŚÌβŁÈ'
temp_dir = to_bytes(tmp_path_factory.mktemp('test-%s Collections' % to_native(filename)))
@ -98,6 +99,13 @@ def tmp_tarfile(tmp_path_factory):
tar_info.mode = 0o0644
tfile.addfile(tarinfo=tar_info, fileobj=b_io)
b_data = to_bytes(json.dumps(manifest_info, indent=True), errors='surrogate_or_strict')
b_io = BytesIO(b_data)
tar_info = tarfile.TarInfo('MANIFEST.json')
tar_info.size = len(b_data)
tar_info.mode = 0o0644
tfile.addfile(tarinfo=tar_info, fileobj=b_io)
sha256_hash = sha256()
sha256_hash.update(data)
@ -113,6 +121,101 @@ def galaxy_server():
return galaxy_api
@pytest.fixture()
def manifest_template():
def get_manifest_info(namespace='ansible_namespace', name='collection', version='0.1.0'):
return {
"collection_info": {
"namespace": namespace,
"name": name,
"version": version,
"authors": [
"shertel"
],
"readme": "README.md",
"tags": [
"test",
"collection"
],
"description": "Test",
"license": [
"MIT"
],
"license_file": None,
"dependencies": {},
"repository": "https://github.com/{0}/{1}".format(namespace, name),
"documentation": None,
"homepage": None,
"issues": None
},
"file_manifest_file": {
"name": "FILES.json",
"ftype": "file",
"chksum_type": "sha256",
"chksum_sha256": "files_manifest_checksum",
"format": 1
},
"format": 1
}
return get_manifest_info
@pytest.fixture()
def manifest_info(manifest_template):
return manifest_template()
@pytest.fixture()
def files_manifest_info():
return {
"files": [
{
"name": ".",
"ftype": "dir",
"chksum_type": None,
"chksum_sha256": None,
"format": 1
},
{
"name": "README.md",
"ftype": "file",
"chksum_type": "sha256",
"chksum_sha256": "individual_file_checksum",
"format": 1
}
],
"format": 1}
@pytest.fixture()
def manifest(manifest_info):
b_data = to_bytes(json.dumps(manifest_info))
with patch.object(builtins, 'open', mock_open(read_data=b_data)) as m:
with open('MANIFEST.json', mode='rb') as fake_file:
yield fake_file, sha256(b_data).hexdigest()
@pytest.fixture()
def mock_collection(galaxy_server):
def create_mock_collection(namespace='ansible_namespace', name='collection', version='0.1.0', local=True, local_installed=True):
b_path = None
force = False
if local:
mock_collection = collection.CollectionRequirement(namespace, name, b_path, galaxy_server, [version], version, force, skip=local_installed)
else:
download_url = 'https://galaxy.ansible.com/download/{0}-{1}-{2}.tar.gz'.format(namespace, name, version)
digest = '19415a6a6df831df61cffde4a09d1d89ac8d8ca5c0586e85bea0b106d6dff29a'
dependencies = {}
metadata = api.CollectionVersionMetadata(namespace, name, version, download_url, digest, dependencies)
mock_collection = collection.CollectionRequirement(namespace, name, b_path, galaxy_server, [version], version, force, metadata=metadata)
return mock_collection
return create_mock_collection
def test_build_collection_no_galaxy_yaml():
fake_path = u'/fake/ÅÑŚÌβŁÈ/path'
expected = to_native("The collection galaxy.yml path '%s/galaxy.yml' does not exist." % fake_path)
@ -630,3 +733,562 @@ def test_extract_tar_file_missing_parent_dir(tmp_tarfile):
collection._extract_tar_file(tfile, filename, output_dir, temp_dir, checksum)
os.path.isfile(output_file)
def test_require_one_of_collections_requirements_with_both():
cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify', 'namespace.collection', '-r', 'requirements.yml'])
with pytest.raises(AnsibleError) as req_err:
cli._require_one_of_collections_requirements(('namespace.collection',), 'requirements.yml')
with pytest.raises(AnsibleError) as cli_err:
cli.run()
assert req_err.value.message == cli_err.value.message == 'The positional collection_name arg and --requirements-file are mutually exclusive.'
def test_require_one_of_collections_requirements_with_neither():
cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify'])
with pytest.raises(AnsibleError) as req_err:
cli._require_one_of_collections_requirements((), '')
with pytest.raises(AnsibleError) as cli_err:
cli.run()
assert req_err.value.message == cli_err.value.message == 'You must specify a collection name or a requirements file.'
def test_require_one_of_collections_requirements_with_collections():
cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify', 'namespace1.collection1', 'namespace2.collection1:1.0.0'])
collections = ('namespace1.collection1', 'namespace2.collection1:1.0.0',)
requirements = cli._require_one_of_collections_requirements(collections, '')
assert requirements == [('namespace1.collection1', '*', None), ('namespace2.collection1', '1.0.0', None)]
@patch('ansible.cli.galaxy.GalaxyCLI._parse_requirements_file')
def test_require_one_of_collections_requirements_with_requirements(mock_parse_requirements_file, galaxy_server):
cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify', '-r', 'requirements.yml', 'namespace.collection'])
mock_parse_requirements_file.return_value = {'collections': [('namespace.collection', '1.0.5', galaxy_server)]}
requirements = cli._require_one_of_collections_requirements((), 'requirements.yml')
assert mock_parse_requirements_file.call_count == 1
assert requirements == [('namespace.collection', '1.0.5', galaxy_server)]
@patch('ansible.cli.galaxy.GalaxyCLI.execute_verify', spec=True)
def test_call_GalaxyCLI(execute_verify):
galaxy_args = ['ansible-galaxy', 'collection', 'verify', 'namespace.collection']
GalaxyCLI(args=galaxy_args).run()
assert execute_verify.call_count == 1
@patch('ansible.cli.galaxy.GalaxyCLI.execute_verify')
def test_call_GalaxyCLI_with_implicit_role(execute_verify):
galaxy_args = ['ansible-galaxy', 'verify', 'namespace.implicit_role']
with pytest.raises(SystemExit):
GalaxyCLI(args=galaxy_args).run()
assert not execute_verify.called
@patch('ansible.cli.galaxy.GalaxyCLI.execute_verify')
def test_call_GalaxyCLI_with_role(execute_verify):
galaxy_args = ['ansible-galaxy', 'role', 'verify', 'namespace.role']
with pytest.raises(SystemExit):
GalaxyCLI(args=galaxy_args).run()
assert not execute_verify.called
@patch('ansible.cli.galaxy.verify_collections', spec=True)
def test_execute_verify_with_defaults(mock_verify_collections):
galaxy_args = ['ansible-galaxy', 'collection', 'verify', 'namespace.collection:1.0.4']
GalaxyCLI(args=galaxy_args).run()
assert mock_verify_collections.call_count == 1
requirements, search_paths, galaxy_apis, validate, ignore_errors = mock_verify_collections.call_args[0]
assert requirements == [('namespace.collection', '1.0.4', None)]
for install_path in search_paths:
assert install_path.endswith('ansible_collections')
assert galaxy_apis[0].api_server == 'https://galaxy.ansible.com'
assert validate is True
assert ignore_errors is False
@patch('ansible.cli.galaxy.verify_collections', spec=True)
def test_execute_verify(mock_verify_collections):
GalaxyCLI(args=[
'ansible-galaxy', 'collection', 'verify', 'namespace.collection:1.0.4', '--ignore-certs',
'-p', '~/.ansible', '--ignore-errors', '--server', 'http://galaxy-dev.com',
]).run()
assert mock_verify_collections.call_count == 1
requirements, search_paths, galaxy_apis, validate, ignore_errors = mock_verify_collections.call_args[0]
assert requirements == [('namespace.collection', '1.0.4', None)]
for install_path in search_paths:
assert install_path.endswith('ansible_collections')
assert galaxy_apis[0].api_server == 'http://galaxy-dev.com'
assert validate is False
assert ignore_errors is True
def test_verify_file_hash_deleted_file(manifest_info):
data = to_bytes(json.dumps(manifest_info))
digest = sha256(data).hexdigest()
namespace = manifest_info['collection_info']['namespace']
name = manifest_info['collection_info']['name']
version = manifest_info['collection_info']['version']
server = 'http://galaxy.ansible.com'
error_queue = []
with patch.object(builtins, 'open', mock_open(read_data=data)) as m:
with patch.object(collection.os.path, 'isfile', MagicMock(return_value=False)) as mock_isfile:
collection_req = collection.CollectionRequirement(namespace, name, './', server, [version], version, False)
collection_req._verify_file_hash(b'path/', 'file', digest, error_queue)
assert mock_isfile.called_once
assert len(error_queue) == 1
assert error_queue[0].installed is None
assert error_queue[0].expected == digest
def test_verify_file_hash_matching_hash(manifest_info):
data = to_bytes(json.dumps(manifest_info))
digest = sha256(data).hexdigest()
namespace = manifest_info['collection_info']['namespace']
name = manifest_info['collection_info']['name']
version = manifest_info['collection_info']['version']
server = 'http://galaxy.ansible.com'
error_queue = []
with patch.object(builtins, 'open', mock_open(read_data=data)) as m:
with patch.object(collection.os.path, 'isfile', MagicMock(return_value=True)) as mock_isfile:
collection_req = collection.CollectionRequirement(namespace, name, './', server, [version], version, False)
collection_req._verify_file_hash(b'path/', 'file', digest, error_queue)
assert mock_isfile.called_once
assert error_queue == []
def test_verify_file_hash_mismatching_hash(manifest_info):
data = to_bytes(json.dumps(manifest_info))
digest = sha256(data).hexdigest()
different_digest = 'not_{0}'.format(digest)
namespace = manifest_info['collection_info']['namespace']
name = manifest_info['collection_info']['name']
version = manifest_info['collection_info']['version']
server = 'http://galaxy.ansible.com'
error_queue = []
with patch.object(builtins, 'open', mock_open(read_data=data)) as m:
with patch.object(collection.os.path, 'isfile', MagicMock(return_value=True)) as mock_isfile:
collection_req = collection.CollectionRequirement(namespace, name, './', server, [version], version, False)
collection_req._verify_file_hash(b'path/', 'file', different_digest, error_queue)
assert mock_isfile.called_once
assert len(error_queue) == 1
assert error_queue[0].installed == digest
assert error_queue[0].expected == different_digest
def test_consume_file(manifest):
manifest_file, checksum = manifest
assert checksum == collection._consume_file(manifest_file)
def test_consume_file_and_write_contents(manifest, manifest_info):
manifest_file, checksum = manifest
write_to = BytesIO()
actual_hash = collection._consume_file(manifest_file, write_to)
write_to.seek(0)
assert to_bytes(json.dumps(manifest_info)) == write_to.read()
assert actual_hash == checksum
def test_get_tar_file_member(tmp_tarfile):
temp_dir, tfile, filename, checksum = tmp_tarfile
with collection._get_tar_file_member(tfile, filename) as tar_file_obj:
assert isinstance(tar_file_obj, tarfile.ExFileObject)
def test_get_nonexistent_tar_file_member(tmp_tarfile):
temp_dir, tfile, filename, checksum = tmp_tarfile
file_does_not_exist = filename + 'nonexistent'
with pytest.raises(AnsibleError) as err:
collection._get_tar_file_member(tfile, file_does_not_exist)
assert to_text(err.value.message) == "Collection tar at '%s' does not contain the expected file '%s'." % (to_text(tfile.name), file_does_not_exist)
def test_get_tar_file_hash(tmp_tarfile):
temp_dir, tfile, filename, checksum = tmp_tarfile
assert checksum == collection._get_tar_file_hash(tfile.name, filename)
def test_get_json_from_tar_file(tmp_tarfile):
temp_dir, tfile, filename, checksum = tmp_tarfile
assert 'MANIFEST.json' in tfile.getnames()
data = collection._get_json_from_tar_file(tfile.name, 'MANIFEST.json')
assert isinstance(data, dict)
def test_verify_collection_not_installed(mock_collection):
local_collection = mock_collection(local_installed=False)
remote_collection = mock_collection(local=False)
with patch.object(collection.display, 'display') as mocked_display:
local_collection.verify(remote_collection, './', './')
assert mocked_display.called
assert mocked_display.call_args[0][0] == "'%s.%s' has not been installed, nothing to verify" % (local_collection.namespace, local_collection.name)
def test_verify_successful_debug_info(monkeypatch, mock_collection):
local_collection = mock_collection()
remote_collection = mock_collection(local=False)
monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock())
monkeypatch.setattr(collection.CollectionRequirement, '_verify_file_hash', MagicMock())
monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock())
with patch.object(collection.display, 'vvv') as mock_display:
local_collection.verify(remote_collection, './', './')
namespace = local_collection.namespace
name = local_collection.name
version = local_collection.latest_version
assert mock_display.call_count == 4
assert mock_display.call_args_list[0][0][0] == "Verifying '%s.%s:%s'." % (namespace, name, version)
assert mock_display.call_args_list[1][0][0] == "Installed collection found at './%s/%s'" % (namespace, name)
located = "Remote collection found at 'https://galaxy.ansible.com/download/%s-%s-%s.tar.gz'" % (namespace, name, version)
assert mock_display.call_args_list[2][0][0] == located
verified = "Successfully verified that checksums for '%s.%s:%s' match the remote collection" % (namespace, name, version)
assert mock_display.call_args_list[3][0][0] == verified
def test_verify_different_versions(mock_collection):
local_collection = mock_collection(version='0.1.0')
remote_collection = mock_collection(local=False, version='3.0.0')
with patch.object(collection.display, 'display') as mock_display:
local_collection.verify(remote_collection, './', './')
namespace = local_collection.namespace
name = local_collection.name
installed_version = local_collection.latest_version
compared_version = remote_collection.latest_version
msg = "%s.%s has the version '%s' but is being compared to '%s'" % (namespace, name, installed_version, compared_version)
assert mock_display.call_count == 1
assert mock_display.call_args[0][0] == msg
@patch.object(builtins, 'open', mock_open())
def test_verify_modified_manifest(monkeypatch, mock_collection, manifest_info):
local_collection = mock_collection()
remote_collection = mock_collection(local=False)
monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum']))
monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=['manifest_checksum_modified', 'files_manifest_checksum']))
monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, {'files': []}]))
monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True))
with patch.object(collection.display, 'display') as mock_display:
with patch.object(collection.display, 'vvv') as mock_debug:
local_collection.verify(remote_collection, './', './')
namespace = local_collection.namespace
name = local_collection.name
assert mock_display.call_count == 3
assert mock_display.call_args_list[0][0][0] == 'Collection %s.%s contains modified content in the following files:' % (namespace, name)
assert mock_display.call_args_list[1][0][0] == '%s.%s' % (namespace, name)
assert mock_display.call_args_list[2][0][0] == ' MANIFEST.json'
# The -vvv output should show details (the checksums do not match)
assert mock_debug.call_count == 5
assert mock_debug.call_args_list[-1][0][0] == ' Expected: manifest_checksum\n Found: manifest_checksum_modified'
@patch.object(builtins, 'open', mock_open())
def test_verify_modified_files_manifest(monkeypatch, mock_collection, manifest_info):
local_collection = mock_collection()
remote_collection = mock_collection(local=False)
monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum']))
monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=['manifest_checksum', 'files_manifest_checksum_modified']))
monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, {'files': []}]))
monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True))
with patch.object(collection.display, 'display') as mock_display:
with patch.object(collection.display, 'vvv') as mock_debug:
local_collection.verify(remote_collection, './', './')
namespace = local_collection.namespace
name = local_collection.name
assert mock_display.call_count == 3
assert mock_display.call_args_list[0][0][0] == 'Collection %s.%s contains modified content in the following files:' % (namespace, name)
assert mock_display.call_args_list[1][0][0] == '%s.%s' % (namespace, name)
assert mock_display.call_args_list[2][0][0] == ' FILES.json'
# The -vvv output should show details (the checksums do not match)
assert mock_debug.call_count == 5
assert mock_debug.call_args_list[-1][0][0] == ' Expected: files_manifest_checksum\n Found: files_manifest_checksum_modified'
@patch.object(builtins, 'open', mock_open())
def test_verify_modified_files(monkeypatch, mock_collection, manifest_info, files_manifest_info):
local_collection = mock_collection()
remote_collection = mock_collection(local=False)
monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum']))
fakehashes = ['manifest_checksum', 'files_manifest_checksum', 'individual_file_checksum_modified']
monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=fakehashes))
monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, files_manifest_info]))
monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True))
with patch.object(collection.display, 'display') as mock_display:
with patch.object(collection.display, 'vvv') as mock_debug:
local_collection.verify(remote_collection, './', './')
namespace = local_collection.namespace
name = local_collection.name
assert mock_display.call_count == 3
assert mock_display.call_args_list[0][0][0] == 'Collection %s.%s contains modified content in the following files:' % (namespace, name)
assert mock_display.call_args_list[1][0][0] == '%s.%s' % (namespace, name)
assert mock_display.call_args_list[2][0][0] == ' README.md'
# The -vvv output should show details (the checksums do not match)
assert mock_debug.call_count == 5
assert mock_debug.call_args_list[-1][0][0] == ' Expected: individual_file_checksum\n Found: individual_file_checksum_modified'
@patch.object(builtins, 'open', mock_open())
def test_verify_identical(monkeypatch, mock_collection, manifest_info, files_manifest_info):
local_collection = mock_collection()
remote_collection = mock_collection(local=False)
monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum']))
monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=['manifest_checksum', 'files_manifest_checksum', 'individual_file_checksum']))
monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, files_manifest_info]))
monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True))
with patch.object(collection.display, 'display') as mock_display:
with patch.object(collection.display, 'vvv') as mock_debug:
local_collection.verify(remote_collection, './', './')
# Successful verification is quiet
assert mock_display.call_count == 0
# The -vvv output should show the checksums not matching
namespace = local_collection.namespace
name = local_collection.name
version = local_collection.latest_version
success_msg = "Successfully verified that checksums for '%s.%s:%s' match the remote collection" % (namespace, name, version)
assert mock_debug.call_count == 4
assert mock_debug.call_args_list[-1][0][0] == success_msg
@patch.object(collection.CollectionRequirement, 'verify')
def test_verify_collections_not_installed(mock_verify, mock_collection, monkeypatch):
namespace = 'ansible_namespace'
name = 'collection'
version = '1.0.0'
local_collection = mock_collection(local_installed=False)
found_remote = MagicMock(return_value=mock_collection(local=False))
monkeypatch.setattr(collection.CollectionRequirement, 'from_name', found_remote)
collections = [('%s.%s' % (namespace, name), version, None)]
search_path = './'
validate_certs = False
ignore_errors = False
apis = [local_collection.api]
with patch.object(collection, '_download_file') as mock_download_file:
with pytest.raises(AnsibleError) as err:
collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors)
assert err.value.message == "Collection %s.%s is not installed in any of the collection paths." % (namespace, name)
@patch.object(collection.CollectionRequirement, 'verify')
def test_verify_collections_not_installed_ignore_errors(mock_verify, mock_collection, monkeypatch):
namespace = 'ansible_namespace'
name = 'collection'
version = '1.0.0'
local_collection = mock_collection(local_installed=False)
found_remote = MagicMock(return_value=mock_collection(local=False))
monkeypatch.setattr(collection.CollectionRequirement, 'from_name', found_remote)
collections = [('%s.%s' % (namespace, name), version, None)]
search_path = './'
validate_certs = False
ignore_errors = True
apis = [local_collection.api]
with patch.object(collection, '_download_file') as mock_download_file:
with patch.object(Display, 'warning') as mock_warning:
collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors)
skip_message = "Failed to verify collection %s.%s but skipping due to --ignore-errors being set." % (namespace, name)
original_err = "Error: Collection %s.%s is not installed in any of the collection paths." % (namespace, name)
assert mock_warning.called
assert mock_warning.call_args[0][0] == skip_message + " " + original_err
@patch.object(os.path, 'isdir', return_value=True)
@patch.object(collection.CollectionRequirement, 'verify')
def test_verify_collections_no_remote(mock_verify, mock_isdir, mock_collection):
namespace = 'ansible_namespace'
name = 'collection'
version = '1.0.0'
collections = [('%s.%s' % (namespace, name), version, None)]
search_path = './'
validate_certs = False
ignore_errors = False
apis = []
with pytest.raises(AnsibleError) as err:
collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors)
assert err.value.message == "Failed to find remote collection %s.%s:%s on any of the galaxy servers" % (namespace, name, version)
@patch.object(os.path, 'isdir', return_value=True)
@patch.object(collection.CollectionRequirement, 'verify')
def test_verify_collections_no_remote_ignore_errors(mock_verify, mock_isdir, mock_collection):
namespace = 'ansible_namespace'
name = 'collection'
version = '1.0.0'
local_collection = mock_collection(local_installed=False)
collections = [('%s.%s' % (namespace, name), version, None)]
search_path = './'
validate_certs = False
ignore_errors = True
apis = []
with patch.object(Display, 'warning') as mock_warning:
collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors)
skip_message = "Failed to verify collection %s.%s but skipping due to --ignore-errors being set." % (namespace, name)
original_err = "Error: Failed to find remote collection %s.%s:%s on any of the galaxy servers" % (namespace, name, version)
assert mock_warning.called
assert mock_warning.call_args[0][0] == skip_message + " " + original_err
def test_verify_collections_tarfile(monkeypatch):
monkeypatch.setattr(os.path, 'isfile', MagicMock(return_value=True))
invalid_format = 'ansible_namespace-collection-0.1.0.tar.gz'
collections = [(invalid_format, '*', None)]
with pytest.raises(AnsibleError) as err:
collection.verify_collections(collections, './', [], False, False)
msg = "'%s' is not a valid collection name. The format namespace.name is expected." % invalid_format
assert err.value.message == msg
def test_verify_collections_path(monkeypatch):
monkeypatch.setattr(os.path, 'isfile', MagicMock(return_value=False))
invalid_format = 'collections/collection_namespace/collection_name'
collections = [(invalid_format, '*', None)]
with pytest.raises(AnsibleError) as err:
collection.verify_collections(collections, './', [], False, False)
msg = "'%s' is not a valid collection name. The format namespace.name is expected." % invalid_format
assert err.value.message == msg
def test_verify_collections_url(monkeypatch):
monkeypatch.setattr(os.path, 'isfile', MagicMock(return_value=False))
invalid_format = 'https://galaxy.ansible.com/download/ansible_namespace-collection-0.1.0.tar.gz'
collections = [(invalid_format, '*', None)]
with pytest.raises(AnsibleError) as err:
collection.verify_collections(collections, './', [], False, False)
msg = "'%s' is not a valid collection name. The format namespace.name is expected." % invalid_format
assert err.value.message == msg
@patch.object(os.path, 'isfile', return_value=False)
@patch.object(os.path, 'isdir', return_value=True)
@patch.object(collection.CollectionRequirement, 'verify')
def test_verify_collections_name(mock_verify, mock_isdir, mock_isfile, mock_collection, monkeypatch):
local_collection = mock_collection()
monkeypatch.setattr(collection.CollectionRequirement, 'from_path', MagicMock(return_value=local_collection))
located_remote_from_name = MagicMock(return_value=mock_collection(local=False))
monkeypatch.setattr(collection.CollectionRequirement, 'from_name', located_remote_from_name)
with patch.object(collection, '_download_file') as mock_download_file:
collections = [('%s.%s' % (local_collection.namespace, local_collection.name), '%s' % local_collection.latest_version, None)]
search_path = './'
validate_certs = False
ignore_errors = False
apis = [local_collection.api]
collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors)
assert mock_download_file.call_count == 1
assert located_remote_from_name.call_count == 1