Skip tests with unsatisfied deps ()

* Skip gitlab tests if dependencies aren't met

* Skip certain unittests if passlib is not installed

* Fix tests with deps on paramiko to skip if paramiko is not installed

* Use pytest to skip for cloudstack

If either on Python-2.6 or the cs library is not installed we cannot run
this test so skip it
This commit is contained in:
Toshio Kuratomi 2019-06-28 13:09:36 -07:00 committed by GitHub
parent 35b6345bdc
commit 8acf71f78f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 318 additions and 107 deletions

View file

@ -1,13 +1,22 @@
import sys import sys
import pytest
import units.compat.unittest as unittest import units.compat.unittest as unittest
from units.compat.mock import MagicMock from units.compat.mock import MagicMock
from units.compat.unittest import TestCase from units.compat.unittest import TestCase
from units.modules.utils import set_module_args from units.modules.utils import set_module_args
# Exoscale's cs doesn't support Python 2.6 # Exoscale's cs doesn't support Python 2.6
pytestmark = []
if sys.version_info[:2] != (2, 6): if sys.version_info[:2] != (2, 6):
from ansible.modules.cloud.cloudstack.cs_traffic_type import AnsibleCloudStackTrafficType, setup_module_object from ansible.modules.cloud.cloudstack.cs_traffic_type import AnsibleCloudStackTrafficType, setup_module_object
from ansible.module_utils.cloudstack import HAS_LIB_CS
if not HAS_LIB_CS:
pytestmark.append(pytest.mark.skip('The cloudstack library, "cs", is needed to test cs_traffic_type'))
else:
pytestmark.append(pytest.mark.skip('Exoscale\'s cs doesn\'t support Python 2.6'))
EXISTING_TRAFFIC_TYPES_RESPONSE = { EXISTING_TRAFFIC_TYPES_RESPONSE = {
@ -85,13 +94,11 @@ base_module_args = {
class TestAnsibleCloudstackTraffiType(TestCase): class TestAnsibleCloudstackTraffiType(TestCase):
@unittest.skipUnless(sys.version_info[:2] >= (2, 7), "Exoscale's cs doesn't support Python 2.6")
def test_module_is_created_sensibly(self): def test_module_is_created_sensibly(self):
set_module_args(base_module_args) set_module_args(base_module_args)
module = setup_module_object() module = setup_module_object()
assert module.params['traffic_type'] == 'Guest' assert module.params['traffic_type'] == 'Guest'
@unittest.skipUnless(sys.version_info[:2] >= (2, 7), "Exoscale's cs doesn't support Python 2.6")
def test_update_called_when_traffic_type_exists(self): def test_update_called_when_traffic_type_exists(self):
set_module_args(base_module_args) set_module_args(base_module_args)
module = setup_module_object() module = setup_module_object()
@ -101,7 +108,6 @@ class TestAnsibleCloudstackTraffiType(TestCase):
actt.present_traffic_type() actt.present_traffic_type()
self.assertTrue(actt.update_traffic_type.called) self.assertTrue(actt.update_traffic_type.called)
@unittest.skipUnless(sys.version_info[:2] >= (2, 7), "Exoscale's cs doesn't support Python 2.6")
def test_update_not_called_when_traffic_type_doesnt_exist(self): def test_update_not_called_when_traffic_type_doesnt_exist(self):
set_module_args(base_module_args) set_module_args(base_module_args)
module = setup_module_object() module = setup_module_object()
@ -113,7 +119,6 @@ class TestAnsibleCloudstackTraffiType(TestCase):
self.assertFalse(actt.update_traffic_type.called) self.assertFalse(actt.update_traffic_type.called)
self.assertTrue(actt.add_traffic_type.called) self.assertTrue(actt.add_traffic_type.called)
@unittest.skipUnless(sys.version_info[:2] >= (2, 7), "Exoscale's cs doesn't support Python 2.6")
def test_traffic_type_returned_if_exists(self): def test_traffic_type_returned_if_exists(self):
set_module_args(base_module_args) set_module_args(base_module_args)
module = setup_module_object() module = setup_module_object()

View file

@ -1,5 +1,8 @@
import pytest
from ansible.module_utils.source_control.bitbucket import BitbucketHelper from ansible.module_utils.source_control.bitbucket import BitbucketHelper
from ansible.modules.source_control.bitbucket import bitbucket_pipeline_known_host from ansible.modules.source_control.bitbucket import bitbucket_pipeline_known_host
from ansible.modules.source_control.bitbucket.bitbucket_pipeline_known_host import HAS_PARAMIKO
from units.compat import unittest from units.compat import unittest
from units.compat.mock import patch from units.compat.mock import patch
from units.modules.utils import AnsibleExitJson, ModuleTestCase, set_module_args from units.modules.utils import AnsibleExitJson, ModuleTestCase, set_module_args
@ -10,6 +13,7 @@ class TestBucketPipelineKnownHostModule(ModuleTestCase):
super(TestBucketPipelineKnownHostModule, self).setUp() super(TestBucketPipelineKnownHostModule, self).setUp()
self.module = bitbucket_pipeline_known_host self.module = bitbucket_pipeline_known_host
@pytest.mark.skipif(not HAS_PARAMIKO, reason='paramiko must be installed to test key creation')
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token') @patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value=None) @patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value=None)
def test_create_known_host(self, *args): def test_create_known_host(self, *args):
@ -48,6 +52,7 @@ class TestBucketPipelineKnownHostModule(ModuleTestCase):
self.assertEqual(get_host_key_mock.call_count, 0) self.assertEqual(get_host_key_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], True) self.assertEqual(exec_info.exception.args[0]['changed'], True)
@pytest.mark.skipif(not HAS_PARAMIKO, reason='paramiko must be installed to test key creation')
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token') @patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value={ @patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value={
'type': 'pipeline_known_host', 'type': 'pipeline_known_host',
@ -77,6 +82,7 @@ class TestBucketPipelineKnownHostModule(ModuleTestCase):
self.assertEqual(create_known_host_mock.call_count, 0) self.assertEqual(create_known_host_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], False) self.assertEqual(exec_info.exception.args[0]['changed'], False)
@pytest.mark.skipif(not HAS_PARAMIKO, reason='paramiko must be installed to test key creation')
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token') @patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value=None) @patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value=None)
def test_create_known_host_check_mode(self, *args): def test_create_known_host_check_mode(self, *args):
@ -96,6 +102,7 @@ class TestBucketPipelineKnownHostModule(ModuleTestCase):
self.assertEqual(create_known_host_mock.call_count, 0) self.assertEqual(create_known_host_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], True) self.assertEqual(exec_info.exception.args[0]['changed'], True)
@pytest.mark.skipif(not HAS_PARAMIKO, reason='paramiko must be installed to test key creation')
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token') @patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value={ @patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value={
'type': 'pipeline_known_host', 'type': 'pipeline_known_host',
@ -125,6 +132,7 @@ class TestBucketPipelineKnownHostModule(ModuleTestCase):
self.assertEqual(delete_known_host_mock.call_count, 1) self.assertEqual(delete_known_host_mock.call_count, 1)
self.assertEqual(exec_info.exception.args[0]['changed'], True) self.assertEqual(exec_info.exception.args[0]['changed'], True)
@pytest.mark.skipif(not HAS_PARAMIKO, reason='paramiko must be installed to test key creation')
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token') @patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value=None) @patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value=None)
def test_delete_absent_known_host(self, *args): def test_delete_absent_known_host(self, *args):
@ -143,6 +151,7 @@ class TestBucketPipelineKnownHostModule(ModuleTestCase):
self.assertEqual(delete_known_host_mock.call_count, 0) self.assertEqual(delete_known_host_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], False) self.assertEqual(exec_info.exception.args[0]['changed'], False)
@pytest.mark.skipif(not HAS_PARAMIKO, reason='paramiko must be installed to test key creation')
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token') @patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value={ @patch.object(bitbucket_pipeline_known_host, 'get_existing_known_host', return_value={
'type': 'pipeline_known_host', 'type': 'pipeline_known_host',

View file

@ -5,19 +5,42 @@
from __future__ import absolute_import from __future__ import absolute_import
import pytest
from ansible.modules.source_control.gitlab_deploy_key import GitLabDeployKey from ansible.modules.source_control.gitlab_deploy_key import GitLabDeployKey
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_project, resp_find_project_deploy_key,
resp_create_project_deploy_key, resp_delete_project_deploy_key)
# Gitlab module requirements def _dummy(x):
if python_version_match_requirement(): """Dummy function. Only used as a placeholder for toplevel definitions when the test is going
from gitlab.v4.objects import ProjectKey to be skipped anyway"""
return x
pytestmark = []
try:
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_project, resp_find_project_deploy_key,
resp_create_project_deploy_key, resp_delete_project_deploy_key)
# Gitlab module requirements
if python_version_match_requirement():
from gitlab.v4.objects import ProjectKey
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load gitlab module required for testing"))
# Need to set these to something so that we don't fail when parsing
GitlabModuleTestCase = object
resp_get_project = _dummy
resp_find_project_deploy_key = _dummy
resp_create_project_deploy_key = _dummy
resp_delete_project_deploy_key = _dummy
# Unit tests requirements # Unit tests requirements
from httmock import with_httmock # noqa try:
from httmock import with_httmock # noqa
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load httmock module required for testing"))
with_httmock = _dummy
class TestGitlabDeployKey(GitlabModuleTestCase): class TestGitlabDeployKey(GitlabModuleTestCase):

View file

@ -5,19 +5,44 @@
from __future__ import absolute_import from __future__ import absolute_import
import pytest
from ansible.modules.source_control.gitlab_group import GitLabGroup from ansible.modules.source_control.gitlab_group import GitLabGroup
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_group, resp_get_missing_group, resp_create_group,
resp_create_subgroup, resp_delete_group, resp_find_group_project)
# Gitlab module requirements def _dummy(x):
if python_version_match_requirement(): """Dummy function. Only used as a placeholder for toplevel definitions when the test is going
from gitlab.v4.objects import Group to be skipped anyway"""
return x
pytestmark = []
try:
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_group, resp_get_missing_group, resp_create_group,
resp_create_subgroup, resp_delete_group, resp_find_group_project)
# Gitlab module requirements
if python_version_match_requirement():
from gitlab.v4.objects import Group
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load gitlab module required for testing"))
# Need to set these to something so that we don't fail when parsing
GitlabModuleTestCase = object
resp_get_group = _dummy
resp_get_missing_group = _dummy
resp_create_group = _dummy
resp_create_subgroup = _dummy
resp_delete_group = _dummy
resp_find_group_project = _dummy
# Unit tests requirements # Unit tests requirements
from httmock import with_httmock # noqa try:
from httmock import with_httmock # noqa
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load httmock module required for testing"))
with_httmock = _dummy
class TestGitlabGroup(GitlabModuleTestCase): class TestGitlabGroup(GitlabModuleTestCase):

View file

@ -4,20 +4,42 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import from __future__ import absolute_import
import pytest
from ansible.modules.source_control.gitlab_hook import GitLabHook from ansible.modules.source_control.gitlab_hook import GitLabHook
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_project, resp_find_project_hook,
resp_create_project_hook, resp_delete_project_hook)
# Gitlab module requirements def _dummy(x):
if python_version_match_requirement(): """Dummy function. Only used as a placeholder for toplevel definitions when the test is going
from gitlab.v4.objects import ProjectHook to be skipped anyway"""
return x
pytestmark = []
try:
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_project, resp_find_project_hook,
resp_create_project_hook, resp_delete_project_hook)
# Gitlab module requirements
if python_version_match_requirement():
from gitlab.v4.objects import ProjectHook
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load gitlab module required for testing"))
# Need to set these to something so that we don't fail when parsing
GitlabModuleTestCase = object
resp_get_project = _dummy
resp_find_project_hook = _dummy
resp_create_project_hook = _dummy
resp_delete_project_hook = _dummy
# Unit tests requirements # Unit tests requirements
from httmock import with_httmock # noqa try:
from httmock import with_httmock # noqa
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load httmock module required for testing"))
with_httmock = _dummy
class TestGitlabHook(GitlabModuleTestCase): class TestGitlabHook(GitlabModuleTestCase):

View file

@ -5,19 +5,44 @@
from __future__ import absolute_import from __future__ import absolute_import
import pytest
from ansible.modules.source_control.gitlab_project import GitLabProject from ansible.modules.source_control.gitlab_project import GitLabProject
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_group, resp_get_project_by_name, resp_create_project,
resp_get_project, resp_delete_project, resp_get_user)
# Gitlab module requirements def _dummy(x):
if python_version_match_requirement(): """Dummy function. Only used as a placeholder for toplevel definitions when the test is going
from gitlab.v4.objects import Project to be skipped anyway"""
return x
pytestmark = []
try:
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_get_group, resp_get_project_by_name, resp_create_project,
resp_get_project, resp_delete_project, resp_get_user)
# Gitlab module requirements
if python_version_match_requirement():
from gitlab.v4.objects import Project
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load gitlab module required for testing"))
# Need to set these to something so that we don't fail when parsing
GitlabModuleTestCase = object
resp_get_group = _dummy
resp_get_project_by_name = _dummy
resp_create_project = _dummy
resp_get_project = _dummy
resp_delete_project = _dummy
resp_get_user = _dummy
# Unit tests requirements # Unit tests requirements
from httmock import with_httmock # noqa try:
from httmock import with_httmock # noqa
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load httmock module required for testing"))
with_httmock = _dummy
class TestGitlabProject(GitlabModuleTestCase): class TestGitlabProject(GitlabModuleTestCase):

View file

@ -5,19 +5,42 @@
from __future__ import absolute_import from __future__ import absolute_import
import pytest
from ansible.modules.source_control.gitlab_runner import GitLabRunner from ansible.modules.source_control.gitlab_runner import GitLabRunner
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_find_runners_list, resp_get_runner,
resp_create_runner, resp_delete_runner)
# Gitlab module requirements def _dummy(x):
if python_version_match_requirement(): """Dummy function. Only used as a placeholder for toplevel definitions when the test is going
from gitlab.v4.objects import Runner to be skipped anyway"""
return x
pytestmark = []
try:
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_find_runners_list, resp_get_runner,
resp_create_runner, resp_delete_runner)
# Gitlab module requirements
if python_version_match_requirement():
from gitlab.v4.objects import Runner
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load gitlab module required for testing"))
# Need to set these to something so that we don't fail when parsing
GitlabModuleTestCase = object
resp_find_runners = _dummy
resp_get_runner = _dummy
resp_create_runner = _dummy
resp_delete_runner = _dummy
# Unit tests requirements # Unit tests requirements
from httmock import with_httmock # noqa try:
from httmock import with_httmock # noqa
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load httmock module required for testing"))
with_httmock = _dummy
class TestGitlabRunner(GitlabModuleTestCase): class TestGitlabRunner(GitlabModuleTestCase):

View file

@ -5,21 +5,51 @@
from __future__ import absolute_import from __future__ import absolute_import
import pytest
from ansible.modules.source_control.gitlab_user import GitLabUser from ansible.modules.source_control.gitlab_user import GitLabUser
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_find_user, resp_get_user, resp_get_user_keys,
resp_create_user_keys, resp_create_user, resp_delete_user,
resp_get_member, resp_get_group, resp_add_member,
resp_update_member, resp_get_member)
# Gitlab module requirements def _dummy(x):
if python_version_match_requirement(): """Dummy function. Only used as a placeholder for toplevel definitions when the test is going
from gitlab.v4.objects import User to be skipped anyway"""
return x
pytestmark = []
try:
from .gitlab import (GitlabModuleTestCase,
python_version_match_requirement,
resp_find_user, resp_get_user, resp_get_user_keys,
resp_create_user_keys, resp_create_user, resp_delete_user,
resp_get_member, resp_get_group, resp_add_member,
resp_update_member, resp_get_member)
# Gitlab module requirements
if python_version_match_requirement():
from gitlab.v4.objects import User
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load gitlab module required for testing"))
# Need to set these to something so that we don't fail when parsing
GitlabModuleTestCase = object
resp_find_user = _dummy
resp_get_user = _dummy
resp_get_user_keys = _dummy
resp_create_user_keys = _dummy
resp_create_user = _dummy
resp_delete_user = _dummy
resp_get_member = _dummy
resp_get_group = _dummy
resp_add_member = _dummy
resp_update_member = _dummy
resp_get_member = _dummy
# Unit tests requirements # Unit tests requirements
from httmock import with_httmock # noqa try:
from httmock import with_httmock # noqa
except ImportError:
pytestmark.append(pytest.mark.skip("Could not load httmock module required for testing"))
with_httmock = _dummy
class TestGitlabUser(GitlabModuleTestCase): class TestGitlabUser(GitlabModuleTestCase):

View file

@ -23,7 +23,8 @@ import sys
import pytest import pytest
from units.compat import unittest from units.compat import unittest
from ansible.plugins.filter.network import parse_xml, type5_pw, hash_salt, comp_type5, vlan_parser from ansible.plugins.filter.network import (HAS_PASSLIB, parse_xml, type5_pw, hash_salt,
comp_type5, vlan_parser)
from ansible.errors import AnsibleFilterError from ansible.errors import AnsibleFilterError
@ -84,6 +85,7 @@ class TestNetworkParseFilter(unittest.TestCase):
self.assertEqual("parse_xml works on string input, but given input of : %s" % type(output), str(e.exception)) self.assertEqual("parse_xml works on string input, but given input of : %s" % type(output), str(e.exception))
@pytest.mark.skipif(not HAS_PASSLIB, reason="Current type5_pw filter needs passlib to function")
class TestNetworkType5(unittest.TestCase): class TestNetworkType5(unittest.TestCase):
def test_defined_salt_success(self): def test_defined_salt_success(self):
@ -147,6 +149,7 @@ class TestHashSalt(unittest.TestCase):
self.assertEqual("Could not parse salt out password correctly from $nTc1$Z28sUTcWfXlvVe2x.3XAa.", str(e.exception)) self.assertEqual("Could not parse salt out password correctly from $nTc1$Z28sUTcWfXlvVe2x.3XAa.", str(e.exception))
@pytest.mark.skipif(not HAS_PASSLIB, reason="Current comp_type5 filter needs passlib to function")
class TestCompareType5(unittest.TestCase): class TestCompareType5(unittest.TestCase):
def test_compare_type5_boolean(self): def test_compare_type5_boolean(self):
@ -161,7 +164,7 @@ class TestCompareType5(unittest.TestCase):
parsed = comp_type5(unencrypted_password, encrypted_password, True) parsed = comp_type5(unencrypted_password, encrypted_password, True)
self.assertEqual(parsed, '$1$nTc1$Z28sUTcWfXlvVe2x.3XAa.') self.assertEqual(parsed, '$1$nTc1$Z28sUTcWfXlvVe2x.3XAa.')
def test_compate_type5_fail(self): def test_compare_type5_fail(self):
unencrypted_password = 'invalid_password' unencrypted_password = 'invalid_password'
encrypted_password = '$1$nTc1$Z28sUTcWfXlvVe2x.3XAa.' encrypted_password = '$1$nTc1$Z28sUTcWfXlvVe2x.3XAa.'
parsed = comp_type5(unencrypted_password, encrypted_password) parsed = comp_type5(unencrypted_password, encrypted_password)

View file

@ -20,8 +20,15 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import passlib try:
from passlib.handlers import pbkdf2 import passlib
from passlib.handlers import pbkdf2
except ImportError:
passlib = None
pbkdf2 = None
import pytest
from units.mock.loader import DictDataLoader from units.mock.loader import DictDataLoader
from units.compat import unittest from units.compat import unittest
@ -359,7 +366,7 @@ class TestWritePasswordFile(unittest.TestCase):
m().write.assert_called_once_with(u'Testing Café\n'.encode('utf-8')) m().write.assert_called_once_with(u'Testing Café\n'.encode('utf-8'))
class TestLookupModule(unittest.TestCase): class BaseTestLookupModule(unittest.TestCase):
def setUp(self): def setUp(self):
self.fake_loader = DictDataLoader({'/path/to/somewhere': 'sdfsdf'}) self.fake_loader = DictDataLoader({'/path/to/somewhere': 'sdfsdf'})
self.password_lookup = password.LookupModule(loader=self.fake_loader) self.password_lookup = password.LookupModule(loader=self.fake_loader)
@ -373,19 +380,15 @@ class TestLookupModule(unittest.TestCase):
self.makedirs_safe = password.makedirs_safe self.makedirs_safe = password.makedirs_safe
password.makedirs_safe = lambda path, mode: None password.makedirs_safe = lambda path, mode: None
# Different releases of passlib default to a different number of rounds
self.sha256 = passlib.registry.get_crypt_handler('pbkdf2_sha256')
sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000)
passlib.registry.register_crypt_handler(sha256_for_tests, force=True)
def tearDown(self): def tearDown(self):
password.os.path.exists = self.os_path_exists password.os.path.exists = self.os_path_exists
password.os.open = self.os_open password.os.open = self.os_open
password.os.close = self.os_close password.os.close = self.os_close
password.os.remove = self.os_remove password.os.remove = self.os_remove
password.makedirs_safe = self.makedirs_safe password.makedirs_safe = self.makedirs_safe
passlib.registry.register_crypt_handler(self.sha256, force=True)
class TestLookupModuleWithoutPasslib(BaseTestLookupModule):
@patch.object(PluginLoader, '_get_paths') @patch.object(PluginLoader, '_get_paths')
@patch('ansible.plugins.lookup.password._write_password_file') @patch('ansible.plugins.lookup.password._write_password_file')
def test_no_encrypt(self, mock_get_paths, mock_write_file): def test_no_encrypt(self, mock_get_paths, mock_write_file):
@ -395,46 +398,8 @@ class TestLookupModule(unittest.TestCase):
# FIXME: assert something useful # FIXME: assert something useful
for result in results: for result in results:
self.assertEquals(len(result), password.DEFAULT_LENGTH) assert len(result) == password.DEFAULT_LENGTH
self.assertIsInstance(result, text_type) assert isinstance(result, text_type)
@patch.object(PluginLoader, '_get_paths')
@patch('ansible.plugins.lookup.password._write_password_file')
def test_encrypt(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'], None)
# pbkdf2 format plus hash
expected_password_length = 76
for result in results:
self.assertEquals(len(result), expected_password_length)
# result should have 5 parts split by '$'
str_parts = result.split('$', 5)
# verify the result is parseable by the passlib
crypt_parts = passlib.hash.pbkdf2_sha256.parsehash(result)
# verify it used the right algo type
self.assertEquals(str_parts[1], 'pbkdf2-sha256')
self.assertEquals(len(str_parts), 5)
# verify the string and parsehash agree on the number of rounds
self.assertEquals(int(str_parts[2]), crypt_parts['rounds'])
self.assertIsInstance(result, text_type)
@patch.object(PluginLoader, '_get_paths')
@patch('ansible.plugins.lookup.password._write_password_file')
def test_password_already_created_encrypt(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere')
with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'], None)
for result in results:
self.assertEqual(result, u'$pbkdf2-sha256$20000$ODc2NTQzMjE$Uikde0cv0BKaRaAXMrUQB.zvG4GmnjClwjghwIRf2gU')
@patch.object(PluginLoader, '_get_paths') @patch.object(PluginLoader, '_get_paths')
@patch('ansible.plugins.lookup.password._write_password_file') @patch('ansible.plugins.lookup.password._write_password_file')
@ -480,3 +445,57 @@ class TestLookupModule(unittest.TestCase):
for result in results: for result in results:
self.assertEqual(result, u'hunter42') self.assertEqual(result, u'hunter42')
@pytest.mark.skipif(passlib is None, reason='passlib must be installed to run these tests')
class TestLookupModuleWithPasslib(BaseTestLookupModule):
def setUp(self):
super(TestLookupModuleWithPasslib, self).setUp()
# Different releases of passlib default to a different number of rounds
self.sha256 = passlib.registry.get_crypt_handler('pbkdf2_sha256')
sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000)
passlib.registry.register_crypt_handler(sha256_for_tests, force=True)
def tearDown(self):
super(TestLookupModuleWithPasslib, self).tearDown()
passlib.registry.register_crypt_handler(self.sha256, force=True)
@patch.object(PluginLoader, '_get_paths')
@patch('ansible.plugins.lookup.password._write_password_file')
def test_encrypt(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'], None)
# pbkdf2 format plus hash
expected_password_length = 76
for result in results:
self.assertEquals(len(result), expected_password_length)
# result should have 5 parts split by '$'
str_parts = result.split('$', 5)
# verify the result is parseable by the passlib
crypt_parts = passlib.hash.pbkdf2_sha256.parsehash(result)
# verify it used the right algo type
self.assertEquals(str_parts[1], 'pbkdf2-sha256')
self.assertEquals(len(str_parts), 5)
# verify the string and parsehash agree on the number of rounds
self.assertEquals(int(str_parts[2]), crypt_parts['rounds'])
self.assertIsInstance(result, text_type)
@patch.object(PluginLoader, '_get_paths')
@patch('ansible.plugins.lookup.password._write_password_file')
def test_password_already_created_encrypt(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere')
with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'], None)
for result in results:
self.assertEqual(result, u'$pbkdf2-sha256$20000$ODc2NTQzMjE$Uikde0cv0BKaRaAXMrUQB.zvG4GmnjClwjghwIRf2gU')

View file

@ -36,16 +36,29 @@ class passlib_off(object):
def assert_hash(expected, secret, algorithm, **settings): def assert_hash(expected, secret, algorithm, **settings):
assert encrypt.CryptHash(algorithm).hash(secret, **settings) == expected
if encrypt.PASSLIB_AVAILABLE: if encrypt.PASSLIB_AVAILABLE:
assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected
assert encrypt.PasslibHash(algorithm).hash(secret, **settings) == expected assert encrypt.PasslibHash(algorithm).hash(secret, **settings) == expected
else: else:
with pytest.raises(AnsibleFilterError): assert encrypt.passlib_or_crypt(secret, algorithm, **settings) == expected
with pytest.raises(AnsibleError) as excinfo:
encrypt.PasslibHash(algorithm).hash(secret, **settings) encrypt.PasslibHash(algorithm).hash(secret, **settings)
assert excinfo.value.args[0] == "passlib must be installed to hash with '%s'" % algorithm
def test_encrypt_with_rounds_no_passlib():
with passlib_off():
assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000)
assert_hash("$5$rounds=10000$12345678$JBinliYMFEcBeAXKZnLjenhgEhTmJBvZn3aR8l70Oy/",
secret="123", algorithm="sha256_crypt", salt="12345678", rounds=10000)
assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.",
secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000)
# If passlib is not installed. this is identical to the test_encrypt_with_rounds_no_passlib() test
@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
def test_encrypt_with_rounds(): def test_encrypt_with_rounds():
assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7", assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000) secret="123", algorithm="sha256_crypt", salt="12345678", rounds=5000)
@ -55,6 +68,20 @@ def test_encrypt_with_rounds():
secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000) secret="123", algorithm="sha512_crypt", salt="12345678", rounds=5000)
def test_encrypt_default_rounds_no_passlib():
with passlib_off():
assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/",
secret="123", algorithm="md5_crypt", salt="12345678")
assert_hash("$5$12345678$uAZsE3BenI2G.nA8DpTl.9Dc8JiqacI53pEqRr5ppT7",
secret="123", algorithm="sha256_crypt", salt="12345678")
assert_hash("$6$12345678$LcV9LQiaPekQxZ.OfkMADjFdSO2k9zfbDQrHPVcYjSLqSdjLYpsgqviYvTEP/R41yPmhH3CCeEDqVhW1VHr3L.",
secret="123", algorithm="sha512_crypt", salt="12345678")
assert encrypt.CryptHash("md5_crypt").hash("123")
# If passlib is not installed. this is identical to the test_encrypt_default_rounds_no_passlib() test
@pytest.mark.skipif(not encrypt.PASSLIB_AVAILABLE, reason='passlib must be installed to run this test')
def test_encrypt_default_rounds(): def test_encrypt_default_rounds():
assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/", assert_hash("$1$12345678$tRy4cXc3kmcfRZVj4iFXr/",
secret="123", algorithm="md5_crypt", salt="12345678") secret="123", algorithm="md5_crypt", salt="12345678")